Writing Senders

Writing Senders

By Lucian Radu Teodorescu

Overload, 33(186):4-9, April 2025


Senders/receivers can be used to introduce concurrency. Lucian Radu Teodorescu describes how to implement senders.

In the December issue of Overload [Teodorescu24], we provided a gentle introduction to senders/receivers, arguing that it is easy to write programs with senders/receivers. Then, in the February issue [Teodorescu25a], we had an article that walked the reader through some examples showing how senders/receivers can be used to introduce concurrency in an application. Both of these articles focused on the end users of senders/receivers. This article focuses on the implementer’s side: what does it take to implement senders?

After a section explaining some details about the execution model of senders/receivers, we have three examples in which we build three different senders, in increasing order of complexity. The examples are purposely kept as simple as possible. We didn’t bother much about using std::move when we should, we didn’t consider noexcept functions in depth, we reduced the amount of metaprogramming we needed to do, we didn’t showcase the extra complications needed to implement the pipeable notation, and we didn’t delve into advanced topics like environments and cancellation. This is meant to be an introductory article for library implementers who are writing senders.

All the code is available on GitHub [Teodorescu25b]. While last time we used the stdexec library [stdexec], this time we are going to use the execution library that is part of the Beman project [Beman]. This shows that there are multiple valid implementations of the senders/receivers framework, and there is a relatively large implementation experience.

Receivers and operation states

If people are just using frameworks based on std::execution, they mainly need to care about senders and schedulers. These are user-facing concepts. However, if people want to implement sender-ready abstractions, they also need to consider receivers and operation states – these are implementer-side concepts. As this article mainly focuses on the implementation of sender abstractions, we need to discuss these two concepts in more detail.

A receiver is defined in P2300 as “a callback that supports more than one channel” [P2300R10]. The proposal defines a concept for a receiver, unsurprisingly called receiver. To model this concept, a type needs to meet the following conditions:

  • It must be movable and copyable.
  • It must have an inner type alias named receiver_concept that is equal to receiver_t (or a derived type).
  • std::execution::get_env() must be callable on an object of this type (to retrieve the environment of the receiver).

A receiver is the object that receives the sender’s completion signal, i.e., one of set_value(), set_error(), or set_stopped(). As explained in the December 2024 issue [Teodorescu24], a sender may have different value completion types and different error completion types. For example, the same sender might sometimes complete with set_value(int, int), sometimes with set_value(double), sometimes with set_error(std::exception_ptr), sometimes with set_error(std::error_code), and sometimes with set_stopped(). This implies that a receiver must also be able to accept multiple types of completion signals.

The need for completion signatures is not directly visible in the receiver concept. There is another concept that the P2300 proposal defines, which includes the completion signatures for a receiver: receiver_of<Completions>. A type models this concept if it also models the receiver concept and provides functions to handle the completions indicated by Completions. More details on how these completions look will be covered in the example sections.

We say that a sender can be connected to a receiver if the receiver accepts at least the completion signals advertised by the sender. Formally, we can connect a sender s to a receiver r if std::execution::connect(s, r) is well-formed and returns an object of a type that fulfils the requirements of an operation_state concept. For a type to match this concept, the following requirements must be met:

  • It must have an inner type alias of type operation_state_t (or a type derived from it) that is named operation_state_concept.
  • std::execution::start() must be callable on a reference of this type.

If a sender describes an asynchronous task, an operation state object encapsulates the actual work, including the receiver’s role in the entire process. Executing start() for an operation state triggers the asynchronous operation. The lifetime of the asynchronous operation corresponds to the duration of the start() execution.

There are a few conditions that must apply to an operation state, in addition to the requirements encoded in the corresponding concept:

  • The object must not be destroyed during the lifetime of the asynchronous operation.
  • The object must not be copied or moved after it has been created by connecting a sender.

These requirements guarantee that implementations can safely use pointers to operation states during the asynchronous operation’s lifetime, as the objects remain valid.

