Using Senders/Receivers

Using Senders/Receivers

By Lucian Radu Teodorescu

Overload, 33(185):4-10, February 2025


C++26 will introduce senders/receivers. Lucian Radu Teodorescu demonstrates how to use them to write multithreaded code.

This is a follow-up to the article in the previous issue of Overload, which introduced the upcoming C++26 senders/receivers framework [WG21Exec]. While the previous article focused on presenting the main concepts and outlining what will be standardized, this article demonstrates how to use the framework to build concurrent applications.

The goal is to showcase examples that are closer to real-world software rather than minimal examples. We address three problems that can benefit from multi-threaded execution: computing the Mandelbrot fractal, performing a concurrent sort, and applying a graphical transformation to a set of images.

All the code examples are available on GitHub [ExamplesCode]. We use stdexec [stdexec], the reference implementation for the senders/receivers proposal. Additionally, some features included in the examples are not yet accepted by the standard committee, though we hope they will be soon.

Before we get started

Before diving into more realistic examples, let’s begin with a minimal example to set the stage. The code in Listing 1 prints “Hello, concurrency!” from a thread that is different from the main thread.

#include <exec/system_context.hpp>
#include <stdexec/execution.hpp>
int main() {
  stdexec::scheduler auto sched = 
    exec::get_system_scheduler();
  stdexec::sender auto snd = 
    stdexec::schedule(sched)
    | stdexec::then([] 
    { printf("Hello, concurrency!\n"); });
  stdexec::sync_wait(std::move(snd));
}
Listing 1

The code is roughly equivalent to:

  std::thread{[]
    { printf("Hello, concurrency!\n"); }}.join();

Here, we acquire a thread from the system scheduler and execute the given lambda on that thread, which prints the message to the standard output.

The scheduler acts as a handle to an execution context – an entity that owns threads of execution, such as CPU or GPU threads. The system scheduler represents the default execution context on the current system, presumably shared among all applications running on the system. A good way to conceptualize it is as a thread pool, with an unspecified number of threads, shared across the applications currently running.

The work to be done is described by the sender snd. As mentioned in the previous article [Teodorescu24], senders merely describe work – they do not represent the actual execution of that work. To execute the work, the sender must be started. Senders are somewhat similar to std::function objects: they represent function-like work, but defining such an object does not immediately execute it; the function object must be invoked to start the work. In our case, the operation that starts the work is sync_wait. This function initiates the work described by the sender and blocks until the result is produced. It then returns the result of the work, although in our example, we ignore the result.

As shown in the example, the stdexec library provides two namespaces: stdexec and exec. Similarly, the include files are organized into folders named stdexec and exec. Everything under the stdexec namespace is part of the P2300 proposal [P2300R10], which has already been accepted into the C++26 draft. Entities within the exec namespace are not part of the original P2300 proposal but are either candidates for standardization or provide useful abstractions. In our case, system_context and get_system_scheduler are proposed for standardization [P2079R5].

Work graph

In a serial program, all instructions are executed sequentially, and the order of execution is typically straightforward. For these programs, especially when following structured programming principles, understanding the scopes of different objects and code structures is crucial.

In contrast, for concurrent programs, both the ordering of instructions and the scopes of entities become important. In concurrent execution, there is a partial ordering of work items, forming a graph that represents the dependencies and execution flow of these items.

When examining this graph of work items, well-structured concurrency often results in the scope of an operation aligning with the span during which the operation can be executed – specifically, from the completion of all predecessors to the initiation of any successors.

Thinking of work as a graph is a quick and effective way to understand the constraints of a problem. For this reason, we will briefly discuss this graph of execution in the context of our examples.

Computing the Mandelbrot set

The Mandelbrot set is a two-dimensional fractal of great complexity, generated by the convergence of the simple formula: . Figure 1 illustrates the image of a Mandelbrot fractal, centered at (with no imaginary component), using a scale of 512 and an iteration limit (depth) of 1000. Each iteration count is represented by a different color.

Figure 1

The code to compute this fractal without using concurrency is similar to the code shown in Listing 2.

