From 98b1634a92ae857eb19ce27a3146e118e424c1c5 Mon Sep 17 00:00:00 2001 From: "Peng Hailin," Date: Thu, 6 Apr 2023 22:17:28 +0800 Subject: [PATCH] Update Ch20 --- hello/src/lib.rs | 16 +++- hello/src/main.rs | 6 +- ...ect_Building_a_Multithreaded_Web_Server.md | 88 ++++++++++++++++++- 3 files changed, 102 insertions(+), 8 deletions(-) diff --git a/hello/src/lib.rs b/hello/src/lib.rs index 8d0d21a..78e7e53 100644 --- a/hello/src/lib.rs +++ b/hello/src/lib.rs @@ -33,7 +33,7 @@ impl ThreadPool { ThreadPool { workers, - sender: Some, + sender: Some(sender), } } @@ -69,11 +69,19 @@ struct Worker { impl Worker { fn new(id: usize, receiver: Arc>>) -> Worker { let thread = thread::spawn(move || loop { - let job = receiver.lock().unwrap().recv().unwrap(); + let message = receiver.lock().unwrap().recv(); - println! ("Worker {id} 获取到一项作业;执行中。"); + match message { + Ok(job) => { + println! ("Worker {id} 获取到一项作业;执行中。"); - job(); + job(); + } + Err(_) => { + println! ("Worker {id} 已断开链接;关闭中。"); + break; + } + } }); Worker { diff --git a/hello/src/main.rs b/hello/src/main.rs index 2945b87..b1fcbe2 100644 --- a/hello/src/main.rs +++ b/hello/src/main.rs @@ -12,13 +12,15 @@ fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); let pool = ThreadPool::new(4); - for stream in listener.incoming() { + for stream in listener.incoming().take(2) { let stream = stream.unwrap(); pool.execute(|| { handle_conn(stream); }); } + + println! ("关闭中。"); } fn handle_conn(mut stream: TcpStream) { @@ -28,7 +30,7 @@ fn handle_conn(mut stream: TcpStream) { let (status_line, filename) = match &req_line[..] { "GET / HTTP/1.1" => ( "HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { - thread::sleep(Duration::from_secs(30)); + thread::sleep(Duration::from_secs(10)); ("HTTP/1.1 200 0K", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), diff --git a/src/Ch20_Final_Project_Building_a_Multithreaded_Web_Server.md b/src/Ch20_Final_Project_Building_a_Multithreaded_Web_Server.md index 0a0e8ce..d20ffb5 100644 --- a/src/Ch20_Final_Project_Building_a_Multithreaded_Web_Server.md +++ b/src/Ch20_Final_Project_Building_a_Multithreaded_Web_Server.md @@ -1057,7 +1057,7 @@ impl Worker { **Implementing the `execute` method** -咱们来最终实现那个 `ThreadPool` 上的 `execute` 方法。咱们还将把 `Job` 从结构体,修改为保存着 `execute` 接收到闭包类型的特质对象的类型别名。正如第 19 章 [“使用类型别名创建类型同义词”](Ch19_Advanced_Features.md#creating-type-synonyms-with-type-aliases) 小节中曾讨论过的,类型别名实现了为易于使用而将长类型构造缩短。请看看下面清单 20-19. +咱们来最终实现那个 `ThreadPool` 上的 `execute` 方法。咱们还将把 `Job` 从结构体,修改为保存着 `execute` 接收到闭包类型的特质对象的类型别名。正如第 19 章 [“使用类型别名创建类型义词”](Ch19_Advanced_Features.md#creating-type-synonyms-with-type-aliases) 小节中曾讨论过的,类型别名实现了为易于使用而将长类型构造缩短。请看看下面清单 20-19. 文件名:`src/lib.rs` @@ -1380,4 +1380,88 @@ impl Drop for ThreadPool { *清单 20-23:在归拢那些 `worker` 线程前显式丢弃 `sender`* -丢弃 `sender` 就会关闭通道,这表明将不会有其余消息发出。 +丢弃 `sender` 就会关闭通道,这表明将不会有更多消息发出。当那发生时,在无限循环中那些 `worker` 所做的到 `recv` 的全部全部调用,就会返回错误。在下面清单 20-24 中,咱们修改了 `Worker` 的循环,来在那种情况下优雅有序地退出循环,这就意味着在 `ThreadPool` 的 `drop` 实现在那些线程上调用 `join` 时,他们将结束。 + + +文件名:`src/lib.rs` + +```rust +impl Worker { + fn new(id: usize, receiver: Arc>>) -> Worker { + let thread = thread::spawn(move || loop { + let message = receiver.lock().unwrap().recv(); + + match message { + Ok(job) => { + println! ("Worker {id} 获取到一项作业;执行中。"); + + job(); + } + Err(_) => { + println! ("Worker {id} 已断开链接;关闭中。"); + break; + } + } + }); + + Worker { + id, + thread: Some(thread), + } + } +} +``` + +*清单 20-24:在 `recv` 返回错误时显式跳出循环* + +要看到运作中的代码,咱们就来把 `main` 修改为在有序关闭服务器钱,只接收两个请求,如下清单 20-25 中所示。 + +文件名:`src/main.rs` + +```rust +fn main() { + let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); + let pool = ThreadPool::new(4); + + for stream in listener.incoming().take(2) { + let stream = stream.unwrap(); + + pool.execute(|| { + handle_conn(stream); + }); + } + + println! ("关闭中。"); +} +``` + +*清单 20-25: 在服务两个请求后通过退出循环关闭服务器* + +咱们是不会想要真实世界的 web 服务器在仅服务两个请求后就关闭的。这段代码只演示了这种有序关闭与清理是在正常工作。 + +其中的 `take` 方法,是定义在 `Iterator` 特质中的,且将迭代限制到最多头两个项目。在 `main` 的结束处,`ThreadPool` 将超出作用域,而 `drop` 实现将运行。 + +请以 `cargo run` 启动服务器,并构造三个请求。第三个请求应会出错,而在终端里咱们应看到类似于下面这样的输出: + +```console +$ cargo run 16s + Finished dev [unoptimized + debuginfo] target(s) in 0.00s + Running `target/debug/hello` +Worker 0 获取到一项作业;执行中。 +关闭中。 +关闭 worker 0 +Worker 1 获取到一项作业;执行中。 +Worker 3 已断开链接;关闭中。 +Worker 2 已断开链接;关闭中。 +Worker 1 已断开链接;关闭中。 +Worker 0 已断开链接;关闭中。 +关闭 worker 1 +关闭 worker 2 +关闭 worker 3 +``` + +咱们可能会看到不同顺序的 `worker` 与消息打印出来。咱们能从这些消息,看出代码是如何工作的:`worker` `1` 与 `2` 获取到了头两个请求。服务器在第二个 TCP 连接之后,便停止了接收连接,而 `ThreadPool` 上的 `Drop` 实现,在 `worker` `2` 还没开始其作业前,便开始了执行。丢弃 `sender` 会断开全部 `worker` 并告诉他们要关闭。那些 `worker` 在他们断开连接时,都各自打印了一条消息,而随后线程池便调用了 `join` 来等待各个 `worker` 线程结束。 + +请注意这次特定执行的一个有趣方面:`ThreadPool` 弃用了 `sender`,而在有任何 `worker` 接收到错误前,咱们就尝试归拢了 `worker` `0`。`worker` `0` 还不曾从 `recv` 获取到一个错误,因此主线程就阻塞于等待 `worker` `0` 结束。与此同时,`worker` `1` 收到了一项作业,而随后全部线程都收到了错误。在 `worker` `0` 结束时,主线程就等待其余 `worker` 结束。而在那个时候,他们都已退出了他们的循环并停止了。 + +