We’ve just provided technical details on what it means to be a receiver and an operation state, but we have not yet given such details on what it means to be a sender. Previous articles didn’t cover these details either, as they are not important for end users. A sender is a type that models (at least) the sender concept. A type models this concept if:

  • It is movable and copyable.
  • Either it has an inner type alias named sender_concept of type sender_t (or derived), or it is an awaitable (in a special way, compatible with senders).
  • std::execution::get_env() can be called on an object of this type (to retrieve the environment parameters of the sender).

In addition to the sender concept, the proposal also defines the sender_in concept (to check whether a sender can create an asynchronous operation in a given environment) and sender_to (to check whether the sender can be connected with a given receiver type).

The relationship between senders, receivers, and operation states is depicted in Figure 1.

Figure 1

A just example

Let’s try to implement a very basic sender. Probably the simplest sender in P2300 is the one created by just(). We will attempt to create a simplified version of this. More specifically, our sender will always complete with an int value.

Listing 1 shows the main implementation of our sender.

#include <beman/execution/execution.hpp>

namespace ex = beman::execution;

struct just_int_sender {
  // The data of the sender.
  int value_to_send_;

  // This is a sender type.
  using sender_concept = ex::sender_t;

  // This sender always completes with an 'int'
  // value.
  using completion_signatures = 
    ex::completion_signatures
    <ex::set_value_t(int)>;

  // No environment to provide.
  ex::empty_env get_env() const noexcept {
    return {}; 
  }

  // Connect to the given receiver, and produce
  // an operation state.
  template <ex::receiver Receiver>
  auto connect(Receiver receiver) noexcept {
    return detail::just_int_op{value_to_send_,
      receiver};
  }
};

auto just_int(int x) {
  return just_int_sender{x}; }
Listing 1

First, in all our examples, we include the execution.hpp header from the Beman libraries. This allows us to utilise and extend the std::execution framework. Additionally, we use ex as a shorthand for the beman::execution namespace, which is the implementation of what C++26 will provide in the std::execution namespace.

Similar to the algorithms in P2300, we provide an algorithm that can create our sender; the implementation is straightforward. The important part to focus on is the definition of just_int_sender, the actual sender type. Probably the most important aspect of this type, which is not directly visible, is that it models the ex::sender concept.

We define an inner type sender_concept that aliases ex::sender_t to explicitly indicate to the senders/receivers framework that this is intended to be a sender type. This is simply how the framework is designed to work.

Secondly, we define a completion_signatures inner type that specifies how the sender is expected to complete. In our case, we indicate that the sender can only complete with a set_value(int) signal. We will see more complex completion signature definitions later, but the reader can observe that the framework makes it straightforward to declare a sender’s completion signatures.

A sender needs to have an environment, but in our case, there is no environment to provide. So we simply provide an empty environment, which is literally defined as:

  struct empty_env {};

The implementation of the operation state type is given in Listing 2.

namespace detail {
template <ex::receiver Receiver>
struct just_int_op {
  int value_to_send_;
  Receiver receiver_;

  // This is an operation-state type.
  using operation_state_concept = 
    ex::operation_state_t;

  // The actual work of the operation state.
  void start() noexcept {
    // No actual work, just send the value 
    // to the receiver.
    ex::set_value(std::move(receiver_),
      value_to_send_);
  }
};
}
Listing 2

Following the same pattern used for senders, an operation state must declare an inner type named operation_state_concept, which must be ex::operation_state_t (or a type derived from it). Besides this, the only other required operation for an operation state is start(). This is called to actually execute the asynchronous operation described by the sender, while also sending the completion signal to the receiver.

The reader should note that an operation state needs to store the receiver object connected to the sender so that it knows which object should receive the completion signal. The actual completion signal invocation is expressed as:

  ex::set_value(std::move(receiver_), 
  value_to_send_);

