You are previewing Programming Pig.

Programming Pig

Cover of Programming Pig by Alan Gates Published by O'Reilly Media, Inc.
  1. Programming Pig
    1. SPECIAL OFFER: Upgrade this ebook with O’Reilly
    2. Preface
      1. Data Addiction
      2. Who Should Read This Book
      3. Conventions Used in This Book
      4. Code Examples in This Book
      5. Using Code Examples
      6. Safari® Books Online
      7. How to Contact Us
      8. Acknowledgments
    3. 1. Introduction
      1. What Is Pig?
      2. Pig’s History
    4. 2. Installing and Running Pig
      1. Downloading and Installing Pig
      2. Running Pig
    5. 3. Grunt
      1. Entering Pig Latin Scripts in Grunt
      2. HDFS Commands in Grunt
      3. Controlling Pig from Grunt
    6. 4. Pig’s Data Model
      1. Types
      2. Schemas
    7. 5. Introduction to Pig Latin
      1. Preliminary Matters
      2. Input and Output
      3. Relational Operations
      4. User Defined Functions
    8. 6. Advanced Pig Latin
      1. Advanced Relational Operations
      2. Integrating Pig with Legacy Code and MapReduce
      3. Nonlinear Data Flows
      4. Controlling Execution
      5. Pig Latin Preprocessor
    9. 7. Developing and Testing Pig Latin Scripts
      1. Development Tools
      2. Testing Your Scripts with PigUnit
    10. 8. Making Pig Fly
      1. Writing Your Scripts to Perform Well
      2. Writing Your UDF to Perform
      3. Tune Pig and Hadoop for Your Job
      4. Using Compression in Intermediate Results
      5. Data Layout Optimization
      6. Bad Record Handling
    11. 9. Embedding Pig Latin in Python
      1. Compile
      2. Bind
      3. Run
      4. Utility Methods
    12. 10. Writing Evaluation and Filter Functions
      1. Writing an Evaluation Function in Java
      2. Algebraic Interface
      3. Accumulator Interface
      4. Python UDFs
      5. Writing Filter Functions
    13. 11. Writing Load and Store Functions
      1. Load Functions
      2. Store Functions
    14. 12. Pig and Other Members of the Hadoop Community
      1. Pig and Hive
      2. Cascading
      3. NoSQL Databases
      4. Metadata in Hadoop
    15. A. Built-in User Defined Functions and Piggybank
      1. Built-in UDFs
      2. Piggybank
    16. B. Overview of Hadoop
      1. MapReduce
      2. Hadoop Distributed File System
    17. Index
    18. About the Author
    19. Colophon
    20. SPECIAL OFFER: Upgrade this ebook with O’Reilly
O'Reilly logo

Nonlinear Data Flows

So far our examples have been linear data flows or trees. In a linear data flow, one input is loaded, processed, and stored. We have looked at operators that combine multiple data flows: join, cogroup, union, and cross. With these you can build tree structures where multiple inputs all flow to a single output. But in complex data-processing situations, you often also want to split your data flow. That is, one input will result in more than one output. You might also have diamonds, places where the data flow is split and eventually joined back together. Pig supports these directed acyclic graph (DAG) data flows.

Splits in your data flow can be either implicit or explicit. In an implicit split, no specific operator or syntax is required in your script. You simply refer to a given relation multiple times. Let’s consider data from our baseball example data. You might, for example, want to analyze players by position and by team at the same time:

--multiquery.pig
players    = load 'baseball' as (name:chararray, team:chararray,
                position:bag{t:(p:chararray)}, bat:map[]);
pwithba    = foreach players generate name, team, position,
                bat#'batting_average' as batavg;
byteam     = group pwithba by team;
avgbyteam  = foreach byteam generate group, AVG(pwithba.batavg);
store avgbyteam into 'by_team';
flattenpos = foreach pwithba generate name, team,
                flatten(position) as position, batavg;
bypos      = group flattenpos by position;
avgbypos   = foreach bypos generate group, AVG(flattenpos.batavg);
store avgbypos into 'by_position';

The pwithba relation is referred to by the group operators for both the byteam and bypos relations. Pig builds a data flow that takes every record from pwithba and ships it to both group operators.

