You are previewing Programming Pig.

Programming Pig

Cover of Programming Pig by Alan Gates Published by O'Reilly Media, Inc.
  1. Programming Pig
    1. SPECIAL OFFER: Upgrade this ebook with O’Reilly
    2. Preface
      1. Data Addiction
      2. Who Should Read This Book
      3. Conventions Used in This Book
      4. Code Examples in This Book
      5. Using Code Examples
      6. Safari® Books Online
      7. How to Contact Us
      8. Acknowledgments
    3. 1. Introduction
      1. What Is Pig?
      2. Pig’s History
    4. 2. Installing and Running Pig
      1. Downloading and Installing Pig
      2. Running Pig
    5. 3. Grunt
      1. Entering Pig Latin Scripts in Grunt
      2. HDFS Commands in Grunt
      3. Controlling Pig from Grunt
    6. 4. Pig’s Data Model
      1. Types
      2. Schemas
    7. 5. Introduction to Pig Latin
      1. Preliminary Matters
      2. Input and Output
      3. Relational Operations
      4. User Defined Functions
    8. 6. Advanced Pig Latin
      1. Advanced Relational Operations
      2. Integrating Pig with Legacy Code and MapReduce
      3. Nonlinear Data Flows
      4. Controlling Execution
      5. Pig Latin Preprocessor
    9. 7. Developing and Testing Pig Latin Scripts
      1. Development Tools
      2. Testing Your Scripts with PigUnit
    10. 8. Making Pig Fly
      1. Writing Your Scripts to Perform Well
      2. Writing Your UDF to Perform
      3. Tune Pig and Hadoop for Your Job
      4. Using Compression in Intermediate Results
      5. Data Layout Optimization
      6. Bad Record Handling
    11. 9. Embedding Pig Latin in Python
      1. Compile
      2. Bind
      3. Run
      4. Utility Methods
    12. 10. Writing Evaluation and Filter Functions
      1. Writing an Evaluation Function in Java
      2. Algebraic Interface
      3. Accumulator Interface
      4. Python UDFs
      5. Writing Filter Functions
    13. 11. Writing Load and Store Functions
      1. Load Functions
      2. Store Functions
    14. 12. Pig and Other Members of the Hadoop Community
      1. Pig and Hive
      2. Cascading
      3. NoSQL Databases
      4. Metadata in Hadoop
    15. A. Built-in User Defined Functions and Piggybank
      1. Built-in UDFs
      2. Piggybank
    16. B. Overview of Hadoop
      1. MapReduce
      2. Hadoop Distributed File System
    17. Index
    18. About the Author
    19. Colophon
    20. SPECIAL OFFER: Upgrade this ebook with O’Reilly
O'Reilly logo

Chapter 6. Advanced Pig Latin

In the previous chapter we worked through the basics of Pig Latin. In this chapter we will plumb its depths, and we will also discuss how Pig handles more complex data flows. Finally, we will look at how to use macros and modules to modularize your scripts.

Advanced Relational Operations

We will now discuss the more advanced Pig Latin operators, as well as additional options for operators that were introduced in the previous chapter.

Advanced Features of foreach

In our introduction to foreach (see foreach), we discussed how it could take a list of expressions to output for every record in your data pipeline. Now we will look at ways it can explode the number of records in your pipeline, and also how it can be used to apply a set of operations to each record.

flatten

Sometimes you have data in a bag or a tuple and you want to remove that level of nesting. The baseball data available on GitHub (see Code Examples in This Book) can be used as an example. Because a player can play more than one position, position is stored in a bag. This allows us to still have one entry per player in the baseball file.[15] But when you want to switch around your data on the fly and group by a particular position, you need a way to pull those entries out of the bag. To do this, Pig provides the flatten modifier in foreach:

--flatten.pig
players = load 'baseball' as (name:chararray, team:chararray,
            position:bag{t:(p:chararray)}, bat:map[]);
pos     = foreach players generate name, flatten(position) as position;
bypos   = group pos by position;

A foreach with a flatten produces a cross product of every record in the bag with all of the other expressions in the generate statement. Looking at the first record in baseball, we see it is the following (replacing tabs with commas for clarity):

Jorge Posada,New York Yankees,{(Catcher),(Designated_hitter)},...

