HPX and C++ Futures

There has been a lot of attention to Futures in C++ lately. One of the main related events (even if it was not widely mentioned anywhere) was the final call for positions and comments for the preliminary draft technical specification for C++ Extensions for Concurrency (PDTS), see N4538. This call closed on July 7th, 2015. At this point, the document is out for the national bodies to vote on whether it should be accepted as a final TS (the balloting period ends on July 22nd, 2015). Personally, I expect for this document to be accepted unanimously, which means that we soon will have a second TS related to parallelism and concurrency ready. Compiler vendors will have a field day implementing all of this functionality over the next months (and years).

The document N4538 describes three main new sets of abstractions for C++ related to concurrency:

  • Improvements to std::future<T> and Related APIs
  • Latches and Barriers
  • Atomic Smart Pointers

In this post I would like to focus on the first of the three, namely what additions will come to C++ in order to finally make futures more usable and fit for the age of asynchronous computing.

In short, the TS defines extensions for std::future<T> enabling sequential and parallel composition of futures and two API functions which allow to create instances of futures which are ready at construction. Why is that so important?

The std::future<T> as it is defined in C++11 is very limited. All you can do is to anonymously connect a producer with a consumer and to call future<T>::get() to synchronize between the two:

int universal_answer() 
{ 
    /* ... do complex computations ... */ 
    return 42; 
}

void deep_thought()
{
    future<int> promised_answer = async(&universal_answer);
    // ... do other things for 7.5 million years
    cout << promised_answer.get() << endl;   // prints 42, eventually
}

Over time people realized that this — while useful on its own — is missing the capabilities to define continuations (i.e. functions executed once a future has become ready) and to combine several of those futures into a single one.

Sequential Composition of Futures

In order to enable for a function to be executed once a future has become ready, the TS adds a member function .then() to the API of a future:

string make_string() 
{
    future<int> f1 = async([]() -> int { return 123; });
    future<string> f2 = f1.then(
        [](future<int> f) -> string 
        {
            return to_string(f.get());    // here .get() won’t block
        });
    return f2.get();     // will evaluate to string("123")
}

The lambda function in the example will be attached as a continuation to ‘f1’. This means the function will be automatically executed as soon as ‘f1’ becomes ready. Note that the member function .then() returns another future instance representing the result of the execution of the continuation. The parameter ‘f’ which is passed to the lambda is referring to the same future instance as the continuation was attached to initially. For this reason, the parameter ‘f’ is guaranteed to be ready and calling .get() is guaranteed not to suspend.

In HPX, we have implemented this quite a while ago. As usual, we also extended the exposed API to be more flexible. We added two additional overloads of .then() to our hpx::future and hpx::shared_future types. One overload takes an additional launch policy parameter (similar to hpx::async), the other overload additionally allows to specify an instance of an executor  type (see HPX and C++ Executors for more details about those). Both overloads add powerful abstractions for giving the user control over where and how the continuation is actually executed.

Parallel Composition of Futures

Once code starts to use futures very often we need to combine more than one of them. The TS document describes two new API functions for this kind of use cases: when_all() and when_any(). Both are variadic functions which can be invoked with as many futures as necessary. As the names imply, when_all() returns a future which becomes ready once all of the passed future instance have become ready and the future returned from when_any() becomes ready once at least one of the future instances passed to it has become ready.

int test_when_all()
{
    shared_future<int> shared_future1 = 
        async([]() -> int { return 125; });
    future<string> future2 = 
        async([]() -> string { return string("hi"); });

    future<tuple<shared_future<int>, future<string>>> all_f = 
        when_all(shared_future1, future2);

    future<int> result = all_f.then(
        [](future<tuple<shared_future<int>, future<string>>> f) -> int 
        {
             return do_work(f.get());
        });

    return result.get();
}

Using when_any() is very similar. The function returns a future containing an instance of a special type when_any_result which becomes ready if any of the argument futures has become ready. The returned structure holds all of the futures and an index to one of the futures which is ready:

template <typename Sequence>
struct when_any_result {
    size_t index;
    Sequence futures;
};

Obviously, we have implemented those two functions in HPX as well. However we have added a couple more (variadic) template functions for parallel composition. First of all, we added when_some(), which returns a future holding an instance of the type when_some_result. The returned future becomes ready when a given number of the argument futures have become ready:

template <typename Sequence>
struct when_some_result
{
    // List of indices of futures which have become ready
    std::vector<std::size_t> indices;
    // The sequence of futures as passed to hpx::when_some
    Sequence futures;
};

This is a simple generalization of when_any() which waits for one of the futures only.

We also added a set of functions which provide functionality that is similar to future::wait(), namely waiting for a given number of futures to become ready (one, some, or all futures) without ‘consuming’ the futures (i.e. without making the arguments invalid — if they are of type hpx::future): hpx::wait_all, hpx::wait_any, hpx::wait_some. Our experience shows that these are very useful in a broad set of cases, for instance:

std::vector<hpx::future<void>> results;
for (int i = 0; i != NUM; ++i)
    results.push_back(hpx::async(...));
hpx::wait_all(results);

The is an example demonstrating how you can spawn off NUM (arbitrary) tasks concurrently and wait for all of them to finish executing before continuing on. As you can see, we also allowed for passing vectors of futures as single arguments to all of the functions for parallel composition.

There is one caveat worth mentioning, however. Since the wait_xxx functions do not ‘consume’ any of the input futures, the functions will not throw if one of the tasks itself throws an exception. Any exceptions occurring while the tasks are executed will be stored in the shared state of the futures themselves. Thus a second loop over the input futures is necessary to perform proper error handling. This behavior however is consistent with the behavior of future::wait(), so it shouldn’t come as a surprise.

Related APIs

There are two more functions defined by the TS which help creating futures which are ready at construction time: make_ready_future and make_exceptional_future. Both functions do exactly what their name implies. The first creates a future from a given value, the created future is ready to begin with and encapsulates the value it was constructed from. The second function creates a future which is in exceptional state (stores an exception). Both functions are available in HPX.

As usual, if you would like to try out today these new capabilities which will be available soon with every major compiler, please fork HPX from our Github site and give us some feedback. We are looking forward to your comments!

Leave a Reply

Your email address will not be published. Required fields are marked *