HPX and C++ Dataflow

The C++11 standard library introducedstd::asyncas a convenient means of asynchronously executing a function on a new thread and returning an instance of astd::futurerepresenting the eventual result produced by that function. In HPX, thehpx::asyncAPI is one of the most basic parallelization facilities exposed to the user. Here is a simple example of how it can be used:

int convert(std::string s) { return std::stoi(s); }
future<int> f = async(convert, "42");
std::cout << f.get() << std::endl;    // prints: 42

Here the convert function is spawned on a new thread, passing along the “42” as an argument.
In HPX, we like futures as they

  • Enable a transparent synchronization between the producer of the result and its consumer
  • Hide the notion of directly dealing with threads
  • Make asynchrony manageable as a future represents a data dependency
  • Allow to coordinate asynchronous execution of several tasks
  • Support and encourage a programming style which favors parallelism over concurrency

In short, futures are a most fundamental building block enabling highly transparent parallelism and fine grain synchronization of tasks. We have presented our understanding of futures in several talks. Please go watch Plain Threads are the GOTO of Today’s Computing or Asynchronous Computing in C++ if you would like to hear more about this.

Now, all is well and easy until somebody decides to pass an instance of a future toasyncas an argument to the executed function. Theasyncsimply passes through all of its arguments to the executed function. Consequently, that future would be directly passed to the function as well. At the point of the function invocation this future may not have become ready yet, so that the function would need to wait for the future causing the executing thread to possibly suspend while callingfuture::get().

We have learned relatively early on that thread suspension is an operation which should be avoided as much as possible. Thread suspension binds a lot of (mainly memory) resources as the execution context (the stack, register set, and thread state) of the suspended thread has to be kept alive. In the context of HPX which is designed to handle millions of concurrent threads, this can lead to excessive resource requirements.

All of this has led us to implement a new facility function which we calleddataflow. Thedataflow API is identical to the one exposed byasync. The only semantic difference is that if one or more arguments are futures,dataflowwill delay invoking the function until all of those futures have become ready. While the semantic difference might seem to be very small, it has tremendous impact on performance and style of programming. Any calls tofuture::get()from inside the invoked function are now guaranteed to not suspend and to immediately return the value encapsulated by the future.

Let’s look at an example. In one of our recent posts we demonstrated the use of a task block for implementing a self-recursive traversal of a recursive data structure. There we also pointed out the general disadvantage imposed by using fork-join parallelism: it enforces a barrier in the computation on every level of the recursion. This barrier most likely causes one or more threads to suspend as they have to wait for all other threads to catch up. However, as outlined above, we want to avoid thread suspension as much as possible. Luckily,dataflowis the perfect tool not only for avoiding any thread suspensions during the traversal, but also enables us to apply a technique which we call futurization.

template <typename Func>
future<int> traverse(node& n, Func && f)
{
    // traversal of left and right sub-tree
    future<int> left = 
        n.left ? traverse(*n.left, f) : make_ready_future(0);
    future<int> right = 
        n.right ? traverse(*n.right, f) : make_ready_future(0);

    // return overall result for current node
    return dataflow(
        [&n, &f](future<int> l, future<int> r) -> int
        {
            // calling .get() does not suspend
            return f(n) + l.get() + r.get();
        },
        std::move(left), std::move(right)
    );
}

Look’ma – no suspensions!

The code however requires some additional explanations. The traversal function now returns afuture<int>instead of just the result. If we look closer it becomes apparent, that the returned future not only represents the result of the traversal, but also all of the dependencies of the results for the traversal of all sub-nodes of the current node. In other words, the returned future will become ready only after all futures representing the sub-nodes of the current one have become ready. The dependencies between the results for a sub-tree and the result of the current node is implicitly established bydataflowand is represented by the returned future.

Executing the traverse function performs two operations: traverse two sub-trees of the current node and schedule the lambda doing the actual calculation for each level of the recursion. The lambda functions are automatically scheduled to run (bydataflow) as soon as both futures representing the calculations on the sub-trees have become ready (i.e. after the traversal of the sub-trees has finished).

In effect, we have successfully futurized our traverse algorithm. Instead of performing the actual calculations our traverse function now constructs a dependency tree of futures, which – when executed – produces the same result as our original algorithm. The difference is that this dependency tree will be unraveled with full speed, without any suspension or waiting, using all compute resources (cores) available to HPX.

