You are previewing Programming Pig.

Programming Pig

Cover of Programming Pig by Alan Gates Published by O'Reilly Media, Inc.
  1. Programming Pig
    1. SPECIAL OFFER: Upgrade this ebook with O’Reilly
    2. Preface
      1. Data Addiction
      2. Who Should Read This Book
      3. Conventions Used in This Book
      4. Code Examples in This Book
      5. Using Code Examples
      6. Safari® Books Online
      7. How to Contact Us
      8. Acknowledgments
    3. 1. Introduction
      1. What Is Pig?
      2. Pig’s History
    4. 2. Installing and Running Pig
      1. Downloading and Installing Pig
      2. Running Pig
    5. 3. Grunt
      1. Entering Pig Latin Scripts in Grunt
      2. HDFS Commands in Grunt
      3. Controlling Pig from Grunt
    6. 4. Pig’s Data Model
      1. Types
      2. Schemas
    7. 5. Introduction to Pig Latin
      1. Preliminary Matters
      2. Input and Output
      3. Relational Operations
      4. User Defined Functions
    8. 6. Advanced Pig Latin
      1. Advanced Relational Operations
      2. Integrating Pig with Legacy Code and MapReduce
      3. Nonlinear Data Flows
      4. Controlling Execution
      5. Pig Latin Preprocessor
    9. 7. Developing and Testing Pig Latin Scripts
      1. Development Tools
      2. Testing Your Scripts with PigUnit
    10. 8. Making Pig Fly
      1. Writing Your Scripts to Perform Well
      2. Writing Your UDF to Perform
      3. Tune Pig and Hadoop for Your Job
      4. Using Compression in Intermediate Results
      5. Data Layout Optimization
      6. Bad Record Handling
    11. 9. Embedding Pig Latin in Python
      1. Compile
      2. Bind
      3. Run
      4. Utility Methods
    12. 10. Writing Evaluation and Filter Functions
      1. Writing an Evaluation Function in Java
      2. Algebraic Interface
      3. Accumulator Interface
      4. Python UDFs
      5. Writing Filter Functions
    13. 11. Writing Load and Store Functions
      1. Load Functions
      2. Store Functions
    14. 12. Pig and Other Members of the Hadoop Community
      1. Pig and Hive
      2. Cascading
      3. NoSQL Databases
      4. Metadata in Hadoop
    15. A. Built-in User Defined Functions and Piggybank
      1. Built-in UDFs
      2. Piggybank
    16. B. Overview of Hadoop
      1. MapReduce
      2. Hadoop Distributed File System
    17. Index
    18. About the Author
    19. Colophon
    20. SPECIAL OFFER: Upgrade this ebook with O’Reilly
O'Reilly logo

Store Functions

Pig’s store function is, in many ways, a mirror image of the load function. It is built on top of Hadoop’s OutputFormat. It takes Pig Tuples and creates key-value pairs that its associated output format writes to storage.

StoreFunc is an abstract class, which allows it to provide default implementations for some methods. However, some functions implement both load and store functionality; PigStorage is one example. Because Java does not support multiple inheritance, the interface StoreFuncInterface is provided. These dual load/store functions can implement this interface rather than extending StoreFunc.

Store function operations are split between the frontend and backend of Pig. Pig does planning and optimization on the frontend. Store functions have an opportunity at this time to check that a valid schema is being used and set up the storage location. On the backend, store functions take a tuple from Pig, convert it to a key-value pair, and pass it to a Hadoop RecordWriter. Store functions can pass information from frontend invocations to backend invocations via UDFContext.

Store Function Frontend Planning

Store functions have three tasks to fulfill on the frontend:

  • Instantiate the OutputFormat they will use to store data.

  • Check the schema of the data being stored.

  • Record the location where the data will be stored.

Determining OutputFormat

Pig calls getOutputFormat to get an instance of the output format that your store function will use to store records. This method returns an instance rather than the classname or the class itself. This allows your store function to control how the class is instantiated. The example store function JsonStorage uses TextOutputFormat. This is an output format that stores text data in HDFS. We have to instantiate this with a key of LongWritable and a value of Text to match the expectations of TextInputFormat:

// JsonStorage.java
public OutputFormat getOutputFormat() throws IOException {
    return new TextOutputFormat<LongWritable, Text>();
}

Setting the output location

Pig calls setStoreLocation to communicate the location string the user provides to your store function. Given the Pig Latin store Z into 'output';, output is the location string. This method, called on both the frontend and the backend, could be called multiple times; consequently, it should not have any side effects that will cause a problem if this happens. Your store function will need to communicate the location to its output format. Our example store function uses the FileOutputFormat utility function setOutputPath to do this:

