You are previewing Programming Pig.

Programming Pig

Cover of Programming Pig by Alan Gates Published by O'Reilly Media, Inc.
  1. Programming Pig
    1. SPECIAL OFFER: Upgrade this ebook with O’Reilly
    2. Preface
      1. Data Addiction
      2. Who Should Read This Book
      3. Conventions Used in This Book
      4. Code Examples in This Book
      5. Using Code Examples
      6. Safari® Books Online
      7. How to Contact Us
      8. Acknowledgments
    3. 1. Introduction
      1. What Is Pig?
      2. Pig’s History
    4. 2. Installing and Running Pig
      1. Downloading and Installing Pig
      2. Running Pig
    5. 3. Grunt
      1. Entering Pig Latin Scripts in Grunt
      2. HDFS Commands in Grunt
      3. Controlling Pig from Grunt
    6. 4. Pig’s Data Model
      1. Types
      2. Schemas
    7. 5. Introduction to Pig Latin
      1. Preliminary Matters
      2. Input and Output
      3. Relational Operations
      4. User Defined Functions
    8. 6. Advanced Pig Latin
      1. Advanced Relational Operations
      2. Integrating Pig with Legacy Code and MapReduce
      3. Nonlinear Data Flows
      4. Controlling Execution
      5. Pig Latin Preprocessor
    9. 7. Developing and Testing Pig Latin Scripts
      1. Development Tools
      2. Testing Your Scripts with PigUnit
    10. 8. Making Pig Fly
      1. Writing Your Scripts to Perform Well
      2. Writing Your UDF to Perform
      3. Tune Pig and Hadoop for Your Job
      4. Using Compression in Intermediate Results
      5. Data Layout Optimization
      6. Bad Record Handling
    11. 9. Embedding Pig Latin in Python
      1. Compile
      2. Bind
      3. Run
      4. Utility Methods
    12. 10. Writing Evaluation and Filter Functions
      1. Writing an Evaluation Function in Java
      2. Algebraic Interface
      3. Accumulator Interface
      4. Python UDFs
      5. Writing Filter Functions
    13. 11. Writing Load and Store Functions
      1. Load Functions
      2. Store Functions
    14. 12. Pig and Other Members of the Hadoop Community
      1. Pig and Hive
      2. Cascading
      3. NoSQL Databases
      4. Metadata in Hadoop
    15. A. Built-in User Defined Functions and Piggybank
      1. Built-in UDFs
      2. Piggybank
    16. B. Overview of Hadoop
      1. MapReduce
      2. Hadoop Distributed File System
    17. Index
    18. About the Author
    19. Colophon
    20. SPECIAL OFFER: Upgrade this ebook with O’Reilly

Integrating Pig with Legacy Code and MapReduce

One tenet of Pig’s philosophy is that Pig allows users to integrate their own code with Pig wherever possible (see Pig Philosophy). The most obvious way Pig does that is through its UDFs. But it also allows you to directly integrate other executables and MapReduce jobs.


To specify an executable that you want to insert into your data flow, use stream. You may want to do this when you have a legacy program that you do not want to modify or are unable to change. You can also use stream when you have a program you use frequently, or one you have tested on small data sets and now want to apply to a large data set. Let’s look at an example where you have a Perl program that filters out all stocks with a dividend below $1.00:

-- streamsimple.pig
divs = load 'NYSE_dividends' as (exchange, symbol, date, dividends);
highdivs = stream divs through `` as (exchange, symbol, date, dividends);

Notice the as clause in the stream command. This is not required. But Pig has no idea what the executable will return, so if you do not provide the as clause, the relation highdivs will have no schema.

The executable is invoked once on every map or reduce task. It is not invoked once per record. Pig instantiates the executable and keeps feeding data to it via stdin. It also keeps checking stdout, passing any results to the next operator in your data flow. The executable can choose whether to produce an output for every input, only every so many inputs, or only after all inputs have been received.

The preceding example assumes that you already have installed on your grid, and that it is runnable from the working directory on the task machines. If that is not the case, which it usually will not be, you can ship the executable to the grid. To do this, use a define statement:

define hd `` ship('');
divs = load 'NYSE_dividends' as (exchange, symbol, date, dividends);
highdivs = stream divs through hd as (exchange, symbol, date, dividends);

This define does two things. First, it defines the executable that will be used. Now in stream we refer to by the alias we gave it, hp, rather than referring to it directly. Second, it tells Pig to pick up the file ./ and ship it to Hadoop as part of this job. This file will be picked up from the specified location on the machine where you launch the job. It will be placed in the working directory of the task on the task machines. So, the command you pass to stream must refer to it relative to the current working directory, not via an absolute path. If your executable depends on other modules or files, they can be specified as part of the ship clause as well. For example, if depends on a Perl module called, you can send them both to the task machines:

