O'Reilly logo

Intel Threading Building Blocks by James Reinders

Stay ahead with the world's most comprehensive technology and business learning platform.

With Safari, you learn the way you learn best. Get unlimited access to videos, live online training, learning paths, books, tutorials, and more.

Start Free Trial

No credit card required

Chapter 4. Advanced Algorithms

Algorithm templates are the keys to using Intel Threading Building Blocks. This chapter presents some relatively complex algorithms that build on the foundation laid in Chapter 3, so you should understand Chapter 3 before jumping into this chapter. This chapter covers Threading Building Blocks’ support for the following types of generic parallel algorithms.

Parallel algorithms for streams:

parallel_while

Use for an unstructured stream or pile of work. Offers the ability to add additional work to the pile while running.

pipeline

Use when you have a linear sequence of stages. Specify the maximum number of items that can be in transit. Each stage can be serial or parallel. This uses the cache efficiently because each worker thread takes an item through as many stages as possible, and the algorithm is biased toward finishing old items before tackling new ones.

Parallel sort:

parallel_sort

A comparison sort with an average time complexity not to exceed O(n log n) on a single processor and approaching O(N) as more processors are used. When worker threads are available, parallel_sort creates subtasks that may be executed concurrently.

Parallel Algorithms for Streams

You can successfully parallelize many applications using only the constructs discussed thus far. However, some situations call for other parallel patterns. This section describes the support for some of these alternative patterns:

parallel_while

Use for an unstructured stream or pile of work. Offers the ability to add additional work to the pile while running.

pipeline

Use when you have a linear pipeline of stages. Specify the maximum number of items that can be in flight. Each stage can be serial or parallel. This uses the cache efficiently because each worker thread handles an item through as many stages as possible, and the algorithm is biased toward finishing old items before tackling new ones.

Cook Until Done: parallel_while

For some loops, the end of the iteration space is not known in advance, or the loop body may add more iterations to do before the loop exits. You can deal with both situations using the template class tbb::parallel_while.

A linked list is an example of an iteration space that is not known in advance. In parallel programming, it is usually better to use dynamic arrays instead of linked lists because accessing items in a linked list is inherently serial. But if you are limited to linked lists, if the items can be safely processed in parallel, and if processing each item takes at least a few thousand instructions, you can use parallel_while in a situation where the serial form is as shown in Example 4-1.

Example 4-1. Original list processing code

void SerialApplyFooToList( Item*root ) {
    for( Item* ptr=root; ptr!=NULL; ptr=ptr->next )
        Foo(pointer->data);
}

If Foo takes at least a few thousand instructions to run, you can get parallel speedup by converting the loop to use parallel_while. Unlike the templates described earlier, parallel_while is a class, not a function, and it requires two user-defined objects. The first object defines the stream of items. The object must have a method, pop_if_ present, such that when bool b =pop_if_present(v) is invoked, it sets v to the next iteration value if there is one and returns true. If there are no more iterations, it returns false. Example 4-2 shows a typical implementation of pop_if_present.

Example 4-2. pop_if_present for a parallel_while

class ItemStream {
    Item* my_ptr;
public:
    bool pop_if_present( Item*& item ) {
        if( my_ptr ) {
            item = my_ptr;
            my_ptr = my_ptr->next;
            return true;
        } else {
            return false;
        }
    };
    ItemStream( Item* root ) : my_ptr(root) {}
}

The second object defines the loop body, and must define an operator()const and an argument_type member type. This is similar to a C++ function object from the C++ standard header, <functional>, except that it must be const (see Example 4-3).

Example 4-3. Use of parallel_while

class ApplyFoo {
public:
    void operator()( Item* item ) const {
        Foo(item->data);
    }
    typedef Item* argument_type;
};

Given the stream and body classes, the new code is as shown in Example 4-4.

Example 4-4. ParallelApplyFooToList

void ParallelApplyFooToList( Item*root ) {parallel_while<ApplyFoo> w;
    ItemStream stream;
    ApplyFoo body;
    w.run( stream, body );
}

The pop_if_present method does not have to be thread-safe for a given stream because parallel_while never calls it concurrently for the same stream. Notice that this convenience makes parallel_while nonscalable because the fetching is serialized. But in many situations, you still get useful speedup over doing things sequentially.

Tip

parallel_while may concurrently invoke pop_if_present on the same object, but only if the object is in different streams.

There is a second way that parallel_while can acquire work, and this is the way it can become scalable. The body of a parallel_while w, if given a reference to w when it is constructed, can add more work by calling w.add(item), where item is of type Body::argument_type.