int mandelbrot_core(std::complex<double> c, 
    int depth) {
  int count = 0;
  std::complex<double> z = 0;
  for (int i = 0; i < depth; i++) {
    if (abs(z) >= 2.0)
      break;
    z = z * z + c;
    count++;
  }
  return count;
}
std::complex<double> pixel_to_complex(int x,
    int y) {
  double x0 = offset_x + 
    (x - max_x / 2) * 4.0 / max_x / scale;
  double y0 = offset_y + 
    (y - max_y / 2) * 4.0 / max_y / scale;
  return std::complex<double>(x0, y0);
}
template <typename F>
void serial_mandelbrot(int* vals, int max_x,
    int max_y, int depth, F&& transform) {
  for (int y = 0; y < max_y; y++) {
    for (int x = 0; x < max_x; x++) {
      vals[y * max_x + x] = 
        mandelbrot_core(transform(x, y), depth);
    }
  }
}
Listing 2

We use a matrix of dimensions max_x by max_y, where each element represents a depth value that will be mapped to a color to create a colorful image. The transform functor passed to serial_mandelbrot converts a position in the matrix (a pixel) into a complex value. One possible implementation for this is the pixel_to_complex function. The core of the algorithm resides in the mandelbrot_core function, which computes the depth (up to a specified limit) for a given initial complex number c. This function is called for each element in the matrix, iterating up to depth times for each.

The overall complexity of the algorithm is O(max_y max_x depth). It is worth noting that, for some pixels, the mandelbrot_core function will terminate after only a few iterations, resulting in unbalanced computation across matrix elements. Despite this, on common hardware, filling a screen with the Mandelbrot fractal at a depth of 1000 is not particularly fast. Adding concurrency to the computation could provide significant performance benefits.

Listing 3 demonstrates the changes required to modify the main function to execute the program on multiple threads. The primary change involves transforming the outer loop (which iterates over the y axis) into a bulk() call. The bulk() sender executes the given body max_y times on the current execution context. This execution context is provided by the scheduler, which, as before, is obtained using get_system_scheduler(). Consequently, different lines in the matrix may be computed by different threads.

template <typename F>
void mandelbrot_concurrent(int* vals, int max_x,
    int max_y, int depth, F&& transform) {
  auto sched = exec::get_system_scheduler();
  auto snd = stdexec::schedule(sched)
    | stdexec::bulk(max_y, [=](int y) {
        for (int x = 0; x < max_x; x++) {
          vals[y * max_x + x] = 
            mandelbrot_core(transform(x, y), 
                            depth);
        }
      });
  stdexec::sync_wait(std::move(snd));
}
Listing 3

If the machine running this program has 8 cores, it is reasonable to assume that the system’s execution context will provide 8 OS threads to perform the work. However, creating more threads than the number of hardware threads available on the system can lead to CPU oversubscription [Wikipedia], which will degrade the application’s performance.

The work itself is described by a sender, snd. To execute the work, the program invokes sync_wait(), which blocks until all the work is completed.

There is an important caveat in this example that is worth highlighting. By simply reading the code in Listing 3, one might assume that the definition of the bulk() algorithm inherently specifies the conditions under which computations can be executed concurrently. However, this is not entirely accurate. By default, the bulk() algorithm functions as a glorified for loop without any built-in concurrency.

Concurrency is introduced through specialization. Algorithms like bulk() can be specialized based on the scheduler they execute on. In this case, the system scheduler provides a specialization for bulk() that leverages the execution context it manages. It is the combination of the system scheduler and the bulk() algorithm that enables the desired multi-threaded implementation. If the system scheduler were removed from the code, the computation would run sequentially.

The graph for this problem, shown in Figure 2, illustrates the dependencies between tasks. From a concurrency perspective, this problem is relatively straightforward, as the graph is not complex.

Figure 2

In conclusion, transforming single-threaded code into multi-threaded code using the senders/receivers framework does not need to be difficult.

Concurrent sort

In the previous example, achieving multi-threaded execution involved transforming a for loop into a bulk() call. Given a known number of iterations, bulk() effectively executes the work concurrently, adhering to the rules defined by the current scheduler. But what happens when the work to be done is not linear, and the number of iterations is unknown upfront? This section provides an example to address this scenario.

Here, we focus on adapting a classic implementation of quick sort to run concurrently. The serial version of the algorithm is shown in Listing 41. For small collections, we use std::sort as the base case for recursion. For larger collections, the algorithm partitions the elements into three groups based on a pivot: elements smaller than the pivot, elements equal to the pivot, and elements larger than the pivot. The pivot is chosen to maximize the likelihood of balanced partitions. Once the data is partitioned, we recursively sort the smaller and larger partitions.