Once this has passed through the flatten statement, it will be two records:

Jorge Posada,Catcher
Jorge Posada,Designated_hitter

If there is more than one bag and both are flattened, this cross product will be done with members of each bag as well as other expressions in the generate statement. So rather than getting n rows (where n is the number of records in one bag), you will get n * m rows.

One side effect that surprises many users is that if the bag is empty, no records are produced. So if there had been an entry in baseball with no position, either because the bag is null or empty, that record would not be contained in the output of flatten.pig. The record with the empty bag would be swallowed by foreach. There are a couple of reasons for this behavior. One, since Pig may or may not have the schema of the data in the bag, it might have no idea how to fill in nulls for the missing fields. Two, from a mathematical perspective, this is what you would expect. Crossing a set S with the empty set results in the empty set. If you wish to avoid this, use a bincond to replace empty bags with a constant bag:

--flatten_noempty.pig
players = load 'baseball' as (name:chararray, team:chararray,
            position:bag{t:(p:chararray)}, bat:map[]);
noempty = foreach players generate name,
            ((position is null or IsEmpty(position)) ? {('unknown')} : position)
            as position;
pos     = foreach noempty generate name, flatten(position) as position;
bypos   = group pos by position;

flatten can also be applied to a tuple. In this case, it does not produce a cross product; instead, it elevates each field in the tuple to a top-level field. Again, empty tuples will remove the entire record.

If the fields in a bag or tuple that is being flattened have names, Pig will carry those names along. As with join, to avoid ambiguity, the field name will have the bag’s name and :: prepended to it. As long as the field name is not ambiguous, you are not required to use the bagname:: prefix.

If you wish to change the names of the fields, or if the fields initially did not have names, you can attach an as clause to your flatten, as in the preceding example. If there is more than one field in the bag or tuple that you are assigning names to, you must surround the set of field names with parentheses.

Finally, if you flatten a bag or tuple without a schema and do not provide an as clause, the resulting records coming out of your foreach will have a null schema. This is because Pig will not know how many fields the flatten will result in.[16]

Nested foreach

So far, all of the examples of foreach that we have seen immediately generate one or more lines of output. But foreach is more powerful than this. It can also apply a set of relational operations to each record in your pipeline. This is referred to as a nested foreach, or inner foreach. One example of how this can be used is to find the number of unique entries in a group. For example, to find the number of unique stock symbols for each exchange in the NYSE_daily data:

--distinct_symbols.pig
daily    = load 'NYSE_daily' as (exchange, symbol); -- not interested in other fields
grpd     = group daily by exchange;
uniqcnt  = foreach grpd {
           sym      = daily.symbol;
           uniq_sym = distinct sym;
           generate group, COUNT(uniq_sym);
};

