One tenet of Pig’s philosophy is that Pig allows users to integrate their own code with Pig wherever possible (see Pig Philosophy). The most obvious way Pig does that is through its UDFs. But it also allows you to directly integrate other executables and MapReduce jobs.
To specify an executable that you want to insert into your
data flow, use stream
. You may want to do this when you
have a legacy program that you do not want to modify or are unable to
change. You can also use stream
when you have a program you
use frequently, or one you have tested on small data sets and now want
to apply to a large data set. Let’s look at an example where you have a
Perl program highdiv.pl that
filters out all stocks with a dividend below $1.00:
-- streamsimple.pig divs = load 'NYSE_dividends' as (exchange, symbol, date, dividends); highdivs = stream divs through `highdiv.pl` as (exchange, symbol, date, dividends);
Notice the as
clause in the stream
command.
This is not required. But Pig has no idea what the executable will
return, so if you do not provide the as
clause, the
relation highdivs
will have no
schema.
The executable highdiv.pl is invoked once on every map or reduce task. It is not invoked once per record. Pig instantiates the executable and keeps feeding data to it via stdin. It also keeps checking stdout, passing any results to the next operator in your data flow. The executable can choose whether to produce an output for every input, only every so many inputs, or only after all inputs have been received.
The preceding example assumes that you already
have highdiv.pl installed on your
grid, and that it is runnable from the working directory on the task
machines. If that is not the case, which it usually will not be, you can
ship the executable to the grid. To do this, use a define
statement:
--streamship.pig define hd `highdiv.pl` ship('highdiv.pl'); divs = load 'NYSE_dividends' as (exchange, symbol, date, dividends); highdivs = stream divs through hd as (exchange, symbol, date, dividends);
This define
does two things. First,
it defines the executable that will be used. Now in stream
we refer to highdiv.pl by the alias
we gave it, hp
, rather than referring
to it directly. Second, it tells Pig to pick up the file ./highdiv.pl and ship it to Hadoop as part of
this job. This file will be picked up from the specified location on the
machine where you launch the job. It will be placed in the working
directory of the task on the task machines. So, the command you pass to
stream
must refer to it relative to the current working
directory, not via an absolute path. If your executable depends on other
modules or files, they can be specified as part of the ship
clause as well. For example, if
highdiv.pl depends on a Perl module
called Financial.pm, you can send
them both to the task machines:
define hd `highdiv.pl` ship('highdiv.pl', 'Financial.pm'); divs = load 'NYSE_dividends' as (exchange, symbol, date, dividends); highdivs = stream divs through hd as (exchange, symbol, date, dividends);
Many scripting languages assume certain paths
for modules based on their hierarchy. For example, Perl expects to find
a module Acme::Financial
in Acme/Financial.pm. However, the
ship
clause always puts files in your current working
directory, and it does not take directories, so you could not ship
Acme. The workaround for this is to create a TAR
file and ship that, and then have a step in your executable that
unbundles the TAR file. You then need to set your module include path
(for Perl, -I
or the PERLLIB
environment
variables) to contain .
(dot).
ship
moves files into the grid from
the machine where you are launching your job. But sometimes the file you
want is already in the grid. If you have a grid file that will be
accessed by every map or reduce task in your job, the proper way to
access it is via the distributed cache. The distributed cache is a mechanism Hadoop provides to
share files. It reduces the load on HDFS by preloading the file to the
local disk on the machine that will be executing the task. You can use
the distributed cache for your executable by using the cache
clause in
define
:
crawl = load 'webcrawl' as (url, pageid); normalized = foreach crawl generate normalize(url); define blc `blacklistchecker.py` cache('/data/shared/badurls#badurls'); goodurls = stream normalized through blc as (url, pageid);
The string before the #
is
the path on HDFS, in this case, /data/shared/badurls. The string after the
#
is the name of the file as viewed by the
executable. So, Hadoop will put a copy of /data/shared/badurls into the task’s working
directory and call it badurls.
So far we have assumed that your executable
takes data on stdin and writes it
to stdout. This might not work,
depending on your executable. If your executable needs a file to read
from, write to, or both, you can specify that with the input
and output
clauses in the define
command. Continuing with our previous example, let’s say that blacklistchecker.py expects to read its input
from a file specified by -i
on its
command line and write to a file specified by -o
:
crawl = load 'webcrawl' as (url, pageid); normalized = foreach crawl generate normalize(url); define blc `blacklistchecker.py -i urls -o good` input('urls') output('good'); goodurls = stream normalized through blc as (url, pageid);
Again, file locations are specified from the working directory on the task machines. In this example, Pig will write out all the input for a given task for blacklistchecker.py to urls, then invoke the executable, and then read good to get the results. Again, the executable will be invoked only once per map or reduce task, so Pig will first write out all the input to the file.
Beginning in Pig 0.8, you can also include MapReduce jobs
directly in your data flow with the mapreduce
command. This is convenient
if you have processing that is better done in MapReduce than Pig but
must be integrated with the rest of your Pig data flow. It can also make
it easier to incorporate legacy processing written in MapReduce with
newer processing you want to write in Pig Latin.
MapReduce jobs expect to read their input from
and write their output to a storage device (usually HDFS). So to
integrate them with your data flow, Pig first has to store the data,
then invoke the MapReduce job, and then read the data back. This is done
via store
and load
clauses in the mapreduce
statement that invoke regular load and store functions. You also provide
Pig with the name of the JAR that contains the code for your MapReduce
job.
As an example, let’s continue with the blacklisting of URLs that we considered in the previous section. Only now let’s assume that this is done by a MapReduce job instead of a Python script:
crawl = load 'webcrawl' as (url, pageid); normalized = foreach crawl generate normalize(url); goodurls = mapreduce 'blacklistchecker.jar' store normalized into 'input' load 'output' as (url, pageid);
mapreduce
takes as its first
argument the JAR containing the code to run a MapReduce job. It uses
load
and store
phrases to specify how data
will be moved from Pig’s data pipeline to the MapReduce job. Notice that
the input alias is contained in the store
clause. As with
stream
, the output of mapreduce
is opaque to
Pig, so if we want the resulting relation goodurls
to have a schema, we have to tell Pig
what it is. This example also assumes that the Java code in blacklistchecker.jar knows which input and
output files to look for and has a default class to run specified in its
manifest. Often this will not be the case. Any arguments you wish to pass to the invocation of the Java
command that will run the MapReduce task can be put in backquotes after
the load
clause:
crawl = load 'webcrawl' as (url, pageid); normalized = foreach crawl generate normalize(url); goodurls = mapreduce 'blacklistchecker.jar' store normalized into 'input' load 'output' as (url, pageid) `com.acmeweb.security.BlackListChecker -i input -o output`;
The string in the backquotes will be passed directly to your MapReduce job as is. So if you wanted to pass Java options, etc., you can do that as well.
The load
and store
clauses of the mapreduce
command have the same syntax as
the load
and store
statements, so you can use
different load and store functions, pass constructor arguments, and so
on. See Load and Store for
full details.
No credit card required