// JsonStorage.java
public void setStoreLocation(String location, Job job) throws IOException {
    FileOutputFormat.setOutputPath(job, new Path(location));
}

The Hadoop Job is passed to this function as well. Most output formats store the location information in the job.

Pig calls setStoreLocation on both the frontend and backend because output formats usually store their location in the job, as we see in our example store function. This works for MapReduce jobs, where a single output format is guaranteed. But due to the split operator, Pig can have more than one instance of the same store function in a job. If multiple instances of a store function call FileOutputFormat.setOutputPath, whichever instance calls it last will overwrite the others. Pig avoids this by keeping output-specific information and calling setStoreLocation again on the backend so that it can properly configure the output format.

For HDFS files, the user might provide a relative path. Pig needs to resolve these to absolute paths using the current working directory at the time the store is called. To accomplish this, Pig calls relToAbsPathForStoreLocation with the user-provided location string before calling setStoreLocation. This method translates between relative and absolute paths. For store functions writing to HDFS, the default implementation in StoreFunc handles the conversion. If you are writing a store function that does not use file paths (e.g., HBase), you should override this method to return the string it is passed.

Checking the schema

As part of frontend planning, Pig gives your store function a chance to check the schema of the data to be stored. If you are storing data to a system that expects a certain schema for the output (such as an RDBMS) or you cannot store certain data types, this is the place to perform those checks. Oddly enough, this method returns a void rather than a Boolean. So if you detect an issue with the schema, you must throw an IOException.

Our example store function does not have limitations on the schemas it can store. However, it uses this function as a place to serialize the schema into UDFContext so that it can be used on the backend when writing data:

// JsonStorage.java

public void checkSchema(ResourceSchema s) throws IOException {
    UDFContext udfc = UDFContext.getUDFContext();
    Properties p = 
        udfc.getUDFProperties(this.getClass(), new String[]{udfcSignature});
    p.setProperty("pig.jsonstorage.schema", s.toString());
}

Store Functions and UDFContext

Store functions work with UDFContext exactly as load functions do, but with one exception: the signature for store functions is passed to the store function via setStoreFuncUDFContextSignature. See Passing Information from the Frontend to the Backend for a discussion of how load functions work with UDFContext. Our example store function stores the signature in a member variable for later use:

// JsonStorage.java
public void setStoreFuncUDFContextSignature(String signature) {
    udfcSignature = signature;
} 

Writing Data

During backend processing, the store function is first initialized, and then takes Pig tuples and converts them to key-value pairs to be written to storage.

Preparing to write

Pig calls your store function’s prepareToWrite method in each map or reduce task before writing any data. This call passes a RecordWriter instance to use when writing data. RecordWriter is a class that OutputFormat uses to write individual records. Pig will get the record writer it passes to your store function by calling getRecordWriter on the output format your store function returned from getOutputFormat. Your store function will need to keep this reference so that it can be used in putNext.

The example store function JsonStorage also uses this method to read the schema out of the UDFContext. It will use this schema when storing data. Finally, it creates a JsonFactory for use in putNext:

// JsonStorage.java
public void prepareToWrite(RecordWriter writer) throws IOException {
    // Store the record writer reference so we can use it when it's time
    // to write tuples.
    this.writer = writer;

    // Get the schema string from the UDFContext object.
    UDFContext udfc = UDFContext.getUDFContext();
    Properties p =
        udfc.getUDFProperties(this.getClass(), new String[]{udfcSignature});
    String strSchema = p.getProperty("pig.jsonstorage.schema");
    if (strSchema == null) {
        throw new IOException("Could not find schema in UDF context");
    }

    // Parse the schema from the string stored in the properties object.
    ResourceSchema schema =
        new ResourceSchema(Utils.getSchemaFromString(strSchema));
    fields = schema.getFields();

    // Build a Json factory.
    jsonFactory = new JsonFactory();
    jsonFactory.configure(
        JsonGenerator.Feature.WRITE_NUMBERS_AS_STRINGS, false);
}

Writing records

putNext is the core method in the store function class. Pig calls this method for every tuple it needs to store. Your store function needs to take these tuples and produce the key-value pairs that its output format expects. For information on the Java objects in which the data will be stored and how to extract them, see Interacting with Pig values.

JsonStorage encodes the contents of the tuple in JSON format and writes the resulting string into the value field of TextOutputFormat. The key field is left null:

// JsonStorage.java
public void putNext(Tuple t) throws IOException {
    // Build a ByteArrayOutputStream to write the JSON into.
    ByteArrayOutputStream baos = new ByteArrayOutputStream(BUF_SIZE);
    // Build the generator.
    JsonGenerator json =
        jsonFactory.createJsonGenerator(baos, JsonEncoding.UTF8);

    // Write the beginning of the top-level tuple object.
    json.writeStartObject();
    for (int i = 0; i < fields.length; i++) {
        writeField(json, fields[i], t.get(i));
    }
    json.writeEndObject();
    json.close();

    // Hand a null key and our string to Hadoop.
    try {   
        writer.write(null, new Text(baos.toByteArray()));
    } catch (InterruptedException ie) {
        throw new IOException(ie);
    }
}