There are several new things here to unpack; we will walk through each. In this example, rather than generate immediately following foreach, a { (open brace) signals that we will be nesting operators inside this foreach. In this nested code, each record passed to foreach is handled one at a time.

In the first line we see a syntax that we have not seen outside of foreach. In fact, sym = daily.symbol would not be legal outside of foreach. It is roughly equivalent to the top-level statement sym = foreach grpd generate daily.symbol, but it is not stated that way inside the foreach because it is not really another foreach. There is no relation for it to be associated with (that is, grpd is not defined here). This line takes the bag daily and produces a new relation sym, which is a bag with tuples that have only the field symbol.

The second line applies the distinct operator to the relation sym. Note that even inside foreach, relational operators can be applied only to relations; they cannot be applied to expressions. For example, the statement uniq_sym = distinct daily.symbol will produce a syntax error because daily.symbol is an expression, not a relation. sym is a relation. This distinction may seem arbitrary, but it results in Pig Latin having a coherent definition as a language. Without this, strange statements such as C = distinct 1 + 2 would be legal. One way to think about this is that the assignment operator inside foreach can be used to take an expression and create a relation, as happens in this example.

The last line in a nested foreach must always be generate. This tells Pig how to take the results of the nested operations and produce a record to be put in the outer relation (in this case, uniqcnt). So, generate is the operator that takes the inner relations and turns them back into expressions for inclusion in the outer relation. That is, if the script read generate group, uniq_sym, uniq_sym would be treated as a bag for the purpose of the generate statement.

Theoretically, any Pig Latin relational operator should be legal inside foreach. However, at the moment, only distinct, filter, limit, and order are supported.

Let’s look at a few more examples of how this feature can be useful, such as to sort the contents of a bag before the bag is passed to a UDF. This is convenient for UDFs that require all of their input to come in a certain order. Consider a stock-analysis UDF that wants to track information about a particular stock over time. The UDF will want input sorted by timestamp:

--analyze_stock.pig
register 'acme.jar';
define analyze com.acme.financial.AnalyzeStock();
daily    = load 'NYSE_daily' as (exchange:chararray, symbol:chararray,
               date:chararray, open:float, high:float, low:float,
               close:float, volume:int, adj_close:float);
grpd     = group daily by symbol;
analyzed = foreach grpd {
            sorted = order daily by date;
            generate group, analyze(sorted);
};

Doing the sorting in Pig Latin, rather than in your UDF, is important for a couple of reasons. One, it means Pig can offload the sorting to MapReduce. MapReduce has the ability to sort data by a secondary key while grouping it. So, the order statement in this case does not require a separate sorting operation. Two, it means that your UDF does not need to wait for all data to be available before it starts processing. Instead, it can use the Accumulator interface (see Accumulator Interface), which is much more memory efficient.

This feature can be used to find the top k elements in a group. The following example will find the top three dividends payed for each stock:

--hightest_dividend.pig
divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
            date:chararray, dividends:float);
grpd = group divs by symbol;
top3 = foreach grpd {
            sorted = order divs by dividends desc;
            top    = limit sorted 3;
            generate group, flatten(top);
};

Currently, these nested portions of code are always run serially for each record handed to them. Of course the foreach itself will be running in multiple map or reduce tasks, but each instance of the foreach will not spawn subtasks to do the nested operations in parallel. So if we added a parallel 10 clause to the grpd = group divs by symbol statement in the previous example, this ordering and limiting would take place in 10 reducers. But each group of stocks would be sorted and the top three records taken serially within one of those 10 reducers.

There is, of course, no requirement that the pipeline inside the foreach be a simple linear pipeline. For example, if you wanted to calculate two distinct counts together, you could do the following:

--double_distinct.pig
divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray);
grpd = group divs all;
uniq = foreach grpd {
            exchanges      = divs.exchange;
            uniq_exchanges = distinct exchanges;
            symbols        = divs.symbol;
            uniq_symbols   = distinct symbols;
            generate COUNT(uniq_exchanges), COUNT(uniq_symbols);
};

For simplicity, Pig actually runs this pipeline once for each expression in generate. Here this has no side effects because the two data flows are completely disjointed. However, if you constructed a pipeline where there was a split in the flow, and you put a UDF in the shared portion, you would find that it was invoked more often than you expected.

Using Different Join Implementations

When we covered join in the previous chapter (see Join), we discussed only the default join behavior. However, Pig offers multiple join implementations, which we will discuss here.

In RDBMS systems, traditionally the SQL optimizer chooses a join implementation for the user. This is nice as long as the optimizer chooses well, which it does in most cases. But Pig has taken a different approach. In the Pig team we like to say that our optimizer is located between the user’s chair and keyboard. We empower the user to make these choices rather than having Pig make them. So for operators such as join where there are multiple implementations, Pig lets the user indicate his choice via a using clause.

This approach fits well with our philosophy that Pigs are domestic animals (i.e., Pig does what you tell it; see Pig Philosophy). Also, as a relatively new product, Pig has a lot of functionality to add. It makes more sense to focus on adding implementation choices and letting the user choose which ones to use, rather than focusing on building an optimizer capable of choosing well.

Joining small to large data

A common type of join is doing a lookup in a smaller input. For example, suppose you were processing data where you needed to translate a US ZIP code (postal code) to the state and city it referred to. As there are at most 100,000 zip codes in the US, this translation table should easily fit in memory. Rather than forcing a reduce phase that will sort your big file plus this tiny zip code translation file, it makes sense instead to send the zip code file to every machine, load it into memory, and then do the join by streaming through the large file and looking up each record in the zip code file. This is called a fragment-replicate join (because you fragment one file and replicate the other):

