Clean Shutdown
当前实现的问题之一是,它无法处理正常关机。如果由于某种原因,我们的 接收 循环中断了,则所有正在进行的任务都将遗弃在地上。更正确的关闭顺序为:
- 停止接受新客户
- 传递所有未解决的(pending)消息
- 退出程序
基于 channel 的体系结构,其干净关闭很容易,尽管它一开始可能看起来很神奇。在 Rust 中,一旦所有发送端(senders)都被丢弃,通道的接收端(receiver)一侧就会关闭。也就是说,一旦生产者退出,并丢弃他们的发送端,系统的其余部分就会自然关闭。在async_std中,这套形式转化为两个规则:
- 确保 channel 形成一个非循环图。
- 注意以正确的顺序等待,直到系统的中间层处理 pending 消息。
在a-chat,我们已经有一个单向消息流:reader -> broker -> writer。但是,我们从不等待 broker 和 writers,这可能会导致某些消息丢失。让我们添加等待,到服务器:
# extern crate async_std;# extern crate futures;# use async_std::{# io::{self, BufReader},# net::{TcpListener, TcpStream, ToSocketAddrs},# prelude::*,# task,# };# use futures::channel::mpsc;# use futures::SinkExt;# use std::{# collections::hash_map::{HashMap, Entry},# sync::Arc,# };## type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;# type Sender<T> = mpsc::UnboundedSender<T>;# type Receiver<T> = mpsc::UnboundedReceiver<T>;## fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()># where# F: Future<Output = Result<()>> + Send + 'static,# {# task::spawn(async move {# if let Err(e) = fut.await {# eprintln!("{}", e)# }# })# }### async fn connection_loop(mut broker: Sender<Event>, stream: TcpStream) -> Result<()> {# let stream = Arc::new(stream); // 2# let reader = BufReader::new(&*stream);# let mut lines = reader.lines();## let name = match lines.next().await {# None => Err("peer disconnected immediately")?,# Some(line) => line?,# };# broker.send(Event::NewPeer { name: name.clone(), stream: Arc::clone(&stream) }).await // 3# .unwrap();## while let Some(line) = lines.next().await {# let line = line?;# let (dest, msg) = match line.find(':') {# None => continue,# Some(idx) => (&line[..idx], line[idx + 1 ..].trim()),# };# let dest: Vec<String> = dest.split(',').map(|name| name.trim().to_string()).collect();# let msg: String = msg.trim().to_string();## broker.send(Event::Message { // 4# from: name.clone(),# to: dest,# msg,# }).await.unwrap();# }# Ok(())# }## async fn connection_writer_loop(# mut messages: Receiver<String>,# stream: Arc<TcpStream>,# ) -> Result<()> {# let mut stream = &*stream;# while let Some(msg) = messages.next().await {# stream.write_all(msg.as_bytes()).await?;# }# Ok(())# }## #[derive(Debug)]# enum Event {# NewPeer {# name: String,# stream: Arc<TcpStream>,# },# Message {# from: String,# to: Vec<String>,# msg: String,# },# }## async fn broker_loop(mut events: Receiver<Event>) -> Result<()> {# let mut peers: HashMap<String, Sender<String>> = HashMap::new();## while let Some(event) = events.next().await {# match event {# Event::Message { from, to, msg } => {# for addr in to {# if let Some(peer) = peers.get_mut(&addr) {# let msg = format!("from {}: {}\n", from, msg);# peer.send(msg).await?# }# }# }# Event::NewPeer { name, stream} => {# match peers.entry(name) {# Entry::Occupied(..) => (),# Entry::Vacant(entry) => {# let (client_sender, client_receiver) = mpsc::unbounded();# entry.insert(client_sender); // 4# spawn_and_log_error(connection_writer_loop(client_receiver, stream)); // 5# }# }# }# }# }# Ok(())# }#async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {let listener = TcpListener::bind(addr).await?;let (broker_sender, broker_receiver) = mpsc::unbounded();let broker_handle = task::spawn(broker_loop(broker_receiver));let mut incoming = listener.incoming();while let Some(stream) = incoming.next().await {let stream = stream?;println!("Accepting from: {}", stream.peer_addr()?);spawn_and_log_error(connection_loop(broker_sender.clone(), stream));}drop(broker_sender); // 1broker_handle.await?; // 5Ok(())}
还有向 broker 添加:
# extern crate async_std;# extern crate futures;# use async_std::{# io::{self, BufReader},# net::{TcpListener, TcpStream, ToSocketAddrs},# prelude::*,# task,# };# use futures::channel::mpsc;# use futures::SinkExt;# use std::{# collections::hash_map::{HashMap, Entry},# sync::Arc,# };## type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;# type Sender<T> = mpsc::UnboundedSender<T>;# type Receiver<T> = mpsc::UnboundedReceiver<T>;## async fn connection_writer_loop(# mut messages: Receiver<String>,# stream: Arc<TcpStream>,# ) -> Result<()> {# let mut stream = &*stream;# while let Some(msg) = messages.next().await {# stream.write_all(msg.as_bytes()).await?;# }# Ok(())# }## fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()># where# F: Future<Output = Result<()>> + Send + 'static,# {# task::spawn(async move {# if let Err(e) = fut.await {# eprintln!("{}", e)# }# })# }## #[derive(Debug)]# enum Event {# NewPeer {# name: String,# stream: Arc<TcpStream>,# },# Message {# from: String,# to: Vec<String>,# msg: String,# },# }#async fn broker_loop(mut events: Receiver<Event>) -> Result<()> {let mut writers = Vec::new();let mut peers: HashMap<String, Sender<String>> = HashMap::new();while let Some(event) = events.next().await { // 2match event {Event::Message { from, to, msg } => {for addr in to {if let Some(peer) = peers.get_mut(&addr) {let msg = format!("from {}: {}\n", from, msg);peer.send(msg).await?}}}Event::NewPeer { name, stream} => {match peers.entry(name) {Entry::Occupied(..) => (),Entry::Vacant(entry) => {let (client_sender, client_receiver) = mpsc::unbounded();entry.insert(client_sender);let handle = spawn_and_log_error(connection_writer_loop(client_receiver, stream));writers.push(handle); // 4}}}}}drop(peers); // 3for writer in writers { // 4writer.await;}Ok(())}
注意一旦退出 accept 循环,所有 channel 都会发生下面情况:
- 首先,我们弃掉 main broker 的 sender。这样,当 readers 完成时, broker 的 channel sender 就没有了,且 channel 关闭了。
- 接下来, broker 退出
while let Some(event) = events.next().await循环。 - 至关重要的是,在此阶段,我们弃掉了
peersmap。这会弃掉 writer 的 senders。 - 现在我们可以 join 所有 writers。
- 最后,我们 join broker ,这也保证所有 write 操作都已终止。