The set_value call is marked as noexcept, meaning the user does not need to check for exceptions (or potentially invoke set_error); this simplifies the entire process.

The reader should take a few moments to observe the interaction between the sender, the receiver, and the resulting operation state, and how the entire flow works. There are no advanced concepts or concurrency concerns in this example. Pretty easy, right?

With this sender, the user might write something like

  ex::sender auto work = just_int(13) 
  | ex::then([] (int x) { printf(
  "Hello, world! Here is a received value: %d\n",
  x); });

to declare a sender that, when executed, will print a message containing value 13.

And then, another example

Now, let’s discuss a slightly more complex example. This time, we want to implement a then sender – a simpler version of the sender with the same name in the P2300 proposal [P2300R10].

The main code for this sender is presented in Listing 3.

template <ex::sender Previous,
  std::invocable<int> Fun>
struct then_sender {
  Previous previous_;
  Fun f_;

  using sender_concept = ex::sender_t;
  ex::empty_env get_env() const noexcept {
    return {}; }

  using completion_signatures = 
    ex::completion_signatures<
      ex::set_value_t(int),
      ex::set_error_t(std::exception_ptr),
      ex::set_stopped_t()>;

  template <ex::receiver Receiver>
  auto connect(Receiver receiver) noexcept {
    return ex::connect(previous_,
      detail::then_receiver{f_, receiver});
  }
};

template <ex::sender Previous,
  std::invocable<int> Fun>
then_sender<Previous, Fun> then(Previous prev,
    Fun f) {
  return {prev, f};
}
Listing 3

Here, there are three key differences compared to the previous example: using a preceding sender, defining slightly more complex completion signatures, and implementing connect in a different way.

then is a sender adapter. That is, it takes a previous sender and chains some extra work on top of it to create a new sender. More specifically, in this case, it executes the given invocable after the previous sender completes. The invocable receives the value produced by the previous sender. In our case, the previous sender must complete with an int value if it completes successfully (as we will see shortly).

In P2300, all sender adapters have two equivalent forms: one that takes the previous sender as an argument and one that is pipeable. For example, ex::then(ex::just(), f) and ex::just() | ex::then(f) both generate the same sender value. To achieve the pipeable form, we need a different overload for the then function – one that returns an expression template that can be combined via operator | with another sender. For simplicity, we leave out the implementation of the pipeable sender. Implementing such a form can be a good exercise for the reader.

Unlike our previous example, we now advertise multiple completion signatures: a successful completion with an int value, an error completion with an exception, and a stopped completion. The completion_signatures helper from the senders/receivers framework easily accommodates specifying multiple completion signatures.

In this example, we simply assume that the value completion signature needs to be int. However, more generic implementations might deduce this from the result of the given invocable. Furthermore, such generic implementations would likely ensure that the value completion signals of the previous sender match the type of the invocable and would also conditionally add support for error and stopped completions. This easily turns into a metaprogramming exercise. For simplicity, we leave these details out.

Finally, we need to explain the implementation of the connect method. In this example, we take a different approach to implementing it and returning an appropriate operation state. Instead of defining the actual resulting operation state, we define the receiver that needs to be connected to the previous sender and place our logic inside that receiver. This intermediate receiver establishes the connection between the previous sender and the receiver connected to our sender.

The code for this receiver is shown in Listing 4. The implementation is relatively straightforward. First, we declare that this is a receiver type by using the receiver_concept inner type; this follows the same pattern as for senders and operation states. Then, in addition to storing the necessary data, we implement the methods that will receive the completion signals from the previous sender – in our case: set_value(), set_error(), and set_stopped(). All these methods must be marked as noexcept.

namespace detail {
template <ex::receiver Receiver, typename Fun>
struct then_receiver {
  Fun f_;
  Receiver receiver_;

  using receiver_concept = ex::receiver_t;