--repljoin.pig
daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray,
            date:chararray, open:float, high:float, low:float,
            close:float, volume:int, adj_close:float);
divs  = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
            date:chararray, dividends:float);
jnd   = join daily by (exchange, symbol), divs by (exchange, symbol)
            using 'replicated';

The using 'replicated' tells Pig to use the fragment-replicate algorithm to execute this join. Because no reduce phase is necessary, all of this can be done in the map task.

The second input listed in the join (in this case, divs) is always the input that is loaded into memory. Pig does not check beforehand that the specified input will fit into memory. If Pig cannot fit the replicated input into memory, it will issue an error and fail.

Warning

Due to the way Java stores objects in memory, the size of the data on disk will not be the size of the data in memory. See Memory Requirements of Pig Data Types for a discussion of how data expands in memory in Pig. You will need more memory for a replicated join than you need space on disk to store the replicated input.

Fragment-replicate join supports only inner and left outer joins. It cannot do a right outer join, because when a given map task sees a record in the replicated input that does not match any record in the fragmented input, it has no idea whether it would match a record in a different fragment. So, it does not know whether to emit a record. If you want a right or full outer join, you will need to use the default join operation.

Fragment-replicate join can be used with more than two tables. In this case, all but the first (left-most) table are read into memory.

Pig implements the fragment-replicate join by loading the replicated input into Hadoop’s distributed cache. The distributed cache is a tool provided by Hadoop that preloads a file onto the local disk of nodes that will be executing the maps or reduces for that job. This has two important benefits. First, if you have a fragment-replicate join that is going to run on 1,000 maps, opening one file in HDFS from 1,000 different machines all at once puts a serious strain on the NameNode and the three data nodes that contain the block for that file. The distributed cache is built specifically to manage these kinds of issues without straining HDFS. Second, if multiple map tasks are located on the same physical machine, the files in the distributed cache are shared between those instances, thus reducing the number of times the file has to be copied.

Pig runs a map-only MapReduce job to preprocess the file and get it ready for loading into the distributed cache. If there is a filter or foreach between the load and join, these will be done as part of this initial job so that the file to be stored in the distributed cache is as small as possible. The join itself will be done in a second map-only job.

Joining skewed data

As we have seen elsewhere, much of the data you will be processing with Pig has significant skew in the number of records per key. For example, if you were building a map of the Web and joining by the domain of the URL (your key), you would expect to see significant skew for values such as yahoo.com. Pig’s default join algorithm is very sensitive to skew, because it collects all of the records for a given key together on a single reducer. In many data sets, there are a few keys that have three or more orders of magnitude more records than other keys. This results in one or two reducers that will take much longer than the rest. To deal with this, Pig provides skew join.

Skew join works by first sampling one input for the join. In that input it identifies any keys that have so many records that skew join estimates it will not be able to fit them all into memory. Then, in a second MapReduce job, it does the join. For all records except those identified in the sample, it does a standard join, collecting records with the same key onto the same reducer. Those keys identified as too large are treated differently. Based on how many records were seen for a given key, those records are split across the appropriate number of reducers. The number of reducers is chosen based on Pig’s estimate of how wide the data must be split such that each reducer can fit its split into memory. For the input to the join that is not split, those keys that were split are then replicated to each reducer that contains that key.[17]

For example, let’s look at how the following Pig Latin script would work:

users = load 'users' as (name:chararray, city:chararray);
cinfo = load 'cityinfo' as (city:chararray, population:int);
jnd   = join cinfo by city, users by city using 'skewed';

Assume that the cities in users are distributed such that 20 users live in Barcelona, 100,000 in New York, and 350 in Portland. Let’s further assume that Pig determined that it could fit 75,000 records into memory on each reducer. When this data was joined, New York would be identified as a key that needed to be split across reducers. During the join phase, all records with keys other than New York would be treated as in a default join. Records from users with New York as the key would be split between two separate reducers. Records from cityinfo with New York as a key would be duplicated and sent to both of those reducers.

The second input in the join, in this case users, is the one that will be sampled and have its keys with a large number of values split across reducers. The first input will have records with those values replicated across reducers.