For example, perhaps processing a node in a tree is a prerequisite to processing its descendants. With parallel_while, after processing a node, you could use parallel_while::add to add the descendant nodes. The instance of parallel_while does not terminate until all items have been processed.

Notes on parallel_while scaling

Use of parallel_while usually does not provide scalable parallelism if the add method is not used because the input stream typically acts as a bottleneck. However, this bottleneck is broken if the stream is used to get things started and further items come from prior items invoking the add method.

Even in the nonscalable case, parallel_while covers a commonly desired idiom of walking a sequential structure (e.g., a linked list) and dispatching concurrent work for each item in the structure.

Working on the Assembly Line: Pipeline

Pipelining is a common parallel pattern that mimics a traditional manufacturing assembly line. Data flows through a series of pipeline stages and each stage processes the data in some way. Given an incoming stream of data, some of these stages can operate in parallel, and others cannot. For example, in video processing, some operations on frames do not depend on other frames and so can be done on multiple frames at the same time. On the other hand, some operations on frames require processing prior frames first.

Pipelined processing is common in multimedia and signal processing applications. A classical thread-per-stage implementation suffers two problems:

  • The speedup is limited to the number of stages.

  • When a thread finishes a stage, it must pass its data to another thread.

Eliminating these problems is desirable. To do so, you specify whether a stage is serial or parallel. A serial stage processes items one at a time, in order. A parallel stage may process items out of order or concurrently. By allowing some stages to be run concurrently, you make available more opportunities for load balancing and concurrency. Given sufficient processor cores and concurrent opportunities, the throughput of the pipeline is limited to the throughput of the slowest serial filter.

The pipeline and filter classes implement the pipeline pattern. Here we’ll look at a simple text-processing problem to demonstrate the usage of pipeline and filter. The problem is to read a text file, capitalize the first letter of each word, and write the modified text to a new file. Figure 4-1 illustrates the pipeline.

Pipeline

Figure 4-1. Pipeline

Assume that the file I/O is sequential. However, the capitalization stage can be done in parallel. That is, if you can serially read n chunks very quickly, you can capitalize the n chunks in parallel, as long as they are written in the proper order to the output file.

To decide whether to capitalize a letter, inspect whether the preceding character is a blank. For the first letter in each chunk, you must inspect the last letter of the preceding chunk. But doing so would introduce a complicated dependency in the middle stage.

The solution is to have each chunk also store the last character of the preceding chunk. The chunks overlap by one character. This overlapping window strategy is quite common to pipeline-processing problems. In the example, the window is represented by an instance of the MyBuffer class. It looks like a typical Standard Template Library (STL) container for characters, except that begin()[-1] is legal and holds the last character of the preceding chunk (see Example 4-5).

Example 4-5. Use of pipeline class

// Buffer that holds block of characters and last character of preceding buffer.
class MyBuffer {
    static const size_t buffer_size = 10000;
    char* my_end;
    // storage[0] holds the last character of the preceding buffer.
    char storage[1+buffer_size];
public:
    // Pointer to first character in the buffer
    char* begin( ) {return storage+1;}
    const char* begin( ) const {return storage+1;}
    // Pointer to one past last character in the buffer
    char* end( ) const {return my_end;}
    // Set end of buffer.
    void set_end( char* new_ptr ) {my_end=new_ptr;}
    // Number of bytes a buffer can hold
    size_t max_size( ) const {return buffer_size;}
    // Number of bytes in buffer.
    size_t size() const {return my_end-begin( );}
};
// Below is the top-level code for building and running the pipeline
    // Create the pipeline
    tbb::pipeline pipeline;

    // Create file-reading stage and add it to the pipeline
    MyInputFilter input_filter( input_file );
    pipeline.add_filter( input_filter );

    // Create capitalization stage and add it to the pipeline
    MyTransformFilter transform_filter;
    pipeline.add_filter( transform_filter );

    // Create file-writing stage and add it to the pipeline
    MyOutputFilter output_filter( output_file );
    pipeline.add_filter( output_filter );

    // Run the pipeline
    pipeline.run( MyInputFilter::n_buffer );

    // Remove filters from pipeline before they are implicitly destroyed.
    pipeline.clear( );

The parameter passed to the pipeline::run method controls the level of parallelism. Conceptually, tokens flow through the pipeline. A serial stage must process each token one at a time, in order. A parallel stage can process multiple tokens in parallel.