private void writeField(JsonGenerator json,
                        ResourceFieldSchema field,
                        Object d) throws IOException {

    // If the field is missing or the value is null, write a null.
    if (d == null) {
        json.writeNullField(field.getName());
        return;
    }

    // Based on the field's type, write it out.
    switch (field.getType()) {
    case DataType.INTEGER:
        json.writeNumberField(field.getName(), (Integer)d);
        return;

    case DataType.LONG:
        json.writeNumberField(field.getName(), (Long)d);
        return;

    case DataType.FLOAT:
        json.writeNumberField(field.getName(), (Float)d);
        return;

    case DataType.DOUBLE:
        json.writeNumberField(field.getName(), (Double)d);
        return;

    case DataType.BYTEARRAY:
        json.writeBinaryField(field.getName(), ((DataByteArray)d).get());
        return;

    case DataType.CHARARRAY:
        json.writeStringField(field.getName(), (String)d);
        return;

    case DataType.MAP:
        json.writeFieldName(field.getName());
        json.writeStartObject();
        for (Map.Entry<String, Object> e : ((Map<String, Object>)d).entrySet()) {
            json.writeStringField(e.getKey(), e.getValue().toString());
        }
        json.writeEndObject();
        return;

    case DataType.TUPLE:
        json.writeFieldName(field.getName());
        json.writeStartObject();

        ResourceSchema s = field.getSchema();
        if (s == null) {
            throw new IOException("Schemas must be fully specified to use "
                + "this storage function.  No schema found for field " +
                field.getName());
        }
        ResourceFieldSchema[] fs = s.getFields();

        for (int j = 0; j < fs.length; j++) {
            writeField(json, fs[j], ((Tuple)d).get(j));
        }
        json.writeEndObject();
        return;

    case DataType.BAG:
        json.writeFieldName(field.getName());
        json.writeStartArray();
        s = field.getSchema();
        if (s == null) {
            throw new IOException("Schemas must be fully specified to use "
                + "this storage function.  No schema found for field " +
                field.getName());
        }
        fs = s.getFields();
        if (fs.length != 1 || fs[0].getType() != DataType.TUPLE) {
            throw new IOException("Found a bag without a tuple "
                + "inside!");
        }
        // Drill down the next level to the tuple's schema.
        s = fs[0].getSchema();
        if (s == null) {
            throw new IOException("Schemas must be fully specified to use "
                + "this storage function.  No schema found for field " +
                field.getName());
        }
        fs = s.getFields();
        for (Tuple t : (DataBag)d) {
            json.writeStartObject();
            for (int j = 0; j < fs.length; j++) {
                writeField(json, fs[j], t.get(j));
            }
            json.writeEndObject();
        }
        json.writeEndArray();
        return;
    }
}

Failure Cleanup

When jobs fail after execution has started, your store function may need to clean up partially stored results. Pig will call cleanupOnFailure to give your store function an opportunity to do this. It passes the location string and the job object so that your store function knows what it should clean up. In the HDFS case, the default implementation handles removing any output files created by the store function. You need to implement this method only if you are storing data somewhere other than HDFS.

Storing Metadata

If your storage format can store schemas in addition to data, your store function can implement the interface StoreMetadata. This provides a storeSchema method that is called by Pig as part of its frontend operations. Pig passes storeSchema a ResourceSchema, the location string, and the job object so that it can connect to its storage. The ResourceSchema is very similar to the Schema class described in Input and Output Schemas. There is one important difference, however. In ResourceFieldSchema, the schema object associated with a bag always has one field, which is a tuple. The schema for the tuples in the bag is described by that tuple’s ResourceFieldSchema.

The example store function JsonStorage stores the schema in a side file named _schema in the same directory as the data. The schema is stored as a string, using the toString method provided by the class:

// JsonStorage.java
public void storeSchema(ResourceSchema schema, String location, Job job)
throws IOException {
    // Store the schema in a side file in the same directory.  MapReduce
    // does not include files starting with "_" when reading data for a job.
    FileSystem fs = FileSystem.get(job.getConfiguration());
    DataOutputStream out = fs.create(new Path(location + "/_schema"));
    out.writeBytes(schema.toString());
    out.writeByte('\n');
    out.close();
}

StoreMetadata also has a storeStatistics function, but Pig does not use this yet.

The best content for your career. Discover unlimited learning on demand for around $1/day.