  void set_value(int value) noexcept {
    try {
      ex::set_value(std::move(receiver_),
        f_(value));
    } catch (...) {
      ex::set_error(std::move(receiver_),
        std::current_exception());
    }
  }

  void set_error(std::exception_ptr e) noexcept {
    ex::set_error(std::move(receiver_), e); }

  void set_stopped() noexcept { 
    ex::set_stopped(std::move(receiver_)); }
};
}
Listing 4

When connecting the previous sender to this receiver, the framework will check that all the advertised completion signals of the sender have a corresponding method in the receiver and that the types match. This means that if the previous sender successfully completes with a value that is not of type int, it cannot be connected to our sender.

The set_error() and set_stopped() cases are straightforward: we simply forward the signal to the next receiver. The main logic is handled in the set_value() method. Here, after receiving the value from the previous sender, we call the given invocable and pass the result to the next receiver. Since the call to f_ can throw, we need to catch any exceptions and pass them as errors to the next receiver– this is a common pattern in implementing senders.

And that’s it. As this example demonstrates, implementing sender adaptors is not particularly complicated. Most of the complexity arises from making the implementation generic: detecting the completion signals of the previous sender, ensuring they match the signature of the invocable, generating appropriate completion signatures, handling r-value objects, and so on.

A serializer example

In this section, we have shown an example of implementing a sender that addresses some concurrency concerns. We aim to implement a serializer context and a corresponding sender that ensures only one work item can be executed at a given time within the context. The context is similar to std::mutex, and the sender is similar to std::lock_guard, which can be obtained from the mutex.

Having such a facility would allow users to migrate more easily to senders/receivers without significantly altering their business logic. In terms of resource usage, the serializer will typically be more efficient than a mutex: it won’t block any threads and will always ensure the optimal execution of work items. This is not a new idea (see, for example, [Intel] and [Kohlhoff23]), and my hope is that it will be standardised in the next release cycle (C++29).

Let’s first look at a usage example. The code in Listing 5, without such a serializer, will likely execute work1, work2, and work3 concurrently (work1, work2, and work3 are senders). Listing 6 shows how the code can be modified using our serializer to ensure that work1, work2, and work3 cannot be executed concurrently; at most, one of them will be executed at a given time.

ex::sender auto branch1 = ex::schedule(sched) 
  | let_value([]{ return work1; });
ex::sender auto branch2 = ex::schedule(sched) 
  | let_value([]{ return work2; });
ex::sender auto branch3 = ex::schedule(sched) 
  | let_value([]{ return work3; });
ex::sync_wait(ex::when_all(branch1, branch2,
   branch3));
Listing 5
serializer_context ctx;
ex::sender auto branch1 = 
  on_serializer(ex::schedule(sched), ctx, work1);
ex::sender auto branch2 = 
  on_serializer(ex::schedule(sched), ctx, work2);
ex::sender auto branch3 = 
  on_serializer(ex::schedule(sched), ctx, work3);
ex::sync_wait(ex::when_all(branch1, branch2,
  branch3));
Listing 6

Listing 7 shows a simple implementation of the serializer_context class. It has two public methods: one that is called when new work needs to be executed within the context, and one that notifies the context when the work is done.

struct serializer_context {
  using continuation_t = std::function<void()>;

  // Called when new work needs to be enqueued
  void enqueue(continuation_t cont) {
    {
      std::lock_guard<std::mutex> 
        lock{bottleneck_};
      if (busy_) {
        // If we are busy, we need to enqueue 
        // the continuation
        to_run_.push_back(std::move(cont));
        return;
      }
      // We are free; mark ourselves as busy, 
      // and execute continuation inplace
      busy_ = true;
    }
    cont();
  }

