Clean Shutdown

当前实现的问题之一是,它无法处理正常关机。如果由于某种原因,我们的 接收 循环中断了,则所有正在进行的任务都将遗弃在地上。更正确的关闭顺序为:

  1. 停止接受新客户
  2. 传递所有未解决的(pending)消息
  3. 退出程序

基于 channel 的体系结构,其干净关闭很容易,尽管它一开始可能看起来很神奇。在 Rust 中,一旦所有发送端(senders)都被丢弃,通道的接收端(receiver)一侧就会关闭。也就是说,一旦生产者退出,并丢弃他们的发送端,系统的其余部分就会自然关闭。在async_std中,这套形式转化为两个规则:

  1. 确保 channel 形成一个非循环图。
  2. 注意以正确的顺序等待,直到系统的中间层处理 pending 消息。

a-chat,我们已经有一个单向消息流:reader -> broker -> writer。但是,我们从不等待 broker 和 writers,这可能会导致某些消息丢失。让我们添加等待,到服务器:

  1. # extern crate async_std;
  2. # extern crate futures;
  3. # use async_std::{
  4. # io::{self, BufReader},
  5. # net::{TcpListener, TcpStream, ToSocketAddrs},
  6. # prelude::*,
  7. # task,
  8. # };
  9. # use futures::channel::mpsc;
  10. # use futures::SinkExt;
  11. # use std::{
  12. # collections::hash_map::{HashMap, Entry},
  13. # sync::Arc,
  14. # };
  15. #
  16. # type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
  17. # type Sender<T> = mpsc::UnboundedSender<T>;
  18. # type Receiver<T> = mpsc::UnboundedReceiver<T>;
  19. #
  20. # fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()>
  21. # where
  22. # F: Future<Output = Result<()>> + Send + 'static,
  23. # {
  24. # task::spawn(async move {
  25. # if let Err(e) = fut.await {
  26. # eprintln!("{}", e)
  27. # }
  28. # })
  29. # }
  30. #
  31. #
  32. # async fn connection_loop(mut broker: Sender<Event>, stream: TcpStream) -> Result<()> {
  33. # let stream = Arc::new(stream); // 2
  34. # let reader = BufReader::new(&*stream);
  35. # let mut lines = reader.lines();
  36. #
  37. # let name = match lines.next().await {
  38. # None => Err("peer disconnected immediately")?,
  39. # Some(line) => line?,
  40. # };
  41. # broker.send(Event::NewPeer { name: name.clone(), stream: Arc::clone(&stream) }).await // 3
  42. # .unwrap();
  43. #
  44. # while let Some(line) = lines.next().await {
  45. # let line = line?;
  46. # let (dest, msg) = match line.find(':') {
  47. # None => continue,
  48. # Some(idx) => (&line[..idx], line[idx + 1 ..].trim()),
  49. # };
  50. # let dest: Vec<String> = dest.split(',').map(|name| name.trim().to_string()).collect();
  51. # let msg: String = msg.trim().to_string();
  52. #
  53. # broker.send(Event::Message { // 4
  54. # from: name.clone(),
  55. # to: dest,
  56. # msg,
  57. # }).await.unwrap();
  58. # }
  59. # Ok(())
  60. # }
  61. #
  62. # async fn connection_writer_loop(
  63. # mut messages: Receiver<String>,
  64. # stream: Arc<TcpStream>,
  65. # ) -> Result<()> {
  66. # let mut stream = &*stream;
  67. # while let Some(msg) = messages.next().await {
  68. # stream.write_all(msg.as_bytes()).await?;
  69. # }
  70. # Ok(())
  71. # }
  72. #
  73. # #[derive(Debug)]
  74. # enum Event {
  75. # NewPeer {
  76. # name: String,
  77. # stream: Arc<TcpStream>,
  78. # },
  79. # Message {
  80. # from: String,
  81. # to: Vec<String>,
  82. # msg: String,
  83. # },
  84. # }
  85. #
  86. # async fn broker_loop(mut events: Receiver<Event>) -> Result<()> {
  87. # let mut peers: HashMap<String, Sender<String>> = HashMap::new();
  88. #
  89. # while let Some(event) = events.next().await {
  90. # match event {
  91. # Event::Message { from, to, msg } => {
  92. # for addr in to {
  93. # if let Some(peer) = peers.get_mut(&addr) {
  94. # let msg = format!("from {}: {}\n", from, msg);
  95. # peer.send(msg).await?
  96. # }
  97. # }
  98. # }
  99. # Event::NewPeer { name, stream} => {
  100. # match peers.entry(name) {
  101. # Entry::Occupied(..) => (),
  102. # Entry::Vacant(entry) => {
  103. # let (client_sender, client_receiver) = mpsc::unbounded();
  104. # entry.insert(client_sender); // 4
  105. # spawn_and_log_error(connection_writer_loop(client_receiver, stream)); // 5
  106. # }
  107. # }
  108. # }
  109. # }
  110. # }
  111. # Ok(())
  112. # }
  113. #
  114. async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
  115. let listener = TcpListener::bind(addr).await?;
  116. let (broker_sender, broker_receiver) = mpsc::unbounded();
  117. let broker_handle = task::spawn(broker_loop(broker_receiver));
  118. let mut incoming = listener.incoming();
  119. while let Some(stream) = incoming.next().await {
  120. let stream = stream?;
  121. println!("Accepting from: {}", stream.peer_addr()?);
  122. spawn_and_log_error(connection_loop(broker_sender.clone(), stream));
  123. }
  124. drop(broker_sender); // 1
  125. broker_handle.await?; // 5
  126. Ok(())
  127. }

