Clean Shutdown
One of the problems of the current implementation is that it doesn't handle graceful shutdown. If we break from the accept loop for some reason, all in-flight tasks are just dropped on the floor.
We will intercept Ctrl-C
.
A more correct shutdown sequence would be:
- Stop accepting new clients
- Notify the readers we're not accepting new messages
- Deliver all pending messages
- Exit the process
A clean shutdown in a channel based architecture is easy, although it can appear a magic trick at first.
In Rust, receiver side of a channel is closed as soon as all senders are dropped.
That is, as soon as producers exit and drop their senders, the rest of the system shuts down naturally.
In tokio
this translates to two rules:
- Make sure that channels form an acyclic graph.
- Take care to wait, in the correct order, until intermediate layers of the system process pending messages.
In a-chat
, we already have an unidirectional flow of messages: reader -> broker -> writer
.
However, we never wait for broker and writers, which might cause some messages to get dropped.
We also need to notify all readers that we are going to stop accepting messages. Here, we use tokio::sync::Notify
.
Let's first add the notification feature to the readers.
We have to start using select!
here to work
async fn connection_loop(broker: Sender<Event>, stream: TcpStream, shutdown: Arc<Notify>) -> Result<()> {
// ...
loop {
tokio::select! {
Ok(Some(line)) = lines.next_line() => {
let (dest, msg) = match line.split_once(':') {
None => continue,
Some((dest, msg)) => (dest, msg.trim()),
};
let dest: Vec<String> = dest
.split(',')
.map(|name| name.trim().to_string())
.collect();
let msg: String = msg.trim().to_string();
broker
.send(Event::Message {
from: name.clone(),
to: dest,
msg,
})
.unwrap();
},
_ = shutdown.notified() => break,
}
}
}
Let's add Ctrl-C handling and waiting to the server.
And to the broker:
Notice what happens with all of the channels once we exit the accept loop:
- We notify all readers to stop accepting messages.
- We drop the main broker's sender. That way when the readers are done, there's no sender for the broker's channel, and the channel closes.
- Next, the broker exits
while let Some(event) = events.next().await
loop. - It's crucial that, at this stage, we drop the
peers
map. This drops writer's senders. - Tokio will automatically wait for all finishing futures
- Finally, we join the broker, which also guarantees that all the writes have terminated.