mirror of
https://github.com/LCTT/TranslateProject.git
synced 2025-02-03 23:40:14 +08:00
commit
0dff46fbfc
@ -1,492 +0,0 @@
|
||||
Translating by qhwdw [Concurrent Servers: Part 4 - libuv][17]
|
||||
============================================================
|
||||
|
||||
This is part 4 of a series of posts on writing concurrent network servers. In this part we're going to use libuv to rewrite our server once again, and also talk about handling time-consuming tasks in callbacks using a thread pool. Finally, we're going to look under the hood of libuv for a bit to study how it wraps blocking file-system operations with an asynchronous API.
|
||||
|
||||
All posts in the series:
|
||||
|
||||
* [Part 1 - Introduction][7]
|
||||
|
||||
* [Part 2 - Threads][8]
|
||||
|
||||
* [Part 3 - Event-driven][9]
|
||||
|
||||
* [Part 4 - libuv][10]
|
||||
|
||||
### Abstracting away event-driven loops with libuv
|
||||
|
||||
In [part 3][11], we've seen how similar select-based and epoll-based servers are, and I mentioned it's very tempting to abstract away the minor differences between them. Numerous libraries are already doing this, however, so in this part I'm going to pick one and use it. The library I'm picking is [libuv][12], which was originally designed to serve as the underlying portable platform layer for Node.js, and has since found use in additional projects. libuv is written in C, which makes it highly portable and very suitable for tying into high-level languages like JavaScript and Python.
|
||||
|
||||
While libuv has grown to be a fairly large framework for abstracting low-level platform details, it remains centered on the concept of an _event loop_ . In our event-driven servers in part 3, the event loop was explicit in the main function; when using libuv, the loop is usually hidden inside the library itself, and user code just registers event handlers (as callback functions) and runs the loop. Furthermore, libuv will use the fastest event loop implementation for a given platform: for Linux this is epoll, etc.
|
||||
|
||||
![libuv loop](https://eli.thegreenplace.net/images/2017/libuvloop.png)
|
||||
|
||||
libuv supports multiple event loops, and thus an event loop is a first class citizen within the library; it has a handle - uv_loop_t, and functions for creating/destroying/starting/stopping loops. That said, I will only use the "default" loop in this post, which libuv makes available via uv_default_loop(); multiple loops are mosly useful for multi-threaded event-driven servers, a more advanced topic I'll leave for future parts in the series.
|
||||
|
||||
### A concurrent server using libuv
|
||||
|
||||
To get a better feel for libuv, let's jump to our trusty protocol server that we've been vigorously reimplementing throughout the series. The structure of this server is going to be somewhat similar to the select and epoll-based servers of part 3, since it also relies on callbacks. The full [code sample is here][13]; we start with setting up the server socket bound to a local port:
|
||||
|
||||
```
|
||||
int portnum = 9090;
|
||||
if (argc >= 2) {
|
||||
portnum = atoi(argv[1]);
|
||||
}
|
||||
printf("Serving on port %d\n", portnum);
|
||||
|
||||
int rc;
|
||||
uv_tcp_t server_stream;
|
||||
if ((rc = uv_tcp_init(uv_default_loop(), &server_stream)) < 0) {
|
||||
die("uv_tcp_init failed: %s", uv_strerror(rc));
|
||||
}
|
||||
|
||||
struct sockaddr_in server_address;
|
||||
if ((rc = uv_ip4_addr("0.0.0.0", portnum, &server_address)) < 0) {
|
||||
die("uv_ip4_addr failed: %s", uv_strerror(rc));
|
||||
}
|
||||
|
||||
if ((rc = uv_tcp_bind(&server_stream, (const struct sockaddr*)&server_address, 0)) < 0) {
|
||||
die("uv_tcp_bind failed: %s", uv_strerror(rc));
|
||||
}
|
||||
```
|
||||
|
||||
Fairly standard socket fare here, except that it's all wrapped in libuv APIs. In return we get a portable interface that should work on any platform libuv supports.
|
||||
|
||||
This code also demonstrates conscientious error handling; most libuv functions return an integer status, with a negative number meaning an error. In our server we treat these errors as fatals, but one may imagine a more graceful recovery.
|
||||
|
||||
Now that the socket is bound, it's time to listen on it. Here we run into our first callback registration:
|
||||
|
||||
```
|
||||
// Listen on the socket for new peers to connect. When a new peer connects,
|
||||
// the on_peer_connected callback will be invoked.
|
||||
if ((rc = uv_listen((uv_stream_t*)&server_stream, N_BACKLOG, on_peer_connected)) < 0) {
|
||||
die("uv_listen failed: %s", uv_strerror(rc));
|
||||
}
|
||||
```
|
||||
|
||||
uv_listen registers a callback that the event loop will invoke when new peers connect to the socket. Our callback here is called on_peer_connected, and we'll examine it soon.
|
||||
|
||||
Finally, main runs the libuv loop until it's stopped (uv_run only returns when the loop has stopped or some error occurred).
|
||||
|
||||
```
|
||||
// Run the libuv event loop.
|
||||
uv_run(uv_default_loop(), UV_RUN_DEFAULT);
|
||||
|
||||
// If uv_run returned, close the default loop before exiting.
|
||||
return uv_loop_close(uv_default_loop());
|
||||
```
|
||||
|
||||
Note that only a single callback was registered by main prior to running the event loop; we'll soon see how additional callbacks are added. It's not a problem to add and remove callbacks throughout the runtime of the event loop - in fact, this is how most servers are expected to be written.
|
||||
|
||||
This is on_peer_connected, which handles new client connections to the server:
|
||||
|
||||
```
|
||||
void on_peer_connected(uv_stream_t* server_stream, int status) {
|
||||
if (status < 0) {
|
||||
fprintf(stderr, "Peer connection error: %s\n", uv_strerror(status));
|
||||
return;
|
||||
}
|
||||
|
||||
// client will represent this peer; it's allocated on the heap and only
|
||||
// released when the client disconnects. The client holds a pointer to
|
||||
// peer_state_t in its data field; this peer state tracks the protocol state
|
||||
// with this client throughout interaction.
|
||||
uv_tcp_t* client = (uv_tcp_t*)xmalloc(sizeof(*client));
|
||||
int rc;
|
||||
if ((rc = uv_tcp_init(uv_default_loop(), client)) < 0) {
|
||||
die("uv_tcp_init failed: %s", uv_strerror(rc));
|
||||
}
|
||||
client->data = NULL;
|
||||
|
||||
if (uv_accept(server_stream, (uv_stream_t*)client) == 0) {
|
||||
struct sockaddr_storage peername;
|
||||
int namelen = sizeof(peername);
|
||||
if ((rc = uv_tcp_getpeername(client, (struct sockaddr*)&peername,
|
||||
&namelen)) < 0) {
|
||||
die("uv_tcp_getpeername failed: %s", uv_strerror(rc));
|
||||
}
|
||||
report_peer_connected((const struct sockaddr_in*)&peername, namelen);
|
||||
|
||||
// Initialize the peer state for a new client: we start by sending the peer
|
||||
// the initial '*' ack.
|
||||
peer_state_t* peerstate = (peer_state_t*)xmalloc(sizeof(*peerstate));
|
||||
peerstate->state = INITIAL_ACK;
|
||||
peerstate->sendbuf[0] = '*';
|
||||
peerstate->sendbuf_end = 1;
|
||||
peerstate->client = client;
|
||||
client->data = peerstate;
|
||||
|
||||
// Enqueue the write request to send the ack; when it's done,
|
||||
// on_wrote_init_ack will be called. The peer state is passed to the write
|
||||
// request via the data pointer; the write request does not own this peer
|
||||
// state - it's owned by the client handle.
|
||||
uv_buf_t writebuf = uv_buf_init(peerstate->sendbuf, peerstate->sendbuf_end);
|
||||
uv_write_t* req = (uv_write_t*)xmalloc(sizeof(*req));
|
||||
req->data = peerstate;
|
||||
if ((rc = uv_write(req, (uv_stream_t*)client, &writebuf, 1,
|
||||
on_wrote_init_ack)) < 0) {
|
||||
die("uv_write failed: %s", uv_strerror(rc));
|
||||
}
|
||||
} else {
|
||||
uv_close((uv_handle_t*)client, on_client_closed);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
This code is well commented, but there are a couple of important libuv idioms I'd like to highlight:
|
||||
|
||||
* Passing custom data into callbacks: since C has no closures, this can be challenging. libuv has a void* datafield in all its handle types; these fields can be used to pass user data. For example, note how client->data is made to point to a peer_state_t structure so that the callbacks registered by uv_write and uv_read_start can know which peer data they're dealing with.
|
||||
|
||||
* Memory management: event-driven programming is much easier in languages with garbage collection, because callbacks usually run in a completely different stack frame from where they were registered, making stack-based memory management difficult. It's almost always necessary to pass heap-allocated data to libuv callbacks (except in main, which remains alive on the stack when all callbacks run), and to avoid leaks much care is required about when these data are safe to free(). This is something that comes with a bit of practice [[1]][6].
|
||||
|
||||
The peer state for this server is:
|
||||
|
||||
```
|
||||
typedef struct {
|
||||
ProcessingState state;
|
||||
char sendbuf[SENDBUF_SIZE];
|
||||
int sendbuf_end;
|
||||
uv_tcp_t* client;
|
||||
} peer_state_t;
|
||||
```
|
||||
|
||||
It's fairly similar to the state in part 3; we no longer need sendptr, since uv_write will make sure to send the whole buffer it's given before invoking the "done writing" callback. We also keep a pointer to the client for other callbacks to use. Here's on_wrote_init_ack:
|
||||
|
||||
```
|
||||
void on_wrote_init_ack(uv_write_t* req, int status) {
|
||||
if (status) {
|
||||
die("Write error: %s\n", uv_strerror(status));
|
||||
}
|
||||
peer_state_t* peerstate = (peer_state_t*)req->data;
|
||||
// Flip the peer state to WAIT_FOR_MSG, and start listening for incoming data
|
||||
// from this peer.
|
||||
peerstate->state = WAIT_FOR_MSG;
|
||||
peerstate->sendbuf_end = 0;
|
||||
|
||||
int rc;
|
||||
if ((rc = uv_read_start((uv_stream_t*)peerstate->client, on_alloc_buffer,
|
||||
on_peer_read)) < 0) {
|
||||
die("uv_read_start failed: %s", uv_strerror(rc));
|
||||
}
|
||||
|
||||
// Note: the write request doesn't own the peer state, hence we only free the
|
||||
// request itself, not the state.
|
||||
free(req);
|
||||
}
|
||||
```
|
||||
|
||||
Then we know for sure that the initial '*' was sent to the peer, we start listening to incoming data from this peer by calling uv_read_start, which registers a callback (on_peer_read) that will be invoked by the event loop whenever new data is received on the socket from the client:
|
||||
|
||||
```
|
||||
void on_peer_read(uv_stream_t* client, ssize_t nread, const uv_buf_t* buf) {
|
||||
if (nread < 0) {
|
||||
if (nread != uv_eof) {
|
||||
fprintf(stderr, "read error: %s\n", uv_strerror(nread));
|
||||
}
|
||||
uv_close((uv_handle_t*)client, on_client_closed);
|
||||
} else if (nread == 0) {
|
||||
// from the documentation of uv_read_cb: nread might be 0, which does not
|
||||
// indicate an error or eof. this is equivalent to eagain or ewouldblock
|
||||
// under read(2).
|
||||
} else {
|
||||
// nread > 0
|
||||
assert(buf->len >= nread);
|
||||
|
||||
peer_state_t* peerstate = (peer_state_t*)client->data;
|
||||
if (peerstate->state == initial_ack) {
|
||||
// if the initial ack hasn't been sent for some reason, ignore whatever
|
||||
// the client sends in.
|
||||
free(buf->base);
|
||||
return;
|
||||
}
|
||||
|
||||
// run the protocol state machine.
|
||||
for (int i = 0; i < nread; ++i) {
|
||||
switch (peerstate->state) {
|
||||
case initial_ack:
|
||||
assert(0 && "can't reach here");
|
||||
break;
|
||||
case wait_for_msg:
|
||||
if (buf->base[i] == '^') {
|
||||
peerstate->state = in_msg;
|
||||
}
|
||||
break;
|
||||
case in_msg:
|
||||
if (buf->base[i] == '$') {
|
||||
peerstate->state = wait_for_msg;
|
||||
} else {
|
||||
assert(peerstate->sendbuf_end < sendbuf_size);
|
||||
peerstate->sendbuf[peerstate->sendbuf_end++] = buf->base[i] + 1;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (peerstate->sendbuf_end > 0) {
|
||||
// we have data to send. the write buffer will point to the buffer stored
|
||||
// in the peer state for this client.
|
||||
uv_buf_t writebuf =
|
||||
uv_buf_init(peerstate->sendbuf, peerstate->sendbuf_end);
|
||||
uv_write_t* writereq = (uv_write_t*)xmalloc(sizeof(*writereq));
|
||||
writereq->data = peerstate;
|
||||
int rc;
|
||||
if ((rc = uv_write(writereq, (uv_stream_t*)client, &writebuf, 1,
|
||||
on_wrote_buf)) < 0) {
|
||||
die("uv_write failed: %s", uv_strerror(rc));
|
||||
}
|
||||
}
|
||||
}
|
||||
free(buf->base);
|
||||
}
|
||||
```
|
||||
|
||||
The runtime behavior of this server is very similar to the event-driven servers of part 3: all clients are handled concurrently in a single thread. Also similarly, a certain discipline has to be maintained in the server's code: the server's logic is implemented as an ensemble of callbacks, and long-running operations are a big no-no since they block the event loop. Let's explore this issue a bit further.
|
||||
|
||||
### Long-running operations in event-driven loops
|
||||
|
||||
The single-threaded nature of event-driven code makes it very susceptible to a common issue: long-running code blocks the entire loop. Consider this program:
|
||||
|
||||
```
|
||||
void on_timer(uv_timer_t* timer) {
|
||||
uint64_t timestamp = uv_hrtime();
|
||||
printf("on_timer [%" PRIu64 " ms]\n", (timestamp / 1000000) % 100000);
|
||||
|
||||
// "Work"
|
||||
if (random() % 5 == 0) {
|
||||
printf("Sleeping...\n");
|
||||
sleep(3);
|
||||
}
|
||||
}
|
||||
|
||||
int main(int argc, const char** argv) {
|
||||
uv_timer_t timer;
|
||||
uv_timer_init(uv_default_loop(), &timer);
|
||||
uv_timer_start(&timer, on_timer, 0, 1000);
|
||||
return uv_run(uv_default_loop(), UV_RUN_DEFAULT);
|
||||
}
|
||||
```
|
||||
|
||||
It runs a libuv event loop with a single registered callback: on_timer, which is invoked by the loop every second. The callback reports a timestamp, and once in a while simulates some long-running task by sleeping for 3 seconds. Here's a sample run:
|
||||
|
||||
```
|
||||
$ ./uv-timer-sleep-demo
|
||||
on_timer [4840 ms]
|
||||
on_timer [5842 ms]
|
||||
on_timer [6843 ms]
|
||||
on_timer [7844 ms]
|
||||
Sleeping...
|
||||
on_timer [11845 ms]
|
||||
on_timer [12846 ms]
|
||||
Sleeping...
|
||||
on_timer [16847 ms]
|
||||
on_timer [17849 ms]
|
||||
on_timer [18850 ms]
|
||||
...
|
||||
```
|
||||
|
||||
on_timer dutifully fires every second, until the random sleep hits in. At that point, on_timer is not invoked again until the sleep is over; in fact, _no other callbacks_ will be invoked in this time frame. The sleep call blocks the current thread, which is the only thread involved and is also the thread the event loop uses. When this thread is blocked, the event loop is blocked.
|
||||
|
||||
This example demonstrates why it's so important for callbacks to never block in event-driven calls, and applies equally to Node.js servers, client-side Javascript, most GUI programming frameworks, and many other asynchronous programming models.
|
||||
|
||||
But sometimes running time-consuming tasks is unavoidable. Not all tasks have asynchronous APIs; for example, we may be dealing with some library that only has a synchronous API, or just have to perform a potentially long computation. How can we combine such code with event-driven programming? Threads to the rescue!
|
||||
|
||||
### Threads for "converting" blocking calls into asynchronous calls
|
||||
|
||||
A thread pool can be used to turn blocking calls into asynchronous calls, by running alongside the event loop and posting events onto it when tasks are completed. Here's how it works, for a given blocking function do_work():
|
||||
|
||||
1. Instead of directly calling do_work() in a callback, we package it into a "task" and ask the thread pool to execute the task. We also register a callback for the loop to invoke when the task has finished; let's call iton_work_done().
|
||||
|
||||
2. At this point our callback can return and the event loop keeps spinning; at the same time, a thread in the pool is executing the task.
|
||||
|
||||
3. Once the task has finished executing, the main thread (the one running the event loop) is notified and on_work_done() is invoked by the event loop.
|
||||
|
||||
Let's see how this solves our previous timer/sleep example, using libuv's work scheduling API:
|
||||
|
||||
```
|
||||
void on_after_work(uv_work_t* req, int status) {
|
||||
free(req);
|
||||
}
|
||||
|
||||
void on_work(uv_work_t* req) {
|
||||
// "Work"
|
||||
if (random() % 5 == 0) {
|
||||
printf("Sleeping...\n");
|
||||
sleep(3);
|
||||
}
|
||||
}
|
||||
|
||||
void on_timer(uv_timer_t* timer) {
|
||||
uint64_t timestamp = uv_hrtime();
|
||||
printf("on_timer [%" PRIu64 " ms]\n", (timestamp / 1000000) % 100000);
|
||||
|
||||
uv_work_t* work_req = (uv_work_t*)malloc(sizeof(*work_req));
|
||||
uv_queue_work(uv_default_loop(), work_req, on_work, on_after_work);
|
||||
}
|
||||
|
||||
int main(int argc, const char** argv) {
|
||||
uv_timer_t timer;
|
||||
uv_timer_init(uv_default_loop(), &timer);
|
||||
uv_timer_start(&timer, on_timer, 0, 1000);
|
||||
return uv_run(uv_default_loop(), UV_RUN_DEFAULT);
|
||||
}
|
||||
```
|
||||
|
||||
Instead of calling sleep directly in on_timer, we enqueue a task, represented by a handle of type work_req [[2]][14], the function to run in the task (on_work) and the function to invoke once the task is completed (on_after_work). on_workis where the "work" (the blocking/time-consuming operation) happens. Note a crucial difference between the two callbacks passed into uv_queue_work: on_work runs in the thread pool, while on_after_work runs on the main thread which also runs the event loop - just like any other callback.
|
||||
|
||||
Let's see this version run:
|
||||
|
||||
```
|
||||
$ ./uv-timer-work-demo
|
||||
on_timer [89571 ms]
|
||||
on_timer [90572 ms]
|
||||
on_timer [91573 ms]
|
||||
on_timer [92575 ms]
|
||||
Sleeping...
|
||||
on_timer [93576 ms]
|
||||
on_timer [94577 ms]
|
||||
Sleeping...
|
||||
on_timer [95577 ms]
|
||||
on_timer [96578 ms]
|
||||
on_timer [97578 ms]
|
||||
...
|
||||
```
|
||||
|
||||
The timer ticks every second, even though the sleeping function is still invoked; sleeping is now done on a separate thread and doesn't block the event loop.
|
||||
|
||||
### A primality-testing server, with exercises
|
||||
|
||||
Since sleep isn't a very exciting way to simulate work, I've prepared a more comprehensive example - a server that accepts numbers from clients over a socket, checks whether these numbers are prime and sends back either "prime" or "composite". The full [code for this server is here][15] - I won't post it here since it's long, but will rather give readers the opportunity to explore it on their own with a couple of exercises.
|
||||
|
||||
The server deliberatly uses a naive primality test algorithm, so for large primes it can take quite a while to return an answer. On my machine it takes ~5 seconds to compute the answer for 2305843009213693951, but YMMV.
|
||||
|
||||
Exercise 1: the server has a setting (via an environment variable named MODE) to either run the primality test in the socket callback (meaning on the main thread) or in the libuv work queue. Play with this setting to observe the server's behavior when multiple clients are connecting simultaneously. In blocking mode, the server will not answer other clients while it's computing a big task; in non-blocking mode it will.
|
||||
|
||||
Exercise 2: libuv has a default thread-pool size, and it can be configured via an environment variable. Can you use multiple clients to discover experimentally what the default size is? Having found the default thread-pool size, play with different settings to see how it affects the server's responsiveness under heavy load.
|
||||
|
||||
### Non-blocking file-system operations using work queues
|
||||
|
||||
Delegating potentially-blocking operations to a thread pool isn't good for just silly demos and CPU-intensive computations; libuv itself makes heavy use of this capability in its file-system APIs. This way, libuv accomplishes the superpower of exposing the file-system with an asynchronous API, in a portable way.
|
||||
|
||||
Let's take uv_fs_read(), for example. This function reads from a file (represented by a uv_fs_t handle) into a buffer [[3]][16], and invokes a callback when the reading is completed. That is, uv_fs_read() always returns immediately, even if the file sits on an NFS-like system and it may take a while for the data to get to the buffer. In other words, this API is asynchronous in the way other libuv APIs are. How does this work?
|
||||
|
||||
At this point we're going to look under the hood of libuv; the internals are actually fairly straightforward, and it's a good exercise. Being a portable library, libuv has different implementations of many of its functions for Windows and Unix systems. We're going to be looking at src/unix/fs.c in the libuv source tree.
|
||||
|
||||
The code for uv_fs_read is:
|
||||
|
||||
```
|
||||
int uv_fs_read(uv_loop_t* loop, uv_fs_t* req,
|
||||
uv_file file,
|
||||
const uv_buf_t bufs[],
|
||||
unsigned int nbufs,
|
||||
int64_t off,
|
||||
uv_fs_cb cb) {
|
||||
if (bufs == NULL || nbufs == 0)
|
||||
return -EINVAL;
|
||||
|
||||
INIT(READ);
|
||||
req->file = file;
|
||||
|
||||
req->nbufs = nbufs;
|
||||
req->bufs = req->bufsml;
|
||||
if (nbufs > ARRAY_SIZE(req->bufsml))
|
||||
req->bufs = uv__malloc(nbufs * sizeof(*bufs));
|
||||
|
||||
if (req->bufs == NULL) {
|
||||
if (cb != NULL)
|
||||
uv__req_unregister(loop, req);
|
||||
return -ENOMEM;
|
||||
}
|
||||
|
||||
memcpy(req->bufs, bufs, nbufs * sizeof(*bufs));
|
||||
|
||||
req->off = off;
|
||||
POST;
|
||||
}
|
||||
```
|
||||
|
||||
It may seem puzzling at first, because it defers the real work to the INIT and POST macros, with some local variable setup for POST. This is done to avoid too much code duplication within the file.
|
||||
|
||||
The INIT macro is:
|
||||
|
||||
```
|
||||
#define INIT(subtype) \
|
||||
do { \
|
||||
req->type = UV_FS; \
|
||||
if (cb != NULL) \
|
||||
uv__req_init(loop, req, UV_FS); \
|
||||
req->fs_type = UV_FS_ ## subtype; \
|
||||
req->result = 0; \
|
||||
req->ptr = NULL; \
|
||||
req->loop = loop; \
|
||||
req->path = NULL; \
|
||||
req->new_path = NULL; \
|
||||
req->cb = cb; \
|
||||
} \
|
||||
while (0)
|
||||
```
|
||||
|
||||
It sets up the request, and most importantly sets the req->fs_type field to the actual FS request type. Since uv_fs_read invokes INIT(READ), it means req->fs_type gets assigned the constant UV_FS_READ.
|
||||
|
||||
The POST macro is:
|
||||
|
||||
```
|
||||
#define POST \
|
||||
do { \
|
||||
if (cb != NULL) { \
|
||||
uv__work_submit(loop, &req->work_req, uv__fs_work, uv__fs_done); \
|
||||
return 0; \
|
||||
} \
|
||||
else { \
|
||||
uv__fs_work(&req->work_req); \
|
||||
return req->result; \
|
||||
} \
|
||||
} \
|
||||
while (0)
|
||||
```
|
||||
|
||||
What it does depends on whether the callback is NULL. In libuv file-system APIs, a NULL callback means we actually want to perform the operation _synchronously_ . In this case POST invokes uv__fs_work directly (we'll get to what this function does in just a bit), whereas for a non-NULL callback, it submits uv__fs_work as a work item to the work queue (which is the thread pool), and registers uv__fs_done as the callback; that function does a bit of book-keeping and invokes the user-provided callback.
|
||||
|
||||
If we look at the code of uv__fs_work, we'll see it uses more macros to route work to the actual file-system call as needed. In our case, for UV_FS_READ the call will be made to uv__fs_read, which (at last!) does the reading using regular POSIX APIs. This function can be safely implemented in a _blocking_ manner, since it's placed on a thread-pool when called through the asynchronous API.
|
||||
|
||||
In Node.js, the fs.readFile function is mapped to uv_fs_read. Thus, reading files can be done in a non-blocking fashion even though the underlying file-system API is blocking.
|
||||
|
||||
* * *
|
||||
|
||||
|
||||
[[1]][1] To ensure that this server doesn't leak memory, I ran it under Valgrind with the leak checker enabled. Since servers are often designed to run forever, this was a bit challenging; to overcome this issue I've added a "kill switch" to the server - a special sequence received from a client makes it stop the event loop and exit. The code for this is in theon_wrote_buf handler.
|
||||
|
||||
|
||||
[[2]][2] Here we don't use work_req for much; the primality testing server discussed next will show how it's used to pass context information into the callback.
|
||||
|
||||
|
||||
[[3]][3] uv_fs_read() provides a generalized API similar to the preadv Linux system call: it takes multiple buffers which it fills in order, and supports an offset into the file. We can ignore these features for the sake of our discussion.
|
||||
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
via: https://eli.thegreenplace.net/2017/concurrent-servers-part-4-libuv/
|
||||
|
||||
作者:[Eli Bendersky ][a]
|
||||
译者:[译者ID](https://github.com/译者ID)
|
||||
校对:[校对者ID](https://github.com/校对者ID)
|
||||
|
||||
本文由 [LCTT](https://github.com/LCTT/TranslateProject) 原创编译,[Linux中国](https://linux.cn/) 荣誉推出
|
||||
|
||||
[a]:https://eli.thegreenplace.net/
|
||||
[1]:https://eli.thegreenplace.net/2017/concurrent-servers-part-4-libuv/#id1
|
||||
[2]:https://eli.thegreenplace.net/2017/concurrent-servers-part-4-libuv/#id2
|
||||
[3]:https://eli.thegreenplace.net/2017/concurrent-servers-part-4-libuv/#id3
|
||||
[4]:https://eli.thegreenplace.net/tag/concurrency
|
||||
[5]:https://eli.thegreenplace.net/tag/c-c
|
||||
[6]:https://eli.thegreenplace.net/2017/concurrent-servers-part-4-libuv/#id4
|
||||
[7]:http://eli.thegreenplace.net/2017/concurrent-servers-part-1-introduction/
|
||||
[8]:http://eli.thegreenplace.net/2017/concurrent-servers-part-2-threads/
|
||||
[9]:http://eli.thegreenplace.net/2017/concurrent-servers-part-3-event-driven/
|
||||
[10]:http://eli.thegreenplace.net/2017/concurrent-servers-part-4-libuv/
|
||||
[11]:http://eli.thegreenplace.net/2017/concurrent-servers-part-3-event-driven/
|
||||
[12]:http://libuv.org/
|
||||
[13]:https://github.com/eliben/code-for-blog/blob/master/2017/async-socket-server/uv-server.c
|
||||
[14]:https://eli.thegreenplace.net/2017/concurrent-servers-part-4-libuv/#id5
|
||||
[15]:https://github.com/eliben/code-for-blog/blob/master/2017/async-socket-server/uv-isprime-server.c
|
||||
[16]:https://eli.thegreenplace.net/2017/concurrent-servers-part-4-libuv/#id6
|
||||
[17]:https://eli.thegreenplace.net/2017/concurrent-servers-part-4-libuv/
|
493
translated/tech/20171109 Concurrent Servers- Part 4 - libuv.md
Normal file
493
translated/tech/20171109 Concurrent Servers- Part 4 - libuv.md
Normal file
@ -0,0 +1,493 @@
|
||||
[并发服务器:第四部分 - libuv][17]
|
||||
============================================================
|
||||
|
||||
这是写并发网络服务器系列文章的第四部分。在这一部分中,我们将使用 libuv 去再次重写我们的服务器,并且也讨论关于使用一个线程池在回调中去处理耗时任务。最终,我们去看一下底层的 libuv,花一点时间去学习如何用异步 API 对文件系统阻塞操作进行封装。
|
||||
|
||||
这一系列的所有文章包括:
|
||||
|
||||
* [第一部分 - 简介][7]
|
||||
|
||||
* [第二部分 - 线程][8]
|
||||
|
||||
* [第三部分 - 事件驱动][9]
|
||||
|
||||
* [第四部分 - libuv][10]
|
||||
|
||||
### 使用 Linux 抽象出事件驱动循环
|
||||
|
||||
在 [第三部分][11] 中,我们看到了基于 `select` 和 `epoll` 的相似之处,并且,我说过,在它们之间抽象出细微的差别是件很有魅力的事。Numerous 库已经做到了这些,但是,因为在这一部分中,我将去选一个并使用它。我选的这个库是 [libuv][12],它最初设计用于 Node.js 底层的轻便的平台层,并且,后来发现在其它的项目中已有使用。libuv 是用 C 写的,因此,它具有很高的可移植性,非常适用嵌入到像 JavaScript 和 Python 这样的高级语言中。
|
||||
|
||||
虽然 libuv 为抽象出底层平台细节已经有了一个非常大的框架,但它仍然是一个以 _事件循环_ 思想为中心的。在我们第三部分的事件驱动服务器中,事件循环在 main 函数中是很明确的;当使用 libuv 时,循环通常隐藏在库自身中,而用户代码仅需要注册事件句柄(作为一个回调函数)和运行这个循环。此外,libuv 将为给定的平台实现更快的事件循环实现。对于 Linux 它是 epoll,等等。
|
||||
|
||||
![libuv loop](https://eli.thegreenplace.net/images/2017/libuvloop.png)
|
||||
|
||||
libuv 支持多路事件循环,并且,因此一个事件循环在库中是非常重要的;它有一个句柄 - `uv_loop_t`,和创建/杀死/启动/停止循环的函数。也就是说,在这篇文章中,我将仅需要使用 “默认的” 循环,libuv 可通过 `uv_default_loop()` 提供它;多路循环大多用于多线程事件驱动的服务器,这是一个更高级别的话题,我将留在这一系列文章的以后部分。
|
||||
|
||||
### 使用 libuv 的并发服务器
|
||||
|
||||
为了对 libuv 有一个更深的印象,让我们跳转到我们的可靠的协议服务器,它通过我们的这个系列已经有了一个强大的重新实现。这个服务器的结构与第三部分中的基于 select 和 epoll 的服务器有一些相似之处。因为,它也依赖回调。完整的 [示例代码在这里][13];我们开始设置这个服务器的套接字绑定到一个本地端口:
|
||||
|
||||
```
|
||||
int portnum = 9090;
|
||||
if (argc >= 2) {
|
||||
portnum = atoi(argv[1]);
|
||||
}
|
||||
printf("Serving on port %d\n", portnum);
|
||||
|
||||
int rc;
|
||||
uv_tcp_t server_stream;
|
||||
if ((rc = uv_tcp_init(uv_default_loop(), &server_stream)) < 0) {
|
||||
die("uv_tcp_init failed: %s", uv_strerror(rc));
|
||||
}
|
||||
|
||||
struct sockaddr_in server_address;
|
||||
if ((rc = uv_ip4_addr("0.0.0.0", portnum, &server_address)) < 0) {
|
||||
die("uv_ip4_addr failed: %s", uv_strerror(rc));
|
||||
}
|
||||
|
||||
if ((rc = uv_tcp_bind(&server_stream, (const struct sockaddr*)&server_address, 0)) < 0) {
|
||||
die("uv_tcp_bind failed: %s", uv_strerror(rc));
|
||||
}
|
||||
```
|
||||
|
||||
除了它被封装进 libuv APIs 中之外,你看到的是一个相当标准的套接字。在它的返回中,我们取得一个可工作于任何 libuv 支持的平台上的轻便的接口。
|
||||
|
||||
这些代码也很认真负责地演示了错误处理;多数的 libuv 函数返回一个整数状态,返回一个负数意味着出现了一个错误。在我们的服务器中,我们把这些错误按致命的问题处理,但也可以设想为一个更优雅的恢复。
|
||||
|
||||
现在,那个套接字已经绑定,是时候去监听它了。这里我们运行一个回调注册:
|
||||
|
||||
```
|
||||
// Listen on the socket for new peers to connect. When a new peer connects,
|
||||
// the on_peer_connected callback will be invoked.
|
||||
if ((rc = uv_listen((uv_stream_t*)&server_stream, N_BACKLOG, on_peer_connected)) < 0) {
|
||||
die("uv_listen failed: %s", uv_strerror(rc));
|
||||
}
|
||||
```
|
||||
|
||||
当新的对端连接到这个套接字,`uv_listen` 将被调用去注册一个事件循环回调。我们的回调在这里被称为 `on_peer_connected`,并且我们一会儿将去检测它。
|
||||
|
||||
最终,main 运行这个 libuv 循环,直到它被停止(`uv_run` 仅在循环被停止或者发生错误时返回)
|
||||
|
||||
```
|
||||
// Run the libuv event loop.
|
||||
uv_run(uv_default_loop(), UV_RUN_DEFAULT);
|
||||
|
||||
// If uv_run returned, close the default loop before exiting.
|
||||
return uv_loop_close(uv_default_loop());
|
||||
```
|
||||
|
||||
注意,那个仅是一个单一的通过 main 优先去运行的事件循环回调;我们不久将看到怎么去添加更多的另外的回调。在事件循环的整个运行时中,添加和删除回调并不是一个问题 - 事实上,大多数服务器就是这么写的。
|
||||
|
||||
这是一个 `on_peer_connected`,它处理到服务器的新的客户端连接:
|
||||
|
||||
```
|
||||
void on_peer_connected(uv_stream_t* server_stream, int status) {
|
||||
if (status < 0) {
|
||||
fprintf(stderr, "Peer connection error: %s\n", uv_strerror(status));
|
||||
return;
|
||||
}
|
||||
|
||||
// client will represent this peer; it's allocated on the heap and only
|
||||
// released when the client disconnects. The client holds a pointer to
|
||||
// peer_state_t in its data field; this peer state tracks the protocol state
|
||||
// with this client throughout interaction.
|
||||
uv_tcp_t* client = (uv_tcp_t*)xmalloc(sizeof(*client));
|
||||
int rc;
|
||||
if ((rc = uv_tcp_init(uv_default_loop(), client)) < 0) {
|
||||
die("uv_tcp_init failed: %s", uv_strerror(rc));
|
||||
}
|
||||
client->data = NULL;
|
||||
|
||||
if (uv_accept(server_stream, (uv_stream_t*)client) == 0) {
|
||||
struct sockaddr_storage peername;
|
||||
int namelen = sizeof(peername);
|
||||
if ((rc = uv_tcp_getpeername(client, (struct sockaddr*)&peername,
|
||||
&namelen)) < 0) {
|
||||
die("uv_tcp_getpeername failed: %s", uv_strerror(rc));
|
||||
}
|
||||
report_peer_connected((const struct sockaddr_in*)&peername, namelen);
|
||||
|
||||
// Initialize the peer state for a new client: we start by sending the peer
|
||||
// the initial '*' ack.
|
||||
peer_state_t* peerstate = (peer_state_t*)xmalloc(sizeof(*peerstate));
|
||||
peerstate->state = INITIAL_ACK;
|
||||
peerstate->sendbuf[0] = '*';
|
||||
peerstate->sendbuf_end = 1;
|
||||
peerstate->client = client;
|
||||
client->data = peerstate;
|
||||
|
||||
// Enqueue the write request to send the ack; when it's done,
|
||||
// on_wrote_init_ack will be called. The peer state is passed to the write
|
||||
// request via the data pointer; the write request does not own this peer
|
||||
// state - it's owned by the client handle.
|
||||
uv_buf_t writebuf = uv_buf_init(peerstate->sendbuf, peerstate->sendbuf_end);
|
||||
uv_write_t* req = (uv_write_t*)xmalloc(sizeof(*req));
|
||||
req->data = peerstate;
|
||||
if ((rc = uv_write(req, (uv_stream_t*)client, &writebuf, 1,
|
||||
on_wrote_init_ack)) < 0) {
|
||||
die("uv_write failed: %s", uv_strerror(rc));
|
||||
}
|
||||
} else {
|
||||
uv_close((uv_handle_t*)client, on_client_closed);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
这些代码都有很好的注释,但是,这里有一些重要的 libuv 语法我想去强调一下:
|
||||
|
||||
* 进入回调中的自定义数据:因为 C 还没有停用,这可能是个挑战,libuv 在它的处理类型中有一个 `void*` 数据域;这些域可以被用于进入到用户数据。例如,注意 `client->data` 是如何指向到一个 `peer_state_t` 结构上,以便于通过 `uv_write` 和 `uv_read_start` 注册的回调可以知道它们正在处理的是哪个客户端的数据。
|
||||
|
||||
* 内存管理:事件驱动编程在语言中使用垃圾回收是非常容易的,因为,回调通常运行在一个它们注册的完全不同的栈框架中,使得基于栈的内存管理很困难。它总是需要传递堆分配的数据到 libuv 回调中(当所有回调运行时,除了 main,其它的都运行在栈上),并且,为了避免泄漏,许多情况下都要求这些数据去安全释放。这些都是些需要实践的内容 [[1]][6]。
|
||||
|
||||
这个服务器上对端的状态如下:
|
||||
|
||||
```
|
||||
typedef struct {
|
||||
ProcessingState state;
|
||||
char sendbuf[SENDBUF_SIZE];
|
||||
int sendbuf_end;
|
||||
uv_tcp_t* client;
|
||||
} peer_state_t;
|
||||
```
|
||||
|
||||
它与第三部分中的状态非常类似;我们不再需要 sendptr,因为,在调用 "done writing" 回调之前,`uv_write` 将确保去发送它提供的整个缓冲。我们也为其它的回调使用保持了一个到客户端的指针。这里是 `on_wrote_init_ack`:
|
||||
|
||||
```
|
||||
void on_wrote_init_ack(uv_write_t* req, int status) {
|
||||
if (status) {
|
||||
die("Write error: %s\n", uv_strerror(status));
|
||||
}
|
||||
peer_state_t* peerstate = (peer_state_t*)req->data;
|
||||
// Flip the peer state to WAIT_FOR_MSG, and start listening for incoming data
|
||||
// from this peer.
|
||||
peerstate->state = WAIT_FOR_MSG;
|
||||
peerstate->sendbuf_end = 0;
|
||||
|
||||
int rc;
|
||||
if ((rc = uv_read_start((uv_stream_t*)peerstate->client, on_alloc_buffer,
|
||||
on_peer_read)) < 0) {
|
||||
die("uv_read_start failed: %s", uv_strerror(rc));
|
||||
}
|
||||
|
||||
// Note: the write request doesn't own the peer state, hence we only free the
|
||||
// request itself, not the state.
|
||||
free(req);
|
||||
}
|
||||
```
|
||||
|
||||
然后,我们确信知道了这个初始的 '*' 已经被发送到对端,我们通过调用 `uv_read_start` 去监听从这个对端来的入站数据,它注册一个回调(`on_peer_read`)去被调用,不论什么时候,事件循环都在套接字上接收来自客户端的调用:
|
||||
|
||||
```
|
||||
void on_peer_read(uv_stream_t* client, ssize_t nread, const uv_buf_t* buf) {
|
||||
if (nread < 0) {
|
||||
if (nread != uv_eof) {
|
||||
fprintf(stderr, "read error: %s\n", uv_strerror(nread));
|
||||
}
|
||||
uv_close((uv_handle_t*)client, on_client_closed);
|
||||
} else if (nread == 0) {
|
||||
// from the documentation of uv_read_cb: nread might be 0, which does not
|
||||
// indicate an error or eof. this is equivalent to eagain or ewouldblock
|
||||
// under read(2).
|
||||
} else {
|
||||
// nread > 0
|
||||
assert(buf->len >= nread);
|
||||
|
||||
peer_state_t* peerstate = (peer_state_t*)client->data;
|
||||
if (peerstate->state == initial_ack) {
|
||||
// if the initial ack hasn't been sent for some reason, ignore whatever
|
||||
// the client sends in.
|
||||
free(buf->base);
|
||||
return;
|
||||
}
|
||||
|
||||
// run the protocol state machine.
|
||||
for (int i = 0; i < nread; ++i) {
|
||||
switch (peerstate->state) {
|
||||
case initial_ack:
|
||||
assert(0 && "can't reach here");
|
||||
break;
|
||||
case wait_for_msg:
|
||||
if (buf->base[i] == '^') {
|
||||
peerstate->state = in_msg;
|
||||
}
|
||||
break;
|
||||
case in_msg:
|
||||
if (buf->base[i] == '$') {
|
||||
peerstate->state = wait_for_msg;
|
||||
} else {
|
||||
assert(peerstate->sendbuf_end < sendbuf_size);
|
||||
peerstate->sendbuf[peerstate->sendbuf_end++] = buf->base[i] + 1;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (peerstate->sendbuf_end > 0) {
|
||||
// we have data to send. the write buffer will point to the buffer stored
|
||||
// in the peer state for this client.
|
||||
uv_buf_t writebuf =
|
||||
uv_buf_init(peerstate->sendbuf, peerstate->sendbuf_end);
|
||||
uv_write_t* writereq = (uv_write_t*)xmalloc(sizeof(*writereq));
|
||||
writereq->data = peerstate;
|
||||
int rc;
|
||||
if ((rc = uv_write(writereq, (uv_stream_t*)client, &writebuf, 1,
|
||||
on_wrote_buf)) < 0) {
|
||||
die("uv_write failed: %s", uv_strerror(rc));
|
||||
}
|
||||
}
|
||||
}
|
||||
free(buf->base);
|
||||
}
|
||||
```
|
||||
|
||||
这个服务器的运行时行为非常类似于第三部分的事件驱动服务器:所有的客户端都在一个单个的线程中并发处理。并且一些行为被维护在服务器代码中:服务器的逻辑实现为一个集成的回调,并且长周期运行是禁止的,因为它会阻塞事件循环。这一点也很类似。让我们进一步探索这个问题。
|
||||
|
||||
### 在事件驱动循环中的长周期运行的操作
|
||||
|
||||
单线程的事件驱动代码使它先天地对一些常见问题非常敏感:整个循环中的长周期运行的代码块。参见如下的程序:
|
||||
|
||||
```
|
||||
void on_timer(uv_timer_t* timer) {
|
||||
uint64_t timestamp = uv_hrtime();
|
||||
printf("on_timer [%" PRIu64 " ms]\n", (timestamp / 1000000) % 100000);
|
||||
|
||||
// "Work"
|
||||
if (random() % 5 == 0) {
|
||||
printf("Sleeping...\n");
|
||||
sleep(3);
|
||||
}
|
||||
}
|
||||
|
||||
int main(int argc, const char** argv) {
|
||||
uv_timer_t timer;
|
||||
uv_timer_init(uv_default_loop(), &timer);
|
||||
uv_timer_start(&timer, on_timer, 0, 1000);
|
||||
return uv_run(uv_default_loop(), UV_RUN_DEFAULT);
|
||||
}
|
||||
```
|
||||
|
||||
它用一个单个注册的回调运行一个 libuv 事件循环:`on_timer`,它被每秒钟循环调用一次。回调报告一个时间戳,并且,偶尔通过睡眠 3 秒去模拟一个长周期运行。这是运行示例:
|
||||
|
||||
```
|
||||
$ ./uv-timer-sleep-demo
|
||||
on_timer [4840 ms]
|
||||
on_timer [5842 ms]
|
||||
on_timer [6843 ms]
|
||||
on_timer [7844 ms]
|
||||
Sleeping...
|
||||
on_timer [11845 ms]
|
||||
on_timer [12846 ms]
|
||||
Sleeping...
|
||||
on_timer [16847 ms]
|
||||
on_timer [17849 ms]
|
||||
on_timer [18850 ms]
|
||||
...
|
||||
```
|
||||
|
||||
`on_timer` 忠实地每秒执行一次,直到随机出现的睡眠为止。在那个时间点,`on_timer` 不再被调用,直到睡眠时间结束;事实上,_没有其它的回调_ 在这个时间帧中被调用。这个睡眠调用阻塞当前线程,它正是被调用的线程,并且也是事件循环使用的线程。当这个线程被阻塞后,事件循环也被阻塞。
|
||||
|
||||
这个示例演示了在事件驱动的调用中为什么回调不能被阻塞是多少的重要。并且,同样适用于 Node.js 服务器、客户端侧的 Javascript、大多数的 GUI 编程框架、以及许多其它的异步编程模型。
|
||||
|
||||
但是,有时候运行耗时的任务是不可避免的。并不是所有任务都有一个异步 APIs;例如,我们可能使用一些仅有同步 API 的库去处理,或者,正在执行一个可能的长周期计算。我们如何用事件驱动编程去结合这些代码?线程可以帮到你!
|
||||
|
||||
### “转换” 阻塞调用到异步调用的线程
|
||||
|
||||
一个线程池可以被用于去转换阻塞调用到异步调用,通过与事件循环并行运行,并且当任务完成时去由它去公布事件。一个给定的阻塞函数 `do_work()`,这里介绍了它是怎么运行的:
|
||||
|
||||
1. 在一个回调中,用 `do_work()` 代表直接调用,我们将它打包进一个 “任务”,并且请求线程池去运行这个任务。当任务完成时,我们也为循环去调用它注册一个回调;我们称它为 `on_work_done()`。
|
||||
|
||||
2. 在这个时间点,我们的回调可以返回并且事件循环保持运行;在同一时间点,线程池中的一个线程运行这个任务。
|
||||
|
||||
3. 一旦任务运行完成,通知主线程(指正在运行事件循环的线程),并且,通过事件循环调用 `on_work_done()`。
|
||||
|
||||
让我们看一下,使用 libuv 的工作调度 API,是怎么去解决我们前面的 timer/sleep 示例中展示的问题的:
|
||||
|
||||
```
|
||||
void on_after_work(uv_work_t* req, int status) {
|
||||
free(req);
|
||||
}
|
||||
|
||||
void on_work(uv_work_t* req) {
|
||||
// "Work"
|
||||
if (random() % 5 == 0) {
|
||||
printf("Sleeping...\n");
|
||||
sleep(3);
|
||||
}
|
||||
}
|
||||
|
||||
void on_timer(uv_timer_t* timer) {
|
||||
uint64_t timestamp = uv_hrtime();
|
||||
printf("on_timer [%" PRIu64 " ms]\n", (timestamp / 1000000) % 100000);
|
||||
|
||||
uv_work_t* work_req = (uv_work_t*)malloc(sizeof(*work_req));
|
||||
uv_queue_work(uv_default_loop(), work_req, on_work, on_after_work);
|
||||
}
|
||||
|
||||
int main(int argc, const char** argv) {
|
||||
uv_timer_t timer;
|
||||
uv_timer_init(uv_default_loop(), &timer);
|
||||
uv_timer_start(&timer, on_timer, 0, 1000);
|
||||
return uv_run(uv_default_loop(), UV_RUN_DEFAULT);
|
||||
}
|
||||
```
|
||||
|
||||
通过一个 work_req [[2]][14] 类型的句柄,我们进入一个任务队列,代替在 `on_timer` 上直接调用 sleep,这个函数在任务中(`on_work`)运行,并且,一旦任务完成(`on_after_work`),这个函数被调用一次。`on_work` 在这里是指发生的 “work”(阻塞中的/耗时的操作)。在这两个回调传递到 `uv_queue_work` 时,注意一个关键的区别:`on_work` 运行在线程池中,而 `on_after_work` 运行在事件循环中的主线程上 - 就好像是其它的回调一样。
|
||||
|
||||
让我们看一下这种方式的运行:
|
||||
|
||||
```
|
||||
$ ./uv-timer-work-demo
|
||||
on_timer [89571 ms]
|
||||
on_timer [90572 ms]
|
||||
on_timer [91573 ms]
|
||||
on_timer [92575 ms]
|
||||
Sleeping...
|
||||
on_timer [93576 ms]
|
||||
on_timer [94577 ms]
|
||||
Sleeping...
|
||||
on_timer [95577 ms]
|
||||
on_timer [96578 ms]
|
||||
on_timer [97578 ms]
|
||||
...
|
||||
```
|
||||
|
||||
即便在 sleep 函数被调用时,定时器也每秒钟滴答一下,睡眠(sleeping)现在运行在一个单独的线程中,并且不会阻塞事件循环。
|
||||
|
||||
### 一个用于练习的素数测试服务器
|
||||
|
||||
因为通过睡眼去模拟工作并不是件让人兴奋的事,我有一个事先准备好的更综合的一个示例 - 一个基于套接字接受来自客户端的数字的服务器,检查这个数字是否是素数,然后去返回一个 “prime" 或者 “composite”。完整的 [服务器代码在这里][15] - 我不在这里粘贴了,因为它太长了,更希望读者在一些自己的练习中去体会它。
|
||||
|
||||
这个服务器使用了一个原生的素数测试算法,因此,对于大的素数可能花很长时间才返回一个回答。在我的机器中,对于 2305843009213693951,它花了 ~5 秒钟去计算,但是,你的方法可能不同。
|
||||
|
||||
练习 1:服务器有一个设置(通过一个名为 MODE 的环境变量)要么去在套接字回调(意味着在主线程上)中运行素数测试,要么在 libuv 工作队列中。当多个客户端同时连接时,使用这个设置来观察服务器的行为。当它计算一个大的任务时,在阻塞模式中,服务器将不回复其它客户端,而在非阻塞模式中,它会回复。
|
||||
|
||||
练习 2;libuv 有一个缺省大小的线程池,并且线程池的大小可以通过环境变量配置。你可以通过使用多个客户端去实验找出它的缺省值是多少?找到线程池缺省值后,使用不同的设置去看一下,在重负载下怎么去影响服务器的响应能力。
|
||||
|
||||
### 在非阻塞文件系统中使用工作队列
|
||||
|
||||
对于仅傻傻的演示和 CPU 密集型的计算来说,将可能的阻塞操作委托给一个线程池并不是明智的;libuv 在它的文件系统 APIs 中本身就大量使用了这种性能。通过这种方式,libuv 使用一个异步 API,在一个轻便的方式中,显示出它强大的文件系统的处理能力。
|
||||
|
||||
让我们使用 `uv_fs_read()`,例如,这个函数从一个文件中(以一个 `uv_fs_t` 句柄为代表)读取一个文件到一个缓冲中 [[3]][16],并且当读取完成后调用一个回调。换句话说,`uv_fs_read()` 总是立即返回,甚至如果文件在一个类似 NFS 的系统上,并且,数据到达缓冲区可能需要一些时间。换句话说,这个 API 与这种方式中其它的 libuv APIs 是异步的。这是怎么工作的呢?
|
||||
|
||||
在这一点上,我们看一下 libuv 的底层;内部实际上非常简单,并且它是一个很好的练习。作为一个便携的库,libuv 对于 Windows 和 Unix 系统在它的许多函数上有不同的实现。我们去看一下在 libuv 源树中的 src/unix/fs.c。
|
||||
|
||||
这是 `uv_fs_read` 的代码:
|
||||
|
||||
```
|
||||
int uv_fs_read(uv_loop_t* loop, uv_fs_t* req,
|
||||
uv_file file,
|
||||
const uv_buf_t bufs[],
|
||||
unsigned int nbufs,
|
||||
int64_t off,
|
||||
uv_fs_cb cb) {
|
||||
if (bufs == NULL || nbufs == 0)
|
||||
return -EINVAL;
|
||||
|
||||
INIT(READ);
|
||||
req->file = file;
|
||||
|
||||
req->nbufs = nbufs;
|
||||
req->bufs = req->bufsml;
|
||||
if (nbufs > ARRAY_SIZE(req->bufsml))
|
||||
req->bufs = uv__malloc(nbufs * sizeof(*bufs));
|
||||
|
||||
if (req->bufs == NULL) {
|
||||
if (cb != NULL)
|
||||
uv__req_unregister(loop, req);
|
||||
return -ENOMEM;
|
||||
}
|
||||
|
||||
memcpy(req->bufs, bufs, nbufs * sizeof(*bufs));
|
||||
|
||||
req->off = off;
|
||||
POST;
|
||||
}
|
||||
```
|
||||
|
||||
第一次看可能觉得很困难,因为它延缓真实的工作到 INIT 和 POST 宏中,在 POST 中与一些本地变量一起设置。这样做可以避免了文件中的许多重复代码。
|
||||
|
||||
这是 INIT 宏:
|
||||
|
||||
```
|
||||
#define INIT(subtype) \
|
||||
do { \
|
||||
req->type = UV_FS; \
|
||||
if (cb != NULL) \
|
||||
uv__req_init(loop, req, UV_FS); \
|
||||
req->fs_type = UV_FS_ ## subtype; \
|
||||
req->result = 0; \
|
||||
req->ptr = NULL; \
|
||||
req->loop = loop; \
|
||||
req->path = NULL; \
|
||||
req->new_path = NULL; \
|
||||
req->cb = cb; \
|
||||
} \
|
||||
while (0)
|
||||
```
|
||||
|
||||
它设置了请求,并且更重要的是,设置 `req->fs_type` 域为真实的 FS 请求类型。因为 `uv_fs_read` 调用 invokes INIT(READ),它意味着 `req->fs_type` 被分配一个常数 `UV_FS_READ`。
|
||||
|
||||
这是 POST 宏:
|
||||
|
||||
```
|
||||
#define POST \
|
||||
do { \
|
||||
if (cb != NULL) { \
|
||||
uv__work_submit(loop, &req->work_req, uv__fs_work, uv__fs_done); \
|
||||
return 0; \
|
||||
} \
|
||||
else { \
|
||||
uv__fs_work(&req->work_req); \
|
||||
return req->result; \
|
||||
} \
|
||||
} \
|
||||
while (0)
|
||||
```
|
||||
|
||||
它做什么取决于回调是否为 NULL。在 libuv 文件系统 APIs 中,一个 NULL 回调意味着我们真实地希望去执行一个 _同步_ 操作。在这种情况下,POST 直接调用 `uv__fs_work`(我们需要了解一下这个函数的功能),而对于一个 non-NULL 回调,它提交 `uv__fs_work` 作为一个工作事项到工作队列(指的是线程池),然后,注册 `uv__fs_done` 作为回调;该函数执行一些登记并调用用户提供的回调。
|
||||
|
||||
如果我们去看 `uv__fs_work` 的代码,我们将看到它使用很多宏去按需路由工作到真实的文件系统调用。在我们的案例中,对于 `UV_FS_READ` 这个调用将被 `uv__fs_read` 生成,它(最终)使用普通的 POSIX APIs 去读取。这个函数可以在一个 _阻塞_ 方式中很安全地实现。因为,它通过异步 API 调用时被置于一个线程池中。
|
||||
|
||||
在 Node.js 中,fs.readFile 函数是映射到 `uv_fs_read` 上。因此,可以在一个非阻塞模式中读取文件,甚至是当底层文件系统 API 是阻塞方式时。
|
||||
|
||||
* * *
|
||||
|
||||
|
||||
[[1]][1] 为确保服务器不泄露内存,我在一个启用泄露检查的 Valgrind 中运行它。因为服务器经常是被设计为永久运行,这是一个挑战;为克服这个问题,我在服务器上添加了一个 “kill 开关” - 一个从客户端接收的特定序列,以使它可以停止事件循环并退出。这个代码在 `theon_wrote_buf` 句柄中。
|
||||
|
||||
|
||||
[[2]][2] 在这里我们不过多地使用 `work_req`;讨论的素数测试服务器接下来将展示怎么被用于去传递上下文信息到回调中。
|
||||
|
||||
|
||||
[[3]][3] `uv_fs_read()` 提供了一个类似于 preadv Linux 系统调用的通用 API:它使用多缓冲区用于排序,并且支持一个到文件中的偏移。基于我们讨论的目的可以忽略这些特性。
|
||||
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
via: https://eli.thegreenplace.net/2017/concurrent-servers-part-4-libuv/
|
||||
|
||||
作者:[Eli Bendersky ][a]
|
||||
译者:[qhwdw](https://github.com/qhwdw)
|
||||
校对:[校对者ID](https://github.com/校对者ID)
|
||||
|
||||
本文由 [LCTT](https://github.com/LCTT/TranslateProject) 原创编译,[Linux中国](https://linux.cn/) 荣誉推出
|
||||
|
||||
[a]:https://eli.thegreenplace.net/
|
||||
[1]:https://eli.thegreenplace.net/2017/concurrent-servers-part-4-libuv/#id1
|
||||
[2]:https://eli.thegreenplace.net/2017/concurrent-servers-part-4-libuv/#id2
|
||||
[3]:https://eli.thegreenplace.net/2017/concurrent-servers-part-4-libuv/#id3
|
||||
[4]:https://eli.thegreenplace.net/tag/concurrency
|
||||
[5]:https://eli.thegreenplace.net/tag/c-c
|
||||
[6]:https://eli.thegreenplace.net/2017/concurrent-servers-part-4-libuv/#id4
|
||||
[7]:http://eli.thegreenplace.net/2017/concurrent-servers-part-1-introduction/
|
||||
[8]:http://eli.thegreenplace.net/2017/concurrent-servers-part-2-threads/
|
||||
[9]:http://eli.thegreenplace.net/2017/concurrent-servers-part-3-event-driven/
|
||||
[10]:http://eli.thegreenplace.net/2017/concurrent-servers-part-4-libuv/
|
||||
[11]:http://eli.thegreenplace.net/2017/concurrent-servers-part-3-event-driven/
|
||||
[12]:http://libuv.org/
|
||||
[13]:https://github.com/eliben/code-for-blog/blob/master/2017/async-socket-server/uv-server.c
|
||||
[14]:https://eli.thegreenplace.net/2017/concurrent-servers-part-4-libuv/#id5
|
||||
[15]:https://github.com/eliben/code-for-blog/blob/master/2017/async-socket-server/uv-isprime-server.c
|
||||
[16]:https://eli.thegreenplace.net/2017/concurrent-servers-part-4-libuv/#id6
|
||||
[17]:https://eli.thegreenplace.net/2017/concurrent-servers-part-4-libuv/
|
||||
|
Loading…
Reference in New Issue
Block a user