If the number of tokens were unlimited, there might be a problem where the unordered stage in the middle keeps gaining tokens because the output stage cannot keep up. This situation typically leads to undesirable resource consumption by the middle stage. The parameter to the pipeline::run method specifies the maximum number of tokens that can be in flight. Once this limit is reached, the pipeline class doesn’t create a new token at the input stage until another token is destroyed at the output stage.

This top-level code also shows the clear method that removes all stages from the pipeline. This call is required if the filters have to be destroyed before the pipeline. The pipeline is a container that holds filters, and as with most containers in C++, it is illegal to destroy an item while it is in the container.

Now look in detail at how the stages are defined. Each stage is derived from the filter class. First let’s consider the output stage because it is the simplest (see Example 4-6).

Example 4-6. Output stage for pipeline

// Filter that writes each buffer to a file.
class MyOutputFilter: public tbb::filter {
    FILE* my_output_file;
public:
    MyOutputFilter( FILE* output_file );
    /*override*/void* operator( )( void* item );
};

MyOutputFilter::MyOutputFilter( FILE* output_file ) :
    tbb::filter(/*is_serial=*/true),
    my_output_file(output_file)
{
}

void* MyOutputFilter::operator( )( void* item ) {
    MyBuffer& b = *static_cast<MyBuffer*>(item);
    fwrite( b.begin(), 1, b.size( ), my_output_file );
    return NULL;
}

The class is derived from the filter class. When its constructor calls the base class constructor for filter, it specifies that this is a serial filter. The class overrides the virtual method filter::operator(), which is the method invoked by the pipeline to process an item. The parameter item points to the item to be processed. The value returned points to the item to be processed by the next filter. Because this is the last filter, the return value is ignored, and thus can be NULL.

The middle stage is similar. Its operator() returns a pointer to the item to be sent to the next stage (see Example 4-7).

Example 4-7. Middle stage for pipeline

// Filter that changes the first letter of each word
// from lowercase to uppercase.
class MyTransformFilter: public tbb::filter {
public:
    MyTransformFilter( );
    /*override*/void* operator( )( void* item );
};

MyTransformFilter::MyTransformFilter( ) :
    tbb::filter(/*serial=*/false)
{}

/*override*/void* MyTransformFilter::operator( )( void* item ) {
    MyBuffer& b = *static_cast<MyBuffer*>(item);
    bool prev_char_is_space = b.begin( )[-1]==' ';
    for( char* s=b.begin(); s!=b.end( ); ++s ) {
        if( prev_char_is_space && islower(*s) )
            *s = toupper(*s);
        prev_char_is_space = isspace(*s);
    }
    return &b;
}

The middle stage operates on purely local data. Thus, any number of invocations on operator() can run concurrently on the same instance of MyTransformFilter. The class communicates this fact to the pipeline by constructing its base class, filter, with the <serial> parameter set to false.

The input filter is the most complicated because it has to decide when the end of the input is reached and it must allocate buffers (see Example 4-8).

Example 4-8. Input stage for pipeline

class MyInputFilter: public tbb::filter {
public:
    static const size_t n_buffer = 4;
    MyInputFilter( FILE* input_file_ );
private:
    FILE* input_file;
    size_t next_buffer;
    char last_char_of_previous_buffer;
    MyBuffer buffer[n_buffer];
    /*override*/ void* operator( )(void*);
};

MyInputFilter::MyInputFilter( FILE* input_file_ ) :
    filter(/*is_serial=*/true),
    next_buffer(0),
    input_file(input_file_),
    last_char_of_previous_buffer(' ')
{
}

void* MyInputFilter::operator( )(void*) {
    MyBuffer& b = buffer[next_buffer];
    next_buffer = (next_buffer+1) % n_buffer;
    size_t n = fread( b.begin(), 1, b.max_size( ), input_file );
    if( !n ) {
        // end of file
        return NULL;
    } else {
        b.begin( )[-1] = last_char_of_previous_buffer;
        last_char_of_previous_buffer = b.begin( )[n-1];
        b.set_end( b.begin( )+n );
        return &b;
    }
}

The input filter is serial because it is reading from a sequential file. The override of operator() ignores its parameter because it is generating a stream, not transforming it. It remembers the last character of the preceding chunk so that it can properly overlap windows.

The buffers are allocated from a circular queue of size n_buffer. This might seem risky because after the initial n_buffer input operations, buffers are recycled without any obvious checks as to whether they are still in use. But the recycling is indeed safe because of two constraints:

  • The pipeline received n_buffer tokens when pipeline::run was called. Therefore, no more than n_buffer buffers are ever in flight simultaneously.

  • The last stage is serial. Therefore, the buffers are retired by the last stage in the order they were allocated by the first stage.

