I have already mentioned in a number of other places that there are significant advantages to using Hadoop’s combiner whenever possible. It lowers skew in your reduce tasks, as well as the amount of data sent over the network between map and reduce tasks. For details on the combiner and when it is run, see Combiner Phase.
Use of the combiner is interesting when you are working with sets of
data, usually sets you intend to aggregate and then compute a single or
small set of values for. There are two classes of functions that fit
nicely into the combiner: distributive and algebraic. A function is distributive
if the same result is obtained by 1) dividing its input set into subsets,
applying the function to those subsets, and then applying the function to
those results; or 2) applying the function to the original set. SUM
is an example of this. A function is said
to be algebraic if it can be divided into initial, intermediate, and final
functions (possibly different from the initial function), where the
initial function is applied to subsets of the input set, the intermediate
function is applied to results of the initial function, and the final
function is applied to all of the results of the intermediate function.
COUNT
is an example of an algebraic
function, with count being used as the initial function and sum as the
intermediate and final functions. A distributive function is a special
case of an algebraic function, where the initial, intermediate, and final
functions are all identical to the original function.
An EvalFunc
can declare
itself to be algebraic by implementing the Java interface
Algebraic
. Algebraic
provides three methods that allow your UDF to declare Java classes that
implement its initial, intermediate, and final functionality. These
classes must extend EvalFunc
:
// src/org/apache/pig/Algebraic.java public interface Algebraic{ /** * Get the initial function. * @return A function name of f_init. f_init should be an eval func. */ public String getInitial(); /** * Get the intermediate function. * @return A function name of f_intermed. f_intermed should be an eval func. */ public String getIntermed(); /** * Get the final function. * @return A function name of f_final. f_final should be an eval func * parameterized by the same datum as the eval func implementing this interface. */ public String getFinal(); }
Each of these methods returns a name of a Java
class, which should itself implement EvalFunc
. Pig
will use these UDFs to rewrite the execution of your script. Consider the
following Pig Latin script:
input = load 'data' as (x, y); grpd = group input by x; cnt = foreach grpd generate group, COUNT(input); store cnt into 'result';
The execution pipeline for this script would initially look like:
After being rewritten to use the combiner, it would look like:
As an example, we will walk through the
implementation for COUNT
. Its algebraic functions look like
this:
// src/org/apache/pig/builtin/COUNT.java public String getInitial() { return Initial.class.getName(); } public String getIntermed() { return Intermediate.class.getName(); } public String getFinal() { return Final.class.getName(); }
Each of these referenced classes is a static
internal class in COUNT
. The implementation of
Initial
is:
// src/org/apache/pig/builtin/COUNT.java static public class Initial extends EvalFunc<Tuple> { public Tuple exec(Tuple input) throws IOException { // Since Initial is guaranteed to be called // only in the map, it will be called with an // input of a bag with a single tuple - the // count should always be 1 if bag is nonempty, DataBag bag = (DataBag)input.get(0); Iterator it = bag.iterator(); if (it.hasNext()){ Tuple t = (Tuple)it.next(); if (t != null && t.size() > 0 && t.get(0) != null) return mTupleFactory.newTuple(Long.valueOf(1)); } return mTupleFactory.newTuple(Long.valueOf(0)); } }
Even though the initial function is guaranteed to
receive only one record in its input, that record will match the schema of
the original function. So, in the case of COUNT
, it
will be a bag. Thus, this initial method determines whether there is a
nonnull record in that bag. If so, it returns one; otherwise, it returns
zero. The return type of the initial function is a tuple. The contents of
that tuple are entirely up to you as the UDF implementer. In this case,
the initial returns a tuple with one long field.
COUNT
’s
Intermediate
class sums the counts seen so
far:
// src/org/apache/pig/builtin/COUNT.java static public class Intermediate extends EvalFunc<Tuple> { public Tuple exec(Tuple input) throws IOException { try { return mTupleFactory.newTuple(sum(input)); } catch (ExecException ee) { ... } } } static protected Long sum(Tuple input) throws ExecException, NumberFormatException { DataBag values = (DataBag)input.get(0); long sum = 0; for (Iterator<Tuple> it = values.iterator(); it.hasNext();) { Tuple t = it.next(); sum += (Long)t.get(0); } return sum; }
The input to the intermediate function is a bag of
tuples that were returned by the initial function. The intermediate
function may be called zero, one, or many times. So, it needs to output
tuples that match the input tuples it expects. The framework will handle
placing those tuples in bags. COUNT
’s intermediate
function returns a tuple with a long. As we now want to sum the previous
counts, this function implements SUM
rather than
COUNT
.
The final function is called in the reducer and is
guaranteed to be called only once. Its input type is a bag of tuples that
both the initial and intermediate implementations return. Its return type
needs to be the return type of the original UDF, which in this case is
long. In COUNT
’s case, this is the same operation as
the intermediate because it sums the intermediate sums:
// src/org/apache/pig/builtin/COUNT.java static public class Final extends EvalFunc<Long> { public Long exec(Tuple input) throws IOException { try { return sum(input); } catch (Exception ee) { ... } } }
Implementing Algebraic
does
not guarantee that the algebraic implementation will always be used. Pig
chooses the algebraic implementation only if all UDFs in the same
foreach
statement are algebraic. This is because our testing
has shown that using the combiner with data that cannot be combined
significantly slows down the job. And there is no way in Hadoop to route
some data to the combiner (for algebraic functions) and some straight to
the reducer (for nonalgebraic). This means that your UDF must always
implement the exec
method, even if you hope it will always be
used in algebraic mode. An additional motivation is to implement algebraic
mode for your UDFs when possible.