template <std::random_access_iterator It>
void serial_sort(It first, It last) {
  auto size = std::distance(first, last);
  if (size_t(size) < size_threshold) {
    // Use serial sort under a certain threshold.
    std::sort(first, last);
  } else {
    // Partition the data, such as elements 
    // [0, mid1) < [mid1, mid2) <= [mid2, n).
    // Elements in [mid1, mid2) are equal to 
    // the pivot.
    auto p = sort_partition(first, last);
    auto mid1 = p.first;
    auto mid2 = p.second;
    serial_sort(first, mid1);
    serial_sort(mid2, last);
  }
}
Listing 4

Listing 5 illustrates how this algorithm can be implemented using senders/receivers to achieve concurrent execution. This example utilizes an async_scope2 object to manage dynamic concurrent work, necessitating the wrapping of the recursive function. The async_scope provides a dynamic scope for the concurrent tasks it spawns. The core logic of the sorting function remains largely unchanged; the primary modification is that the sorting of the right-side subrange is now offloaded to the system scheduler, allowing it to run concurrently with the sorting of the left-side subrange.

template <std::random_access_iterator It>
void concurrent_sort_impl(It first, It last, 
    exec::async_scope& scope) {
  auto size = std::distance(first, last);
  if (size_t(size) < size_threshold) {
    // Use serial sort under a certain threshold.
    std::sort(first, last);
  } else {
    // Partition the data, such as elements 
    // [0, mid1) < [mid1, mid2) <= [mid2, n).
    // Elements in [mid1, mid2) are equal to the 
    // pivot.
    auto p = sort_partition(first, last);
    auto mid1 = p.first;
    auto mid2 = p.second;
    // Spawn work to sort the right-hand side.
    stdexec::sender auto snd
      = stdexec::schedule
          (exec::get_system_scheduler())
        | stdexec::upon_error([]
              (std::error_code ec) -> void {
            throw std::runtime_error
              ("cannot start work");
          })
        | stdexec::then([=, &scope] {
            concurrent_sort_impl(mid2, last,
            scope); 
          })
        ;
    scope.spawn(std::move(snd));
    // Execute the sorting on the left side,
    // on the current thread.
    concurrent_sort_impl(first, mid1, scope);
  }
}
template <std::random_access_iterator It>
void concurrent_sort(It first, It last) {
  exec::async_scope scope;
  concurrent_sort_impl(first, last, scope);
  stdexec::sync_wait(scope.on_empty());
}
Listing 5

The code used to spawn work appears more complex because it includes handling errors of type std::error_code. The system scheduler is currently undergoing standardization, and the stdexec implementation is continuously evolving to align with this process. At the time of writing, scheduling work on the system context may produce an error of type std::error_code. However, async_scope does not natively handle such errors – it only manages exceptions. To bridge this gap, we need to convert the std::error_code into an exception, which we accomplish using the upon_error() algorithm.

Ideally, the result of the lambda passed to upon_error() is sent through the value channel (see the previous article in this series [Teodorescu24]). The value channel for the schedule() algorithm is set_value(void). Since we do not want to introduce an additional value channel, the lambda passed to upon_error() must return void. Even if the lambda body is empty, it is not declared as noexcept. Consequently, upon_error() assumes that the lambda might throw, ensuring the inclusion of a set_error(std::exception_ptr) error channel in its response. This mechanism enables the conversion of the set_error(std::error_code) channel into a set_error(std::exception_ptr) channel. Later in this article, we will demonstrate another method for modifying the error channels of a sender.

Even if the std::error_code error channel is not ultimately standardized (and stdexec removes support for it), this exercise provides valuable insights into handling error channels effectively.

Now, let’s dive into the most interesting aspect of this example: the concept of work span. In the previous examples, the span of spawned work was always contained within the span of the enclosing function, meaning the work spans were fully nested. This approach is known as structured concurrency. However, in the example from Listing 5, the span of the spawned work can extend beyond the end of the enclosing function. In this case, the scopes do not fully nest; we call this weakly-structured concurrency.

