Chapter 1. Design Patterns and MapReduce

MapReduce is a computing paradigm for processing data that resides on hundreds of computers, which has been popularized recently by Google, Hadoop, and many others. The paradigm is extraordinarily powerful, but it does not provide a general solution to what many are calling “big data,” so while it works particularly well on some problems, some are more challenging. This book will teach you what problems are amenable to the MapReduce paradigm, as well as how to use it effectively.

At first glance, many people do not realize that MapReduce is more of a framework than a tool. You have to fit your solution into the framework of map and reduce, which in some situations might be challenging. MapReduce is not a feature, but rather a constraint.

This makes problem solving easier and harder. It provides clear boundaries for what you can and cannot do, making the number of options you have to consider fewer than you may be used to. At the same time, figuring out how to solve a problem with constraints requires cleverness and a change in thinking.

Learning MapReduce is a lot like learning recursion for the first time: it is challenging to find the recursive solution to the problem, but when it comes to you, it is clear, concise, and elegant. In many situations you have to be conscious of system resources being used by the MapReduce job, especially inter-cluster network utilization. The tradeoff of being confined to the MapReduce framework is the ability to process your data with distributed computing, without having to deal with concurrency, robustness, scale, and other common challenges. But with a unique system and a unique way of problem solving, come unique design patterns.

What is a MapReduce design pattern? It is a template for solving a common and general data manipulation problem with MapReduce. A pattern is not specific to a domain such as text processing or graph analysis, but it is a general approach to solving a problem. Using design patterns is all about using tried and true design principles to build better software.

Designing good software is challenging for a number of reasons, and similar challenges face those who want to achieve good design in MapReduce. Just as good programmers can produce bad software due to poor design, good programmers can produce bad MapReduce algorithms. With MapReduce we’re not only battling with clean and maintainable code, but also with the performance of a job that will be distributed across hundreds of nodes to compute over terabytes and even petabytes of data. In addition, this job is potentially competing with hundreds of others on a shared cluster of machines. This makes choosing the right design to solve your problem with MapReduce extremely important and can yield performance gains of several orders of magnitude. Before we dive into some design patterns in the chapters following this one, we’ll talk a bit about how and why design patterns and MapReduce together make sense, and a bit of a history lesson of how we got here.

Design Patterns

Design patterns have been making developers’ lives easier for years. They are tools for solving problems in a reusable and general way so that the developer can spend less time figuring out how he’s going to overcome a hurdle and move onto the next one. They are also a way for veteran problem solvers to pass down their knowledge in a concise way to younger generations.

One of the major milestones in the field of design patterns in software engineering is the book Design Patterns: Elements of Reusable Object-Oriented Software, by Gamma et al. (Addison-Wesley Professional, 1995), also known as the “Gang of Four” book. None of the patterns in this very popular book were new and many had been in use for several years. The reason why it was and still is so influential is the authors took the time to document the most important design patterns across the field of object-oriented programming. Since the book was published in 1994, most individuals interested in good design heard about patterns from word of mouth or had to root around conferences, journals, and a barely existent World Wide Web.

Design patterns have stood the test of time and have shown the right level of abstraction: not too specific that there are too many of them to remember and too hard to tailor to a problem, yet not too general that tons of work has to be poured into a pattern to get things working. This level of abstraction also has the major benefit of providing developers with a common language in which to communicate verbally and through code. Simply saying “abstract factory” is easier than explaining what an abstract factory is over and over. Also, when looking at a stranger’s code that implements an abstract factory, you already have a general understanding of what the code is trying to accomplish.

MapReduce design patterns fill this same role in a smaller space of problems and solutions. They provide a general framework for solving your data computation issues, without being specific to the problem domain. Experienced MapReduce developers can pass on knowledge of how to solve a general problem to more novice MapReduce developers. This is extremely important because MapReduce is a new technology with a fast adoption rate and there are new developers joining the community every day. MapReduce design patterns also provide a common language for teams working together on MapReduce problems. Suggesting to someone that they should use a “reduce-side join” instead of a “map-side replicated join” is more concise than explaining the low-level mechanics of each.

The MapReduce world is in a state similar to the object-oriented world before 1994. Patterns today are scattered across blogs, websites such as StackOverflow, deep inside other books, and inside very advanced technology teams at organizations across the world. The intent of this book is not to provide some groundbreaking new ways to solve problems with MapReduce that nobody has seen before, but instead to collect patterns that have been developed by veterans in the field so that they can be shared with everyone else.

Caution