  // Called when the work completes
  void on_done() {
    continuation_t cont;
    {
      std::lock_guard<std::mutex> 
        lock{bottleneck_};
      assert(busy_);
      if (to_run_.empty()) {
        // Nothing to run next, we are done
        busy_ = false;
        return;
      }
      // We have more work to do; extract the
      // first continuation
      cont = std::move(to_run_.front());
      to_run_.erase(to_run_.begin());
    }
    if (cont) {
      cont();
    }
  }

private:
  bool busy_{false};
  std::vector<continuation_t> to_run_;
  std::mutex bottleneck_;
};
Listing 7

When on_serializer wants to start some work, it may be that the context is already in the process of executing something. This means that the work needs to be delayed. Thus, we need to store the work for later. The way we do this is by encapsulating the work inside a std::function<void()> object.

When enqueuing work, we have two scenarios: one where there is no ongoing work, and one where work is already being executed on the serializer. In the first case, we execute the work immediately, and in the second, we store the work for later execution in a vector. Of course, we need to synchronise access to the vector and to the flag that indicates whether execution is currently in progress.

When a chunk of work has finished executing on the serializer, we again have two cases: one where there is no pending work on the serializer, and one where there is pending work. In the first case, we should simply exit, marking the serializer as not busy. In the second case, we need to start executing the next work item.

The implication of this strategy is that one scheduler may queue up a significant amount of work to be executed, which is not always desirable. To address this issue, one might add a scheduler to the context and always execute pending work on this scheduler. We leave this as an exercise for the reader.

Moving on, the code that implements the actual sender is presented in Listing 8, and its intermediate receiver in Listing 9. The sender code doesn’t contain anything particularly noteworthy. It simply follows the same pattern we’ve seen before: sender_concept, completion_signatures, get_env(), and connect.

template <ex::sender Previous, ex::sender Work>
struct on_serializer_sender {
  Previous previous_;
  serializer_context& context_;
  Work work_;
  using sender_concept = ex::sender_t;
  using completion_signatures = 
    ex::completion_signatures<
      ex::set_value_t(),
      ex::set_error_t(std::exception_ptr),
      ex::set_stopped_t()>;
  ex::empty_env get_env() const noexcept { 
    return {}; }
  template <ex::receiver Receiver>
  auto connect(Receiver receiver) noexcept {
    return ex::connect(previous_, 
      detail::on_serializer_receiver{context_, 
      work_, receiver});
  }
};

template <ex::sender Previous, ex::sender Work>
on_serializer_sender<Previous, 
    Work> on_serializer(Previous prev, 
    serializer_context& ctx, Work work) {
  return {prev, ctx, work};
}
Listing 8
namespace detail {
template <ex::receiver Receiver, ex::sender Work>
struct on_serializer_receiver {
  serializer_context& context_;
  Work work_;
  Receiver receiver_;
  using receiver_concept = ex::receiver_t;
  void set_value() noexcept {
    context_.enqueue([this] {
      ex::sender auto work_and_done =
        work_ | ex::then([this] { 
          context_.on_done(); });
      auto op = ex::connect(work_and_done,
        std::move(receiver_));
      ex::start(op);
    });
  }

  void set_error(std::exception_ptr e) noexcept {
    ex::set_error(std::move(receiver_), e); }

  void set_stopped() noexcept { 
    ex::set_stopped(std::move(receiver_)); }
};
}
Listing 9

As in the previous example, the key logic happens in the set_value() method of the intermediate receiver. Here, we enqueue a lambda into the serializer context that must perform three actions: execute the given work, notify the context when the work is done, and signal the final receiver with a completion signal.

For the first two actions, we create a sender that will execute them; we name this sender work_and_done. This is a straightforward composition of the original work and an ex::then with a lambda that calls on_done on the context. Then, to cover the third action, we connect this sender to the final receiver object, storing the resulting operation state in the variable op. We then call start on this operation state to actually execute everything. To summarise, this will first execute the work_and_done sender, which will first execute the given work, then call on_done on the context, and finally trigger a completion signal to the final receiver.