One of the key purposes of async_scope is to impose a weak structure on work that might otherwise lack structure. The structure imposed here ensures that all work must be completed before the call to stdexec::sync_wait(scope.on_empty()). This statement blocks the current thread until all work within the scope is finished (i.e., the scope is empty).

You can think of async_scope as a sophisticated shared counter. Each time work is spawned on the scope, the counter increments. When the work is completed, the counter decrements. The on_empty() method returns a sender that completes when the counter reaches zero, signifying that there is no outstanding work.

Whenever we introduce weakly-structured constructs, we must carefully double-check the safety of the approach. Specifically, we need to ensure that the spawned work does not access anything from the stack of the function that might be deallocated before the work is completed. In this case, the spawned work only accesses a section of the input sequence, and no other work item accesses the same section simultaneously.

The concurrent sort algorithm performs partitioning in a non-parallelizable manner. However, it then continues dividing the work in half, adding tasks to process the partitions concurrently. This causes the number of worker threads to gradually increase until all threads in the system scheduler are fully utilized for sorting tasks.

The concurrent structure of the problem is illustrated in Figure 3. It highlights the recursive nature of the problem and the way tasks are divided and executed concurrently.

Figure 3

In this example, we demonstrated how to use weakly-structured concurrency and discussed some of the challenges associated with managing error channels.

Processing images

Let’s now tackle a more complex problem, one that introduces additional challenges and interesting discussions. We will build an application that reads all JPEG images from a folder, applies a filter to each image, and saves the processed images to a different folder. Since processing an image can be time-consuming and there may be multiple images to handle, the application could benefit significantly from leveraging multiple threads.

An outline of the program, including function declarations and the main() function body, is shown in Listing 6. The program uses OpenCV [OpenCV] for image processing. All functions returning cv::Mat are standard functions that process images and return new ones. The read_file and write_file functions perform file reading and writing, as expected. Our focus will be on three key functions: tr_cartoonify, error_to_exception, and process_files.

cv::Mat tr_apply_mask(const cv::Mat& img_main,
  const cv::Mat& img_mask);
cv::Mat tr_blur(const cv::Mat& src, int size);
cv::Mat tr_to_grayscale(const cv::Mat& src);
cv::Mat tr_adaptthresh(const cv::Mat& img, 
  int block_size, int diff);
cv::Mat tr_reducecolors(const cv::Mat& img, 
  int num_colors)
cv::Mat tr_oilpainting(const cv::Mat& img, 
  int size, int dyn_ratio);
auto tr_cartoonify(const cv::Mat& src, 
  int blur_size, int num_colors, int block_size, 
  int diff);
auto error_to_exception();
std::vector<std::byte> 
  read_file(const fs::directory_entry& file);
void write_file(const char* filename, 
  const std::vector<unsigned char>& data);
exec::task<int> 
  process_files(const char* in_folder_name, 
    const char* out_folder_name, int blur_size, 
    int num_colors, int block_size, int diff);
int main() {
  auto everything = process_files("data", "out",
    blur_size, num_colors, block_size, diff);
  auto [processed] = stdexec::sync_wait
    (std::move(everything)).value();
  printf("Processed images: %d\n", processed);
  return 0;
}
Listing 6

Figure 4 illustrates the execution graph for this problem, assuming there are three files to process. The graph resembles a pipeline, where the first and last stages (read_file and write_file) are I/O operations, and the intermediate stages consist of operations that can benefit from concurrent execution across multiple threads.

Figure 4

Adding concurrency to a small pipeline

The ‘cartoonify’ operation involves applying a mask to an image with reduced colors, where the mask consists of the edges of the original picture. To produce the final result, we need two intermediate images: one with reduced colors and one showing the edges. The reduced-color image is obtained by calling tr_reduce_colors, while the edges image is computed through a sequence of operations: tr_blur, tr_to_grayscale, and tr_adaptthresh. Since these operations can be computationally expensive and the two processing streams are independent, it makes sense to execute them concurrently. The code for this is shown in Listing 7.

