I have no idea to which category this post belongs to – to C++ or to
KDE development – because it is about one component in Plasma Blade. But
the post is mostly about the approach to writing a distributed system I
went for while implementing it, and a C++ framework that will come out
of it.
Ranges
One of the most powerful parts of the C++ standard library is the
<algorithm>
header. It provides quite a few useful
algorithms for processing collections. The main problem is that
algorithms take iterator pairs to denote the start and end of a
collection which should be processed instead of taking the collection
directly.
While this has some useful applications, most of the time it just
makes algorithms more difficult to use and difficult to compose.
There were multiple attempts at fixing this by introducing an
abstraction over sequence collections called ranges. There are
several 3rd party libraries that implement ranges, and one of them (Eric
Niebler’s range-v3 library) is on its way to become a part of the C++
standard library.
There are numerous articles written about ranges available online
(and I even dedicated a whole chapter to them in my book), so I’m not
going to cover them in more detail here. I’m just going to say that
ranges allow us to easily create sequences of transformations that
should be applied to a collection.
Imagine the following scenario – we have the output of the
ping
command, and we want to convert the whole output to
uppercase, then extract the number of miliseconds each response took,
and then filter out all the responses which took longer than some
predefined time limit.
64 bytes from localhost (::1): icmp_seq=1 ttl=64 time=0.015 ms
64 bytes from localhost (::1): icmp_seq=2 ttl=64 time=0.041 ms
64 bytes from localhost (::1): icmp_seq=3 ttl=64 time=0.041 ms
In order to demonstrate the range transformation chaining, we are
going to make this a bit more complicated than it needs to be:
- we will convert each line to uppercase;
- find the last
=
character in each line;
- chop off everything before the
=
sign (including the
sign);
- keep only results that are less than
0.045
.
Written in the pipe notation supported by the range-v3
library, it would look something like this:
auto results =
ping_output
| transform([] (std::string&& value) {
std::transform(value.begin(), value.end(), value.begin(), toupper);
return value;
})
| transform([] (std::string&& value) {
const auto pos = value.find_last_of('=');
return std::make_pair(std::move(value), pos);
})
| transform([] (std::pair<std::string, size_t>&& pair) {
auto [ value, pos ] = pair;
return pos == std::string::npos
? std::move(value)
: std::string(value.cbegin() + pos + 1, value.cend());
})
| filter([] (const std::string& value) {
return value < "0.045"s;
});
Abstractions
In one of my previous
posts, I wrote that using the for_each
algorithm
instead of the range-based for loop provides us with an abstraction high
enough to be used for processing asynchronous data streams instead of
being able to work only with regular data collections.
Some people argued that for_each
was not meant to be a
customization point in C++, which might be true, but it works very well
as one. Now, it suffers from the same problems as other STL algorithms,
and if we want to reach new heights of abstraction, it truly is not the
best customization point out there. So, we might find something else to
customize instead.
If you look at the previous code snippet, you’ll see several
transformations performed on ping_output
. But it does not
say what ping_output
is. It can be a vector of strings, it
can be an input stream tokenized on newlines, it can be a
QFuture
, etc. It can be anything that
transform
and filter
can exist for.
Reactive streams
We can create an abstraction similar to ranges, but instead of trying
to create an abstraction over collections, we will create abstractions
over series of events.
Imagine if ping_output
was a range that reads the data
from an input stream like std::cin
. Whenever we request a
result from the results
range, it would block the execution
of our whole program until a whole line is read from the input
stream.
This is a huge problem. The ping
command will send our
program one line each second which means that our program will be
suspended for most of its lifetime waiting for those seconds to
pass,
Instead, if would be better if it could continue working on other
tasks until the ping
command sends it the new data to
process.
This is where event processing comes into play. Instead of requesting
a value from the results
, and then blocking the execution
until that value appears, we want to react to new values
(events) that are received from the ping
command, and
process them when they arrive – without ever blocking our program.
This is what reactive streams are meant to model – an asynchronous
stream of values (events). If we continue with the ping
example, the transformations it defines should also be able to work on
reactive streams. When a new value arrives, it goes through the first
transformation which converts it to uppercase, and sends it to the
second transformation. The second transformation processes the value and
sends it to the third. And so on.
The code would look like this:
auto pipeline =
system_cmd("ping"s, "localhost"s)
| transform([] (std::string&& value) {
std::transform(value.begin(), value.end(), value.begin(), toupper);
return value;
})
| transform([] (std::string&& value) {
const auto pos = value.find_last_of('=');
return std::make_pair(std::move(value), pos);
})
| transform([] (std::pair&& pair) {
auto [ value, pos ] = pair;
return pos == std::string::npos
? std::move(value)
: std::string(value.cbegin() + pos + 1, value.cend());
})
| filter([] (const std::string& value) {
return value < "0.045"s;
});
The only thing that changed is the source of the values. Instead of
using a range (a vector, or another sequence collection), this uses a
reactive stream which emits a line every time the ping
command outputs it.
This is the power of abstraction – using the code we have written for
one thing, for something completely different. In this case, using the
code that was written to process a collection synchronously, to process
asynchronous event streams.
Distributed stream
processing
While it is nice to be able to write asynchronous software systems in
the same way we write synchronous systems, that is not enough for this
post.

One of the great things about defining programs as series of pure
transformations to be performed on the data is that those
transformations are independent from one another – they can be isolated
from each other and even moved into different processes (even processes
on separate computers in a network).
For this case, I’ve implemented in the Voy library a special type of
transformation called a bridge which is used to transparently
send the data from one process to the other.
Let’s split the data processing in ping
example into
three parts (similar to what the Plasma Blade will need) – to execute
the first and the last part in the main program, and to execute the
middle part in the backend. It will look like this (added the
voy
namespace for completeness):
auto pipeline =
voy::system_cmd("ping"s, "localhost"s)
| voy::transform([] (std::string&& value) {
std::transform(value.begin(), value.end(),
value.begin(), toupper);
return value;
})
| voy_bridge(to_backend)
| voy::transform([] (std::string&& value) {
const auto pos = value.find_last_of('=');
return std::make_pair(std::move(value), pos);
})
| voy::transform([] (std::pair&& pair) {
auto [ value, pos ] = pair;
return pos == std::string::npos
? std::move(value)
: std::string(value.cbegin() + pos + 1,
value.cend());
})
| voy_bridge(from_backend)
| voy::filter([] (const std::string& value) {
return value < "0.045"s;
});
This data pipeline is defined once for both the main program and the
backend. For the main program, the middle part of the pipeline will be
disabled (no code generated for it), while for the backend only the
middle part will be compiled.
A note on the implementation
At the last Akademy, I talked to Tomaz Canabrava about making KDE
software more appealing to students to join our development efforts.
Now, this project is probably going to be overly complex for an
average student to join in, but I’m trying to make it as readable and as
clean as possible for more adventurous students. It uses all the new and
cool features of C++17, along with void_t
and the detection
idiom to simulate concepts, etc.
If you desire to have a chance to work on a real-world C++17 project,
just send me an e-mail, and I’ll try to get you up to speed. The only
pre-requirement is for you to do some investigation and find out which
repository the code resides in. ;)