define hd `` ship('', '');
divs = load 'NYSE_dividends' as (exchange, symbol, date, dividends);
highdivs = stream divs through hd as (exchange, symbol, date, dividends);

Many scripting languages assume certain paths for modules based on their hierarchy. For example, Perl expects to find a module Acme::Financial in Acme/ However, the ship clause always puts files in your current working directory, and it does not take directories, so you could not ship Acme. The workaround for this is to create a TAR file and ship that, and then have a step in your executable that unbundles the TAR file. You then need to set your module include path (for Perl, -I or the PERLLIB environment variables) to contain . (dot).

ship moves files into the grid from the machine where you are launching your job. But sometimes the file you want is already in the grid. If you have a grid file that will be accessed by every map or reduce task in your job, the proper way to access it is via the distributed cache. The distributed cache is a mechanism Hadoop provides to share files. It reduces the load on HDFS by preloading the file to the local disk on the machine that will be executing the task. You can use the distributed cache for your executable by using the cache clause in define:

crawl      = load 'webcrawl' as (url, pageid);
normalized = foreach crawl generate normalize(url);
define blc `` cache('/data/shared/badurls#badurls');
goodurls   = stream normalized through blc as (url, pageid);

The string before the # is the path on HDFS, in this case, /data/shared/badurls. The string after the # is the name of the file as viewed by the executable. So, Hadoop will put a copy of /data/shared/badurls into the task’s working directory and call it badurls.

So far we have assumed that your executable takes data on stdin and writes it to stdout. This might not work, depending on your executable. If your executable needs a file to read from, write to, or both, you can specify that with the input and output clauses in the define command. Continuing with our previous example, let’s say that expects to read its input from a file specified by -i on its command line and write to a file specified by -o:

crawl      = load 'webcrawl' as (url, pageid);
normalized = foreach crawl generate normalize(url);
define blc ` -i urls -o good` input('urls') output('good');
goodurls   = stream normalized through blc as (url, pageid);

Again, file locations are specified from the working directory on the task machines. In this example, Pig will write out all the input for a given task for to urls, then invoke the executable, and then read good to get the results. Again, the executable will be invoked only once per map or reduce task, so Pig will first write out all the input to the file.


Beginning in Pig 0.8, you can also include MapReduce jobs directly in your data flow with the mapreduce command. This is convenient if you have processing that is better done in MapReduce than Pig but must be integrated with the rest of your Pig data flow. It can also make it easier to incorporate legacy processing written in MapReduce with newer processing you want to write in Pig Latin.

MapReduce jobs expect to read their input from and write their output to a storage device (usually HDFS). So to integrate them with your data flow, Pig first has to store the data, then invoke the MapReduce job, and then read the data back. This is done via store and load clauses in the mapreduce statement that invoke regular load and store functions. You also provide Pig with the name of the JAR that contains the code for your MapReduce job.

As an example, let’s continue with the blacklisting of URLs that we considered in the previous section. Only now let’s assume that this is done by a MapReduce job instead of a Python script:

crawl      = load 'webcrawl' as (url, pageid);
normalized = foreach crawl generate normalize(url);
goodurls   = mapreduce 'blacklistchecker.jar'
                store normalized into 'input'
                load 'output' as (url, pageid);

mapreduce takes as its first argument the JAR containing the code to run a MapReduce job. It uses load and store phrases to specify how data will be moved from Pig’s data pipeline to the MapReduce job. Notice that the input alias is contained in the store clause. As with stream, the output of mapreduce is opaque to Pig, so if we want the resulting relation goodurls to have a schema, we have to tell Pig what it is. This example also assumes that the Java code in blacklistchecker.jar knows which input and output files to look for and has a default class to run specified in its manifest. Often this will not be the case. Any arguments you wish to pass to the invocation of the Java command that will run the MapReduce task can be put in backquotes after the load clause:

crawl      = load 'webcrawl' as (url, pageid);
normalized = foreach crawl generate normalize(url);
goodurls   = mapreduce 'blacklistchecker.jar'
                 store normalized into 'input'
                 load 'output' as (url, pageid)
                 ` -i input -o output`;

The string in the backquotes will be passed directly to your MapReduce job as is. So if you wanted to pass Java options, etc., you can do that as well.

The load and store clauses of the mapreduce command have the same syntax as the load and store statements, so you can use different load and store functions, pass constructor arguments, and so on. See Load and Store for full details.

The best content for your career. Discover unlimited learning on demand for around $1/day.