auto tr_cartoonify(const cv::Mat& src, 
    int blur_size, int num_colors, 
    int block_size, int diff) {
  auto sched = exec::get_system_scheduler();
  stdexec::sender auto snd =
    stdexec::when_all(
      stdexec::transfer_just(sched, src)
        | error_to_exception()
        | stdexec::then([=](const cv::Mat& src) {
            auto blurred = tr_blur(src, 
              blur_size);
            auto gray = tr_to_grayscale(blurred);
            return tr_adaptthresh(gray, 
              block_size, diff);
          }),
      stdexec::transfer_just(sched, src)
        | error_to_exception()
        | stdexec::then([=](const cv::Mat& src) {
            return tr_reducecolors(src, 
              num_colors);
          })
    )
    | stdexec::then([](const cv::Mat& edges, 
          const cv::Mat& reduced_colors) {
        return tr_apply_mask(reduced_colors,
          edges);
      });
  return snd;
}
Listing 7

To enable concurrency, we again rely on the system scheduler. The two concurrent chains of computation are represented by the two parameters passed to when_all(). Each computation begins with a call to transfer_just(), which transfers execution to a thread managed by the system scheduler while passing the source image as an argument. As before, the issue of the std::error_code error channel arises, and this time we address it by chaining the error_to_exception() sender adaptor. The primary work for each computation chain is encapsulated in lambdas passed to the then() algorithm, clearly showing the steps needed to produce the two intermediate images.

The when_all() algorithm combines the two computations, creating a sender that completes only when both branches have finished. Upon completion, it triggers a value completion, passing the two resulting images. On top of when_all(), we use the then() algorithm again to combine the two images into a single output image. The result is a sender that completes with the final image as a value. Additionally, it can signal completion with an exception-encoded error or a stopped signal.

The tr_cartoonify() function simply returns this resulting sender. The sender’s type is complex and not easily nameable, as it encapsulates type information from all the senders and lambdas involved in the function.

Although this image processing function introduces limited concurrency (less than a 2× improvement), it still provides a notable performance boost compared to the serial version.

Consolidating error completion signals

Let’s now focus on the error_to_exception() function, shown in Listing 8. This function achieves essentially the same goal as the upon_error() approach from the previous section, but in a slightly more general manner. The limitations of upon_error() make it less practical for some scenarios. Specifically, upon_error() cannot handle multiple error completion signals from the previous sender, and it must return the correct value type to integrate seamlessly into the pipeline.

auto error_to_exception() {
  return stdexec::let_error([](auto e) {
    if constexpr (std::same_as<decltype((e)),
                  std::exception_ptr>)
      return stdexec::just_error(e);
    else
      return stdexec::just_error
        (std::make_exception_ptr
        (std::runtime_error("other error")));
  });
}
Listing 8

Our approach in this case converts any error type into an exception. Each time an error is sent by the previous sender, the lambda passed to let_error() is invoked. If the previous sender supports both the set_error(std::exception_ptr) and the set_error(std::error_code) completion signatures, the lambda must handle both an std::exception_ptr and an std::error_code as arguments. To accommodate this, we use a generic auto parameter for the lambda.

In the body of the lambda, we differentiate between two cases: if the argument is an exception pointer, we simply forward it; otherwise, we create a new exception and forward that.

In both cases, the lambda returns a sender that produces an error. It is crucial that the return types of the two cases are the same; otherwise, the code would result in a compilation error.

While this process may seem cumbersome to users unfamiliar with such completion signal manipulations, it is likely that users will adapt quickly to these patterns with practice.

The main transformation

Listing 9 shows the main body of the process_files() function, which represents the core process of the program. Setting aside the fact that this is a coroutine, as well as the initialization of the two schedulers and the async_scope object at the start of the function, the body itself is relatively straightforward. It iterates over all the JPEG images in the source folder and processes each one. The processing is divided into two parts: reading the file’s content and processing the image.

exec::task<int> process_files(const char* 
    in_folder_name, const char* out_folder_name, 
    int blur_size, int num_colors, 
    int block_size, int diff) {
  exec::async_scope scope;
  exec::static_thread_pool io_pool(1);
  auto io_sched = io_pool.get_scheduler();
  auto cpu_sched = exec::get_system_scheduler();
  int processed = 0;
  for (const auto& entry 
      : fs::directory_iterator(in_folder_name)) {
    auto extension = entry.path().extension();
    if (!entry.is_regular_file() || (extension 
        != ".jpg") && (extension != ".jpeg"))
      continue;
    auto in_filename = entry.path().string();
    auto out_filename = 
      (fs::path(out_folder_name) / 
       entry.path().filename()).string();
    printf(“Processing %s\n”, 
      in_filename.c_str());
    auto file_content =
        co_await (stdexec::schedule(io_sched) 
        | stdexec::then([=] 
        { return read_file(entry); }));
    stdexec::sender auto work = ...
    scope.spawn(std::move(work));
  }
  co_await scope.on_empty();
  co_return processed;
}
Listing 9