This algorithm addresses skew in only one input. If both inputs have skew, this algorithm will still work, but it will be slow. Much of the motivation behind this approach was that it guarantees the join will still finish, given time. Before Pig introduced skew join in version 0.4, data that was skewed on both sides could not be joined in Pig because it was not possible to fit all the records for the high-cardinality key values in memory for either side.

Skew join can be done on inner or outer joins. However, it can take only two join inputs. Multiway joins must be broken into a series of joins if they need to use skew join.

Since data often has skew, why not use skew join all of the time? There is a small performance penalty for using skew join, because one of the inputs must be sampled first to find any key values with a large number of records. This usually adds about 5% to the time it takes to calculate the join. If your data frequently has skew, it might be worth it to always use skew join and pay the 5% tax in order to avoid failing or running very slowly with the default join and then needing to rerun using skewed join.

As stated earlier, Pig estimates how much data it can fit into memory when deciding which key values to split and how wide to split them. For the purposes of this calculation, Pig looks at the record sizes in the sample and assumes it can use 30% of the JVM’s heap to materialize records that will be joined. In your particular case you might find you need to increase or decrease this size. You should decrease the value if your join is still failing with out-of-memory errors even when using skew join. This indicates that Pig is estimating memory usage improperly, so you should tell it to use less. If profiling indicates that Pig is not utilizing all of your heap, you might want to increase the value in order to do the join more efficiently; the less ways the key values are split, the more efficient the join will be. You can do that by setting the property pig.skewedjoin.reduce.memusage to a value between 0 and 1. For example, if you wanted it to use 25% instead of 30%, you could add -Dpig.skewedjoin.reduce.memusage=0.25 to your Pig command line or define the value in your properties file.

Warning

Like order, skew join breaks the MapReduce convention that all records with the same key will be processed by the same reducer. This means records with the same key might be placed in separate part files. If you plan to process the data in a way that depends on all records with the same key being in the same part file, you cannot use skew join.

Joining sorted data

A common database join strategy is to first sort both inputs on the join key and then walk through both inputs together, doing the join. This is referred to as a sort-merge join. In MapReduce, because a sort requires a full MapReduce job, as does Pig’s default join, this technique is not more efficient than the default. However, if your inputs are already sorted on the join key, this approach makes sense. The join can be done in the map phase by opening both files and walking through them. Pig refers to this as a merge join because it is a sort-merge join, but the sort has already been done:

--mergejoin.pig
-- use sort_for_mergejoin.pig to build NYSE_daily_sorted and NYSE_dividends_sorted
daily = load 'NYSE_daily_sorted' as (exchange:chararray, symbol:chararray,
            date:chararray, open:float, high:float, low:float,
            close:float, volume:int, adj_close:float);
divs  = load 'NYSE_dividends_sorted' as (exchange:chararray, symbol:chararray,
            date:chararray, dividends:float);
jnd   = join daily by symbol, divs by symbol using 'merge';

To execute this join, Pig will first run a MapReduce job that samples the second input, NYSE_dividends_sorted. This sample builds an index that tells Pig the value of the join keys, symbol in the first record in every input split (usually each HDFS block). Because this sample reads only one record per split, it runs very quickly. Pig will then run a second MapReduce job that takes the first input, NYSE_daily_sorted, as its input. When each map reads the first record in its split of NYSE_daily_sorted, it takes the value of symbol and looks it up in the index built by the previous job. It looks for the last entry that is less than its value of symbol. It then opens NYSE_dividends_sorted at the corresponding block for that entry. For example, if the index contained entries (CA, 1), (CHY, 2), (CP, 3), and the first symbol in a given map’s input split of NYSE_daily_sorted was CJA, that map would open block 2 of NYSE_dividends_sorted. (Even if CP was the first user ID in NYSE_daily_sorted’s split, block 2 of NYSE_dividends_sorted would be opened, as there could be records with a key of CP in that block.) Once NYSE_dividends_sorted is opened, Pig throws away records until it reaches a record with symbol of CJA. Once it finds a match, it collects all the records with that value into memory and then does the join. It then advances the first input, NYSE_daily_sorted. If the key is the same, it again does the join. If not, it advances the second input, NYSE_dividends_sorted, again until it finds a value greater than or equal to the next value in the first input, NYSE_daily_sorted. If the value is greater, it advances the first input and continues. Because both inputs are sorted, it never needs to look in the index after the initial lookup.

