基于单线程模拟慢请求
use std::{ fs, io::{prelude::*, BufReader}, net::{TcpListener, TcpStream}, thread, time::Duration,};fn main() { // 监听地址: 127.0.0.1:7878 let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); // 建立连接 for stream in listener.incoming() { let stream = stream.unwrap(); handle_connection(stream); }}fn handle_connection(mut stream: TcpStream) { let buf_reader = BufReader::new(&mut stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); // let (status_line, filename) = if request_line == "GET / HTTP/1.1" { // ("HTTP/1.1 200 OK", "hello.html") // } else { // ("HTTP/1.1 404 NOT FOUND", "404.html") // }; let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), }; let contents = fs::read_to_string(filename).unwrap(); let length = contents.len(); let response = format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}"); stream.write_all(response.as_bytes()).unwrap();}
如果我们连续访问 / 路径,那效果跟之前一样:立刻看到请求的页面。但假如先访问 /sleep ,接着在另一个页面访问 /,就会看到 / 的页面直到 5 秒后才会刷出来,验证了请求排队这个糟糕的事实。
至于如何解决,其实办法不少,本章我们来看看一个经典解决方案:线程池。
使用线程池改善吞吐
为每个请求生成一个线程
fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); thread::spawn(|| { handle_connection(stream); }); }}
这种实现下,依次访问 /sleep 和 / 就无需再等待,不错的开始。这显然不是我们的最终方案,原因在于它会生成无上限的线程数,最终导致资源耗尽。但它确实是一个好的起点。
限制创建线程的数量
我们需要限制线程池中的线程数量,以保护服务器免受拒绝服务攻击(DoS)的影响:如果针对每个请求创建一个新线程,那么一个人向我们的服务器发出1000万个请求,会直接耗尽资源,导致后续用户的请求无法被处理,这也是拒绝服务名称的来源。架构设计首先是设定最大线程数的上限,其次维护一个请求队列。池中的线程去队列中依次弹出请求并处理。这样就可以同时并发处理 N 个请求,其中 N 是线程数。
use std::{ fs, io::{prelude::*, BufReader}, net::{TcpListener, TcpStream}, thread, time::Duration,};use multi_thread::ThreadPool;fn main() { // 监听地址: 127.0.0.1:7878 let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); // 创建一个包含 4 个线程的线程池 let pool = ThreadPool::new(4); // 建立连接 for stream in listener.incoming() { let stream = stream.unwrap(); // 分发执行请求 pool.execute(|| { handle_connection(stream); }); }}fn handle_connection(mut stream: TcpStream) { let buf_reader = BufReader::new(&mut stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); // let (status_line, filename) = if request_line == "GET / HTTP/1.1" { // ("HTTP/1.1 200 OK", "hello.html") // } else { // ("HTTP/1.1 404 NOT FOUND", "404.html") // }; let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), }; let contents = fs::read_to_string(filename).unwrap(); let length = contents.len(); let response = format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}"); stream.write_all(response.as_bytes()).unwrap();}
创建 src/lib.rs 文件并写入如下代码:
use std::{ sync::{mpsc, Arc, Mutex}, thread,};type Job = Box<dyn FnOnce() + Send + 'static>;pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>,}impl ThreadPool { // 初始化实例 pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(job).unwrap(); }}// 创建一个 Worker 结构体,作为 ThreadPool 和任务线程联系的桥梁,它的任务是获得将要执行的代码,然后在具体的线程中去执行。想象一个场景:一个餐馆,Worker 等待顾客的点餐,然后将具体的点餐信息传递给厨房,感觉类似服务员,// 由于外部调用者无需知道 Worker 的存在,因此这里使用了私有的声明。struct Worker { id: usize, thread: thread::JoinHandle<()>,}impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || loop { let job = receiver.lock().unwrap().recv().unwrap(); println!("Worker {id} got a job; executing."); job(); }); Worker { id, thread } }}