Even provided with some design patterns, genuine experience with the MapReduce paradigm is still necessary to understand when to apply them. When you are trying to solve a new problem with a pattern you saw in this book or elsewhere, be very careful that the pattern fits the problem by paying close attention to its “Applicability” section.

For the most part, the MapReduce design patterns in this book are intended to be platform independent. MapReduce, being a paradigm published by Google without any actual source code, has been reimplemented a number of times, both as a standalone system (e.g., Hadoop, Disco, Amazon Elastic MapReduce) and as a query language within a larger system (e.g., MongoDB, Greenplum DB, Aster Data). Even if design patterns are intended to be general, we write this book with a Hadoop perspective. Many of these patterns can be applied in other systems, such as MongoDB, because they conform to the same conceptual architecture. However, some technical details may be different from implementation to implementation. The Gang of Four’s book on design patterns was written with a C++ perspective, but developers have found the concepts conveyed in the book useful in modern languages such as Ruby and Python. The patterns in this book should be usable with systems other than Hadoop. You’ll just have to use the code examples as a guide to developing your own code.

MapReduce History

How did we get to the point where a MapReduce design patterns book is a good idea? At a certain point, the community’s momentum and widespread use of the paradigm reaches a critical mass where it is possible to write a comprehensive list of design patterns to be shared with developers everywhere. Several years ago, when Hadoop was still in its infancy, not enough had been done with the system to figure out what it is capable of. But the speed at which MapReduce has been adopted is remarkable. It went from an interesting paper from Google in 2004 to a widely adopted industry standard in distributed data processing in 2012.

The actual origins of MapReduce are arguable, but the paper that most cite as the one that started us down this journey is “MapReduce: Simplified Data Processing on Large Clusters” by Jeffrey Dean and Sanjay Ghemawat in 2004. This paper described how Google split, processed, and aggregated their data set of mind-boggling size.

Shortly after the release of the paper, a free and open source software pioneer by the name of Doug Cutting started working on a MapReduce implementation to solve scalability in another project he was working on called Nutch, an effort to build an open source search engine. Over time and with some investment by Yahoo!, Hadoop split out as its own project and eventually became a top-level Apache Foundation project. Today, numerous independent people and organizations contribute to Hadoop. Every new release adds functionality and boosts performance.

Several other open source projects have been built with Hadoop at their core, and this list is continually growing. Some of the more popular ones include Pig, Hive, HBase, Mahout, and ZooKeeper. Doug Cutting and other Hadoop experts have mentioned several times that Hadoop is becoming the kernel of a distributed operating system in which distributed applications can be built. In this book, we’ll be explaining the examples with the least common denominator in the Hadoop ecosystem, Java MapReduce. In the resemblance sections of each pattern in some chapters, we’ll typically outline a parallel for Pig and SQL that could be used in Hive.

MapReduce and Hadoop Refresher

The point of this section is to provide a quick refresher on MapReduce in the Hadoop context, since the code examples in this book are written in Hadoop. Some beginners might want to refer to a more in-depth resource such as Tom White’s excellent Hadoop: The Definitive Guide or the Apache Hadoop website. These resources will help you get started in setting up a development or fully productionalized environment that will allow you to follow along the code examples in this book.

Hadoop MapReduce jobs are divided into a set of map tasks and reduce tasks that run in a distributed fashion on a cluster of computers. Each task works on the small subset of the data it has been assigned so that the load is spread across the cluster. The map tasks generally load, parse, transform, and filter data. Each reduce task is responsible for handling a subset of the map task output. Intermediate data is then copied from mapper tasks by the reducer tasks in order to group and aggregate the data. It is incredible what a wide range of problems can be solved with such a straightforward paradigm, from simple numerical aggregations to complex join operations and Cartesian products.

The input to a MapReduce job is a set of files in the data store that are spread out over the Hadoop Distributed File System (HDFS). In Hadoop, these files are split with an input format, which defines how to separate a file into input splits. An input split is a byte-oriented view of a chunk of the file to be loaded by a map task.

Each map task in Hadoop is broken into the following phases: record reader, mapper, combiner, and partitioner. The output of the map tasks, called the intermediate keys and values, are sent to the reducers. The reduce tasks are broken into the following phases: shuffle, sort, reducer, and output format. The nodes in which the map tasks run are optimally on the nodes in which the data rests. This way, the data typically does not have to move over the network and can be computed on the local machine.

record reader

