Update Ch20

This commit is contained in:
Peng Hailin, 2023-04-06 22:17:28 +08:00
parent 164c69bae8
commit 98b1634a92
3 changed files with 102 additions and 8 deletions

View File

@ -33,7 +33,7 @@ impl ThreadPool {
ThreadPool {
workers,
sender: Some<sender>,
sender: Some(sender),
}
}
@ -69,11 +69,19 @@ struct Worker {
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv().unwrap();
let message = receiver.lock().unwrap().recv();
println! ("Worker {id} 获取到一项作业;执行中。");
match message {
Ok(job) => {
println! ("Worker {id} 获取到一项作业;执行中。");
job();
job();
}
Err(_) => {
println! ("Worker {id} 已断开链接;关闭中。");
break;
}
}
});
Worker {

View File

@ -12,13 +12,15 @@ fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
for stream in listener.incoming().take(2) {
let stream = stream.unwrap();
pool.execute(|| {
handle_conn(stream);
});
}
println! ("关闭中。");
}
fn handle_conn(mut stream: TcpStream) {
@ -28,7 +30,7 @@ fn handle_conn(mut stream: TcpStream) {
let (status_line, filename) = match &req_line[..] {
"GET / HTTP/1.1" => ( "HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(30));
thread::sleep(Duration::from_secs(10));
("HTTP/1.1 200 0K", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),

View File

@ -1057,7 +1057,7 @@ impl Worker {
**Implementing the `execute` method**
咱们来最终实现那个 `ThreadPool` 上的 `execute` 方法。咱们还将把 `Job` 从结构体,修改为保存着 `execute` 接收到闭包类型的特质对象的类型别名。正如第 19 章 [“使用类型别名创建类型义词”](Ch19_Advanced_Features.md#creating-type-synonyms-with-type-aliases) 小节中曾讨论过的,类型别名实现了为易于使用而将长类型构造缩短。请看看下面清单 20-19.
咱们来最终实现那个 `ThreadPool` 上的 `execute` 方法。咱们还将把 `Job` 从结构体,修改为保存着 `execute` 接收到闭包类型的特质对象的类型别名。正如第 19 章 [“使用类型别名创建类型义词”](Ch19_Advanced_Features.md#creating-type-synonyms-with-type-aliases) 小节中曾讨论过的,类型别名实现了为易于使用而将长类型构造缩短。请看看下面清单 20-19.
文件名:`src/lib.rs`
@ -1380,4 +1380,88 @@ impl Drop for ThreadPool {
*清单 20-23在归拢那些 `worker` 线程前显式丢弃 `sender`*
丢弃 `sender` 就会关闭通道,这表明将不会有其余消息发出。
丢弃 `sender` 就会关闭通道,这表明将不会有更多消息发出。当那发生时,在无限循环中那些 `worker` 所做的到 `recv` 的全部全部调用,就会返回错误。在下面清单 20-24 中,咱们修改了 `Worker` 的循环,来在那种情况下优雅有序地退出循环,这就意味着在 `ThreadPool``drop` 实现在那些线程上调用 `join` 时,他们将结束。
文件名:`src/lib.rs`
```rust
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let message = receiver.lock().unwrap().recv();
match message {
Ok(job) => {
println! ("Worker {id} 获取到一项作业;执行中。");
job();
}
Err(_) => {
println! ("Worker {id} 已断开链接;关闭中。");
break;
}
}
});
Worker {
id,
thread: Some(thread),
}
}
}
```
*清单 20-24`recv` 返回错误时显式跳出循环*
要看到运作中的代码,咱们就来把 `main` 修改为在有序关闭服务器钱,只接收两个请求,如下清单 20-25 中所示。
文件名:`src/main.rs`
```rust
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2) {
let stream = stream.unwrap();
pool.execute(|| {
handle_conn(stream);
});
}
println! ("关闭中。");
}
```
*清单 20-25 在服务两个请求后通过退出循环关闭服务器*
咱们是不会想要真实世界的 web 服务器在仅服务两个请求后就关闭的。这段代码只演示了这种有序关闭与清理是在正常工作。
其中的 `take` 方法,是定义在 `Iterator` 特质中的,且将迭代限制到最多头两个项目。在 `main` 的结束处,`ThreadPool` 将超出作用域,而 `drop` 实现将运行。
请以 `cargo run` 启动服务器,并构造三个请求。第三个请求应会出错,而在终端里咱们应看到类似于下面这样的输出:
```console
$ cargo run 16s
Finished dev [unoptimized + debuginfo] target(s) in 0.00s
Running `target/debug/hello`
Worker 0 获取到一项作业;执行中。
关闭中。
关闭 worker 0
Worker 1 获取到一项作业;执行中。
Worker 3 已断开链接;关闭中。
Worker 2 已断开链接;关闭中。
Worker 1 已断开链接;关闭中。
Worker 0 已断开链接;关闭中。
关闭 worker 1
关闭 worker 2
关闭 worker 3
```
咱们可能会看到不同顺序的 `worker` 与消息打印出来。咱们能从这些消息,看出代码是如何工作的:`worker` `1``2` 获取到了头两个请求。服务器在第二个 TCP 连接之后,便停止了接收连接,而 `ThreadPool` 上的 `Drop` 实现,在 `worker` `2` 还没开始其作业前,便开始了执行。丢弃 `sender` 会断开全部 `worker` 并告诉他们要关闭。那些 `worker` 在他们断开连接时,都各自打印了一条消息,而随后线程池便调用了 `join` 来等待各个 `worker` 线程结束。
请注意这次特定执行的一个有趣方面:`ThreadPool` 弃用了 `sender`,而在有任何 `worker` 接收到错误前,咱们就尝试归拢了 `worker` `0`。`worker` `0` 还不曾从 `recv` 获取到一个错误,因此主线程就阻塞于等待 `worker` `0` 结束。与此同时,`worker` `1` 收到了一项作业,而随后全部线程都收到了错误。在 `worker` `0` 结束时,主线程就等待其余 `worker` 结束。而在那个时候,他们都已退出了他们的循环并停止了。