Splitting data flows can also be done explicitly via the split operator, which allows you to split your data flow as many ways as you like. Let’s take an example where you want to split data into different files depending on the date the record was created:

wlogs = load 'weblogs' as (pageid, url, timestamp);
split wlogs into apr03 if timestamp < '20110404',
          apr02 if timestamp < '20110403' and timestamp > '20110401',
          apr01 if timestamp < '20110402' and timestamp > '20110331';
store apr03 into '20110403';
store apr02 into '20110402';
store apr01 into '20110401';

At first glance, split looks like a switch or case statement, but it is not. A single record can go to multiple legs of the split since you use different filters for each if clause. And a record can go to no leg. In the preceding example, if a record were found with a date of 20110331, it would be dropped. And there is no default clause—no way to send any leftover records to a particular alias.

split is semantically identical to an implicit split that users filters. The previous example could be rewritten as:

wlogs = load 'weblogs' as (pageid, url, timestamp);
apr03 = filter wlogs by timestamp < '20110404';
apr02 = filter wlogs by timestamp < '20110403' and timestamp > '20110401';
apr01 = filter wlogs by timestamp < '20110402' and timestamp > '20110331';
store apr03 into '20110403';
store apr02 into '20110402';
store apr01 into '20110401';

In fact, Pig will internally rewrite the original script that has split in exactly this way.

Let’s take a look at how Pig executes these nonlinear data flows. Whenever possible, it combines them into single MapReduce jobs. This is referred to as a multiquery. In cases where all operators will fit into a single map task, this is easy. Pig creates separate pipelines inside the map and sends the appropriate records to each pipeline. The example using split to store data by date will be executed in this way.

Pig can also combine multiple group operators together in many cases. In the example given at the beginning of this section, where the baseball data is grouped by both team and position, this entire Pig Latin script will be executed inside one MapReduce job. Pig accomplishes this by duplicating records on the map side and annotating each record with its pipeline number. When the data is partitioned during the shuffle, the appropriate key is used for each record. That is, records from the pipeline grouping by team will use team as their shuffle key, and records from the pipeline grouping by position will use position as their shuffle key. This is done by declaring the key type to be tuple and placing the correct values in the key tuple for each record. Once the data has been collected to reducers, the pipeline number is used as part of the sort key so that records from each pipeline and group are collected together. In the reduce task, Pig instantiates multiple pipelines, one for each group operator. It sends each record down the appropriate pipeline based on its annotated pipeline number. In this way, input data can be scanned once but grouped many different ways. An example of how one record flows through this pipeline is shown in Figure 6-1. Although this does not provide linear speedup, we find it often approaches it.

Multiquery illustration

Figure 6-1. Multiquery illustration

There are cases where Pig will not combine multiple operators into a single MapReduce job. Pig does not use multiquery for any of the multiple-input operators: join, union, cross, or cogroup. It does not use multiquery for order statements either. Also, if it has multiple group statements and some would use Hadoop’s combiner and some would not, it combines only those statements that use Hadoop’s combiner into a multiquery. This is because we have found that combining the Hadoop combiner and non-Hadoop combiner jobs together does not perform well.

Multiquery scripts tend to perform better than loading the same input multiple times, but this approach does have limits. Because it requires replicating records in the map, it does slow down the shuffle phase. Eventually the increased cost of the shuffle phase outweighs the reduced cost of rescanning the input data. Pig has no way to estimate when this will occur. Currently, the optimizer is optimistic and always combines jobs with multiquery whenever it can. If it combines too many jobs and becomes slower than splitting some of the jobs, you can turn off multiquery or you can rewrite your Pig Latin into separate scripts so Pig does not attempt to combine them all. To turn off multiquery, you can pass either -M or -no_multiquery on the command line or set the property opt.multiquery to false.

We must also consider what happens when one job in a multiquery fails but others succeed. If all jobs succeed, Pig will return 0, meaning success. If all of the jobs fail, Pig will return 2. If some jobs fail and some succeed, Pig will return 3. By default, if one of the jobs fails, Pig will continue processing the other jobs. However, if you want Pig to stop as soon as one of the jobs fails, you can pass -F or -stop_on_failure. In this case, any jobs that have not yet been finished will be terminated, and any that have not started will not be started. Any jobs that are already finished will not be cleaned up.

The best content for your career. Discover unlimited learning on demand for around $1/day.