O'Reilly logo

Programming Pig by Alan Gates

Stay ahead with the world's most comprehensive technology and business learning platform.

With Safari, you learn the way you learn best. Get unlimited access to videos, live online training, learning paths, books, tutorials, and more.

Start Free Trial

No credit card required

NoSQL Databases

Over the last few years a number of NoSQL databases have arisen. These databases break one or more of the traditional rules of relational database systems. They do not expect data to be normalized. Instead, the data accessed by a single application lives in one large table so that few or no joins are necessary. Many of these databases do not implement full ACID semantics.[31]

Like MapReduce, these systems are built to manage terabytes of data. Unlike MapReduce, they are focused on random reads and writes of data. Where MapReduce and technologies built on top of it (such as Pig) are optimized for reading vast quantities of data very quickly, these NoSQL systems optimize for finding a few records very quickly. This different focus does not mean that Pig does not work with these systems. Users often want to analyze the data stored in these systems. Also, because these systems offer good random lookup, certain types of joins could benefit from having the data stored in these systems.

Two NoSQL databases have been integrated with Pig: HBase and Cassandra.


Apache HBase is a NoSQL database that uses HDFS to store its data. HBase presents its data to users in tables. Within each table, every row has a key. Reads in HBase are done by a key, a range of keys, or a bulk scan. Users can also update or insert individual rows by keys. In addition to a key, rows in HBase have column families, and all rows in a table share the same column families. Within each column family there are columns. There is no constraint that each row have the same columns as any other row in a given column family. Thus an HBase table T might have one column family F, which every row in that table would share, but a row with key x could have columns a, b, c in F, while another row with key y has columns a, b, d in F. Column values also have a version number. HBase keeps a configurable number of versions, so users can access the most recent version or previous versions of a column value. All keys and column values in HBase are arrays of bytes.

Pig provides HBaseStorage to read data from and write data to HBase tables. All these reads and writes are bulk operations. Bulk reads from HBase are slower than scans in HDFS. However, if the data is already in HBase, it is faster to read it directly than it is to extract it, place it in HDFS, and then read it.

When loading from HBase, you must tell Pig what table to read from and what column families and columns to read. You can read individual columns or, beginning in version 0.9, whole column families. Because column families contain a variable set of columns and their values, they must be cast to Pig’s map type. As an example, let’s say we have an HBase table users that stores information on users and their links to other users. It has two column families: user_info and links. The key for the table is the user ID. The user_info column family has columns such as name, email, etc. The links column family has a column for each user that the user is linked to. The column name is the linked user’s ID, and the value of these columns is the type of the link—friend, relation, colleague, etc.:

user_links = load 'hbase://users'
                 using org.apache.pig.backend.hadoop.hbase.HBaseStorage(
                 'user_info:name, links:*', '-loadKey true -gt 10000')
                 as (id, name:chararray, links:map[]);

The load location string is the HBase table name. The appropriate HBase client configuration must be present on your machine to allow the HBase client to determine how to connect to the HBase server. Two arguments are passed as constructor arguments to HBaseStorage. The first tells it which column families and columns to read, and the second passes a set of options.

In HBase, columns are referenced as column_family:column. In the preceding example, user_info:name indicates the column name in the column family user_info. When you want to extract a whole column family, you give the column family and an asterisk, for example, links:*. You can also get a subset of the columns in a column family. For example, links:100* would result in a map having all columns that start with 100. The map that contains a column family has the HBase column names as keys and the column values as values.

The options string allows you to configure HBaseStorage. This can be used to control whether the key is loaded, which rows are loaded, and other features. All of these options are placed in one string, separated by spaces. Table 12-1 describes each of these options.

Table 12-1. HBaseStorage options