The file-reading step simply involves a call to the read_file() function, executed within the context of the io_sched scheduler object. The reason for using this scheduler will be explained in the next section. This step also involves a co_await operation, which will be discussed later.

The main transformation is shown in Listing 10. Here, the content of the input file is transferred to the cpu_sched scheduler (which is the system scheduler), where most of the processing takes place. As in previous examples, we consolidate the error channel by including error_to_exception() in the pipeline. Once this is done, the image is decoded on a CPU thread using cv::imdecode().

stdexec::sender auto work =
  stdexec::transfer_just(cpu_sched, 
    cv::_InputArray::rawIn(file_content))
  | error_to_exception()
  | stdexec::then([=](cv::InputArray 
      file_content) -> cv::Mat {
        return cv::imdecode(file_content,
        cv::IMREAD_COLOR);
    })
  | stdexec::let_value([=](const cv::Mat& img) {
      return tr_cartoonify(img, 
        blur_size, num_colors, block_size, diff);
    })
  | stdexec::then([=](const cv::Mat& img) {
      std::vector<unsigned char> 
        out_image_content;
      if (!cv::imencode(extension, img, 
          out_image_content)) {
        throw std::runtime_error
          ("cannot encode image");
      }
      return out_image_content;
    })
  | stdexec::continues_on(io_sched)
  | stdexec::then([=]
      (const std::vector<unsigned char>& bytes) {
      write_file(out_filename.c_str(), bytes);
    })
  | stdexec::then([=] { printf("Written %s\n",
      out_filename.c_str()); })
  | stdexec::then([&] { processed++; });
Listing 10

Once we retrieve the image, we apply the tr_cartoonify() transformation. However, instead of using the typical then() algorithm, we use let_value(). The then() algorithm is appropriate when the given functor returns a value, whereas let_value() is used when the functor returns a sender. Since tr_cartoonify() returns a sender, let_value() is required. The let_value() algorithm is highly versatile and serves as the monadic bind operation for senders.

After completing the transformation, we encode the image back into a stream of JPEG bytes using the cv::imencode() function. This operation is performed on a CPU thread, as it is typically CPU-intensive. Next, we write the resulting byte stream to disk. Since this is an I/O operation, we transition to the scheduler dedicated to I/O tasks. Once the file writing is complete, we print a message to standard output (still on the I/O thread) and increment the counter for successfully processed images.

Undersubscription and oversubscription

On some modern computers, I/O operations may be fast and predominantly consume CPU resources. However, let’s assume that this is not the case. Specifically, let’s assume that both reading and writing image files are slow operations that do not heavily utilize CPU cycles. For the sake of discussion, we will assume that I/O accounts for 25% of the program’s total runtime3.

If we were to add concurrency to the program without considering this, the CPU cores would spend significant time processing images only to go idle for approximately 25% of the time, waiting on I/O operations. This inefficiency could worsen if I/O operations on one thread interfere with I/O on another thread, leading to greater performance degradation as the level of concurrency increases.

A common solution to this problem is to create a pipeline where all I/O operations are handled on a single thread, while CPU-intensive operations are distributed across a thread pool sized to match the number of physical cores on the machine. To implement this, we use a scheduler obtained from a static_thread_pool (note that this is not proposed for standardization) dedicated to I/O tasks. This scheduler is distinct from the system scheduler, which is designed to match the available hardware resources.

If the target hardware has N physical cores, one might wonder why not use a thread pool with N + 1 threads. The reason lies in the risk of oversubscription: running more CPU-intensive tasks simultaneously on a system with less physical cores can lead to decreased performance due to excessive task switching.

A common misconception is that running two tasks, each requiring one second to complete, simultaneously on one core will somehow finish in one second. In reality, running them concurrently on the same core often takes longer than two seconds due to the overhead of context switching. Running such tasks sequentially is typically more efficient. I explored this concept in my ACCU 2023 talk [Teodorescu23]. To illustrate, imagine trying to read two books at the same time or a physician performing complex surgery while attending a hospital board meeting over the phone. Running two tasks on the same physical core involves frequent context switches, which are inherently expensive.

