Posted on by & filed under Content - Highlights and Reviews, Programming & Development.

codeA guest post by Kasper Grud Skat Madsen, who currently works as a Ph.D. student at the University of Southern Denmark. His research interests are data stream management and cloud computing. He is a contributor to storm and storm-deploy. Furthermore, he is involved in a project trying to extend Storm, called Enorm.

This is my third post in a series that covers Storm. The first post showed how to develop a simple topology. The second post showed how to deploy the topology on Amazon EC2. In this third post on Storm, we investigate how to do multi-threaded computation from within a task.

Storm requires that the emit method on the OutputCollector (used for emitting tuples from one task to another) must always be called from the same thread, otherwise NullPointerExceptions will occur from within Storm itself. In this post I present two ways to do multi-threaded computation while complying with the limitation.

Solution 1 – Tick Tuples

Storm can be configured to send special tuples to all bolts with a given frequency. These tuples are called tick tuples. The way it works is that Storm starts a hidden spout, which emits tick tuples to all bolts, with the desired frequency. Tick tuples can be used to handle the discussed limitation of Storm, by grabbing output from a set of threads, and emitting on each received tick tuple (in this way, only one thread is used). The main disadvantage is the increased latency. To begin using tick tuples, add the following to the Config:

The tasks can distinguish tick tuples, with the following logic:

By using tick tuples, it is therefore possible to use multiple threads in a task, while emitting from only one. Beware though, there is no guarantee that tick tuples will be received with the exact timing requested.

Solution 2 – A dedicated emitter thread

Define a class that reads from a thread-safe queue and emits. All worker threads can put tuples in the thread-safe queue, and there cannot be any problems, as only one thread will actually emit.

NOTE: The following code snippet, serves as an example meant to convey an idea. Often code snippets are written in a simplified style to support understanding. Please do not use the code directly in production.

The method emitDirect, takes three arguments: target task, target stream and data. Simply execute the ThreadSafeOutputCollector in its own thread, and call the emitDirect to emit data. The emitDirect assumes data is sent using the directGrouping. It is your task to extend the above class, to support ordinary emit.


Both of the presented solutions can be employed to handle the discussed limitation of Storm. Which one is better, depends largely upon your exact problem.

See below for Storm and Hadoop resources from Safari Books Online.

Not a subscriber? Sign up for a free trial.

Safari Books Online has the content you need

Getting Started with Storm introduces you to Storm, a distributed, JVM-based system for processing streaming data. Through simple tutorials, sample Java code, and a complete real-world scenario, you’ll learn how to build fast, fault-tolerant solutions that process results as soon as the data arrives.
Storm Real-time Processing Cookbook begins with setting up the development environment and then teaches log stream processing. This is followed by real-time payments workflow, distributed RPC, integrating it with other software such as Hadoop and Apache Camel, and more.
Hadoop: The Definitive Guide, 3rd Edition you’ll learn how to build and maintain reliable, scalable, distributed systems with Apache Hadoop. You’ll also find illuminating case studies that demonstrate how Hadoop is used to solve specific problems. This book is ideal for programmers looking to analyze datasets of any size, and for administrators who want to set up and run Hadoop clusters.

Tags: emitter thread, Hadoop, multi-threading, Storm, Tick Tuples,

3 Responses to “Multi-threading with Storm”

  1. J

    1/ “_runThread” needs to be volatile
    2/ class BufferedData needs to be static final.
    3/ calling Thread.interrupt is not good code. I can hear 1999 calling for their own old style of MT coding.

    • Kasper Madsen

      Hi there,
      Thx for the comments.

      1. You are right, though in practice very rarely a problem :)

      2. The class BufferedData, can be declared static final, but it need not be. When doing this post, I judged it would be better to use an inner class for two reasons: (1) the inner class can access global variables making the code easier to extend, (2) and the example less cluttered.

      3. I don’t agree on this part. The _queue.take() is blocking, so the execution might wait for new data here. In case we need to kill the thread, we call interrupt to inform the _queue.take() that it should stop waiting. If we didn’t do this, then killing the thread could potentially take a very long time. It is possible to use a non-blocking function on the queue to avoid the interrupt, but I don’t see any problems with the interrupt. It is a core part of Java, and used many places.

      • J

        1. it is a problem. Without the volatile keyword, there is a race condition, the worker thread might not see the changed value and block again indefinitely.

        2. The inner class needs to be static final in this example (where no members of the enclosing class are accessed), when changing the code later, one can always remove the “static” qualifier. This is simple and good engineering discipline.

        3. using Thread.interrupt() is always bad form, you have no guarantees about the state you leave the resources used by the worker thread in (in this simple example there’s none of that, but this doesn’t scale to more complex systems). Better use a poison pill to cleanly shut the worker down. Orderly shutdown is the only way to robustness :)