还有向 broker 添加:

  1. # extern crate async_std;
  2. # extern crate futures;
  3. # use async_std::{
  4. # io::{self, BufReader},
  5. # net::{TcpListener, TcpStream, ToSocketAddrs},
  6. # prelude::*,
  7. # task,
  8. # };
  9. # use futures::channel::mpsc;
  10. # use futures::SinkExt;
  11. # use std::{
  12. # collections::hash_map::{HashMap, Entry},
  13. # sync::Arc,
  14. # };
  15. #
  16. # type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
  17. # type Sender<T> = mpsc::UnboundedSender<T>;
  18. # type Receiver<T> = mpsc::UnboundedReceiver<T>;
  19. #
  20. # async fn connection_writer_loop(
  21. # mut messages: Receiver<String>,
  22. # stream: Arc<TcpStream>,
  23. # ) -> Result<()> {
  24. # let mut stream = &*stream;
  25. # while let Some(msg) = messages.next().await {
  26. # stream.write_all(msg.as_bytes()).await?;
  27. # }
  28. # Ok(())
  29. # }
  30. #
  31. # fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()>
  32. # where
  33. # F: Future<Output = Result<()>> + Send + 'static,
  34. # {
  35. # task::spawn(async move {
  36. # if let Err(e) = fut.await {
  37. # eprintln!("{}", e)
  38. # }
  39. # })
  40. # }
  41. #
  42. # #[derive(Debug)]
  43. # enum Event {
  44. # NewPeer {
  45. # name: String,
  46. # stream: Arc<TcpStream>,
  47. # },
  48. # Message {
  49. # from: String,
  50. # to: Vec<String>,
  51. # msg: String,
  52. # },
  53. # }
  54. #
  55. async fn broker_loop(mut events: Receiver<Event>) -> Result<()> {
  56. let mut writers = Vec::new();
  57. let mut peers: HashMap<String, Sender<String>> = HashMap::new();
  58. while let Some(event) = events.next().await { // 2
  59. match event {
  60. Event::Message { from, to, msg } => {
  61. for addr in to {
  62. if let Some(peer) = peers.get_mut(&addr) {
  63. let msg = format!("from {}: {}\n", from, msg);
  64. peer.send(msg).await?
  65. }
  66. }
  67. }
  68. Event::NewPeer { name, stream} => {
  69. match peers.entry(name) {
  70. Entry::Occupied(..) => (),
  71. Entry::Vacant(entry) => {
  72. let (client_sender, client_receiver) = mpsc::unbounded();
  73. entry.insert(client_sender);
  74. let handle = spawn_and_log_error(connection_writer_loop(client_receiver, stream));
  75. writers.push(handle); // 4
  76. }
  77. }
  78. }
  79. }
  80. }
  81. drop(peers); // 3
  82. for writer in writers { // 4
  83. writer.await;
  84. }
  85. Ok(())
  86. }

注意一旦退出 accept 循环,所有 channel 都会发生下面情况:

  1. 首先,我们弃掉 main broker 的 sender。这样,当 readers 完成时, broker 的 channel sender 就没有了,且 channel 关闭了。
  2. 接下来, broker 退出while let Some(event) = events.next().await循环。
  3. 至关重要的是,在此阶段,我们弃掉了peers map。这会弃掉 writer 的 senders。
  4. 现在我们可以 join 所有 writers。
  5. 最后,我们 join broker ,这也保证所有 write 操作都已终止。