While there are clear performance benefits in using futurization, there is also a clear disadvantage of applying this style of computation. The code is difficult to understand, it feels to be ‘inside-out’, and it is hard to read.

There is another proposal currently being discussed in the standardization committee: N4402: Resumable Functions (rev. 4) which will help solving this disadvantage. If you would like to know more about resumable functions, please watch Gor Nishanov’s talk at CppCon 2014.

In our case, using the new await keyword allows to simplify the traverse function while maintaining all of the nice properties outlined above.

template <typename Func>
future<int> traverse(node& n, Func && f)
{
    // spawn traversal of left and right sub-tree
    future<int> left = n.left ? 
        async(traverse, ref(*n.left), f) : make_ready_future(0);
    future<int> right = n.right ? 
        async(traverse, ref(*n.right), f) : make_ready_future(0);

    // directly execute work on current node
    int result = f(n);

    // return overall result for the current node
    return result + await left + await right;
}

Note that the last code usesasyncadding more parallelism to the stage of creating the dependency tree.

We have done some experiments with a preliminary implementation of await in Visual Studio 2015RC. We were able to integrate it well with the futures in HPX and the results are very promising. Unfortunately, the await keyword (and resumable functions) will only be available in all mainstream compilers years from today. So for now we will have to make do with our poor-man’s-await –dataflow.

In any case, if you want to try things out (includingdataflow), please fork HPX from our Github site and tell us what you think.

4 Comments

  1. Evgeny Panasyuk

    It is possible to implement await based on stackful coroutines from Boost.Coroutine, which is widely portable. Here is proof-of-concept: https://github.com/panaseleus/await_emu

  2. Hartmut Kaiser

    Evgeny, sure you can do that, nobody said you couldn’t emulate the interface of await. In fact in the simplest case this would look like:

    template <typename T>
    T await(future<T> && f)
    {
        return f.get();
    }
    

    In HPX this code is 100% equivalent to the solution you showed. It will suspend the coroutine which is calling f.get();

    However, no matter how you turn it, you’d end up with a suspended thread/coroutine, which is exactly what we want to avoid. So you can’t emulate the full semantics of await. The await keyword makes the function return a future at the point of await. This future will have an attached continuation representing the rest of the function. One need compiler support for this.

  3. Evgeny Panasyuk

    The await keyword makes the function return a future at the point of await. This future will have an attached continuation representing the rest of the function.

    This is also possible ( http://tinyurl.com/o3exnq4 ):

    int foo()
    {
        int result = await some_async_operation();
        return result + 5;
    }
    ASYNC_PREFIX(foo)
    

    This will make function async_foo which returns future.

    Main difference between stackless and stackful coroutines in context of await implementation is not semantics, but resource usage. Stackless coroutines could be very lightweight – just several words in size. While stack of stackful coroutine requries hundreds of bytes at minimum, and one have to care about stackoverflow (split stack / guard page / address space reserve / pass intenstive computation into special coroutine with large stack / etc).

    Yes, good stackless coroutines require language/compiler support.
    Regarding N4402 proposal – it has some flaws. It requires unnatural and non-optimal allocation and prohibits copy/move/serialize of coroutine https://groups.google.com/a/isocpp.org/forum/#!topic/std-proposals/muZol1fV6Q4

    To some extent stackless coroutines (and await feature) can be emulated with help of macros like in Boost.Asio.

  4. Evgeny Panasyuk

    Here is an example of await emultation based on stackless coroutine macros from Boost.Asio:
    http://coliru.stacked-crooked.com/a/c7e1930815e4037a

    COROUTINE(int, coroutine, (int, param),
        (int, local_x)
        (int, local_y)
        (int, local_i))
    {
        AWAIT(local_x =, async([]{ return 1; }));
    
        local_y = 0;
        for(local_i = 0; local_i!=4; ++local_i)
            AWAIT(local_y +=, async([]{ return 2; }));
    
        RETURN(local_x + local_y + param);
    }
    COROUTINE_END;
    
    int main()
    {
        auto f = coroutine{3}();
        cout << f.get() << endl;
    }
    

    Output is 12

Leave a Reply

Your email address will not be published. Required fields are marked *