Cover by Alan Gates

Safari, the world’s most comprehensive technology and business learning platform.

Find the exact information you need to solve a problem on the fly, or go deeper to master the technologies and skills you need to succeed

Start Free Trial

No credit card required

O'Reilly logo

Chapter 8. Making Pig Fly

Who says Pigs can’t fly? Knowing how to optimize your Pig Latin scripts can make a significant difference in how they perform. Pig is still a young project and does not have a sophisticated optimizer that can make the right choices. Instead, consistent with Pig’s philosophy of user choice, it relies on you to make these choices. Beyond just optimizing your scripts, Pig and MapReduce can be tuned to perform better based on your workload. And there are ways to optimize your data layout as well. This chapter covers a number of features you can use to help Pig fly.

Before diving into the details of how to optimize your Pig Latin, it is worth understanding what items tend to create bottlenecks in Pig jobs:

Input size

It does not seem that a massively parallel system should be I/O bound. Hadoop’s parallelism reduces I/O bound but does not entirely remove it. You can always add more map tasks. However, the law of diminishing returns comes into effect. Additional maps take more time to start up, and MapReduce has to find more slots in which to run them. If you have twice as many maps as you have slots to run them, it will take twice your average map time to run all of your maps. Adding one more map in that case will actually make it worse because the map time will increase to three times the average. Also, every record that is read might need to be decompressed and will need to be deserialized.

Shuffle size

By shuffle size I mean the data that is moved from your map tasks to your reduce tasks. All of this data has to be serialized, sorted, moved over the network, merged, and deserialized. Also, the number of maps and reduces matters. Every reducer has to go to every mapper, find the portion of the map’s output that belongs to it, and copy that. So if there are m maps and r reduces, the shuffle will have m x r network connections. And if reducers have too many map inputs to merge in one pass, they will have to do a multipass merge, reading the data from and writing it to disk multiple times (see Combiner Phase for details).

Output size

Every record written out by a MapReduce job has to be serialized, possibly compressed, and written to the store. When the store is HDFS, it must be written to three separate machines before it is considered written.

Intermediate results size

Pig moves data between MapReduce jobs by storing it in HDFS. Thus the size of these intermediate results is affected by the input size and output size factors mentioned previously.


Some calculations require your job to hold a lot of information in memory, for example, joins. If Pig cannot hold all of the values in memory simultaneously, it will need to spill some to disk. This causes a significant slowdown, as records must be written to and read from disk, possibly multiple times.

Writing Your Scripts to Perform Well

There are a number of things you can do when writing Pig Latin scripts to help reduce the bottlenecks discussed earlier. It may be helpful to review which operators force new MapReduce jobs in Chapters 5 and 6.

Filter Early and Often

Getting rid of data as quickly as possible will help your script perform better. Pushing filters higher in your script can reduce the amount of data you are shuffling or storing in HDFS between MapReduce jobs. Pig’s logical optimizer will push your filters up whenever it can. In cases where a filter has multiple predicates joined by and, and one or more of the predicates can be applied before the operator preceding the filter, Pig will split the filter at the and and push the eligible predicate(s). This allows Pig to push parts of the filter when it might not be able to push the filter as a whole. Table 8-1 describes when these filter predicates will and will not be pushed once they have been split.

Table 8-1. When Pig pushes filters

Preceding operatorFilter will be pushed before?Comments
cogroupSometimesThe filter will be pushed if it applies to only one input of the cogroup and does not contain a UDF.
crossSometimesThe filter will be pushed if it applies to only one input of the cross.
filterNoWill seek to merge them with and to avoid passing data through a second operator. This is done only after all filter pushing is complete.
foreachSometimesThe filter will be pushed if it references only fields that exist before and after the foreach, and foreach does not transform those fields.
groupSometimesThe filter will be pushed if it does not contain a UDF.
joinSometimesThe filter will be pushed if it applies to only one input of the join, and if the join is not outer for that input.
mapreduceNomapreduce is opaque to Pig, so it cannot know whether pushing will be safe.
streamNostream is opaque to Pig, so it cannot know whether pushing will be safe.

Also, consider adding filters that are implicit in your script. For example, all of the records with null values in the key will be thrown out by an inner join. If you know that more than a few hundred of your records have null key values, put a filter input by key is not null before the join. This will enhance the performance of your join.

Project Early and Often

For earlier versions of Pig, we told users to employ foreach to remove fields they were not using as soon as possible. As of version 0.8, Pig’s logical optimizer does a fair job of removing fields aggressively when it can tell that they will no longer be used:

-- itemid does not need to be loaded, since it is not used in the script
txns        = load 'purchases' as (date, storeid, amount, itemid);
todays      = filter txns by date == '20110513'; -- date not needed after this
bystore     = group todays by storeid;
avgperstore = foreach bystore generate group, AVG(todays.amount);

However, you are still smarter than Pig’s optimizer, so there are situations where you can tell that a field is no longer needed but Pig cannot. If AVG(todays.amount) were changed to COUNT(todays) in the preceding example, Pig would not be able to determine that, after the filter, only storeid and amount were required. It cannot see that COUNT does not need all of the fields in the bag it is being passed. Whenever you pass a UDF the entire record (udf(*)) or an entire complex field, Pig cannot determine which fields are required. In this case, you will need to put in the foreach yourself to remove unneeded data as early as possible.

Set Up Your Joins Properly

Joins are one of the most common data operations, and also one of the costliest. Choosing the correct join implementation can improve your performance significantly. The flowchart in Figure 8-1 will help you make the correct selection.

Choosing a join implementation

Figure 8-1. Choosing a join implementation

Once you have selected your join implementation, make sure to arrange your inputs in the correct order as well. For replicated joins, the small table must be given as the last input. For skewed joins, the second input is the one that is sampled for large keys. For the default join, the rightmost input has its records streamed through, whereas the other input(s) have their records for a given key value materialized in memory. Thus if you have one join input that you know has more records per key value, you should place it in the rightmost position in the join. For merge join, the left input is taken as the input for the MapReduce job, and thus the number of maps started are based on this input. If one input is much larger than the other, you should place it on the left in order to get more map tasks dedicated to your jobs. This will also reduce the size of the sampling step that builds the index for the right side. For complete details on each of these join implementations, see the sections Join and Using Different Join Implementations.

Use Multiquery When Possible

Whenever you are doing operations that can be combined by multiquery, such as grouping and filtering, these should be written together in one Pig Latin script so that Pig can combine them. Although adding extra operations does increase the total processing time, it is still much faster than running jobs separately.

Choose the Right Data Type

As discussed elsewhere, Pig can run with or without data type information. In cases where the load function you are using creates data that is already typed, there is little you need to do to optimize the performance. However, if you are using the default PigStorage load function that reads tab-delimited files, then whether you use types will affect your performance.

On the one hand, converting fields from bytearray to the appropriate type has a cost. So, if you do not need type information, you should not declare it. For example, if you are just counting records, you can omit the type declaration without affecting the outcome of your script.

On the other hand, if you are doing integer calculations, types can help your script perform better. When Pig is asked to do a numeric calculation on a bytearray, it treats that bytearray as a double because this is the safest assumption. But floating-point arithmetic is much slower than integer arithmetic on most machines. For example, if you are doing a SUM over integer values, you will get better performance by declaring them to be of type integer.

Select the Right Level of Parallelism

Setting your parallelism properly can be difficult, as there are a number of factors. Before we discuss the factors, a little background will be helpful. It would be natural to think more parallelism is always better; however, that is not the case. Like any other resource, parallelism has a network cost, as discussed under the shuffle size performance bottleneck.

Second, increasing parallelism adds latency to your script because there is a limited number of reduce slots in your cluster, or a limited number that your scheduler will assign to you. If 100 reduce slots are available to you and you specify parallel 200, you still will be able to run only 100 reduces at a time. Your reducers will run in two separate waves. Because there is overhead in starting and stopping reduce tasks, and the shuffle gets less efficient as parallelism increases, it is often not efficient to select more reducers than you have slots to run them. In fact, it is best to specify slightly fewer reducers than the number of slots that you can access. This leaves room for MapReduce to restart a few failed reducers and use speculative execution without doubling your reduce time. See Handling Failure for information on speculative execution.

Also, it is important to keep in mind the effects of skew on parallelism. MapReduce generally does a good job partitioning keys equally to the reducers, but the number of records per key often varies radically. Thus a few reducers that get keys with a large number of records will significantly lag the other reducers. Pig cannot start the next MapReduce job until all of the reducers have finished in the previous job. So the slowest reducer defines the length of the job. If you have 10G of input to your reducers and you set parallel to 10, but one key accounts for 50% of the data (not an uncommon case), nine of your reducers will finish quite quickly while the last lags. Increasing your parallelism will not help; it will just waste more cluster resources. Instead, you need to use Pig’s mechanisms to handle skew.

Find the exact information you need to solve a problem on the fly, or go deeper to master the technologies and skills you need to succeed

Start Free Trial

No credit card required