For optimal performance, the goal is to achieve near 100% CPU utilization across all cores for the entire program duration. If CPU utilization falls below 100%, we encounter undersubscription, where some cores remain idle despite work being available. Conversely, if workload exceeds 100% CPU utilization, excessive task switching occurs, and the processor spends valuable time managing context switches instead of executing critical tasks.

To address this, it is common practice to offload all I/O operations from CPU-intensive work and execute them on a dedicated execution engine.

Coroutines and senders

This example highlights another intriguing aspect of the senders/receivers framework: its interaction with coroutines. With minimal annotations to a coroutine type, coroutines can effectively behave as senders. This allows us to co_await a sender or use a coroutine object in place of a sender.

The stdexec library provides such a coroutine type, exec::task, which we use in our example for the process_files() coroutine. Within the coroutine, we co_await the result of reading the input file on the I/O execution context and also co_await the completion of all activities using scope.on_empty(). On the other end, in the main() function, we pass the coroutine object to the sync_wait() algorithm, demonstrating that coroutines can seamlessly integrate where senders are used.

In this case, process_files() begins execution on the main thread. After the first co_await, execution continues on the I/O thread. At the end of the coroutine, execution remains on the I/O thread. The final sync_wait() then switches the main execution path back to the main thread.

While writing this, I realized there is a bug in the code. I decided to leave the bug as is and explain it, as this may be more helpful for the reader. The issue is that we are destroying the io_pool object when exiting the scope of the coroutine, but execution may still be ongoing on one of its threads. Ideally, we should switch back to the main thread before destroying this pool. Alternatively, we could transfer control to one of the CPU threads, as the system scheduler guarantees the validity of its threads throughout the application’s lifetime, including before and after main().

Returning to the topic of coroutines, there is nothing that coroutines can achieve that cannot also be done with senders, and the reverse is true as well. However, using senders is generally more efficient. Despite this, I find coroutines useful in two specific scenarios:

  • Non-linear control flow: When logic involves loops or branches, expressing these flows using senders can be challenging due to the lack of standardized algorithms for such patterns. Even if such algorithms were standardized, expressing everything through expression composition would likely be more cumbersome than using traditional control structures.
  • Type erasure: Currently, there is no type-erased sender proposed for standardization. This means that every sender’s internal structure must be fully visible at the point of use. In contrast, coroutines naturally hide implementation details, making them a good choice for situations requiring type erasure.

At the time of writing, the task type used in this example has not been proposed for standardization. However, there is broad consensus that it is worth standardizing.

Takeaways

Following the article in the last Overload [Teodorescu24], which introduced the senders/receivers framework accepted into the working draft of the C++26 standard, this article explores several examples. The goal is to familiarize readers with writing programs using senders/receivers. Each of the three examples presented here aims to improve performance by employing multi-threading.

The examples demonstrate that adding multi-threading to applications does not have to be a daunting task. By thinking in terms of execution graphs, concurrent solutions can be expressed clearly and intuitively, avoiding the need for manual synchronization primitives, which are notoriously error-prone4.

While there are some challenges users may encounter when working with senders/receivers, they are relatively minor compared to the complexities of multi-threading with raw threads and locks. One important consideration is managing the lifetime of objects in relation to the threads accessing them. This article highlights a bug encountered by the author during implementation to emphasize this point. In contrast, manual multi-threading is typically far more difficult, as it requires reasoning about a larger number of objects, with much of the reasoning being non-local.

Another challenge users might face is handling the completion signals of senders. Certain transformations may create unexpected completion signals, forcing the user to address them. Improperly connected senders can result in long, cryptic compilation errors. In our case, we had to consolidate two types of error completions into a single type to resolve these issues.