All of this can be done without a reduce phase, and so it is more efficient than a default join. This algorithm, which was introduced in version 0.4, currently supports only two-way inner joins.

cogroup

cogroup is a generalization of group. Instead of collecting records of one input based on a key, it collects records of n inputs based on a key. The result is a record with a key and one bag for each input. Each bag contains all records from that input that have the given value for the key:

A = load 'input1' as (id:int, val:float);
B = load 'input2' as (id:int, val2:int);
C = cogroup A by id, B by id;
describe C;

C: {group: int,A: {id: int,val: float},B: {id: int,val2: int}}

Another way to think of cogroup is as the first half of a join. The keys are collected together, but the cross product is not done. In fact, cogroup plus foreach, where each bag is flattened, is equivalent to a join—as long as there are no null values in the keys.

cogroup handles null values in the keys similarly to group and unlike join. That is, all records with a null value in the key will be collected together.

cogroup is useful when you want to do join-like things but not a full join. For example, Pig Latin does not have a semi-join operator, but you can do a semi-join:

--semijoin.pig
daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray,
            date:chararray, open:float, high:float, low:float,
            close:float, volume:int, adj_close:float);
divs  = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
            date:chararray, dividends:float);
grpd  = cogroup daily by (exchange, symbol), divs by (exchange, symbol);
sjnd  = filter grpd by not IsEmpty(divs);
final = foreach sjnd generate flatten(daily);

Because cogroup needs to collect records with like keys together, it requires a reduce phase.

union

Sometimes you want to put two data sets together by concatenating them instead of joining them. Pig Latin provides union for this purpose. If you had two files you wanted to use for input and there was no glob that could describe them, you could do the following:

A = load '/user/me/data/files/input1';
B = load '/user/someoneelse/info/input2';
C = union A, B;

Note

Unlike union in SQL, Pig does not require that both inputs share the same schema. If both do share the same schema, the output of the union will have that schema. If one schema can be produced from another by a set of implicit casts, the union will have that resulting schema. If neither of these conditions hold, the output will have no schema (that is, different records will have different fields). This schema comparison includes names, so even different field names will result in the output having no schema. You can get around this by placing a foreach before the union that renames fields.

A = load 'input1' as (x:int, y:float);
B = load 'input2' as (x:int, y:float);
C = union A, B;
describe C;

C: {x: int,y: float}

A = load 'input1' as (x:int, y:float);
B = load 'input2' as (x:int, y:double);
C = union A, B;
describe C;

C: {x: int,y: double}

A = load 'input1' as (x:int, y:float);
B = load 'input2' as (x:int, y:chararray);
C = union A, B;
describe C;

Schema for C unknown.

union does not perform a mathematical set union. That is, duplicate records are not eliminated. In this manner it is like SQL’s union all. Also, union does not require a separate reduce phase.

Sometimes your data changes over time. If you have data you collect every month, you might add a new column this month. Now you are prevented from using union because your schemas do not match. If you want to union this data and force your data into a common schema, you can add the keyword onschema to your union statement:

A = load 'input1' as (w:chararray, x:int, y:float);
B = load 'input2' as (x:int, y:double, z:chararray);
C = union onschema A, B;
describe C;

C: {w: chararray,x: int,y: double,z: chararray}

union onschema requires that all inputs have schemas. It also requires that a shared schema for all inputs can be produced by adding fields and implicit casts. Matching of fields is done by name, not position. So, in the preceding example, w:chararray is added from input1 and z:chararray is added from input2. Also, a cast from float to double is added for input1 so that field y is a double. If a shared schema cannot be produced by this method, an error is returned. When the data is read, nulls are inserted for fields not present in a given input.

cross

cross matches the mathematical set operation of the same name. In the following Pig Latin, cross takes every record in NYSE_daily and combines it with every record in NYSE_dividends:

--cross.pig
-- you may want to run this in a cluster, it produces about 3G of data
daily     = load 'NYSE_daily' as (exchange:chararray, symbol:chararray,
                date:chararray, open:float, high:float, low:float,
                close:float, volume:int, adj_close:float);