The record reader translates an input split generated by input format into records. The purpose of the record reader is to parse the data into records, but not parse the record itself. It passes the data to the mapper in the form of a key/value pair. Usually the key in this context is positional information and the value is the chunk of data that composes a record. Customized record readers are outside the scope of this book. We generally assume you have an appropriate record reader for your data.

map

In the mapper, user-provided code is executed on each key/value pair from the record reader to produce zero or more new key/value pairs, called the intermediate pairs. The decision of what is the key and value here is not arbitrary and is very important to what the MapReduce job is accomplishing. The key is what the data will be grouped on and the value is the information pertinent to the analysis in the reducer. Plenty of detail will be provided in the design patterns in this book to explain what and why the particular key/value is chosen. One major differentiator between MapReduce design patterns is the semantics of this pair.

combiner

The combiner, an optional localized reducer, can group data in the map phase. It takes the intermediate keys from the mapper and applies a user-provided method to aggregate values in the small scope of that one mapper. For example, because the count of an aggregation is the sum of the counts of each part, you can produce an intermediate count and then sum those intermediate counts for the final result. In many situations, this significantly reduces the amount of data that has to move over the network. Sending (hello world, 3) requires fewer bytes than sending (hello world, 1) three times over the network. Combiners will be covered in more depth with the patterns that use them extensively. Many new Hadoop developers ignore combiners, but they often provide extreme performance gains with no downside. We will point out which patterns benefit from using a combiner, and which ones cannot use a combiner. A combiner is not guaranteed to execute, so it cannot be a part of the overall algorithm.

partitioner

The partitioner takes the intermediate key/value pairs from the mapper (or combiner if it is being used) and splits them up into shards, one shard per reducer. By default, the partitioner interrogates the object for its hash code, which is typically an md5sum. Then, the partitioner performs a modulus operation by the number of reducers: key.hashCode() % (number of reducers). This randomly distributes the keyspace evenly over the reducers, but still ensures that keys with the same value in different mappers end up at the same reducer. The default behavior of the partitioner can be customized, and will be in some more advanced patterns, such as sorting. However, changing the partitioner is rarely necessary. The partitioned data is written to the local file system for each map task and waits to be pulled by its respective reducer.

shuffle and sort

The reduce task starts with the shuffle and sort step. This step takes the output files written by all of the partitioners and downloads them to the local machine in which the reducer is running. These individual data pieces are then sorted by key into one larger data list. The purpose of this sort is to group equivalent keys together so that their values can be iterated over easily in the reduce task. This phase is not customizable and the framework handles everything automatically. The only control a developer has is how the keys are sorted and grouped by specifying a custom Comparator object.

reduce

The reducer takes the grouped data as input and runs a reduce function once per key grouping. The function is passed the key and an iterator over all of the values associated with that key. A wide range of processing can happen in this function, as we’ll see in many of our patterns. The data can be aggregated, filtered, and combined in a number of ways. Once the reduce function is done, it sends zero or more key/value pair to the final step, the output format. Like the map function, the reduce function will change from job to job since it is a core piece of logic in the solution.

output format

The output format translates the final key/value pair from the reduce function and writes it out to a file by a record writer. By default, it will separate the key and value with a tab and separate records with a newline character. This can typically be customized to provide richer output formats, but in the end, the data is written out to HDFS, regardless of format. Like the record reader, customizing your own output format is outside of the scope of this book, since it simply deals with I/O.

Hadoop Example: Word Count

Now that you’re refreshed on the steps of the whole MapReduce process, let’s dive into a quick and simple example. The “Word Count” program is the canonical example in MapReduce, and for good reason. It is a straightforward application of MapReduce and MapReduce can handle it extremely efficiently. Many people complain about the “Word Count” program being overused as an example, but hopefully the rest of the book makes up for that!

In this particular example, we’re going to be doing a word count over user-submitted comments on StackOverflow. The content of the Text field will be pulled out and preprocessed a bit, and then we’ll count up how many times we see each word. An example record from this data set is:

<row Id="8189677" PostId="6881722" Text="Have you looked at Hadoop?" CreationDate="2011-07-30T07:29:33.343" UserId="831878" />

This record is the 8,189,677th comment on Stack Overflow, and is associated with post number 6,881,722, and is by user number 831,878. The number of the PostId and the UserId are foreign keys to other portions of the data set. We’ll show how to join these datasets together in the chapter on join patterns.

The first chunk of code we’ll look at is the driver. The driver takes all of the components that we’ve built for our MapReduce job and pieces them together to be submitted to execution. This code is usually pretty generic and considered “boiler plate.” You’ll find that in all of our patterns the driver stays the same for the most part.