OptionValid valuesDefaultDescription
loadKeyBooleanfalseIf true, the key will be loaded as the first column in the input.
gtRow keyNoneOnly loads rows with a key greater than the provided value.
gteRow keyNoneOnly loads rows with a key greater than or equal to the provided value.
ltRow keyNoneOnly loads rows with a key less than the provided value.
lteRow keyNoneOnly loads rows with a key less than or equal to the provided value.
cachingInteger100The number of rows the scanners should cache.
limitIntegerNoneRead at most this many rows from each HBase region.
casterJava classnameUtf8StorageConverterThe Java class to use to do casting between Pig types and the bytes that HBase stores. This class must implement Pig’s LoadCaster and StoreCaster interfaces. The default Utf8StorageConverter can be used when the data stored in HBase is in UTF8 format and the numbers are stored as strings (rather than in binary). HBaseBinaryConverter uses Java’s Byte.toInt, Byte.toString, etc., methods. It is not possible to cast to maps using this converter, so you cannot read entire column families.

As of the time of this writing, Pig is able to read only the latest version of a column value. There have been discussions about what the best interface and data type mapping would be to enable Pig to read multiple versions. This feature will most likely be added at some point in the future.

HBaseStorage stores data into HBase as well. When storing data, you specify the table name as the location string, just as in load. The constructor arguments are also similar to the load case. The first describes the mapping of Pig fields to the HBase table, which uses the same column_family:column syntax as in load. Any Pig value can be mapped to a column. A Pig map can be mapped to a column family by saying column_family:* (again, only in 0.9 and later). The row key is not referenced in this argument, but it is assumed to be the first field in the Pig tuple. The only valid option in the optional second argument in the store case is -caster.

Assume at the end of processing that our Pig data has a schema of id: long, name:chararray, email:chararray, links:map. Storing into our example HBase table we used earlier looks like this:

// Schema of user_links is (id, name, email, links).
// Notice how the id (key) field is omitted in the argument.
store user_links into 'hbase://users'
    using org.apache.pig.backend.hadoop.hbase.HBaseStorage(
    'user_info:name, user_info:email, links:*');


Apache Cassandra is another scalable database used for high-volume random reading and writing of data. It differs from HBase in its approach to distribution. Whereas HBase guarantees consistency between its servers, Cassandra has an eventual consistency model, meaning that servers might have different values for the same data for some period of time. For more information about Cassandra, see Cassandra: The Definitive Guide, by Eben Hewitt (O’Reilly).

Cassandra comes with support for Pig, which means that you can load data from and store data to Cassandra column families. This works just as it does with any other storage mechanism that is used with Pig, such as HDFS. This includes data locality for input splits.

Pig and Cassandra can be used together in a number of ways. Pig can be used to do traditional analytics while Cassandra performs real-time operations. Because Pig and MapReduce can be run on top of Cassandra, this can be done without moving data between Cassandra and HDFS. HDFS is still required for storing intermediate results; however, Pig can be used to do data exploration, research, testing, validation, and correction over Cassandra data as well. It can be used to populate the data store with new data as new tables or column families are added.

The Pygmalion project was written to ease development when using Pig with data stored in Cassandra. It includes helpful UDFs to extract column values from the results, marshal the data back to a form that Cassandra accepts, and others.

In order to properly integrate Pig workloads with data stored in Cassandra, the Cassandra cluster needs to colocate the data with Hadoop task trackers. This allows the Hadoop job tracker to move the data processing to the nodes where the data resides. Traditionally, Cassandra is used for heavy writes and real-time, random-access queries. Heavy Hadoop analytic workloads can be performed on Cassandra without degrading the performance of real-time queries by splitting the cluster by workload type. A set of nodes is dedicated to handling analytic batch processing and another set is dedicated to handling real-time queries. Cassandra’s cross-datacenter replication copies data transparently between these sections of the cluster so that manual copying of data is never required, and the analytic section always has updated data.

[31] Atomicity, Consistency, Isolation, and Durability. See http://en.wikipedia.org/wiki/ACID for a discussion of these properties in relational databases.

With Safari, you learn the way you learn best. Get unlimited access to videos, live online training, learning paths, books, interactive tutorials, and more.

Start Free Trial

No credit card required