You are previewing Programming Pig.

Programming Pig

Cover of Programming Pig by Alan Gates Published by O'Reilly Media, Inc.
  1. Programming Pig
    1. SPECIAL OFFER: Upgrade this ebook with O’Reilly
    2. Preface
      1. Data Addiction
      2. Who Should Read This Book
      3. Conventions Used in This Book
      4. Code Examples in This Book
      5. Using Code Examples
      6. Safari® Books Online
      7. How to Contact Us
      8. Acknowledgments
    3. 1. Introduction
      1. What Is Pig?
      2. Pig’s History
    4. 2. Installing and Running Pig
      1. Downloading and Installing Pig
      2. Running Pig
    5. 3. Grunt
      1. Entering Pig Latin Scripts in Grunt
      2. HDFS Commands in Grunt
      3. Controlling Pig from Grunt
    6. 4. Pig’s Data Model
      1. Types
      2. Schemas
    7. 5. Introduction to Pig Latin
      1. Preliminary Matters
      2. Input and Output
      3. Relational Operations
      4. User Defined Functions
    8. 6. Advanced Pig Latin
      1. Advanced Relational Operations
      2. Integrating Pig with Legacy Code and MapReduce
      3. Nonlinear Data Flows
      4. Controlling Execution
      5. Pig Latin Preprocessor
    9. 7. Developing and Testing Pig Latin Scripts
      1. Development Tools
      2. Testing Your Scripts with PigUnit
    10. 8. Making Pig Fly
      1. Writing Your Scripts to Perform Well
      2. Writing Your UDF to Perform
      3. Tune Pig and Hadoop for Your Job
      4. Using Compression in Intermediate Results
      5. Data Layout Optimization
      6. Bad Record Handling
    11. 9. Embedding Pig Latin in Python
      1. Compile
      2. Bind
      3. Run
      4. Utility Methods
    12. 10. Writing Evaluation and Filter Functions
      1. Writing an Evaluation Function in Java
      2. Algebraic Interface
      3. Accumulator Interface
      4. Python UDFs
      5. Writing Filter Functions
    13. 11. Writing Load and Store Functions
      1. Load Functions
      2. Store Functions
    14. 12. Pig and Other Members of the Hadoop Community
      1. Pig and Hive
      2. Cascading
      3. NoSQL Databases
      4. Metadata in Hadoop
    15. A. Built-in User Defined Functions and Piggybank
      1. Built-in UDFs
      2. Piggybank
    16. B. Overview of Hadoop
      1. MapReduce
      2. Hadoop Distributed File System
    17. Index
    18. About the Author
    19. Colophon
    20. SPECIAL OFFER: Upgrade this ebook with O’Reilly

Algebraic Interface

I have already mentioned in a number of other places that there are significant advantages to using Hadoop’s combiner whenever possible. It lowers skew in your reduce tasks, as well as the amount of data sent over the network between map and reduce tasks. For details on the combiner and when it is run, see Combiner Phase.

Use of the combiner is interesting when you are working with sets of data, usually sets you intend to aggregate and then compute a single or small set of values for. There are two classes of functions that fit nicely into the combiner: distributive and algebraic. A function is distributive if the same result is obtained by 1) dividing its input set into subsets, applying the function to those subsets, and then applying the function to those results; or 2) applying the function to the original set. SUM is an example of this. A function is said to be algebraic if it can be divided into initial, intermediate, and final functions (possibly different from the initial function), where the initial function is applied to subsets of the input set, the intermediate function is applied to results of the initial function, and the final function is applied to all of the results of the intermediate function. COUNT is an example of an algebraic function, with count being used as the initial function and sum as the intermediate and final functions. A distributive function is a special case of an algebraic function, where the initial, intermediate, and final functions are all identical to the original function.

An EvalFunc can declare itself to be algebraic by implementing the Java interface Algebraic. Algebraic provides three methods that allow your UDF to declare Java classes that implement its initial, intermediate, and final functionality. These classes must extend EvalFunc:

// src/org/apache/pig/
public interface Algebraic{
     * Get the initial function. 
     * @return A function name of f_init. f_init should be an eval func.
    public String getInitial();

