@slientdawn-zz
This commit is contained in:
Xingyu Wang 2020-08-08 20:21:41 +08:00
parent 88a1fe5633
commit 784e69eee2

View File

@ -1,6 +1,6 @@
[#]: collector: (lujun9972) [#]: collector: (lujun9972)
[#]: translator: (silentdawn-zz) [#]: translator: (silentdawn-zz)
[#]: reviewer: ( ) [#]: reviewer: (wxy)
[#]: publisher: ( ) [#]: publisher: ( )
[#]: url: ( ) [#]: url: ( )
[#]: subject: (Share data between C and Python with this messaging library) [#]: subject: (Share data between C and Python with this messaging library)
@ -9,21 +9,21 @@
使用 ZeroMQ 消息库在 C 和 Python 间共享数据 使用 ZeroMQ 消息库在 C 和 Python 间共享数据
====== ======
ZeroMQ 是一个快速灵活的消息库,用于数据收集和不同语言间的数据共享。
![Chat via email][1]
作为软件工程师,我有多次在要求完成指定任务时感到毛骨悚然的经历。其中一次是要写个接口,用于控制一些新的底层硬件间的对接,接口实现形式分别是 C 和 云底层组件,而后者主要是 Python > ZeroMQ 是一个快速灵活的消息库,用于数据收集和不同编程语言间的数据共享。
实现的方式之一是 [用 C 写扩展模块][2]Python 支持 C 扩展的调用。快速浏览文档后发现,这需要编写大量的 C 代码。这样做的话,在有些情况下效果还不错,但不是我想要的方式。另一种方式就是将两个任务放在不同的进程中,以使用 [ZeroMQ 消息库][3] 在进程间交换信息的方式实现数据的共享。 ![](https://img.linux.net.cn/data/attachment/album/202008/08/202106uale11l1qf11slzw.jpg)
在引入 ZeroMQ 之前,我选择了编写扩展的方式,试图解决这个场景的需求。这种方式不算太差,但非常复杂和费时。现在为了避免那些问题,我细分出一个系统作为独立的进程运行,专门用于交换通过 [通信套接字][4] 发送的消息所承载的数据。这样,不同的编程语言可以共存,每个进程也变简单了,同时也容易调试 作为软件工程师,我有多次在要求完成指定任务时感到浑身一冷的经历。其中有一次,我必须在一些新的硬件基础设施和云基础设施之间写一个接口,这些硬件需要 C 语言,而云基础设施主要是用 Python
ZeroMQ 提供了更简单的进程: 实现的方式之一是 [用 C 写扩展模块][2]Python 支持 C 扩展的调用。快速浏览文档后发现,这需要编写大量的 C 代码。这样做的话,在有些情况下效果还不错,但不是我喜欢的方式。另一种方式就是将两个任务放在不同的进程中,并使用 [ZeroMQ 消息库][3] 在两者之间交换消息。
1. 编写一小组 C 代码,从硬件读取数据,发送到它能作用到的消息中。 在发现 ZeroMQ 之前,遇到这种类型的情况时,我选择了编写扩展的方式。这种方式不算太差,但非常费时费力。如今,为了避免那些问题,我将一个系统细分为独立的进程,通过 [通信套接字][4] 发送消息来交换信息。这样,不同的编程语言可以共存,每个进程也变简单了,同时也容易调试。
2. 使用 Python 编写接口,实现新旧基础组件的对接。
ZeroMQ 提供了一个更简单的过程:
1. 编写一小段 C 代码,从硬件读取数据,然后把发现的东西作为消息发送出去。
2. 使用 Python 编写接口,实现新旧基础设施之间的对接。
[Pieter Hintjens][5] 是 ZeroMQ 项目发起者之一,他是个拥有 [有趣视角和作品][6] 的非凡人物。 [Pieter Hintjens][5] 是 ZeroMQ 项目发起者之一,他是个拥有 [有趣视角和作品][6] 的非凡人物。
@ -31,23 +31,19 @@ ZeroMQ 提供了更简单的进程:
本教程中,需要: 本教程中,需要:
* 一个 C 编译器(例如 [GCC][7] 或 [Clang][8] * 一个 C 编译器(例如 [GCC][7] 或 [Clang][8]
* [**libzmq** 库][9] * [libzmq 库][9]
* [Python 3][10] * [Python 3][10]
* [ZeroMQ 的 Python 封装][11] * [ZeroMQ 的 Python 封装][11]
Fedora 系统上的安装方法: Fedora 系统上的安装方法:
``` ```
$ dnf install clang zeromq zeromq-devel python3 python3-zmq $ dnf install clang zeromq zeromq-devel python3 python3-zmq
``` ```
Debian 和 Ubuntu 系统上的安装方法: Debian 和 Ubuntu 系统上的安装方法:
``` ```
$ apt-get install clang libzmq5 libzmq3-dev python3 python3-zmq $ apt-get install clang libzmq5 libzmq3-dev python3 python3-zmq
``` ```
@ -58,37 +54,34 @@ $ apt-get install clang libzmq5 libzmq3-dev python3 python3-zmq
因为这里针对的是个设想的场景,本教程虚构了包含两个函数的操作库: 因为这里针对的是个设想的场景,本教程虚构了包含两个函数的操作库:
* **fancyhw_init()** 用来初始化(设想的)硬件 * `fancyhw_init()` 用来初始化(设想的)硬件
* **fancyhw_read_val()** 用于返回从硬件读取的数据 * `fancyhw_read_val()` 用于返回从硬件读取的数据
将库的完整代码保存到文件 **libfancyhw.h** 中:
将库的完整代码保存到文件 `libfancyhw.h` 中:
``` ```
#ifndef LIBFANCYHW_H #ifndef LIBFANCYHW_H
#define LIBFANCYHW_H #define LIBFANCYHW_H
#include &lt;stdlib.h&gt; #include <stdlib.h>
#include &lt;stdint.h&gt; #include <stdint.h>
// This is the fictitious hardware interfacing library // This is the fictitious hardware interfacing library
void fancyhw_init(unsigned int init_param) void fancyhw_init(unsigned int init_param)
{ {
    [srand][12](init_param); srand(init_param);
} }
int16_t fancyhw_read_val(void) int16_t fancyhw_read_val(void)
{ {
    return (int16_t)[rand][13](); return (int16_t)rand();
} }
#endif #endif
``` ```
这个库可以模拟你要在不同语言实现的组件间交换的数据,中间有随机数发生器。 这个库可以模拟你要在不同语言实现的组件间交换的数据,中间有随机数发生器。
### 设计 C 接口 ### 设计 C 接口
@ -98,26 +91,24 @@ int16_t fancyhw_read_val(void)
开始先加载必要的库(每个库的作用见代码注释): 开始先加载必要的库(每个库的作用见代码注释):
``` ```
// For printf() // For printf()
#include &lt;stdio.h&gt; #include <stdio.h>
// For EXIT_* // For EXIT_*
#include &lt;stdlib.h&gt; #include <stdlib.h>
// For memcpy() // For memcpy()
#include &lt;string.h&gt; #include <string.h>
// For sleep() // For sleep()
#include &lt;unistd.h&gt; #include <unistd.h>
#include &lt;zmq.h&gt; #include <zmq.h>
#include "libfancyhw.h" #include "libfancyhw.h"
``` ```
#### 必要的参数 #### 必要的参数
定义 **main** 函数和后续过程中必要的参数: 定义 `main` 函数和后续过程中必要的参数:
``` ```
int main(void) int main(void)
@ -134,27 +125,24 @@ int main(void)
所有的库都需要初始化。虚构的那个只需要一个参数: 所有的库都需要初始化。虚构的那个只需要一个参数:
``` ```
fancyhw_init(INIT_PARAM); fancyhw_init(INIT_PARAM);
``` ```
ZeroMQ 库需要实打实的初始化。首先,定义对象 **context**,它是用来管理全部的套接字的: ZeroMQ 库需要实打实的初始化。首先,定义对象 `context`,它是用来管理全部的套接字的:
``` ```
void *context = zmq_ctx_new(); void *context = zmq_ctx_new();
if (!context) if (!context)
{ {
    [printf][14]("ERROR: ZeroMQ error occurred during zmq_ctx_new(): %s\n", zmq_strerror(errno)); printf("ERROR: ZeroMQ error occurred during zmq_ctx_new(): %s\n", zmq_strerror(errno));
    return EXIT_FAILURE; return EXIT_FAILURE;
} }
``` ```
之后定义用来发送数据的套接字。ZeroMQ 支持若干种套接字,各有其用。使用 **publish** 套接字(也叫 **PUB** 套接字),可以复制消息并分发到多个接收端。这使得你可以让多个接收端接收同一个消息。没有接收者的消息将被丢弃(即不会入消息队列)。用法如下: 之后定义用来发送数据的套接字。ZeroMQ 支持若干种套接字,各有其用。使用 `publish` 套接字(也叫 `PUB` 套接字),可以复制消息并分发到多个接收端。这使得你可以让多个接收端接收同一个消息。没有接收者的消息将被丢弃(即不会入消息队列)。用法如下:
``` ```
void *data_socket = zmq_socket(context, ZMQ_PUB); void *data_socket = zmq_socket(context, ZMQ_PUB);
@ -162,125 +150,116 @@ void *data_socket = zmq_socket(context, ZMQ_PUB);
套接字需要绑定到一个具体的地址,这样客户端就知道要连接哪里了。本例中,使用了 [TCP 传输层][15](当然也有 [其它选项][16],但 TCP 是不错的默认选择): 套接字需要绑定到一个具体的地址,这样客户端就知道要连接哪里了。本例中,使用了 [TCP 传输层][15](当然也有 [其它选项][16],但 TCP 是不错的默认选择):
``` ```
const int rb = zmq_bind(data_socket, "tcp://*:5555"); const int rb = zmq_bind(data_socket, "tcp://*:5555");
if (rb != 0) if (rb != 0)
{ {
    [printf][14]("ERROR: ZeroMQ error occurred during zmq_ctx_new(): %s\n", zmq_strerror(errno)); printf("ERROR: ZeroMQ error occurred during zmq_ctx_new(): %s\n", zmq_strerror(errno));
    return EXIT_FAILURE; return EXIT_FAILURE;
} }
``` ```
下一步, 计算一些后续要用到的值。 注意下面代码中的 **TOPIC**,因为 **PUB** 套接字发送的消息需要绑定一个 topic。topic 用于供接收者过滤消息: 下一步, 计算一些后续要用到的值。 注意下面代码中的 `TOPIC`,因为 `PUB` 套接字发送的消息需要绑定一个主题。主题用于供接收者过滤消息:
``` ```
const size_t topic_size = [strlen][17](TOPIC); const size_t topic_size = strlen(TOPIC);
const size_t envelope_size = topic_size + 1 + PACKET_SIZE * sizeof(int16_t); const size_t envelope_size = topic_size + 1 + PACKET_SIZE * sizeof(int16_t);
[printf][14]("Topic: %s; topic size: %zu; Envelope size: %zu\n", TOPIC, topic_size, envelope_size); printf("Topic: %s; topic size: %zu; Envelope size: %zu\n", TOPIC, topic_size, envelope_size);
``` ```
#### 发送消息 #### 发送消息
启动一个 **重复** 发送消息的循环: 启动一个发送消息的循环,循环 `REPETITIONS` 次:
``` ```
for (unsigned int i = 0; i &lt; REPETITIONS; i++) for (unsigned int i = 0; i < REPETITIONS; i++)
{ {
    ... ...
``` ```
发送消息前,先填充一个长度为 **PACKET_SIZE** 的缓冲区。本库提供的是 16 bits 有符号整数。因为 C 语言中 **int** 类型占用空间大小与平台相关,不是确定的值,所以要使用指定宽度的 **int** 变量: 发送消息前,先填充一个长度为 `PACKET_SIZE` 的缓冲区。本库提供的是 16 个位的有符号整数。因为 C 语言中 `int` 类型占用空间大小与平台相关,不是确定的值,所以要使用指定宽度的 `int` 变量:
``` ```
int16_t buffer[PACKET_SIZE]; int16_t buffer[PACKET_SIZE];
for (unsigned int j = 0; j &lt; PACKET_SIZE; j++) for (unsigned int j = 0; j < PACKET_SIZE; j++)
{ {
    buffer[j] = fancyhw_read_val(); buffer[j] = fancyhw_read_val();
} }
[printf][14]("Read %u data values\n", PACKET_SIZE); printf("Read %u data values\n", PACKET_SIZE);
``` ```
消息的准备和发送中,第一步是创建 ZeroMQ 消息,为消息分配必要的内存空间。空白的消息是用于封装要发送的数据的: 消息的准备和发送的第一步是创建 ZeroMQ 消息,为消息分配必要的内存空间。空白的消息是用于封装要发送的数据的:
``` ```
zmq_msg_t envelope; zmq_msg_t envelope;
const int rmi = zmq_msg_init_size(&amp;envelope, envelope_size); const int rmi = zmq_msg_init_size(&envelope, envelope_size);
if (rmi != 0) if (rmi != 0)
{ {
    [printf][14]("ERROR: ZeroMQ error occurred during zmq_msg_init_size(): %s\n", zmq_strerror(errno)); printf("ERROR: ZeroMQ error occurred during zmq_msg_init_size(): %s\n", zmq_strerror(errno));
    zmq_msg_close(&amp;envelope); zmq_msg_close(&envelope);
    break; break;
} }
``` ```
现在内存空间已分配,数据保存在 ZeroMQ 消息“envelope”中。函数 **zmq_msg_data()** 返回一个指向封装数据缓存区顶端的指针。第一部分是 topic之后是一个空格最后是二进制数。topic 和二进制数据之间的分隔符采用空格字符。需要遍历缓存区的话,使用 cast 和 [指针算法][18]。(感谢 C 语言,让事情变得直截了当。)做法如下: 现在内存空间已分配,数据保存在 ZeroMQ 消息 “信封”中。函数 `zmq_msg_data()` 返回一个指向封装数据缓存区顶端的指针。第一部分是主题,之后是一个空格,最后是二进制数。主题和二进制数据之间的分隔符采用空格字符。需要遍历缓存区的话,使用类型转换和 [指针算法][18]。(感谢 C 语言,让事情变得直截了当。)做法如下:
``` ```
[memcpy][19](zmq_msg_data(&amp;envelope), TOPIC, topic_size); memcpy(zmq_msg_data(&envelope), TOPIC, topic_size);
[memcpy][19]((void*)((char*)zmq_msg_data(&amp;envelope) + topic_size), " ", 1); memcpy((void*)((char*)zmq_msg_data(&envelope) + topic_size), " ", 1);
[memcpy][19]((void*)((char*)zmq_msg_data(&amp;envelope) + 1 + topic_size), buffer, PACKET_SIZE * sizeof(int16_t)); memcpy((void*)((char*)zmq_msg_data(&envelope) + 1 + topic_size), buffer, PACKET_SIZE * sizeof(int16_t))
``` ```
通过 **data_socket** 发送消息: 通过 `data_socket` 发送消息:
``` ```
const size_t rs = zmq_msg_send(&amp;envelope, data_socket, 0); const size_t rs = zmq_msg_send(&envelope, data_socket, 0);
if (rs != envelope_size) if (rs != envelope_size)
{ {
    [printf][14]("ERROR: ZeroMQ error occurred during zmq_msg_send(): %s\n", zmq_strerror(errno)); printf("ERROR: ZeroMQ error occurred during zmq_msg_send(): %s\n", zmq_strerror(errno));
    zmq_msg_close(&amp;envelope); zmq_msg_close(&envelope);
    break; break;
} }
``` ```
使用数据之前要先解取封装: 使用数据之前要先解除封装:
``` ```
zmq_msg_close(&amp;envelope); zmq_msg_close(&envelope);
[printf][14]("Message sent; i: %u, topic: %s\n", i, TOPIC); printf("Message sent; i: %u, topic: %s\n", i, TOPIC);
``` ```
#### 清 #### 清
C 语言不提供 [垃圾收集][20] 功能,用完之后记得要自己扫尾。发送消息之后结束程序之前,需要运行扫尾代码,释放分配的内存: C 语言不提供 [垃圾收集][20] 功能,用完之后记得要自己扫尾。发送消息之后结束程序之前,需要运行扫尾代码,释放分配的内存:
``` ```
const int rc = zmq_close(data_socket); const int rc = zmq_close(data_socket);
if (rc != 0) if (rc != 0)
{ {
    [printf][14]("ERROR: ZeroMQ error occurred during zmq_close(): %s\n", zmq_strerror(errno)); printf("ERROR: ZeroMQ error occurred during zmq_close(): %s\n", zmq_strerror(errno));
    return EXIT_FAILURE; return EXIT_FAILURE;
} }
const int rd = zmq_ctx_destroy(context); const int rd = zmq_ctx_destroy(context);
if (rd != 0) if (rd != 0)
{ {
    [printf][14]("Error occurred during zmq_ctx_destroy(): %s\n", zmq_strerror(errno)); printf("Error occurred during zmq_ctx_destroy(): %s\n", zmq_strerror(errno));
    return EXIT_FAILURE; return EXIT_FAILURE;
} }
return EXIT_SUCCESS; return EXIT_SUCCESS;
@ -288,137 +267,134 @@ return EXIT_SUCCESS;
#### 完整 C 代码 #### 完整 C 代码
保存下面完整的接口代码到本地名为 **hw_interface.c** 的文件: 保存下面完整的接口代码到本地名为 `hw_interface.c` 的文件:
``` ```
// For printf() // For printf()
#include &lt;stdio.h&gt; #include <stdio.h>
// For EXIT_* // For EXIT_*
#include &lt;stdlib.h&gt; #include <stdlib.h>
// For memcpy() // For memcpy()
#include &lt;string.h&gt; #include <string.h>
// For sleep() // For sleep()
#include &lt;unistd.h&gt; #include <unistd.h>
#include &lt;zmq.h&gt; #include <zmq.h>
#include "libfancyhw.h" #include "libfancyhw.h"
int main(void) int main(void)
{ {
    const unsigned int INIT_PARAM = 12345; const unsigned int INIT_PARAM = 12345;
    const unsigned int REPETITIONS = 10; const unsigned int REPETITIONS = 10;
    const unsigned int PACKET_SIZE = 16; const unsigned int PACKET_SIZE = 16;
    const char *TOPIC = "fancyhw_data"; const char *TOPIC = "fancyhw_data";
    fancyhw_init(INIT_PARAM); fancyhw_init(INIT_PARAM);
    void *context = zmq_ctx_new(); void *context = zmq_ctx_new();
    if (!context) if (!context)
    { {
        [printf][14]("ERROR: ZeroMQ error occurred during zmq_ctx_new(): %s\n", zmq_strerror(errno)); printf("ERROR: ZeroMQ error occurred during zmq_ctx_new(): %s\n", zmq_strerror(errno));
        return EXIT_FAILURE; return EXIT_FAILURE;
    } }
    void *data_socket = zmq_socket(context, ZMQ_PUB); void *data_socket = zmq_socket(context, ZMQ_PUB);
    const int rb = zmq_bind(data_socket, "tcp://*:5555"); const int rb = zmq_bind(data_socket, "tcp://*:5555");
    if (rb != 0) if (rb != 0)
    { {
        [printf][14]("ERROR: ZeroMQ error occurred during zmq_ctx_new(): %s\n", zmq_strerror(errno)); printf("ERROR: ZeroMQ error occurred during zmq_ctx_new(): %s\n", zmq_strerror(errno));
        return EXIT_FAILURE; return EXIT_FAILURE;
    } }
    const size_t topic_size = [strlen][17](TOPIC); const size_t topic_size = strlen(TOPIC);
    const size_t envelope_size = topic_size + 1 + PACKET_SIZE * sizeof(int16_t); const size_t envelope_size = topic_size + 1 + PACKET_SIZE * sizeof(int16_t);
    [printf][14]("Topic: %s; topic size: %zu; Envelope size: %zu\n", TOPIC, topic_size, envelope_size); printf("Topic: %s; topic size: %zu; Envelope size: %zu\n", TOPIC, topic_size, envelope_size);
    for (unsigned int i = 0; i &lt; REPETITIONS; i++) for (unsigned int i = 0; i < REPETITIONS; i++)
    { {
        int16_t buffer[PACKET_SIZE]; int16_t buffer[PACKET_SIZE];
        for (unsigned int j = 0; j &lt; PACKET_SIZE; j++) for (unsigned int j = 0; j < PACKET_SIZE; j++)
        { {
            buffer[j] = fancyhw_read_val(); buffer[j] = fancyhw_read_val();
        } }
        [printf][14]("Read %u data values\n", PACKET_SIZE); printf("Read %u data values\n", PACKET_SIZE);
        zmq_msg_t envelope; zmq_msg_t envelope;
   
        const int rmi = zmq_msg_init_size(&amp;envelope, envelope_size); const int rmi = zmq_msg_init_size(&envelope, envelope_size);
        if (rmi != 0) if (rmi != 0)
        { {
            [printf][14]("ERROR: ZeroMQ error occurred during zmq_msg_init_size(): %s\n", zmq_strerror(errno)); printf("ERROR: ZeroMQ error occurred during zmq_msg_init_size(): %s\n", zmq_strerror(errno));
   
            zmq_msg_close(&amp;envelope); zmq_msg_close(&envelope);
   
            break; break;
        } }
       
        [memcpy][19](zmq_msg_data(&amp;envelope), TOPIC, topic_size); memcpy(zmq_msg_data(&envelope), TOPIC, topic_size);
        [memcpy][19]((void*)((char*)zmq_msg_data(&amp;envelope) + topic_size), " ", 1); memcpy((void*)((char*)zmq_msg_data(&envelope) + topic_size), " ", 1);
        [memcpy][19]((void*)((char*)zmq_msg_data(&amp;envelope) + 1 + topic_size), buffer, PACKET_SIZE * sizeof(int16_t)); memcpy((void*)((char*)zmq_msg_data(&envelope) + 1 + topic_size), buffer, PACKET_SIZE * sizeof(int16_t));
   
        const size_t rs = zmq_msg_send(&amp;envelope, data_socket, 0); const size_t rs = zmq_msg_send(&envelope, data_socket, 0);
        if (rs != envelope_size) if (rs != envelope_size)
        { {
            [printf][14]("ERROR: ZeroMQ error occurred during zmq_msg_send(): %s\n", zmq_strerror(errno)); printf("ERROR: ZeroMQ error occurred during zmq_msg_send(): %s\n", zmq_strerror(errno));
   
            zmq_msg_close(&amp;envelope); zmq_msg_close(&envelope);
   
            break; break;
        } }
   
        zmq_msg_close(&amp;envelope); zmq_msg_close(&envelope);
        [printf][14]("Message sent; i: %u, topic: %s\n", i, TOPIC); printf("Message sent; i: %u, topic: %s\n", i, TOPIC);
        sleep(1); sleep(1);
    } }
    const int rc = zmq_close(data_socket); const int rc = zmq_close(data_socket);
    if (rc != 0) if (rc != 0)
    { {
        [printf][14]("ERROR: ZeroMQ error occurred during zmq_close(): %s\n", zmq_strerror(errno)); printf("ERROR: ZeroMQ error occurred during zmq_close(): %s\n", zmq_strerror(errno));
        return EXIT_FAILURE; return EXIT_FAILURE;
    } }
    const int rd = zmq_ctx_destroy(context); const int rd = zmq_ctx_destroy(context);
    if (rd != 0) if (rd != 0)
    { {
        [printf][14]("Error occurred during zmq_ctx_destroy(): %s\n", zmq_strerror(errno)); printf("Error occurred during zmq_ctx_destroy(): %s\n", zmq_strerror(errno));
        return EXIT_FAILURE; return EXIT_FAILURE;
    } }
    return EXIT_SUCCESS; return EXIT_SUCCESS;
} }
``` ```
用如下命令编译: 用如下命令编译:
``` ```
$ clang -std=c99 -I. hw_interface.c -lzmq -o hw_interface $ clang -std=c99 -I. hw_interface.c -lzmq -o hw_interface
``` ```
如果没有编译错误你就可以运行这个接口了。贴心的是ZeroMQ **PUB** 套接字可以在没有任何应用发送或接受数据的状态下运行,这简化了使用复杂度,因为这样不限制进程启动的次序。 如果没有编译错误你就可以运行这个接口了。贴心的是ZeroMQ `PUB` 套接字可以在没有任何应用发送或接受数据的状态下运行,这简化了使用复杂度,因为这样不限制进程启动的次序。
运行该接口: 运行该接口:
``` ```
$ ./hw_interface $ ./hw_interface
Topic: fancyhw_data; topic size: 12; Envelope size: 45 Topic: fancyhw_data; topic size: 12; Envelope size: 45
@ -435,22 +411,20 @@ Read 16 data values
### 编写 Python 数据处理器 ### 编写 Python 数据处理器
现在已经准备好从 C 程序向 Python 应用送数据了。 现在已经准备好从 C 程序向 Python 应用送数据了。
#### 库 #### 库
需要两个库帮助实现数据传输。首先是 ZeroMQ 的 Python 封装: 需要两个库帮助实现数据传输。首先是 ZeroMQ 的 Python 封装:
``` ```
$ python3 -m pip install zmq $ python3 -m pip install zmq
``` ```
另一个就是 [**struct** 库][21],用于解码二进制数据。这个库是 Python 标准库的一部分,所以不需要使用 **pip** 命令安装。 另一个就是 [struct 库][21],用于解码二进制数据。这个库是 Python 标准库的一部分,所以不需要使用 `pip` 命令安装。
Python 程序的第一部分是导入这些库: Python 程序的第一部分是导入这些库:
``` ```
import zmq import zmq
import struct import struct
@ -458,8 +432,7 @@ import struct
#### 重要参数 #### 重要参数
使用 ZeroMQ 时,只能向常量 **TOPIC** 定义相同的接收端发送消息: 使用 ZeroMQ 时,只能向常量 `TOPIC` 定义相同的接收端发送消息:
``` ```
topic = "fancyhw_data".encode('ascii') topic = "fancyhw_data".encode('ascii')
@ -469,8 +442,7 @@ print("Reading messages with topic: {}".format(topic))
#### 初始化 #### 初始化
下一步,初始化上下文和套接字。使用 **subscribe** 套接字(也称为 **SUB** 套接字),它是 **PUB** 套接字的天生好友。这个套接字发送时也需要匹配 topic。 下一步,初始化上下文和套接字。使用 `subscribe` 套接字(也称为 `SUB` 套接字),它是 `PUB` 套接字的天生伴侣。这个套接字发送时也需要匹配主题。
``` ```
with zmq.Context() as context: with zmq.Context() as context:
@ -486,8 +458,7 @@ with zmq.Context() as context:
#### 接收消息 #### 接收消息
启动一个无限循环,等待接收发送到 SUB 套接字的新消息。这个循环会在你按下 **Ctrl+C** 组合键或者内部发生错误时终止: 启动一个无限循环,等待接收发送到 `SUB` 套接字的新消息。这个循环会在你按下 `Ctrl+C` 组合键或者内部发生错误时终止:
``` ```
    try:     try:
@ -502,7 +473,7 @@ with zmq.Context() as context:
        socket.close()         socket.close()
``` ```
这个循环等待 **recv()** 方法获取的新消息,然后将接收到的内容从第一个空格字符处分割开,从而得到 topic 这个循环等待 `recv()` 方法获取的新消息,然后将接收到的内容从第一个空格字符处分割开,从而得到主题
``` ```
@ -511,8 +482,7 @@ binary_topic, data_buffer = socket.recv().split(b' ', 1)
#### 解码消息 #### 解码消息
Python 此时尚不知道 topic 是个字符串,使用标准 ASCII 编解码器进行解码: Python 此时尚不知道主题是个字符串,使用标准 ASCII 编解码器进行解码:
``` ```
topic = binary_topic.decode(encoding = 'ascii') topic = binary_topic.decode(encoding = 'ascii')
@ -521,8 +491,7 @@ print("Message {:d}:".format(i))
print("\ttopic: '{}'".format(topic)) print("\ttopic: '{}'".format(topic))
``` ```
下一步就是使用 **struct** 库读取二进制数据,它可以将二进制数据段转换为明确的数值。首先,计算数据包中数值的组数。本例中使用的 16 bit 有符号整数对应的是 **struct** [格式字符][22] 中的“h” 下一步就是使用 `struct` 库读取二进制数据,它可以将二进制数据段转换为明确的数值。首先,计算数据包中数值的组数。本例中使用的 16 个位的有符号整数对应的是 `struct` [格式字符][22] 中的 `h`
``` ```
packet_size = len(data_buffer) // struct.calcsize("h") packet_size = len(data_buffer) // struct.calcsize("h")
@ -530,8 +499,7 @@ packet_size = len(data_buffer) // struct.calcsize("h")
print("\tpacket size: {:d}".format(packet_size)) print("\tpacket size: {:d}".format(packet_size))
``` ```
知道数据包中有多少组数据后,就可以通过构建一个包含数据组数和数据类型的字符串,来定义格式了(比如“**16h**”): 知道数据包中有多少组数据后,就可以通过构建一个包含数据组数和数据类型的字符串,来定义格式了(比如“`16h`”):
``` ```
struct_format = "{:d}h".format(packet_size) struct_format = "{:d}h".format(packet_size)
@ -539,7 +507,6 @@ struct_format = "{:d}h".format(packet_size)
将二进制数据串转换为可直接打印的一系列数字: 将二进制数据串转换为可直接打印的一系列数字:
``` ```
data = struct.unpack(struct_format, data_buffer) data = struct.unpack(struct_format, data_buffer)
@ -548,8 +515,7 @@ print("\tdata: {}".format(data))
#### 完整 Python 代码 #### 完整 Python 代码
下面是 Python 实现的完整的接收器: 下面是 Python 实现的完整的接收端:
``` ```
#! /usr/bin/env python3 #! /usr/bin/env python3
@ -562,46 +528,45 @@ topic = "fancyhw_data".encode('ascii')
print("Reading messages with topic: {}".format(topic)) print("Reading messages with topic: {}".format(topic))
with zmq.Context() as context: with zmq.Context() as context:
    socket = context.socket(zmq.SUB) socket = context.socket(zmq.SUB)
    socket.connect("tcp://127.0.0.1:5555") socket.connect("tcp://127.0.0.1:5555")
    socket.setsockopt(zmq.SUBSCRIBE, topic) socket.setsockopt(zmq.SUBSCRIBE, topic)
    i = 0 i = 0
    try: try:
        while True: while True:
            binary_topic, data_buffer = socket.recv().split(b' ', 1) binary_topic, data_buffer = socket.recv().split(b' ', 1)
            topic = binary_topic.decode(encoding = 'ascii') topic = binary_topic.decode(encoding = 'ascii')
            print("Message {:d}:".format(i)) print("Message {:d}:".format(i))
            print("\ttopic: '{}'".format(topic)) print("\ttopic: '{}'".format(topic))
            packet_size = len(data_buffer) // struct.calcsize("h") packet_size = len(data_buffer) // struct.calcsize("h")
            print("\tpacket size: {:d}".format(packet_size)) print("\tpacket size: {:d}".format(packet_size))
            struct_format = "{:d}h".format(packet_size) struct_format = "{:d}h".format(packet_size)
            data = struct.unpack(struct_format, data_buffer) data = struct.unpack(struct_format, data_buffer)
            print("\tdata: {}".format(data)) print("\tdata: {}".format(data))
            i += 1 i += 1
    except KeyboardInterrupt: except KeyboardInterrupt:
        socket.close() socket.close()
    except Exception as error: except Exception as error:
        print("ERROR: {}".format(error)) print("ERROR: {}".format(error))
        socket.close() socket.close()
``` ```
将上面的内容保存到名为 **online_analysis.py** 的文件。Python 代码不需要编译,你可以直接运行它。 将上面的内容保存到名为 `online_analysis.py` 的文件。Python 代码不需要编译,你可以直接运行它。
运行输出如下: 运行输出如下:
``` ```
$ ./online_analysis.py $ ./online_analysis.py
Reading messages with topic: b'fancyhw_data' Reading messages with topic: b'fancyhw_data'
@ -619,7 +584,7 @@ Message 1:
### 小结 ### 小结
本教程介绍了一种新方式,实现从基于 C 的硬件接口收集数据,并分发到基于 Python 的基础组件的功能。借此可以获取数据供后续分析,或者转送到任意数量的接收端去。它采用了一个消息库实现数据在发送者和处理者之间的传送,来取代同样功能规模庞大的软件。 本教程介绍了一种新方式,实现从基于 C 的硬件接口收集数据,并分发到基于 Python 的基础设施的功能。借此可以获取数据供后续分析,或者转送到任意数量的接收端去。它采用了一个消息库实现数据在发送者和处理者之间的传送,来取代同样功能规模庞大的软件。
本教程还引出了我称之为“软件粒度”的概念,换言之,就是将软件细分为更小的部分。这种做法的优点之一就是,使得同时采用不同的编程语言实现最简接口作为不同部分之间沟通的组件成为可能。 本教程还引出了我称之为“软件粒度”的概念,换言之,就是将软件细分为更小的部分。这种做法的优点之一就是,使得同时采用不同的编程语言实现最简接口作为不同部分之间沟通的组件成为可能。
@ -632,7 +597,7 @@ via: https://opensource.com/article/20/3/zeromq-c-python
作者:[Cristiano L. Fontana][a] 作者:[Cristiano L. Fontana][a]
选题:[lujun9972][b] 选题:[lujun9972][b]
译者:[silentdawn-zz](https://github.com/译者ID) 译者:[silentdawn-zz](https://github.com/译者ID)
校对:[校对者ID](https://github.com/校对者ID) 校对:[wxy](https://github.com/wxy)
本文由 [LCTT](https://github.com/LCTT/TranslateProject) 原创编译,[Linux中国](https://linux.cn/) 荣誉推出 本文由 [LCTT](https://github.com/LCTT/TranslateProject) 原创编译,[Linux中国](https://linux.cn/) 荣誉推出