This code is derived from the “Word Count” example that ships with Hadoop Core:

import java.io.IOException;
import java.util.StringTokenizer;
import java.util.Map;
import java.util.HashMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import org.apache.commons.lang.StringEscapeUtils;

public class CommentWordCount {


  public static class WordCountMapper
       extends Mapper<Object, Text, Text, IntWritable> {
           ...
   }

  public static class IntSumReducer
       extends Reducer<Text, IntWritable, Text, IntWritable> {
           ...
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs =
        new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: CommentWordCount <in> <out>");
      System.exit(2);
    }

    Job job = new Job(conf, "StackOverflow Comment Word Count");
    job.setJarByClass(CommentWordCount.class);
    job.setMapperClass(WordCountMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

The purpose of the driver is to orchestrate the jobs. The first few lines of main are all about parsing command line arguments. Then we start setting up the job object by telling it what classes to use for computations and what input paths and output paths to use. That’s about it! It’s just important to make sure the class names match up with the classes you wrote and that the output key and value types match up with the output types of the mapper.

One way you’ll see this code change from pattern to pattern is the usage of job.setCombinerClass. In some cases, the combiner simply cannot be used due to the nature of the reducer. In other cases, the combiner class will be different from the reducer class. The combiner is very effective in the “Word Count” program and is quite simple to activate.

Next is the mapper code that parses and prepares the text. Once some of the punctuation and random text is cleaned up, the text string is split up into a list of words. Then the intermediate key produced is the word and the value produced is simply “1.” This means we’ve seen this word once. Even if we see the same word twice in one line, we’ll output the word and “1” twice and it’ll be taken care of in the end. Eventually, all of these ones will be summed together into the global count of that word.

public static class WordCountMapper
      extends Mapper<Object, Text, Text, IntWritable> {

  private final static IntWritable one = new IntWritable(1);
  private Text word = new Text();

  public void map(Object key, Text value, Context context)
        throws IOException, InterruptedException {

    // Parse the input string into a nice map
    Map<String, String> parsed = MRDPUtils.transformXmlToMap(value.toString());

    // Grab the "Text" field, since that is what we are counting over
    String txt = parsed.get("Text");

    // .get will return null if the key is not there
    if (txt == null) {
      // skip this record
        return;
    }

    // Unescape the HTML because the data is escaped.
    txt = StringEscapeUtils.unescapeHtml(txt.toLowerCase());

    // Remove some annoying punctuation
    txt = txt.replaceAll("'", ""); // remove single quotes (e.g., can't)
    txt = txt.replaceAll("[^a-zA-Z]", " "); // replace the rest with a space

    // Tokenize the string by splitting it up on whitespace into
    //  something we can iterate over,
    //  then send the tokens away
    StringTokenizer itr = new StringTokenizer(txt);
    while (itr.hasMoreTokens()) {
      word.set(itr.nextToken());
      context.write(word, one);
    }
  }
}

The first function, MRDPUtils.transformXmlToMap, is a helper function to parse a line of Stack Overflow data in a generic manner. You’ll see it used in a number of our examples. It basically takes a line of the StackOverflow XML (which has a very predictable format) and matches up the XML attributes with the values into a Map.

Next, turn your attention to the WordCountMapper class. This code is a bit more complicated than the driver (for good reason!). The mapper is where we’ll see most of the work done. The first major thing to notice is the type of the parent class:

Mapper<Object, Text, Text, IntWritable>

They map to the types of the input key, input value, output key, and output value, respectively. We don’t care about the key of the input in this case, so that’s why we use Object. The data coming in is Text (Hadoop’s special String type) because we are reading the data as a line-by-line text document. Our output key and value are Text and IntWritable because we will be using the word as the key and the count as the value.

Caution

The mapper input key and value data types are dictated by the job’s configured FileInputFormat. The default implementation is the TextInputFormat, which provides the number of bytes read so far in the file as the key in a LongWritable object and the line of text as the value in a Text object. These key/value data types are likely to change if you are using different input formats.

Up until we start using the StringTokenizer towards the bottom of the code, we’re just cleaning up the string. We unescape the data because the string was stored in an escaped manner so that it wouldn’t mess up XML parsing. Next, we remove any stray punctuation so that the literal string Hadoop! is considered the same word as Hadoop? and Hadoop. Finally, for each token (i.e., word) we emit the word with the number 1, which means we saw the word once. The framework then takes over to shuffle and sorts the key/value pairs to reduce tasks.

Finally comes the reducer code, which is relatively simple. The reduce function gets called once per key grouping, in this case each word. We’ll iterate through the values, which will be numbers, and take a running sum. The final value of this running sum will be the sum of the ones.

public static class IntSumReducer
      extends Reducer<Text, IntWritable, Text, IntWritable> {
  private IntWritable result = new IntWritable();

  public void reduce(Text key, Iterable<IntWritable> values,
        Context context) throws IOException, InterruptedException {
    int sum = 0;
    for (IntWritable val : values) {
      sum += val.get();
    }

    result.set(sum);
    context.write(key, result);
  }
}

As in the mapper, we specify the input and output types via the template parent class. Also like the mapper, the types correspond to the same things: input key, input value, output key, and output value. The input key and input value data types must match the output key/value types from the mapper. The output key and output value data types must match the types that the job’s configured FileOutputFormat is expecting. In this case, we are using the default TextOutputFormat, which can take any two Writable objects as output.

The reduce function has a different signature from map, though: it gives you an Iterator over values instead of just a single value. This is because you are now iterating over all values that have that key, instead of just one at a time. The key is very important in the reducer of pretty much every MapReduce job, unlike the input key in the map.

Anything we pass to context.write will get written out to a file. Each reducer will create one file, so if you want to coalesce them together you’ll have to write a post-processing step to concatenate them.

Now that we’ve gotten a straightforward example out of the way, let’s dive into some design patterns!

Pig and Hive

There is less need for MapReduce design patterns in a ecosystem with Hive and Pig. However, we would like to take this opportunity early in the book to explain why MapReduce design patterns are still important.

Pig and Hive are higher-level abstractions of MapReduce. They provide an interface that has nothing to do with “map” or “reduce,” but the systems interpret the higher-level language into a series of MapReduce jobs. Much like how a query planner in an RDBMS translates SQL into actual operations on data, Hive and Pig translate their respective languages into MapReduce operations.

As will be seen throughout this book in the resemblances sections, Pig and SQL (or HiveQL) can be significantly more terse than the raw Hadoop implementations in Java. For example, it will take several pages to explain total order sorting, while Pig is able to get the job done in a few lines.

So why should we use Java MapReduce in Hadoop at all when we have options like Pig and Hive? What was the point in the authors of this book spending time explaining how to implement something in hundreds of lines of code when the same can be accomplished in a couple lines? There are two core reasons.

First, there is conceptual value in understanding the lower-level workings of a system like MapReduce. The developer that understands how Pig actually performs a reduce-side join will make smarter decisions. Using Pig or Hive without understanding MapReduce can lead to some dangerous situations. Just because you’re benefiting from a higher-level interface doesn’t mean you can ignore the details. Large MapReduce clusters are heavy machinery and need to be respected as such.

Second, Pig and Hive aren’t there yet in terms of full functionality and maturity (as of 2012). It is obvious that they haven’t reached their full potential yet. Right now, they simply can’t tackle all of the problems in the ways that Java MapReduce can. This will surely change over time and with every major release, major features, and bux fixes are added. Speaking hypothetically, say that at Pig version 0.6, your organization could write 50% of their analytics in Pig. At version 0.9, now you are at 90%. With every release, more and more can be done at a higher-level of abstraction. The funny thing about trends things like this in software engineering is that the last 10% of problems that can’t be solved with a higher-level of abstraction are also likely to be the most critical and most challenging. This is when something like Java is going to be the best tool for the job. Some still use assembly language when they really have to!

When you can, write your MapReduce in Pig or Hive. Some of the major benefits of using these higher-level of abstractions include readability, maintainability, development time, and automatic optimization. Rarely is the often-cited performance hit due to indirection a serious consideration. These analytics are running in batch and are taking several minutes already, so what does a minute or two more really matter? In some cases, the query plan optimizer in Pig or Hive will be better at optimizing your code than you are! In a small fraction of situations, the extra few minutes added by Pig or Hive will matter, in which case you should use Java MapReduce.

Pig and Hive are likely to influence MapReduce design patterns more than anything else. New feature requests in Pig and Hive will likely translate down into something that could be a design pattern in MapReduce. Likewise, as more design patterns are developed for MapReduce, some of the more popular ones will become first-class operations at a higher level of abstraction.

Pig and Hive have patterns of their own and experts will start documenting more as they solve more problems. Hive has the benefit of building off of decades of SQL patterns, but not all patterns in SQL are smart in Hive and vice versa. Perhaps as these platforms gain more popularity, cookbook and design pattern books will be written for them.

Get MapReduce Design Patterns now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.