O'Reilly logo

Programming Pig by Alan Gates

Stay ahead with the world's most comprehensive technology and business learning platform.

With Safari, you learn the way you learn best. Get unlimited access to videos, live online training, learning paths, books, tutorials, and more.

Start Free Trial

No credit card required

Algebraic Interface

I have already mentioned in a number of other places that there are significant advantages to using Hadoop’s combiner whenever possible. It lowers skew in your reduce tasks, as well as the amount of data sent over the network between map and reduce tasks. For details on the combiner and when it is run, see Combiner Phase.

Use of the combiner is interesting when you are working with sets of data, usually sets you intend to aggregate and then compute a single or small set of values for. There are two classes of functions that fit nicely into the combiner: distributive and algebraic. A function is distributive if the same result is obtained by 1) dividing its input set into subsets, applying the function to those subsets, and then applying the function to those results; or 2) applying the function to the original set. SUM is an example of this. A function is said to be algebraic if it can be divided into initial, intermediate, and final functions (possibly different from the initial function), where the initial function is applied to subsets of the input set, the intermediate function is applied to results of the initial function, and the final function is applied to all of the results of the intermediate function. COUNT is an example of an algebraic function, with count being used as the initial function and sum as the intermediate and final functions. A distributive function is a special case of an algebraic function, where the initial, intermediate, and final functions are all identical to the original function.

An EvalFunc can declare itself to be algebraic by implementing the Java interface Algebraic. Algebraic provides three methods that allow your UDF to declare Java classes that implement its initial, intermediate, and final functionality. These classes must extend EvalFunc:

// src/org/apache/pig/Algebraic.java
public interface Algebraic{
    
    /**
     * Get the initial function. 
     * @return A function name of f_init. f_init should be an eval func.
     */
    public String getInitial();

    /**
     * Get the intermediate function. 
     * @return A function name of f_intermed. f_intermed should be an eval func.
     */
    public String getIntermed();

    /**
     * Get the final function. 
     * @return A function name of f_final. f_final should be an eval func 
     * parameterized by the same datum as the eval func implementing this interface.
     */
    public String getFinal();
}

Each of these methods returns a name of a Java class, which should itself implement EvalFunc. Pig will use these UDFs to rewrite the execution of your script. Consider the following Pig Latin script:

input = load 'data' as (x, y);
grpd  = group input by x;
cnt   = foreach grpd generate group, COUNT(input);
store cnt into 'result';

The execution pipeline for this script would initially look like:

Map

load

Reduce

foreach(group, COUNT), store

After being rewritten to use the combiner, it would look like:

Map

load

foreach(group, COUNT.Initial)

Combine

foreach(group, COUNT.Intermediate)

Reduce

foreach(group, COUNT.Final), store

As an example, we will walk through the implementation for COUNT. Its algebraic functions look like this:

// src/org/apache/pig/builtin/COUNT.java
public String getInitial() {
    return Initial.class.getName();
}

public String getIntermed() {
    return Intermediate.class.getName();
}

public String getFinal() {
    return Final.class.getName();
}

Each of these referenced classes is a static internal class in COUNT. The implementation of Initial is:

// src/org/apache/pig/builtin/COUNT.java
static public class Initial extends EvalFunc<Tuple> {

    public Tuple exec(Tuple input) throws IOException {
        // Since Initial is guaranteed to be called
        // only in the map, it will be called with an
        // input of a bag with a single tuple - the 
        // count should always be 1 if bag is nonempty,
        DataBag bag = (DataBag)input.get(0);
        Iterator it = bag.iterator();
        if (it.hasNext()){
            Tuple t = (Tuple)it.next();
            if (t != null && t.size() > 0 && t.get(0) != null)
                return mTupleFactory.newTuple(Long.valueOf(1));
        }
        return mTupleFactory.newTuple(Long.valueOf(0));
    }
}

Even though the initial function is guaranteed to receive only one record in its input, that record will match the schema of the original function. So, in the case of COUNT, it will be a bag. Thus, this initial method determines whether there is a nonnull record in that bag. If so, it returns one; otherwise, it returns zero. The return type of the initial function is a tuple. The contents of that tuple are entirely up to you as the UDF implementer. In this case, the initial returns a tuple with one long field.

COUNT’s Intermediate class sums the counts seen so far:

// src/org/apache/pig/builtin/COUNT.java
static public class Intermediate extends EvalFunc<Tuple> {

    public Tuple exec(Tuple input) throws IOException {
        try {
            return mTupleFactory.newTuple(sum(input));
        } catch (ExecException ee) {
            ...
        }
    }
}

static protected Long sum(Tuple input)
throws ExecException, NumberFormatException {
    DataBag values = (DataBag)input.get(0);
    long sum = 0;
    for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
        Tuple t = it.next();
        sum += (Long)t.get(0);
    }
    return sum;
}

The input to the intermediate function is a bag of tuples that were returned by the initial function. The intermediate function may be called zero, one, or many times. So, it needs to output tuples that match the input tuples it expects. The framework will handle placing those tuples in bags. COUNT’s intermediate function returns a tuple with a long. As we now want to sum the previous counts, this function implements SUM rather than COUNT.

The final function is called in the reducer and is guaranteed to be called only once. Its input type is a bag of tuples that both the initial and intermediate implementations return. Its return type needs to be the return type of the original UDF, which in this case is long. In COUNT’s case, this is the same operation as the intermediate because it sums the intermediate sums:

// src/org/apache/pig/builtin/COUNT.java
static public class Final extends EvalFunc<Long> {
    public Long exec(Tuple input) throws IOException {
        try {
            return sum(input);
        } catch (Exception ee) {
            ...
        }
    }
}

Implementing Algebraic does not guarantee that the algebraic implementation will always be used. Pig chooses the algebraic implementation only if all UDFs in the same foreach statement are algebraic. This is because our testing has shown that using the combiner with data that cannot be combined significantly slows down the job. And there is no way in Hadoop to route some data to the combiner (for algebraic functions) and some straight to the reducer (for nonalgebraic). This means that your UDF must always implement the exec method, even if you hope it will always be used in algebraic mode. An additional motivation is to implement algebraic mode for your UDFs when possible.

With Safari, you learn the way you learn best. Get unlimited access to videos, live online training, learning paths, books, interactive tutorials, and more.

Start Free Trial

No credit card required