The examples presented here highlight several key strengths of the senders/receivers framework:

  • Structuredness. Senders/receivers impose a clear structure on an application’s concurrency. In well-structured code, concurrency is nested in such a way that concurrency concerns can be abstracted away by the enclosing construct (e.g., a function or coroutine). The framework also supports weakly-structured concurrency, where scopes do not fully nest but can be organized using dynamic scopes to encompass all computations. Both approaches are far superior to the unstructured methods of managing concurrency with raw threads and locks.
  • Local reasoning. Most concurrency reasoning can be confined to a local scope. For fully structured code, all reasoning remains local. In weakly-structured code, while concurrency concerns may extend beyond the current function, they are still constrained to a defined dynamic scope.
  • Safety. The reader might have noticed that the discussion about safety was minimal. This is because, when object lifetimes are properly managed, the framework inherently avoids safety issues. It eliminates concerns like data races and deadlocks, which are common in unstructured multithreading.
  • Performance. The senders/receivers framework can achieve zero abstraction cost. There are no unnecessary memory allocations, and no extra synchronization overhead is introduced. This makes it possible to build highly performant multi-threaded applications.

Together, these strengths make senders/receivers an excellent framework for writing multi-threaded code. While the syntax might feel less intuitive and diagnostics may sometimes be trickier, the framework offers a powerful and reliable way to build robust and efficient multi-threaded software.

The real question is how well this framework works for you, the reader. Is it as straightforward as the article suggests, or do you encounter challenges when applying it to your problems? I would love to hear your feedback and learn about your experiences using this approach.

References

[ExamplesCode] Lucian Radu Teodorescu, overload185_sr_examples, https://github.com/lucteo/overload185_sr_examples.

[OpenCV] OpenCV, OpenCV – Open Computer Vision Library, https://opencv.org/.

[P2079R5] Lucian Radu Teodorescu, Ruslan Arutyunyan, Lee Howes, Michael Voss, P2079R5: System execution context, 2024, https://wg21.link/P2079R5.

[P2300R10] Michał Dominiak, Georgy Evtushenko, Lewis Baker, Lucian Radu Teodorescu, Lee Howes, Kirk Shoop, Michael Garland, Eric Niebler, Bryce Adelstein Lelbach, P2300R10: std::execution, 2024, https://wg21.link/P2300R10.

[P3149R6] Ian Petersen, Jessica Wong, Ján Ondrušek, Kirk Shoop, Lee Howes, Lucian Radu Teodorescu, async_scope – Creating scopes for non-sequential concurrency, https://wg21.link/P3149R6.

[stdexec] NVIDIA, Senders – A Standard Model for Asynchronous Execution in C++, https://github.com/NVIDIA/stdexec.

[Teodorescu24] Lucian Radu Teodorescu, Senders/receivers: An Introduction, Overload 184, December 2024, https://accu.org/journals/overload/32/184/teodorescu/.

[Teodorescu23] Lucian Radu Teodorescu, ‘Concurrency Approaches: Past, Present, and Future’, ACCU Conference, 2023, https://www.youtube.com/watch?v=uSG240pJGPM.

[WG21Exec] WG21, ‘Execution control library’ in Working Draft Programming Languages – C++ https://eel.is/c++draft/#exec.

[Wikipedia] Wikipedia, Resource contention, https://en.wikipedia.org/wiki/Resource_contention.

Footnotes

  1. This may not be the most optimal version of sorting; the serial method presented here is a simplification of the concurrent version.
  2. The name proposed for standardization is counting_scope; however, we use async_scope here as this is the name currently used by the stdexec library. See [P3149R6].
  3. These assumptions are made to illustrate the thread-switching technique described. In practice, this approach may not always be worthwhile. Readers should measure performance before making similar assumptions.
  4. The lack of need for manual synchronization is discussed in [Teodorescu24]. The main idea is that we prefer structuring concurrency and explicitly encoding the dependencies between work items.

Lucian Radu Teodorescu has a PhD in programming languages and is a Staff Engineer at Garmin. He likes challenges; and understanding the essence of things (if there is one) constitutes the biggest challenge of all.






Your Privacy

By clicking "Accept Non-Essential Cookies" you agree ACCU can store non-essential cookies on your device and disclose information in accordance with our Privacy Policy and Cookie Policy.

Current Setting: Non-Essential Cookies REJECTED


By clicking "Include Third Party Content" you agree ACCU can forward your IP address to third-party sites (such as YouTube) to enhance the information presented on this site, and that third-party sites may store cookies on your device.

Current Setting: Third Party Content EXCLUDED



Settings can be changed at any time from the Cookie Policy page.