     * Get the intermediate function. 
     * @return A function name of f_intermed. f_intermed should be an eval func.
    public String getIntermed();

     * Get the final function. 
     * @return A function name of f_final. f_final should be an eval func 
     * parameterized by the same datum as the eval func implementing this interface.
    public String getFinal();

Each of these methods returns a name of a Java class, which should itself implement EvalFunc. Pig will use these UDFs to rewrite the execution of your script. Consider the following Pig Latin script:

input = load 'data' as (x, y);
grpd  = group input by x;
cnt   = foreach grpd generate group, COUNT(input);
store cnt into 'result';

The execution pipeline for this script would initially look like:




foreach(group, COUNT), store

After being rewritten to use the combiner, it would look like:



foreach(group, COUNT.Initial)


foreach(group, COUNT.Intermediate)


foreach(group, COUNT.Final), store

As an example, we will walk through the implementation for COUNT. Its algebraic functions look like this:

// src/org/apache/pig/builtin/
public String getInitial() {
    return Initial.class.getName();

public String getIntermed() {
    return Intermediate.class.getName();

public String getFinal() {
    return Final.class.getName();

Each of these referenced classes is a static internal class in COUNT. The implementation of Initial is:

// src/org/apache/pig/builtin/
static public class Initial extends EvalFunc<Tuple> {

    public Tuple exec(Tuple input) throws IOException {
        // Since Initial is guaranteed to be called
        // only in the map, it will be called with an
        // input of a bag with a single tuple - the 
        // count should always be 1 if bag is nonempty,
        DataBag bag = (DataBag)input.get(0);
        Iterator it = bag.iterator();
        if (it.hasNext()){
            Tuple t = (Tuple);
            if (t != null && t.size() > 0 && t.get(0) != null)
                return mTupleFactory.newTuple(Long.valueOf(1));
        return mTupleFactory.newTuple(Long.valueOf(0));

Even though the initial function is guaranteed to receive only one record in its input, that record will match the schema of the original function. So, in the case of COUNT, it will be a bag. Thus, this initial method determines whether there is a nonnull record in that bag. If so, it returns one; otherwise, it returns zero. The return type of the initial function is a tuple. The contents of that tuple are entirely up to you as the UDF implementer. In this case, the initial returns a tuple with one long field.

COUNT’s Intermediate class sums the counts seen so far:

// src/org/apache/pig/builtin/
static public class Intermediate extends EvalFunc<Tuple> {

    public Tuple exec(Tuple input) throws IOException {
        try {
            return mTupleFactory.newTuple(sum(input));
        } catch (ExecException ee) {

static protected Long sum(Tuple input)
throws ExecException, NumberFormatException {
    DataBag values = (DataBag)input.get(0);
    long sum = 0;
    for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
        Tuple t =;
        sum += (Long)t.get(0);
    return sum;

The input to the intermediate function is a bag of tuples that were returned by the initial function. The intermediate function may be called zero, one, or many times. So, it needs to output tuples that match the input tuples it expects. The framework will handle placing those tuples in bags. COUNT’s intermediate function returns a tuple with a long. As we now want to sum the previous counts, this function implements SUM rather than COUNT.

The final function is called in the reducer and is guaranteed to be called only once. Its input type is a bag of tuples that both the initial and intermediate implementations return. Its return type needs to be the return type of the original UDF, which in this case is long. In COUNT’s case, this is the same operation as the intermediate because it sums the intermediate sums:

// src/org/apache/pig/builtin/
static public class Final extends EvalFunc<Long> {
    public Long exec(Tuple input) throws IOException {
        try {
            return sum(input);
        } catch (Exception ee) {

Implementing Algebraic does not guarantee that the algebraic implementation will always be used. Pig chooses the algebraic implementation only if all UDFs in the same foreach statement are algebraic. This is because our testing has shown that using the combiner with data that cannot be combined significantly slows down the job. And there is no way in Hadoop to route some data to the combiner (for algebraic functions) and some straight to the reducer (for nonalgebraic). This means that your UDF must always implement the exec method, even if you hope it will always be used in algebraic mode. An additional motivation is to implement algebraic mode for your UDFs when possible.

The best content for your career. Discover unlimited learning on demand for around $1/day.