Cover by Alan Gates

Safari, the world’s most comprehensive technology and business learning platform.

Find the exact information you need to solve a problem on the fly, or go deeper to master the technologies and skills you need to succeed

Start Free Trial

No credit card required

O'Reilly logo

Chapter 11. Writing Load and Store Functions

We will now consider some of the more complex and most critical parts of Pig: data input and output. Operating on huge data sets is inherently I/O-intensive. Hadoop’s massive parallelism and movement of processing to the data mitigates but does not remove this. Having efficient methods to load and store data is therefore critical. Pig provides default load and store functions for text data and for HBase, but many users find they need to write their own load and store functions to handle the data formats and storage mechanisms they use.

As with evaluation functions, the design goal for load and store functions was to make easy things easy and hard things possible. Also, we wanted to make load and store functions a thin wrapper over Hadoop’s InputFormat and OutputFormat. The intention is that once you have an input format and output format for your data, the additional work of creating and storing Pig tuples is minimal. In the same way evaluation functions were implemented, more complex features such as schema management and projection push down are done via separate interfaces to avoid cluttering the base interface. Pig’s load and store functions were completely rewritten between versions 0.6 and 0.7. This chapter will cover only the interfaces for 0.7 and later releases.

One other important design goal for load and store functions is to not assume that the input sources and output sinks are HDFS. In the examples throughout this book, A = load 'foo'; has implied that foo is a file, but there is no need for that to be the case. foo is a resource locator that makes sense to your load function. It could be an HDFS file, an HBase table, a database JDBC connection string, or a web service URL. Because reading from HDFS is the most common case, many defaults and helper functions are provided for this case.

In this chapter we will walk through writing a load function and a store function for JSON data on HDFS, JsonLoader and JsonStorage, respectively. These are located in the example code in udfs/java/com/acme/io. They use the Jackson JSON library, which is included in your Pig distribution. However, the Jackson JAR is not shipped to the backend by Pig, so when using these UDFs in your script, you will need to register the Jackson JAR in addition to the acme examples JAR:

register 'acme.jar';
register 'src/pig/trunk/build/ivy/lib/Pig/jackson-core-asl-1.6.0.jar';

These UDFs will serve as helpful examples, but they will not cover all of the functionality of load and store functions. For those sections not shown in these examples, we will look at other existing load and store functions.

Load Functions

Pig’s load function is built on top of a Hadoop InputFormat, the class that Hadoop uses to read data. InputFormat serves two purposes: it determines how input will be split between map tasks, and it provides a RecordReader that produces key-value pairs as input to those map tasks. The load function takes these key-value pairs and returns a Pig Tuple.

The base class for the load function is LoadFunc. This is an abstract class, which allows it to provide helper functions and default implementations. Many load functions will only need to extend LoadFunc.

Load functions’ operations are split between Pig’s frontend and backend. On the frontend, Pig does job planning and optimization, and load functions participate in this in several ways that we will discuss later. On the backend, load functions get each record from the RecordReader, convert it to a tuple, and pass it on to Pig’s map task. Load functions also need to be able to pass data between the frontend and backend invocations so they can maintain state.

Frontend Planning Functions

For all load functions, Pig must do three things as part of frontend planning: 1) it needs to know the input format it should use to read the data; 2) it needs to be sure that the load function understands where its data is located; and 3) it needs to know how to cast bytearrays returned from the load function.

Determining InputFormat

Pig needs to know which InputFormat to use for reading your input. It calls getInputFormat to get an instance of the input format. It gets an instance rather than the class itself so that your load function can control the instantiation: any generic parameters, constructor arguments, etc. For our example load function, this method is very simple. It uses TextInputFormat, an input format that reads text data from HDFS files:

public InputFormat getInputFormat() throws IOException {
    return new TextInputFormat();

Determining the location

Pig communicates the location string provided by the user to the load function via setLocation. So, if the load operator in Pig Latin is A = load 'input';, input is the location string. This method is called on both the frontend and backend, possibly multiple times. Thus you need to take care that this method does not do anything that will cause problems if done more than one time. Your load function should communicate the location to its input format. For example, JsonLoader passes the filename via a helper method on FileInputFormat (a superclass of TextInputFormat):

public void setLocation(String location, Job job) throws IOException {
    FileInputFormat.setInputPaths(job, location);

The Hadoop Job is passed along with the location because that is where input formats usually store their configuration information.

setLocation is called on both the frontend and backend because input formats store their location in the Job object, as shown in the preceding example. For MapReduce jobs, which always have only one input, this works. For Pig jobs, where the same input format might be used to load multiple different inputs (such as in the join or union case), one instance of the input path will overwrite another in the Job object. To work around this, Pig remembers the location in an input-specific parameter and calls setLocation again on the backend so that the input format can get itself set up properly before reading.

For files on HDFS, the location provided by the user might be relative rather than absolute. To deal with this, Pig needs to resolve these to absolute locations based on the current working directory at the time of the load. Consider the following Pig Latin:

cd /user/joe;
input1 = load 'input';
cd /user/fred;
input2 = load 'input';

These two load statements should load different files. But Pig cannot assume it understands how to turn a relative path into an absolute path, because it does not know what that input is. It could be an HDFS path, a database table name, etc. So it leaves this to the load function. Before calling setLocation, Pig passes the location string to relativeToAbsolutePath to do any necessary conversion. Because most loaders are reading from HDFS, the default implementation in LoadFunc handles the HDFS case. If your loading will never need to do this conversion, it should override this method and return the location string passed to it.

Getting the casting functions

Some Pig functions, such as PigStorage and HBaseStorage, load data by default without understanding its type information, and place the data unchanged in DataByteArray objects. At a later time, when Pig needs to cast that data to another type, it does not know how to because it does not understand how the data is represented in the bytearray. Therefore, it relies on the load function to provide a method to cast from bytearray to the appropriate type.

Pig determines which set of casting functions to use by calling getLoadCaster on the load function. This should return either null, which indicates that your load function does not expect to do any bytearray casts, or an implementation of the LoadCaster interface, which will be used to do the casts. We will look at the methods of LoadCaster in Casting bytearrays.

Our example loader returns null because it provides typed data based on the stored schema and, therefore, does not expect to be casting data. Any bytearrays in its data are binary data that should not be cast.

Passing Information from the Frontend to the Backend

As with evaluation functions, load functions can make use of UDFContext to pass information from frontend invocations to backend invocations. For details on UDFContext, see UDFContext. One significant difference between using UDFContext in evaluation and load functions is determining the instance-specific signature of the function. In evaluation functions, constructor arguments were suggested as a way to do this. For load functions, the input location usually will be the differentiating factor. However, LoadFunc does not guarantee that it will call setLocation before other methods where you might want to use UDFContext. To work around this, setUDFContextSignature is provided. It provides an instance-unique signature that you can use when calling getUDFProperties. This method is guaranteed to be called before any other methods on LoadFunc in both the frontend and backend. Your UDF can then store this signature and use it when getting its property object:

private String udfcSignature = null;

public void setUDFContextSignature(String signature) {
    udfcSignature = signature;

setLocation is the only method in the load function that is guaranteed to be called on the frontend. It is therefore the best candidate for storing needed information to UDFContext. You might need to check that the data you are writing is available and nonnull to avoid overwriting your values when setLocation is called on the backend.

Backend Data Reading

On the backend, your load function takes the key-value pairs produced by its input format and produces Pig Tuples.

Getting ready to read

Before reading any data, Pig gives your load function a chance to set itself up by calling prepareToRead. This is called in each map task and passes a copy of the RecordReader, which your load function will need later to read records from the input. RecordReader is a class that InputFormat uses to read records from an input split. Pig obtains the record reader it passes to prepareToRead by calling getRecordReader on the input format that your store function returned from getInputFormat. Pig also passes an instance of the PigSplit that contains the Hadoop InputSplit corresponding to the partition of input this instance of your load function will read. If you need split-specific information, you can get it from here.

Our example loader, beyond storing the record reader, also reads the schema file that was stored into UDFContext in the frontend so that it knows how to parse the input file. Notice how it uses the signature passed in setUDFContextSignature to access the appropriate properties object. Finally, it creates a JsonFactory object that is used to generate a parser for each line:

public void prepareToRead(RecordReader reader, PigSplit split)
throws IOException {
    this.reader = reader;
    // Get the schema string from the UDFContext object.
    UDFContext udfc = UDFContext.getUDFContext();
    Properties p =
        udfc.getUDFProperties(this.getClass(), new String[]{udfcSignature});
    String strSchema = p.getProperty("pig.jsonloader.schema");
    if (strSchema == null) {
        throw new IOException("Could not find schema in UDF context");

    // Parse the schema from the string stored in the properties object.
    ResourceSchema schema =
        new ResourceSchema(Utils.getSchemaFromString(strSchema));
    fields = schema.getFields();

    jsonFactory = new JsonFactory();

Reading records

Now we have reached the meat of your load function, reading records from its record reader and returning tuples to Pig. Pig will call getNext and place the resulting tuple into its processing pipeline. It will keep doing this until getNext returns a null, which indicates that the input for this split has been fully read.

Pig does not copy the tuple that results from this method, but instead feeds it directly to its pipeline to avoid the copy overhead. This means this method cannot reuse objects, and instead must create a new tuple and contents for each record it reads. On the other hand, record readers may choose to reuse their key and value objects from record to record; most standard implementations do. So, before writing a loader that tries to be efficient and wraps the keys and values from the record reader directly into the tuple to avoid a copy, you must make sure you understand how the record reader is managing its data.

For information on creating the appropriate Java objects when constructing tuples for Pig, see Interacting with Pig values.

Our sample load function’s implementation of getNext reads the value from the Hadoop record (the key is ignored), constructs a JsonParser to parse it, parses the fields, and returns the resulting tuple. If there are parse errors, it does not throw an exception. Instead, it returns a tuple with null fields where the data could not be parsed. This prevents bad lines from causing the whole job to fail. Warnings are issued so that users can see which records were ignored:

public Tuple getNext() throws IOException {
    Text val = null;
    try {
        // Read the next key-value pair from the record reader.  If it's
        // finished, return null.
        if (!reader.nextKeyValue()) return null;

        // Get the current value.  We don't use the key.
        val = (Text)reader.getCurrentValue();
    } catch (InterruptedException ie) {
        throw new IOException(ie);
    // Create a parser specific for this input line.  This might not be the
    // most efficient approach.
    ByteArrayInputStream bais = new ByteArrayInputStream(val.getBytes());
    JsonParser p = jsonFactory.createJsonParser(bais);

    // Create the tuple we will be returning.  We create it with the right
    // number of fields, as the Tuple object is optimized for this case.
    Tuple t = tupleFactory.newTuple(fields.length);

    // Read the start object marker.  Throughout this file if the parsing
    // isn't what we expect, we return a tuple with null fields rather than
    // throwing an exception.  That way a few mangled lines don't fail the job.
    if (p.nextToken() != JsonToken.START_OBJECT) {
        log.warn("Bad record, could not find start of record " + val.toString());
        return t;

    // Read each field in the record.
    for (int i = 0; i < fields.length; i++) {
        t.set(i, readField(p, fields[i], i));

    if (p.nextToken() != JsonToken.END_OBJECT) {
        log.warn("Bad record, could not find end of record " +
        return t;
    return t;

private Object readField(JsonParser p,
                         ResourceFieldSchema field,
                         int fieldnum) throws IOException {
    // Read the next token.
    JsonToken tok = p.nextToken();
    if (tok == null) {
        log.warn("Early termination of record, expected " + fields.length
            + " fields bug found " + fieldnum);
        return null;

    // Check to see if this value was null.
    if (tok == JsonToken.VALUE_NULL) return null;

    // Read based on our expected type.
    switch (field.getType()) {
    case DataType.INTEGER:
        // Read the field name.
        return p.getValueAsInt();

    case DataType.LONG:
        return p.getValueAsLong();

    case DataType.FLOAT: 
        return (float)p.getValueAsDouble();

    case DataType.DOUBLE: 
        return p.getValueAsDouble();

    case DataType.BYTEARRAY:
        byte[] b = p.getBinaryValue();
        // Use the DBA constructor that copies the bytes so that we own
        // the memory.
        return new DataByteArray(b, 0, b.length);

    case DataType.CHARARRAY:
        return p.getText();

    case DataType.MAP:
        // Should be a start of the map object.
        if (p.nextToken() != JsonToken.START_OBJECT) {
            log.warn("Bad map field, could not find start of object, field "
                + fieldnum);
            return null;
        Map<String, String> m = new HashMap<String, String>();
        while (p.nextToken() != JsonToken.END_OBJECT) {
            String k = p.getCurrentName();
            String v = p.getText();
            m.put(k, v);
        return m;

    case DataType.TUPLE:
        if (p.nextToken() != JsonToken.START_OBJECT) {
            log.warn("Bad tuple field, could not find start of object, "
                + "field " + fieldnum);
            return null;

        ResourceSchema s = field.getSchema();
        ResourceFieldSchema[] fs = s.getFields();
        Tuple t = tupleFactory.newTuple(fs.length);

        for (int j = 0; j < fs.length; j++) {
            t.set(j, readField(p, fs[j], j));

        if (p.nextToken() != JsonToken.END_OBJECT) {
            log.warn("Bad tuple field, could not find end of object, "
                + "field " + fieldnum);
            return null;
        return t;

    case DataType.BAG:
        if (p.nextToken() != JsonToken.START_ARRAY) {
            log.warn("Bad bag field, could not find start of array, "
                + "field " + fieldnum);
            return null;

        s = field.getSchema();
        fs = s.getFields();
        // Drill down the next level to the tuple's schema.
        s = fs[0].getSchema();
        fs = s.getFields();

        DataBag bag = bagFactory.newDefaultBag();

        JsonToken innerTok;
        while ((innerTok = p.nextToken()) != JsonToken.END_ARRAY) {
            if (innerTok != JsonToken.START_OBJECT) {
                log.warn("Bad bag tuple field, could not find start of "
                    + "object, field " + fieldnum);
                return null;

            t = tupleFactory.newTuple(fs.length);
            for (int j = 0; j < fs.length; j++) {
                t.set(j, readField(p, fs[j], j));

            if (p.nextToken() != JsonToken.END_OBJECT) {
                log.warn("Bad bag tuple field, could not find end of "
                    + "object, field " + fieldnum);
                return null;
        return bag;

        throw new IOException("Unknown type in input schema: " +


Additional Load Function Interfaces

Your load function can provide more complex features by implementing additional interfaces. (Implementation of these interfaces is optional.)

Loading metadata

Many data storage mechanisms can record the schema along with the data. Pig does not assume the ability to store schemas, but if your storage can hold the schema, it can be very useful. This frees script writers from needing to specify the field names and types as part of the load operator in Pig Latin. This is user-friendly and less error-prone, and avoids the need to rewrite scripts when the schema of your data changes.

Some types of data storage also partition the data. If Pig understands this partitioning, it can load only those partitions that are needed for a particular script. Both of these functions are enabled by implementing the LoadMetadata interface.

getSchema in the LoadMetadata interface gives your load function a chance to provide a schema. It is passed the location string the user provides as well as the Hadoop Job object, in case it needs information in this object to open the schema. It is expected to return a ResourceSchema, which represents the data that will be returned. ResourceSchema is very similar to the Schema class used by evaluation functions. (See Input and Output Schemas for details.) There is one important difference, however. In ResourceFieldSchema, the schema object associated with a bag always has one field, which is a tuple. The schema for the tuples in the bag is described by that tuple’s ResourceFieldSchema.

Our example load and store functions keep the schema in a side file[28] named _schema in HDFS. Our implementation of getSchema reads this file and also serializes the schema into UDFContext so that it is available on the backend:

public ResourceSchema getSchema(String location, Job job)
throws IOException {
    // Open the schema file and read the schema.
    // Get an HDFS handle.
    FileSystem fs = FileSystem.get(job.getConfiguration());
    DataInputStream in = Path(location + "/_schema"));
    String line = in.readLine();

    // Parse the schema.
    ResourceSchema s = new ResourceSchema(Utils.getSchemaFromString(line));
    if (s == null) {
        throw new IOException("Unable to parse schema found in file " +
            location + "/_schema");

    // Now that we have determined the schema, store it in our
    // UDFContext properties object so we have it when we need it on the
    // backend.
    UDFContext udfc = UDFContext.getUDFContext();
    Properties p =
        udfc.getUDFProperties(this.getClass(), new String[]{udfcSignature});
    p.setProperty("pig.jsonloader.schema", line);

    return s;

Once your loader implements getSchema, load statements that use your loader do not need to declare their schemas in order for the field names to be used in the script. For example, if we had data with a schema of user:chararray, age:int, gpa:double, the following Pig Latin will compile and run:

register 'acme.jar';
register 'src/pig/trunk/build/ivy/lib/Pig/jackson-core-asl-1.6.0.jar';
A = load 'input' using;
B = foreach A generate user;
dump B;

LoadMetadata also includes a getStatistics method. Pig does not yet make use of statistics in job planning; this method is for future use.

Using partitions

Some types of storage partition their data, allowing you to read only the relevant sections for a given job. The LoadMetadata interface also provides methods for working with partitions in your data. In order for Pig to request the relevant partitions, it must know how the data is partitioned. Pig determines this by calling getPartitionKeys. If this returns a null or the LoadMetadata interface is not implemented by your loader, Pig will assume it needs to read the entire input.

Pig expects getPartitionKeys to return an array of strings, where each string represents one field name. Those fields are the keys used to partition the data. Pig will look for a filter statement immediately following the load statement that includes one or more of these fields. If such a statement is found, it will be passed to setPartitionFilter. If the filter includes both partition and nonpartition keys and it can be split,[29] Pig will split it and pass just the partition-key-related expression to setPartitionFilter. As an example, consider an HCatalog[30] table web_server_logs that is partitioned by two fields, date and colo:

logs    = load 'web_server_logs' using HCatLoader();
cleaned = filter logs by date = '20110614' and NotABot(user_id);

Pig will call getPartitionKeys, and HCatLoader will return two key names, date and colo. Pig will find the date field in the filter statement and rewrite the filter as shown in the following example, pushing down the date = '20110614' predicate to HCatLoader via setPartitionFilter:

logs    = load 'web_server_logs' using HCatLoader();
cleaned = filter logs by NotABot(user_id);

It is now up to HCatalog loader to assure that it only returns data from web_server_logs where date is 20110614.

The one exception to this is fields used in eval funcs or filter funcs. Pig assumes that loaders do not understand how to invoke UDFs, so Pig will not push these expressions.

Our example loader works on file data, so it does not implement getPartitionKeys or setPartitionFilter. For an example implementation of these methods, see the HCatalog code at

Casting bytearrays

If you need to control how binary data that your loader loads is cast to other data types, you can implement the LoadCaster interface. Because this interface contains a lot of methods, implementers often implement it as a separate class. This also allows load functions to share implementations of LoadCaster, since Java does not support multiple inheritance.

The interface consists of a series of methods: bytesToInteger, bytesToLong, etc. These will be called to convert a bytearray to the appropriate type. Starting in 0.9, there are two bytesToMap methods. You should implement the one that takes a ResourceFieldSchema; the other one is for backward-compatibility. The bytesToBag, bytesToTuple, and bytesToMap methods take a ResourceFieldSchema that describes the field being converted. Calling getSchema on this object will return a schema that describes this bag, tuple, or map, if one exists. If Pig does not know the intended structure of the object, getSchema will return null. Keep in mind that the schema of the bag will be one field, a tuple, which in turn will have a schema describing the contents of that tuple.

A default load caster, Utf8StorageConverter, is provided. It handles converting UTF8-encoded text to Pig types. Scalar conversions are done in a straightforward way. Maps are expected to be surrounded by [] (square brackets), with keys separated by values with # (hash) and key-value pairs separated by , (commas). Tuples are surrounded by () (parentheses) and have fields separated by , (commas). Bags are surrounded by {} (braces) and have tuples separated by , (commas). There is no ability to escape these special characters.

Pushing down projections

Often a Pig Latin script will need to read only a few fields in the input. Some types of storage formats store their data by fields instead of by records (for example, Hive’s RCFile). For these types of formats, there is a significant performance gain to be had by loading only those fields that will be used in the script. Even for record-oriented storage formats, it can be useful to skip deserializing fields that will not be used.

As part of its optimizations, Pig analyzes Pig Latin scripts and determines what fields in an input it needs at each step in the script. It uses this information to aggressively drop fields it no longer needs. If the loader implements the LoadPushDown interface, Pig can go a step further and provide this information to the loader.

Once Pig knows the fields it needs, it assembles them in a RequiredFieldList and passes that to pushProjection. In the load function’s reply, it indicates whether it can meet the request. It responds with a RequiredFieldResponse, which is a fancy wrapper around a Boolean. If the Boolean is true, Pig will assume that only the required fields are being returned from getNext. If it is false, Pig will assume that all fields are being returned by getNext, and it will handle dropping the extra ones itself.

The RequiredField class used to describe which fields are required is slightly complex. Beyond allowing a user to specify whether a given field is required, it provides the ability to specify which subfields of that field are required. For example, for maps, certain keys can be listed as required. For tuples and bags, certain fields can be listed as required.

Load functions that implement LoadPushDown should not modify the schema object returned by getSchema. This should always be the schema of the full input. Pig will manage the translation between the schema having all of the fields and the results of getNext having only some.

Our example loader does not implement LoadPushDown. For an example of a loader that does, see HCatLoader at

[28] A file in the same directory that is not a part file. Side files start with an underscore character. MapReduce’s FileInputFormat knows to ignore them when reading input for a job.

[29] Meaning that the filter can be broken into two filters—one that contains the partition keys and one that does not—and produce the same end result. This is possible when the expressions are connected by and but not when they are connected by or.

[30] HCatalog is a table-management service for Hadoop. It includes Pig load and store functions. See Metadata in Hadoop for more information on HCatalog.

Find the exact information you need to solve a problem on the fly, or go deeper to master the technologies and skills you need to succeed

Start Free Trial

No credit card required