divs      = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
                date:chararray, dividends:float);
tonsodata = cross daily, divs parallel 10;

cross tends to produce a lot of data. Given inputs with n and m records respectively, cross will produce output with n x m records.

Pig does implement cross in a parallel fashion. It does this by generating a synthetic join key, replicating rows, and then doing the cross as a join. The previous script is rewritten to:

daily     = load 'NYSE_daily' as (exchange:chararray, symbol:chararray,
                date:chararray, open:float, high:float, low:float,
                close:float, volume:int, adj_close:float);
divs      = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
                date:chararray, dividends:float);
A         = foreach daily generate flatten(GFCross(0, 2)), flatten(*);
B         = foreach divs generate flatten(GFCross(1, 2)), flatten(*);
C         = cogroup A by ($0, $1), B by ($0, $1) parallel 10;
tonsodata = foreach C generate flatten(A), flatten(B);

GFCross is an internal UDF. The first argument is the input number, and the second argument is the total number of inputs. In this example, the output is a bag that contains four records.[18] These records have a schema of (int, int). The field that is the same number as the first argument to GFCross contains a random number between zero and three. The other field counts from zero to three. So, if we assume for a given two records, one in each input, that the random number for the first input is 3 and for the second is 2, then the outputs of GFCross would look like:

A {(3, 0), (3, 1), (3, 2), (3, 3)}
B {(0, 2), (1, 2), (2, 2), (3, 2)}

When these records are flattened, four copies of each input record will be created in the map. They then are joined on the artificial keys. For every record in each input, it is guaranteed that there is one and only one instance of the artificial keys that will match and produce a record. Because the random numbers are chosen differently for each record, the resulting joins are done on an even distribution of the reducers.

This algorithm does enable crossing of data in parallel. However, it creates a burden on the shuffle phase by increasing the number of records in each input being shuffled. Also, no matter what you do, cross outputs a lot of data. Writing all of this data to disk is expensive, even when done in parallel.

This is not to say you should not use cross. There are instances when it is indispensable. Pig’s join operator supports only equi-joins, that is, joins on an equality condition. Because general join implementations (ones that do not depend on the data being sorted or small enough to fit in memory) in MapReduce depend on collecting records with the same join key values onto the same reducer, non-equi-joins (also called theta joins) are difficult to do. They can be done in Pig using cross followed by filter:

--thetajoin.pig
--I recommend running this one on a cluster too
daily   = load 'NYSE_daily' as (exchange:chararray, symbol:chararray,
            date:chararray, open:float, high:float, low:float,
            close:float, volume:int, adj_close:float);
divs    = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
            date:chararray, dividends:float);
crossed = cross daily, divs;
tjnd    = filter crossed by daily::date < divs::date;

Fuzzy joins could also be done in this manner, where the fuzzy comparison is done after the cross. However, whenever possible, it is better to use a UDF to conform fuzzy values to a standard value and then do a regular join. For example, if you wanted to join two inputs on city but wanted to join any time two cities were in the same metropolitan area (e.g., you wanted Los Angeles and Pasadena to be viewed as equal), you could first run your records through a UDF that generated a single join key for all cities in a metropolitan area and then do the join.



[15] Those with database experience will notice that this is a violation of the first normal form as defined by E. F. Codd. This intentional denormalization of data is very common in OLAP systems in general, and in large data-processing systems such as Hadoop in particular. RDBMS systems tend to make joins common and then work to optimize them. In systems such as Hadoop, where storage is cheap and joins are expensive, it is generally better to use nested data structures to avoid the joins.

[16] In versions 0.8 and earlier, there is a bug where this flatten is assigned a schema of one field, which is a bytearray, instead of causing the schema to be null. This bug has been fixed in 0.9.

[17] This algorithm was proposed in the paper “Practical Skew Handling in Parallel Joins,” presented by David J. DeWitt, Jeffrey F. Naughton, Donovan A. Schneider, and S. Seshadri at the 18th International Conference on Very Large Databases.

[18] In 0.8 and earlier, the number of records is always 10. In 0.9, this is changed to be the square root of the parallel factor, rounded up.

The best content for your career. Discover unlimited learning on demand for around $1/day.