The start() call will last as long as this operation needs to execute. This means that the lifetime of op is slightly longer than this operation. This is a common pattern when implementing code that manually runs operation states. It is also well aligned with the requirement we have for operation states, which forbids copying and moving operation state objects while the asynchronous operation is running (see above).

Depending on the workload of the context, this lambda might be executed immediately or deferred to a later execution. In both cases, we uphold the guarantees of the serializer and the expectations of senders.

Conclusions

In this article, we presented three examples to demonstrate that writing senders is not very complicated and can be done relatively easily by regular C++ engineers. There are a few concepts that need to be understood (we covered the basics in the first part of the article), but once one is familiar with those concepts, writing new senders is not difficult. Of course, senders that deal with complex concurrency concerns are harder to write, as they require extra care on the concurrency side, but this is inherent to the problem being solved.

Through these examples, we also reinforced the idea presented in the previous two articles ([Teodorescu24], [Teodorescu25a]) that the senders/receivers framework has great composability features.

If the first two articles in this series conveyed the message that senders are easy to use but probably harder to implement, this article aims to show that senders are also easy to implement. However, there is a caveat: while it is easy to implement regular senders, implementing fully generic senders can be more challenging. That said, most users will not need to implement fully generic senders. Thus, for most programmers, writing senders should be a straightforward task.

Senders/receivers are not complicated. People just need to spend some time getting acquainted with how they work. They are often compared to the introduction of iterators for generic programming: they may not be the first tool a novice programmer reaches for, but after some practice, their value is inestimable. I truly believe that the same description applies to senders/receivers.

References

[Beman] Dietmar Kühl and other Beman project contributors, execution, available at: https://github.com/bemanproject/execution

[Intel] Intel, ‘Local Serializer’ in Intel® oneAPI Threading Building Blocks Developer Guide and API Reference, available at: https://www.intel.com/content/www/us/en/docs/onetbb/developer-guide-api-reference/2021-12/local-serializer.html

[Kohlhoff23] Christopher M. Kohlhoff, ‘Strands: Use Threads Without Explicit Locking’, boost C++ Libraries, available at: https://www.boost.org/doc/libs/1_87_0/doc/html/boost_asio/overview/core/strands.html

[P2300R10] Michał Dominiak, Georgy Evtushenko, Lewis Baker, Lucian Radu Teodorescu, Lee Howes, Kirk Shoop, Michael Garland, Eric Niebler, Bryce Adelstein Lelbach, P2300R10: std::execution, 2024, available at: https://wg21.link/P2300R10

[stdexec] NVIDIA, ‘Senders - A Standard Model for Asynchronous Execution in C++’, available at: https://github.com/NVIDIA/stdexec

[Teodorescu24] Lucian Radu Teodorescu, ‘Senders/receivers: An Introduction’, Overload 184, December 2024, available at: https://accu.org/journals/overload/32/184/teodorescu/

[Teodorescu25a] Lucian Radu Teodorescu, ‘Using Senders/Receivers’, Overload 185, February 2025, available at: https://accu.org/journals/overload/33/185/teodorescu/

[Teodorescu25b] Lucian Radu Teodorescu, overload186_sr_examples (code for the article) available at: https://github.com/lucteo/overload186_sr_examples

Lucian Radu Teodorescu has a PhD in programming languages and is a Staff Engineer at Garmin. He likes challenges; and understanding the essence of things (if there is one) constitutes the biggest challenge of all.






Your Privacy

By clicking "Accept Non-Essential Cookies" you agree ACCU can store non-essential cookies on your device and disclose information in accordance with our Privacy Policy and Cookie Policy.

Current Setting: Non-Essential Cookies REJECTED


By clicking "Include Third Party Content" you agree ACCU can forward your IP address to third-party sites (such as YouTube) to enhance the information presented on this site, and that third-party sites may store cookies on your device.

Current Setting: Third Party Content EXCLUDED



Settings can be changed at any time from the Cookie Policy page.