Notice that if the last stage were not serial, you would have to keep track of which buffers are currently in use because buffers might be retired out of order.

The directory examples/pipeline/text_filter that comes with Threading Building Blocks contains the complete code for the text filter.

Throughput of pipeline

The throughput of a pipeline is the rate at which tokens flow through it, and it is limited by two constraints. First, if a pipeline is run with n tokens, there obviously cannot be more than n operations running in parallel. Selecting the right value of n may involve some experimentation. Too low a value limits parallelism; too high a value may demand too many resources (for example, more buffers).

Second, the throughput of a pipeline is limited by the throughput of the slowest sequential stage. This is true even for a pipeline with no parallel stages. No matter how fast the other stages are, the slowest sequential stage is the bottleneck. So in general, you should try to keep the sequential stages fast and, when possible, shift work to the parallel stages.

The text-processing example has relatively poor speedup because the serial stages are limited by the I/O speed of the system. Indeed, even when files are on a local disk, you are unlikely to see a speedup of much more than 2X. To really benefit from a pipeline, the parallel stages need to be doing more substantial work compared to the serial stages.

The window size, or subproblem size for each token, can also limit throughput. Making windows too small may cause overheads to dominate the useful work. Making windows too large may cause them to spill out of cache. A good guideline is to try for a large window size that still fits in cache. You may have to experiment a bit to find a good window size.

Nonlinear pipelines

The pipeline template supports only linear pipelines. It does not directly handle more baroque plumbing, such as in Figure 4-2.

Nonlinear pipeline

Figure 4-2. Nonlinear pipeline

However, you can still use pipeline for this. One solution is to topologically sort the stages into a linear order, as in Figure 4-3. Another solution, which injects dummy stages to get lower latency, is provided in Chapter 11 in the section titled “Two Mouths: Feeding Two from the Same Task in a Pipeline.”

Topologically sorted pipeline

Figure 4-3. Topologically sorted pipeline

In the topological sorting of the stages (Figure 4-3), the light gray arrows are the original arrows that are now implied by transitive closure of the other arrows. It might seem that a lot of parallelism is lost by forcing a linear order on the stages, but in fact, the only loss is in the latency of the pipeline, not the throughput. The latency is the time it takes a token to flow from the beginning to the end of the pipeline. Given a sufficient number of processors, the latency of the original nonlinear pipeline is three stages. This is because stages A and B could process the token concurrently, and likewise, stages D and E could process the token concurrently. In the linear pipeline, the latency is five stages. The behavior of stages A, B, D, and E may need to be modified to properly handle objects that don’t need to be acted upon by the stage, other than to be passed along to the next stage in the pipeline.

The throughput remains the same because, regardless of the topology, the through-put is still limited by the throughput of the slowest serial stage. If pipeline supported nonlinear pipelines, it would add a lot of programming complexity, and would not improve throughput. The linear limitation of pipeline is a good trade-off of gain versus pain.

parallel_sort

parallel_sort is a comparison sort with an average time complexity O(nlogn). When worker threads are available, parallel_sort creates subtasks that may be executed concurrently. This sort provides an unstable sort of the sequence [begin1, end1). Being an unstable sort means that it might not preserve the relative ordering of elements with equal keys.

The sort is deterministic; sorting the same sequence will produce the same result each time. The requirements on the iterator and sequence are the same as for std::sort.

A call to parallel_sort(i,j,comp) sorts the sequence [i,j) using the third argument comp to determine relative orderings. If comp(x,y) returns true, x appears before y in the sorted sequence. A call to parallel_sort(i,j) is equivalent to parallel_ sort(i,j,std::less<T>).

Example 4-9 shows two sorts. The sort of array a uses the default comparison, which sorts in ascending order. The sort of array b sorts in descending order by using std:: greater<float> for comparison.

Example 4-9. Two sorts

#include "tbb/parallel_sort.h"
#include <math.h>

using namespace tbb;

const int N = 100000;
float a[N];
float b[N];

void SortExample( ) {
    for( int i = 0; i < N; i++ ) {
       a[i] = sin((double)i);
       b[i] = cos((double)i);
    }
    parallel_sort(a, a + N);
    parallel_sort(b, b + N, std::greater<float>( ));
}

With Safari, you learn the way you learn best. Get unlimited access to videos, live online training, learning paths, books, interactive tutorials, and more.

Start Free Trial

No credit card required