From 18c2bf1bcfad6a3851dd609b6eedaf5c148b3423 Mon Sep 17 00:00:00 2001 From: "Peng Hailin," Date: Thu, 6 Apr 2023 22:31:23 +0800 Subject: [PATCH] Update Ch20 --- ...ect_Building_a_Multithreaded_Web_Server.md | 164 ++++++++++++++++++ 1 file changed, 164 insertions(+) diff --git a/src/Ch20_Final_Project_Building_a_Multithreaded_Web_Server.md b/src/Ch20_Final_Project_Building_a_Multithreaded_Web_Server.md index d20ffb5..2d0b1c7 100644 --- a/src/Ch20_Final_Project_Building_a_Multithreaded_Web_Server.md +++ b/src/Ch20_Final_Project_Building_a_Multithreaded_Web_Server.md @@ -1464,4 +1464,168 @@ Worker 0 已断开链接;关闭中。 请注意这次特定执行的一个有趣方面:`ThreadPool` 弃用了 `sender`,而在有任何 `worker` 接收到错误前,咱们就尝试归拢了 `worker` `0`。`worker` `0` 还不曾从 `recv` 获取到一个错误,因此主线程就阻塞于等待 `worker` `0` 结束。与此同时,`worker` `1` 收到了一项作业,而随后全部线程都收到了错误。在 `worker` `0` 结束时,主线程就等待其余 `worker` 结束。而在那个时候,他们都已退出了他们的循环并停止了。 +恭喜!咱们先进已经完成了咱们的项目;咱们有了一个运用线程池来异步响应的基本 web 服务器。咱们能够完成服务器有序关闭,这会清理掉线程池中的全部线程。 +以下是用于参考的全部代码: + +文件名:`src/main.rs` + +```rust +use hello::ThreadPool; + +use std::{ + fs, + thread, + io::{prelude::*, BufReader}, + net::{TcpListener, TcpStream}, + time::Duration, +}; + +fn main() { + let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); + let pool = ThreadPool::new(4); + + for stream in listener.incoming().take(2) { + let stream = stream.unwrap(); + + pool.execute(|| { + handle_conn(stream); + }); + } + + println! ("关闭中。"); +} + +fn handle_conn(mut stream: TcpStream) { + let buf_reader = BufReader::new(&mut stream); + let req_line = buf_reader.lines().next().unwrap().unwrap(); + + let (status_line, filename) = match &req_line[..] { + "GET / HTTP/1.1" => ( "HTTP/1.1 200 OK", "hello.html"), + "GET /sleep HTTP/1.1" => { + thread::sleep(Duration::from_secs(10)); + ("HTTP/1.1 200 0K", "hello.html") + } + _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), + }; + + let contents = fs::read_to_string(filename).unwrap(); + let length = contents.len(); + + let resp = + format! ("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}"); + + stream.write_all(resp.as_bytes()).unwrap(); +} +``` + + +文件名:`src/lib.rs` + +```rust +use std::{ + sync::{mpsc, Arc, Mutex}, + thread, +}; + +pub struct ThreadPool { + workers: Vec, + sender: Option>, +} + +type Job = Box; + +impl ThreadPool { + /// 创建出一个新的 ThreadPool。 + /// + /// 其中的 size 为线程池中线程的数目。 + /// + /// # 终止运行 + /// + /// 这个 `new` 函数将在 size 为零时终止运行。 + pub fn new(size: usize) -> ThreadPool { + assert! (size > 0); + + let (sender, receiver) = mpsc::channel(); + + let receiver = Arc::new(Mutex::new(receiver)); + + let mut workers = Vec::with_capacity(size); + + for id in 0..size { + workers.push(Worker::new(id, Arc::clone(&receiver))); + } + + ThreadPool { + workers, + sender: Some(sender), + } + } + + pub fn execute(&self, f: F) + where + F: FnOnce() + Send + 'static, + { + let job = Box::new(f); + + self.sender.as_ref().unwrap().send(job).unwrap(); + } +} + +impl Drop for ThreadPool { + fn drop(&mut self) { + drop(self.sender.take()); + + for worker in &mut self.workers { + println! ("关闭 worker {}", worker.id); + + if let Some(thread) = worker.thread.take() { + thread.join().unwrap(); + } + } + } +} + +struct Worker { + id: usize, + thread: Option>, +} + +impl Worker { + fn new(id: usize, receiver: Arc>>) -> Worker { + let thread = thread::spawn(move || loop { + let message = receiver.lock().unwrap().recv(); + + match message { + Ok(job) => { + println! ("Worker {id} 获取到一项作业;执行中。"); + + job(); + } + Err(_) => { + println! ("Worker {id} 已断开链接;关闭中。"); + break; + } + } + }); + + Worker { + id, + thread: Some(thread), + } + } +} +``` + +这里咱们可以做更多事情!若咱们打算继续加强这个项目,下面是一些想法: + +- 给 `ThreadPool` 及其公开方法添加更多文档; +- 给这个库的功能添加测试; +- 把一些调用修改为 `unwrap`,以获得更多的错误处理鲁棒性; +- 运用 `ThreadPool` 来完成除服务 web 请求外的一些别的任务; +- 在 [crates.io](https://crates.io/) 上找到某个线程池代码箱,并用该代码箱实现一个类似的 web 服务器。随后将其 API 及鲁棒性,与咱们实现的线程池相比较。 + + +## 本章小结 + +干得好!咱们已经读完了这整本书!要感谢咱们加入到这次 Rust 之旅中来。咱们现在已经准备好实现咱们自己的 Rust 项目,以及帮助其他人的项目了。请记住有个别的热衷于就咱们在 Rust 道路上,所遇到的任何挑战而帮助咱们的 Rust 公民的热情社区。