Expand description

A multi-producer, multi-consumer broadcast queue. Each sent value is seen by all consumers.

A Sender is used to broadcast values to all connected Receiver values. Sender handles are clone-able, allowing concurrent send and receive actions. Sender and Receiver are both Send and Sync as long as T is Send.

When a value is sent, all Receiver handles are notified and will receive the value. The value is stored once inside the channel and cloned on demand for each receiver. Once all receivers have received a clone of the value, the value is released from the channel.

A channel is created by calling channel, specifying the maximum number of messages the channel can retain at any given time.

New Receiver handles are created by calling Sender::subscribe. The returned Receiver will receive values sent after the call to subscribe.

This channel is also suitable for the single-producer multi-consumer use-case, where a single sender broadcasts values to many receivers.

§Lagging

As sent messages must be retained until all Receiver handles receive a clone, broadcast channels are susceptible to the “slow receiver” problem. In this case, all but one receiver are able to receive values at the rate they are sent. Because one receiver is stalled, the channel starts to fill up.

This broadcast channel implementation handles this case by setting a hard upper bound on the number of values the channel may retain at any given time. This upper bound is passed to the channel function as an argument.

If a value is sent when the channel is at capacity, the oldest value currently held by the channel is released. This frees up space for the new value. Any receiver that has not yet seen the released value will return RecvError::Lagged the next time recv is called.

Once RecvError::Lagged is returned, the lagging receiver’s position is updated to the oldest value contained by the channel. The next call to recv will return this value.

This behavior enables a receiver to detect when it has lagged so far behind that data has been dropped. The caller may decide how to respond to this: either by aborting its task or by tolerating lost messages and resuming consumption of the channel.

§Closing

When all Sender handles have been dropped, no new values may be sent. At this point, the channel is “closed”. Once a receiver has received all values retained by the channel, the next call to recv will return with RecvError::Closed.

When a Receiver handle is dropped, any messages not read by the receiver will be marked as read. If this receiver was the only one not to have read that message, the message will be dropped at this point.

§Examples

Basic usage

use tokio::sync::broadcast;

#[tokio::main]
async fn main() {
    let (tx, mut rx1) = broadcast::channel(16);
    let mut rx2 = tx.subscribe();

    tokio::spawn(async move {
        assert_eq!(rx1.recv().await.unwrap(), 10);
        assert_eq!(rx1.recv().await.unwrap(), 20);
    });

    tokio::spawn(async move {
        assert_eq!(rx2.recv().await.unwrap(), 10);
        assert_eq!(rx2.recv().await.unwrap(), 20);
    });

    tx.send(10).unwrap();
    tx.send(20).unwrap();
}

Handling lag

use tokio::sync::broadcast;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = broadcast::channel(2);

    tx.send(10).unwrap();
    tx.send(20).unwrap();
    tx.send(30).unwrap();

    // The receiver lagged behind
    assert!(rx.recv().await.is_err());

    // At this point, we can abort or continue with lost messages

    assert_eq!(20, rx.recv().await.unwrap());
    assert_eq!(30, rx.recv().await.unwrap());
}

Modules§

  • Broadcast error types

Structs§

Functions§

  • Create a bounded, multi-producer, multi-consumer channel where each sent value is broadcasted to all active receivers.