I have no idea to which category this post belongs to – to C++ or to KDE development – because it is about one component in Plasma Blade. But the post is mostly about the approach to writing a distributed system I went for while implementing it, and a C++ framework that will come out of it.

Blade is my long-term R&D project to implement a multi-process KRunner alternative.

Ranges

One of the most powerful parts of the C++ standard library is the <algorithm> header. It provides quite a few useful algorithms for processing collections. The main problem is that algorithms take iterator pairs to denote the start and end of a collection which should be processed instead of taking the collection directly.

While this has some useful applications, most of the time it just makes algorithms more difficult to use and difficult to compose.

There were multiple attempts at fixing this by introducing an abstraction over sequence collections called ranges. There are several 3rd party libraries that implement ranges, and one of them (Eric Niebler’s range-v3 library) is on its way to become a part of the C++ standard library.

There are numerous articles written about ranges available online (and I even dedicated a whole chapter to them in my book), so I’m not going to cover them in more detail here. I’m just going to say that ranges allow us to easily create sequences of transformations that should be applied to a collection.

Imagine the following scenario – we have the output of the ping command, and we want to convert the whole output to uppercase, then extract the number of miliseconds each response took, and then filter out all the responses which took longer than some predefined time limit.

64 bytes from localhost (::1): icmp_seq=1 ttl=64 time=0.015 ms
64 bytes from localhost (::1): icmp_seq=2 ttl=64 time=0.041 ms
64 bytes from localhost (::1): icmp_seq=3 ttl=64 time=0.041 ms

In order to demonstrate the range transformation chaining, we are going to make this a bit more complicated than it needs to be:

  • we will convert each line to uppercase;
  • find the last = character in each line;
  • chop off everything before the = sign (including the sign);
  • keep only results that are less than 0.045.

Written in the pipe notation supported by the range-v3 library, it would look something like this:

auto results =
    ping_output
    | transform([] (std::string&& value) {
          std::transform(value.begin(), value.end(), value.begin(), toupper);
          return value;
      })
    | transform([] (std::string&& value) {
          const auto pos = value.find_last_of('=');
          return std::make_pair(std::move(value), pos);
      })
    | transform([] (std::pair<std::string, size_t>&& pair) {
          auto [ value, pos ] = pair;
          return pos == std::string::npos
                      ? std::move(value)
                      : std::string(value.cbegin() + pos + 1, value.cend());
      })
    | filter([] (const std::string& value) {
          return value < "0.045"s;
      });

Abstractions

In one of my previous posts, I wrote that using the for_each algorithm instead of the range-based for loop provides us with an abstraction high enough to be used for processing asynchronous data streams instead of being able to work only with regular data collections.

Some people argued that for_each was not meant to be a customization point in C++, which might be true, but it works very well as one. Now, it suffers from the same problems as other STL algorithms, and if we want to reach new heights of abstraction, it truly is not the best customization point out there. So, we might find something else to customize instead.

If you look at the previous code snippet, you’ll see several transformations performed on ping_output. But it does not say what ping_output is. It can be a vector of strings, it can be an input stream tokenized on newlines, it can be a QFuture, etc. It can be anything that transform and filter can exist for.

Reactive streams

We can create an abstraction similar to ranges, but instead of trying to create an abstraction over collections, we will create abstractions over series of events.

Imagine if ping_output was a range that reads the data from an input stream like std::cin. Whenever we request a result from the results range, it would block the execution of our whole program until a whole line is read from the input stream.

This is a huge problem. The ping command will send our program one line each second which means that our program will be suspended for most of its lifetime waiting for those seconds to pass,

Instead, if would be better if it could continue working on other tasks until the ping command sends it the new data to process.

This is where event processing comes into play. Instead of requesting a value from the results, and then blocking the execution until that value appears, we want to react to new values (events) that are received from the ping command, and process them when they arrive – without ever blocking our program.

This is what reactive streams are meant to model – an asynchronous stream of values (events). If we continue with the ping example, the transformations it defines should also be able to work on reactive streams. When a new value arrives, it goes through the first transformation which converts it to uppercase, and sends it to the second transformation. The second transformation processes the value and sends it to the third. And so on.

The code would look like this:

auto pipeline =
    system_cmd("ping"s, "localhost"s)
    | transform([] (std::string&& value) {
          std::transform(value.begin(), value.end(), value.begin(), toupper);
          return value;
      })
    | transform([] (std::string&& value) {
          const auto pos = value.find_last_of('=');
          return std::make_pair(std::move(value), pos);
      })
    | transform([] (std::pair&& pair) {
          auto [ value, pos ] = pair;
          return pos == std::string::npos
                      ? std::move(value)
                      : std::string(value.cbegin() + pos + 1, value.cend());
      })
    | filter([] (const std::string& value) {
          return value < "0.045"s;
      });

The only thing that changed is the source of the values. Instead of using a range (a vector, or another sequence collection), this uses a reactive stream which emits a line every time the ping command outputs it.

This is the power of abstraction – using the code we have written for one thing, for something completely different. In this case, using the code that was written to process a collection synchronously, to process asynchronous event streams.

Distributed stream processing

While it is nice to be able to write asynchronous software systems in the same way we write synchronous systems, that is not enough for this post.

One of the great things about defining programs as series of pure transformations to be performed on the data is that those transformations are independent from one another – they can be isolated from each other and even moved into different processes (even processes on separate computers in a network).

For this case, I’ve implemented in the Voy library a special type of transformation called a bridge which is used to transparently send the data from one process to the other.

Let’s split the data processing in ping example into three parts (similar to what the Plasma Blade will need) – to execute the first and the last part in the main program, and to execute the middle part in the backend. It will look like this (added the voy namespace for completeness):

auto pipeline =
voy::system_cmd("ping"s, "localhost"s) | voy::transform([] (std::string&& value) { std::transform(value.begin(), value.end(), value.begin(), toupper); return value; })
| voy_bridge(to_backend)
| voy::transform([] (std::string&& value) { const auto pos = value.find_last_of('='); return std::make_pair(std::move(value), pos); }) | voy::transform([] (std::pair&& pair) { auto [ value, pos ] = pair; return pos == std::string::npos ? std::move(value) : std::string(value.cbegin() + pos + 1, value.cend()); })
| voy_bridge(from_backend)
| voy::filter([] (const std::string& value) { return value < "0.045"s; });

This data pipeline is defined once for both the main program and the backend. For the main program, the middle part of the pipeline will be disabled (no code generated for it), while for the backend only the middle part will be compiled.

A note on the implementation

At the last Akademy, I talked to Tomaz Canabrava about making KDE software more appealing to students to join our development efforts.

Now, this project is probably going to be overly complex for an average student to join in, but I’m trying to make it as readable and as clean as possible for more adventurous students. It uses all the new and cool features of C++17, along with void_t and the detection idiom to simulate concepts, etc.

If you desire to have a chance to work on a real-world C++17 project, just send me an e-mail, and I’ll try to get you up to speed. The only pre-requirement is for you to do some investigation and find out which repository the code resides in. ;)


You can support my work on , or you can get my book Functional Programming in C++ at if you're into that sort of thing.