Update Ch20

This commit is contained in:
Peng Hailin, 2023-04-06 22:31:23 +08:00
parent 98b1634a92
commit 18c2bf1bcf

View File

@ -1464,4 +1464,168 @@ Worker 0 已断开链接;关闭中。
请注意这次特定执行的一个有趣方面:`ThreadPool` 弃用了 `sender`,而在有任何 `worker` 接收到错误前,咱们就尝试归拢了 `worker` `0`。`worker` `0` 还不曾从 `recv` 获取到一个错误,因此主线程就阻塞于等待 `worker` `0` 结束。与此同时,`worker` `1` 收到了一项作业,而随后全部线程都收到了错误。在 `worker` `0` 结束时,主线程就等待其余 `worker` 结束。而在那个时候,他们都已退出了他们的循环并停止了。
恭喜!咱们先进已经完成了咱们的项目;咱们有了一个运用线程池来异步响应的基本 web 服务器。咱们能够完成服务器有序关闭,这会清理掉线程池中的全部线程。
以下是用于参考的全部代码:
文件名:`src/main.rs`
```rust
use hello::ThreadPool;
use std::{
fs,
thread,
io::{prelude::*, BufReader},
net::{TcpListener, TcpStream},
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2) {
let stream = stream.unwrap();
pool.execute(|| {
handle_conn(stream);
});
}
println! ("关闭中。");
}
fn handle_conn(mut stream: TcpStream) {
let buf_reader = BufReader::new(&mut stream);
let req_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &req_line[..] {
"GET / HTTP/1.1" => ( "HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(10));
("HTTP/1.1 200 0K", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let resp =
format! ("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(resp.as_bytes()).unwrap();
}
```
文件名:`src/lib.rs`
```rust
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// 创建出一个新的 ThreadPool。
///
/// 其中的 size 为线程池中线程的数目。
///
/// # 终止运行
///
/// 这个 `new` 函数将在 size 为零时终止运行。
pub fn new(size: usize) -> ThreadPool {
assert! (size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender: Some(sender),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.as_ref().unwrap().send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in &mut self.workers {
println! ("关闭 worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let message = receiver.lock().unwrap().recv();
match message {
Ok(job) => {
println! ("Worker {id} 获取到一项作业;执行中。");
job();
}
Err(_) => {
println! ("Worker {id} 已断开链接;关闭中。");
break;
}
}
});
Worker {
id,
thread: Some(thread),
}
}
}
```
这里咱们可以做更多事情!若咱们打算继续加强这个项目,下面是一些想法:
- 给 `ThreadPool` 及其公开方法添加更多文档;
- 给这个库的功能添加测试;
- 把一些调用修改为 `unwrap`,以获得更多的错误处理鲁棒性;
- 运用 `ThreadPool` 来完成除服务 web 请求外的一些别的任务;
- 在 [crates.io](https://crates.io/) 上找到某个线程池代码箱,并用该代码箱实现一个类似的 web 服务器。随后将其 API 及鲁棒性,与咱们实现的线程池相比较。
## 本章小结
干得好!咱们已经读完了这整本书!要感谢咱们加入到这次 Rust 之旅中来。咱们现在已经准备好实现咱们自己的 Rust 项目,以及帮助其他人的项目了。请记住有个别的热衷于就咱们在 Rust 道路上,所遇到的任何挑战而帮助咱们的 Rust 公民的热情社区。