diff --git a/README.md b/README.md index 33c83b1..80ed12a 100644 --- a/README.md +++ b/README.md @@ -110,7 +110,7 @@ * [本章小结](ch10.md#本章小结) * [第十一章:流处理](ch11.md) * [传递事件流](ch11.md#传递事件流) - * [流与数据库](ch11.md#流与数据库) + * [数据库与流](ch11.md#数据库与流) * [流处理](ch11.md#流处理) * [本章小结](ch11.md#本章小结) * [第十二章:数据系统的未来](ch12.md) diff --git a/ch10.md b/ch10.md index 9e62c46..092aded 100644 --- a/ch10.md +++ b/ch10.md @@ -293,7 +293,7 @@ top5.each{|count, url| puts "#{count} #{url}" } # 5 ​ 当我们在批处理的语境中讨论连接时,我们指的是在数据集中解析某种关联的全量存在。 例如我们假设一个作业是同时处理所有用户的数据,而非仅仅是为某个特定用户查找数据(而这能通过索引更高效地完成)。 -#### 示例:分析用户活动事件 +#### 示例:用户活动事件分析 ​ [图10-2](img/fig10-2.png)给出了一个批处理作业中连接的典型例子。左侧是事件日志,描述登录用户在网站上做的事情(称为**活动事件(activity events)**或**点击流数据(clickstream data)**),右侧是用户数据库。 你可以将此示例看作是星型模式的一部分(参阅“[星型和雪花型:分析的模式](ch3.md#星型和雪花型:分析的模式)”):事件日志是事实表,用户数据库是其中的一个维度。 @@ -568,7 +568,7 @@ top5.each{|count, url| puts "#{count} #{url}" } # 5 - 另一种可能是接受多个输入,并以相同的方式进行分区,但跳过排序。当记录的分区重要但顺序无关紧要时,这省去了分区散列连接的工作,因为构建散列表还是会把顺序随机打乱。 - 对于广播散列连接,可以将一个算子的输出,发送到连接算子的所有分区。 -​ 这种类型的处理引擎是基于像Dryad 【67】和Nephele 【68】这样的研究系统,与MapReduce模型相比,它有几个优点: +​ 这种类型的处理引擎是基于像Dryad【67】和Nephele【68】这样的研究系统,与MapReduce模型相比,它有几个优点: - 排序等昂贵的工作只需要在实际需要的地方执行,而不是默认地在每个Map和Reduce阶段之间出现。 - 没有不必要的Map任务,因为Mapper所做的工作通常可以合并到前面的Reduce算子中(因为Mapper不会更改数据集的分区)。 diff --git a/ch11.md b/ch11.md index 582fa7a..1871880 100644 --- a/ch11.md +++ b/ch11.md @@ -10,7 +10,7 @@ [TOC] -​ 在[第10章](ch10.md)中,我们讨论了批处理技术,它读取一组文件作为输入,并生成一组新的文件作为输出。输出是**衍生数据(derived data)**的一种形式;也就是说,如果需要,可以通过再次运行批处理过程来重新创建数据集。我们看到了如何使用这个简单而强大的想法来建立搜索索引,推荐系统,做分析等等。 +​ 在[第10章](ch10.md)中,我们讨论了批处理技术,它读取一组文件作为输入,并生成一组新的文件作为输出。输出是**衍生数据(derived data)**的一种形式;也就是说,如果需要,可以通过再次运行批处理过程来重新创建数据集。我们看到了如何使用这个简单而强大的想法来建立搜索索引、推荐系统、做分析等等。 ​ 然而,在[第10章](ch10.md)中仍然有一个很大的假设:即输入是有界的,即已知和有限的大小,所以批处理知道它何时完成输入的读取。例如,MapReduce核心的排序操作必须读取其全部输入,然后才能开始生成输出:可能发生这种情况:最后一条输入记录具有最小的键,因此需要第一个被输出,所以提早开始输出是不可行的。 @@ -19,19 +19,19 @@ ​ 日常批处理中的问题是,输入的变更只会在一天之后的输出中反映出来,这对于许多急躁的用户来说太慢了。为了减少延迟,我们可以更频繁地运行处理 —— 比如说,在每秒钟的末尾 —— 或者甚至更连续一些,完全抛开固定的时间切片,当事件发生时就立即进行处理,这就是**流处理(stream processing)**背后的想法。 ​ 一般来说,“流”是指随着时间的推移逐渐可用的数据。这个概念出现在很多地方:Unix的stdin和stdout,编程语言(惰性列表)【2】,文件系统API(如Java的`FileInputStream`),TCP连接,通过互联网传送音频和视频等等。 -​ 在本章中,我们将把**事件流(event stream)**视为一种数据管理机制:无界限,增量处理,与[上一章](ch10.md)中批量数据相对应。我们将首先讨论怎样表示、存储、通过网络传输流。在“[数据库和流](#数据库和流)”中,我们将研究流和数据库之间的关系。最后在“[流处理](#流处理)”中,我们将研究连续处理这些流的方法和工具,以及它们用于应用构建的方式。 +​ 在本章中,我们将把**事件流(event stream)**视为一种数据管理机制:无界限,增量处理,与上一章中的批量数据相对应。我们将首先讨论怎样表示、存储、通过网络传输流。在“[数据库与流](#数据库与流)”中,我们将研究流和数据库之间的关系。最后在“[流处理](#流处理)”中,我们将研究连续处理这些流的方法和工具,以及它们用于应用构建的方式。 ## 传递事件流 ​ 在批处理领域,作业的输入和输出是文件(也许在分布式文件系统上)。流处理领域中的等价物看上去是什么样子的? -​ 当输入是一个文件(一个字节序列),第一个处理步骤通常是将其解析为一系列记录。在流处理的上下文中,记录通常被叫做 **事件(event)** ,但它本质上是一样的:一个小的,自包含的,不可变的对象,包含某个时间点发生的某件事情的细节。一个事件通常包含一个来自时钟的时间戳,以指明事件发生的时间(参见“[单调钟与时钟](ch8.md#单调钟与时钟)”)。 +​ 当输入是一个文件(一个字节序列),第一个处理步骤通常是将其解析为一系列记录。在流处理的上下文中,记录通常被叫做 **事件(event)** ,但它本质上是一样的:一个小的、自包含的、不可变的对象,包含某个时间点发生的某件事情的细节。一个事件通常包含一个来自日历时钟的时间戳,以指明事件发生的时间(参见“[单调钟与日历时钟](ch8.md#单调钟与日历时钟)”)。 -​ 例如,发生的事件可能是用户采取的行动,例如查看页面或进行购买。它也可能来源于机器,例如对温度传感器或CPU利用率的周期性测量。在“[使用Unix工具进行批处理](ch10.md#使用Unix工具进行批处理)”的示例中,Web服务器日志的每一行都是一个事件。 +​ 例如,发生的事件可能是用户采取的行动,例如查看页面或进行购买。它也可能来源于机器,例如对温度传感器或CPU利用率的周期性测量。在“[使用Unix工具的批处理](ch10.md#使用Unix工具的批处理)”的示例中,Web服务器日志的每一行都是一个事件。 -​ 事件可能被编码为文本字符串或JSON,或者某种二进制编码,如[第4章](ch4.md)所述。这种编码允许你存储一个事件,例如将其附加到一个文件,将其插入关系表,或将其写入文档数据库。它还允许你通过网络将事件发送到另一个节点以进行处理。 +​ 事件可能被编码为文本字符串或JSON,或者某种二进制编码,如[第4章](ch4.md)所述。这种编码允许你存储一个事件,例如将其追加到一个文件,将其插入关系表,或将其写入文档数据库。它还允许你通过网络将事件发送到另一个节点以进行处理。 ​ 在批处理中,文件被写入一次,然后可能被多个作业读取。类似地,在流处理术语中,一个事件由 **生产者(producer)** (也称为 **发布者(publisher)** 或 **发送者(sender)** )生成一次,然后可能由多个 **消费者(consumer)** ( **订阅者(subscribers)** 或 **接收者(recipients)** )进行处理【3】。在文件系统中,文件名标识一组相关记录;在流式系统中,相关的事件通常被聚合为一个 **主题(topic)** 或 **流(stream)** 。 @@ -39,24 +39,24 @@ ​ 但当我们想要进行低延迟的连续处理时,如果数据存储不是为这种用途专门设计的,那么轮询开销就会很大。轮询的越频繁,能返回新事件的请求比例就越低,而额外开销也就越高。相比之下,最好能在新事件出现时直接通知消费者。 -​ 数据库在传统上对这种通知机制支持的并不好,关系型数据库通常有 **触发器(trigger)** ,它们可以对变化作出反应(如,插入表中的一行),但是它们的功能非常有限,并且在数据库设计中有些后顾之忧【4,5】。相应的是,已经开发了专门的工具来提供事件通知。 +​ 数据库在传统上对这种通知机制支持的并不好,关系型数据库通常有 **触发器(trigger)** ,它们可以对变化(如,插入表中的一行)作出反应,但是它们的功能非常有限,并且在数据库设计中有些后顾之忧【4,5】。相应的是,已经开发了专门的工具来提供事件通知。 -### 消息系统 +### 消息传递系统 -​ 向消费者通知新事件的常用方式是使用**消息传递系统(messaging system)**:生产者发送包含事件的消息,然后将消息推送给消费者。我们之前在“[消息传递中的数据流](ch4.md#消息传递中的数据流)”中介绍了这些系统,但现在我们将详细介绍这些系统。 +​ 向消费者通知新事件的常用方式是使用**消息传递系统(messaging system)**:生产者发送包含事件的消息,然后将消息推送给消费者。我们之前在“[消息传递中的数据流](ch4.md#消息传递中的数据流)”中谈到了这些系统,但现在我们将详细介绍这些系统。 -​ 像生产者和消费者之间的Unix管道或TCP连接这样的直接信道,是实现消息传递系统的简单方法。但是,大多数消息传递系统都在这一基本模型上进行扩展。特别的是,Unix管道和TCP将恰好一个发送者与恰好一个接收者连接,而一个消息传递系统允许多个生产者节点将消息发送到同一个主题,并允许多个消费者节点接收主题中的消息。 +​ 像生产者和消费者之间的Unix管道或TCP连接这样的直接信道,是实现消息传递系统的简单方法。但是,大多数消息传递系统都在这一基本模型上进行了扩展。特别的是,Unix管道和TCP将恰好一个发送者与恰好一个接收者连接,而一个消息传递系统允许多个生产者节点将消息发送到同一个主题,并允许多个消费者节点接收主题中的消息。 ​ 在这个**发布/订阅**模式中,不同的系统采取各种各样的方法,并没有针对所有目的的通用答案。为了区分这些系统,问一下这两个问题会特别有帮助: -1. **如果生产者发送消息的速度比消费者能够处理的速度快会发生什么?**一般来说,有三种选择:系统可以丢掉消息,将消息放入缓冲队列,或使用**背压(backpressure)**(也称为**流量控制(flow control);**即阻塞生产者,以免其发送更多的消息)。例如Unix管道和TCP使用背压:它们有一个固定大小的小缓冲区,如果填满,发送者会被阻塞,直到接收者从缓冲区中取出数据(参见“[网络拥塞和排队](ch8.md#网络拥塞和排队)”)。 +1. **如果生产者发送消息的速度比消费者能够处理的速度快会发生什么?**一般来说,有三种选择:系统可以丢掉消息,将消息放入缓冲队列,或使用**背压(backpressure)**(也称为**流量控制(flow control)**;即阻塞生产者,以免其发送更多的消息)。例如Unix管道和TCP就使用了背压:它们有一个固定大小的小缓冲区,如果填满,发送者会被阻塞,直到接收者从缓冲区中取出数据(参见“[网络拥塞和排队](ch8.md#网络拥塞和排队)”)。 如果消息被缓存在队列中,那么理解队列增长会发生什么是很重要的。当队列装不进内存时系统会崩溃吗?还是将消息写入磁盘?如果是这样,磁盘访问又会如何影响消息传递系统的性能【6】? -2. **如果节点崩溃或暂时脱机,会发生什么情况? —— 是否会有消息丢失?**与数据库一样,持久性可能需要写入磁盘**和/或**复制的某种组合(参阅“[复制和持久性](ch7.md#复制和持久性)”),这是有代价的。如果你能接受有时消息会丢失,则可能在同一硬件上获得更高的吞吐量和更低的延迟。 +2. **如果节点崩溃或暂时脱机,会发生什么情况? —— 是否会有消息丢失?**与数据库一样,持久性可能需要写入磁盘和/或复制的某种组合(参阅“[复制和持久性](ch7.md#复制和持久性)”),这是有代价的。如果你能接受有时消息会丢失,则可能在同一硬件上获得更高的吞吐量和更低的延迟。 -是否可以接受消息丢失取决于应用。例如,对于周期传输的传感器读数和指标,偶尔丢失的数据点可能并不重要,因为更新的值会在短时间内发出。但要注意,如果大量的消息被丢弃,可能无法立刻意识到指标已经不正确了【7】。如果你正在对事件计数,那么更重要的是它们能够可靠送达,因为每个丢失的消息都意味着使计数器的错误扩大。 +​ 是否可以接受消息丢失取决于应用。例如,对于周期传输的传感器读数和指标,偶尔丢失的数据点可能并不重要,因为更新的值会在短时间内发出。但要注意,如果大量的消息被丢弃,可能无法立刻意识到指标已经不正确了【7】。如果你正在对事件计数,那么它们能够可靠送达是更重要的,因为每个丢失的消息都意味着使计数器的错误扩大。 ​ 我们在[第10章](ch10.md)中探讨的批处理系统的一个很好的特性是,它们提供了强大的可靠性保证:失败的任务会自动重试,失败任务的部分输出会自动丢弃。这意味着输出与没有发生故障一样,这有助于简化编程模型。在本章的后面,我们将研究如何在流处理的上下文中提供类似的保证。 @@ -67,9 +67,9 @@ * UDP组播广泛应用于金融行业,例如股票市场,其中低时延非常重要【8】。虽然UDP本身是不可靠的,但应用层的协议可以恢复丢失的数据包(生产者必须记住它发送的数据包,以便能按需重新发送数据包)。 * 无代理的消息库,如ZeroMQ 【9】和nanomsg采取类似的方法,通过TCP或IP多播实现发布/订阅消息传递。 * StatsD 【10】和Brubeck 【7】使用不可靠的UDP消息传递来收集网络中所有机器的指标并对其进行监控。 (在StatsD协议中,只有接收到所有消息,才认为计数器指标是正确的;使用UDP将使得指标处在一种最佳近似状态【11】。另请参阅“[TCP与UDP](ch8.md#TCP与UDP)” -* 如果消费者在网络上公开了服务,生产者可以直接发送HTTP或RPC请求(参阅“[通过服务进行数据流:REST和RPC](ch4.md#通过服务进行数据流:REST和RPC)”)将消息推送给使用者。这就是webhooks背后的想法【12】,一种服务的回调URL被注册到另一个服务中,并且每当事件发生时都会向该URL发出请求。 +* 如果消费者在网络上公开了服务,生产者可以直接发送HTTP或RPC请求(参阅“[服务中的数据流:REST与RPC](ch4.md#服务中的数据流:REST与RPC)”)将消息推送给使用者。这就是webhooks背后的想法【12】,一种服务的回调URL被注册到另一个服务中,并且每当事件发生时都会向该URL发出请求。 -尽管这些直接消息传递系统在设计它们的环境中运行良好,但是它们通常要求应用代码意识到消息丢失的可能性。它们的容错程度极为有限:即使协议检测到并重传在网络中丢失的数据包,它们通常也只是假设生产者和消费者始终在线。 +​ 尽管这些直接消息传递系统在设计它们的环境中运行良好,但是它们通常要求应用代码意识到消息丢失的可能性。它们的容错程度极为有限:即使协议检测到并重传在网络中丢失的数据包,它们通常也只是假设生产者和消费者始终在线。 ​ 如果消费者处于脱机状态,则可能会丢失其不可达时发送的消息。一些协议允许生产者重试失败的消息传递,但当生产者崩溃时,它可能会丢失消息缓冲区及其本应发送的消息,这种方法可能就没用了。 @@ -81,22 +81,22 @@ ​ 排队的结果是,消费者通常是**异步(asynchronous)**的:当生产者发送消息时,通常只会等待代理确认消息已经被缓存,而不等待消息被消费者处理。向消费者递送消息将发生在未来某个未定的时间点 —— 通常在几分之一秒之内,但有时当消息堆积时会显著延迟。 -#### 消息代理与数据库对比 +#### 消息代理与数据库的对比 ​ 有些消息代理甚至可以使用XA或JTA参与两阶段提交协议(参阅“[实践中的分布式事务](ch9.md#实践中的分布式事务)”)。这个功能与数据库在本质上非常相似,尽管消息代理和数据库之间仍存在实践上很重要的差异: * 数据库通常保留数据直至显式删除,而大多数消息代理在消息成功递送给消费者时会自动删除消息。这样的消息代理不适合长期的数据存储。 * 由于它们很快就能删除消息,大多数消息代理都认为它们的工作集相当小—— 即队列很短。如果代理需要缓冲很多消息,比如因为消费者速度较慢(如果内存装不下消息,可能会溢出到磁盘),每个消息需要更长的处理时间,整体吞吐量可能会恶化【6】。 -* 数据库通常支持二级索引和各种搜索数据的方式,而消息代理通常支持按照某种模式匹配主题,订阅其子集。机制并不一样,对于客户端选择想要了解的数据的一部分,这是两种基本的方式。 +* 数据库通常支持二级索引和各种搜索数据的方式,而消息代理通常支持按照某种模式匹配主题,订阅其子集。虽然机制并不一样,但对于客户端选择想要了解的数据的一部分,都是基本的方式。 * 查询数据库时,结果通常基于某个时间点的数据快照;如果另一个客户端随后向数据库写入一些改变了查询结果的内容,则第一个客户端不会发现其先前结果现已过期(除非它重复查询或轮询变更)。相比之下,消息代理不支持任意查询,但是当数据发生变化时(即新消息可用时),它们会通知客户端。 -这是关于消息代理的传统观点,它被封装在诸如JMS 【14】和AMQP 【15】的标准中,并且被诸如RabbitMQ,ActiveMQ,HornetQ,Qpid,TIBCO企业消息服务,IBM MQ,Azure Service Bus和Google Cloud Pub/Sub实现 【16】。 +​ 这是关于消息代理的传统观点,它被封装在诸如JMS 【14】和AMQP 【15】的标准中,并且被诸如RabbitMQ、ActiveMQ、HornetQ、Qpid、TIBCO企业消息服务、IBM MQ、Azure Service Bus和Google Cloud Pub/Sub所实现 【16】。 #### 多个消费者 -当多个消费者从同一主题中读取消息时,有使用两种主要的消息传递模式,如[图11-1](img/fig11-1.png)所示: +​ 当多个消费者从同一主题中读取消息时,有两种主要的消息传递模式,如[图11-1](img/fig11-1.png)所示: -***负载均衡(load balance)*** +***负载均衡(load balancing)*** ​ 每条消息都被传递给消费者**之一**,所以处理该主题下消息的工作能被多个消费者共享。代理可以为消费者任意分配消息。当处理消息的代价高昂,希望能并行处理消息时,此模式非常有用(在AMQP中,可以通过让多个客户端从同一个队列中消费来实现负载均衡,而在JMS中则称之为**共享订阅(shared subscription)**)。 @@ -108,15 +108,15 @@ **图11-1 (a)负载平衡:在消费者间共享消费主题;(b)扇出:将每条消息传递给多个消费者。** -​ 两种模式可以组合使用:例如,两个独立的消费者组可以每组各订阅一个主题,每一组都共同收到所有消息,但在每一组内部,每条消息仅由单个节点处理。 +​ 两种模式可以组合使用:例如,两个独立的消费者组可以每组各订阅同一个主题,每一组都共同收到所有消息,但在每一组内部,每条消息仅由单个节点处理。 -#### 确认与重新交付 +#### 确认与重新传递 -​ 消费随时可能会崩溃,所以有一种可能的情况是:代理向消费者递送消息,但消费者没有处理,或者在消费者崩溃之前只进行了部分处理。为了确保消息不会丢失,消息代理使用**确认(acknowledgments)**:客户端必须显式告知代理消息处理完毕的时间,以便代理能将消息从队列中移除。 +​ 消费者随时可能会崩溃,所以有一种可能的情况是:代理向消费者递送消息,但消费者没有处理,或者在消费者崩溃之前只进行了部分处理。为了确保消息不会丢失,消息代理使用**确认(acknowledgments)**:客户端必须显式告知代理消息处理完毕的时间,以便代理能将消息从队列中移除。 ​ 如果与客户端的连接关闭,或者代理超出一段时间未收到确认,代理则认为消息没有被处理,因此它将消息再递送给另一个消费者。 (请注意可能发生这样的情况,消息**实际上是**处理完毕的,但**确认**在网络中丢失了。需要一种原子提交协议才能处理这种情况,正如在“[实践中的分布式事务](ch9.md#实践中的分布式事务)”中所讨论的那样) -​ 当与负载均衡相结合时,这种重传行为对消息的顺序有种有趣的影响。在[图11-2](img/fig11-2.png)中,消费者通常按照生产者发送的顺序处理消息。然而消费者2在处理消息m3时崩溃,与此同时消费者1正在处理消息m4。未确认的消息m3随后被重新发送给消费者1,结果消费者1按照m4,m3,m5的顺序处理消息。因此m3和m4的交付顺序与以生产者1的发送顺序不同。 +​ 当与负载均衡相结合时,这种重传行为对消息的顺序有种有趣的影响。在[图11-2](img/fig11-2.png)中,消费者通常按照生产者发送的顺序处理消息。然而消费者2在处理消息m3时崩溃,与此同时消费者1正在处理消息m4。未确认的消息m3随后被重新发送给消费者1,结果消费者1按照m4,m3,m5的顺序处理消息。因此m3和m4的交付顺序与生产者1的发送顺序不同。 ![](img/fig11-2.png) @@ -132,7 +132,7 @@ ​ 这种思维方式上的差异对创建衍生数据的方式有巨大影响。如[第10章](ch10.md)所述,批处理过程的一个关键特性是,你可以反复运行它们,试验处理步骤,不用担心损坏输入(因为输入是只读的)。而 AMQP/JMS风格的消息传递并非如此:收到消息是具有破坏性的,因为确认可能导致消息从代理中被删除,因此你不能期望再次运行同一个消费者能得到相同的结果。 -​ 如果你将新的消费者添加到消息系统,通常只能接收到消费者注册之后开始发送的消息。先前的任何消息都随风而逝,一去不复返。作为对比,你可以随时为文件和数据库添加新的客户端,且能读取任意久远的数据(只要应用没有显式覆盖或删除这些数据)。 +​ 如果你将新的消费者添加到消息传递系统,通常只能接收到消费者注册之后开始发送的消息。先前的任何消息都随风而逝,一去不复返。作为对比,你可以随时为文件和数据库添加新的客户端,且能读取任意久远的数据(只要应用没有显式覆盖或删除这些数据)。 ​ 为什么我们不能把它俩杂交一下,既有数据库的持久存储方式,又有消息传递的低延迟通知?这就是**基于日志的消息代理(log-based message brokers)** 背后的想法。 @@ -142,7 +142,7 @@ ​ 同样的结构可以用于实现消息代理:生产者通过将消息追加到日志末尾来发送消息,而消费者通过依次读取日志来接收消息。如果消费者读到日志末尾,则会等待新消息追加的通知。 Unix工具`tail -f` 能监视文件被追加写入的数据,基本上就是这样工作的。 -​ 为了伸缩超出单个磁盘所能提供的更高吞吐量,可以对日志进行**分区**(在[第6章](ch6.md)的意义上)。不同的分区可以托管在不同的机器上,且每个分区都拆分出一份能独立于其他分区进行读写的日志。一个主题可以定义为一组携带相同类型消息的分区。这种方法如[图11-3](img/fig11-3.png)所示。 +​ 为了伸缩超出单个磁盘所能提供的更高吞吐量,可以对日志进行**分区**(按[第6章](ch6.md)的定义)。不同的分区可以托管在不同的机器上,使得每个分区都有一份能独立于其他分区进行读写的日志。一个主题可以定义为一组携带相同类型消息的分区。这种方法如[图11-3](img/fig11-3.png)所示。 ​ 在每个分区内,代理为每个消息分配一个单调递增的序列号或**偏移量(offset)**(在[图11-3](img/fig11-3.png)中,框中的数字是消息偏移量)。这种序列号是有意义的,因为分区是仅追加写入的,所以分区内的消息是完全有序的。没有跨不同分区的顺序保证。 @@ -156,20 +156,20 @@ ​ 基于日志的方法天然支持扇出式消息传递,因为多个消费者可以独立读取日志,而不会相互影响 —— 读取消息不会将其从日志中删除。为了在一组消费者之间实现负载平衡,代理可以将整个分区分配给消费者组中的节点,而不是将单条消息分配给消费者客户端。 -​ 每个客户端消费指派分区中的**所有**消息。然后使用分配的分区中的所有消息。通常情况下,当一个用户被指派了一个日志分区时,它会以简单的单线程方式顺序地读取分区中的消息。这种粗粒度的负载均衡方法有一些缺点: +​ 然后每个客户端将消费被指派分区中的**所有**消息。通常情况下,当一个用户被指派了一个日志分区时,它会以简单的单线程方式顺序地读取分区中的消息。这种粗粒度的负载均衡方法有一些缺点: * 共享消费主题工作的节点数,最多为该主题中的日志分区数,因为同一个分区内的所有消息被递送到同一个节点[^i]。 * 如果某条消息处理缓慢,则它会阻塞该分区中后续消息的处理(一种行首阻塞的形式;请参阅“[描述性能](ch1.md#描述性能)”)。 因此在消息处理代价高昂,希望逐条并行处理,以及消息的顺序并没有那么重要的情况下,JMS/AMQP风格的消息代理是可取的。另一方面,在消息吞吐量很高,处理迅速,顺序很重要的情况下,基于日志的方法表现得非常好。 -[^i]: 设计一种负载均衡方案是可行的,在这种方案中,两个消费者通过读取全部消息来共享处理分区的工作,但是其中一个只考虑具有偶数偏移量的消息,而另一个消费者只处理奇数编号的偏移量。或者你可以将消息摊到一个线程池中来处理,但这种方法会使消费者偏移量管理变得复杂。一般来说,单线程处理单分区是合适的,可以通过增加更多分区来提高并行度。 +[^i]: 要设计一种负载均衡方案也是有可能的,在这种方案中,两个消费者通过读取全部消息来共享分区处理的工作,但是其中一个只考虑具有偶数偏移量的消息,而另一个消费者只处理奇数编号的偏移量。或者你可以将消息摊到一个线程池中来处理,但这种方法会使消费者偏移量管理变得复杂。一般来说,单线程处理单分区是合适的,可以通过增加更多分区来提高并行度。 #### 消费者偏移量 -​ 顺序消费一个分区使得判断消息是否已经被处理变得相当容易:所有偏移量小于消费者的当前偏移量的消息已经被处理,而具有更大偏移量的消息还没有被看到。因此,代理不需要跟踪确认每条消息,只需要定期记录消费者的偏移即可。在这种方法减少了额外簿记开销,而且在批处理和流处理中采用这种方法有助于提高基于日志的系统的吞吐量。 +​ 顺序消费一个分区使得判断消息是否已经被处理变得相当容易:所有偏移量小于消费者的当前偏移量的消息已经被处理,而具有更大偏移量的消息还没有被看到。因此,代理不需要跟踪确认每条消息,只需要定期记录消费者的偏移即可。这种方法减少了额外簿记开销,而且在批处理和流处理中采用这种方法有助于提高基于日志的系统的吞吐量。 -​ 实际上,这种偏移量与单领导者数据库复制中常见的日志序列号非常相似,我们在“[设置新从库](ch5.md#设置新从库)”中讨论了这种情况。在数据库复制中,日志序列号允许跟随者断开连接后,重新连接到领导者,并在不跳过任何写入的情况下恢复复制。这里原理完全相同:消息代理的表现得像一个主库,而消费者就像一个从库。 +​ 实际上,这种偏移量与单领导者数据库复制中常见的日志序列号非常相似,我们在“[设置新从库](ch5.md#设置新从库)”中讨论了这种情况。在数据库复制中,日志序列号允许跟随者断开连接后,重新连接到领导者,并在不跳过任何写入的情况下恢复复制。这里原理完全相同:消息代理表现得像一个主库,而消费者就像一个从库。 ​ 如果消费者节点失效,则失效消费者的分区将指派给其他节点,并从最后记录的偏移量开始消费消息。如果消费者已经处理了后续的消息,但还没有记录它们的偏移量,那么重启后这些消息将被处理两次。我们将在本章后面讨论这个问题的处理方法。 @@ -177,7 +177,7 @@ ​ 如果只追加写入日志,则磁盘空间终究会耗尽。为了回收磁盘空间,日志实际上被分割成段,并不时地将旧段删除或移动到归档存储。 (我们将在后面讨论一种更为复杂的磁盘空间释放方式) -​ 这就意味着如果一个慢消费者跟不上消息产生的速率而落后的太多,它的消费偏移量指向了删除的段,那么它就会错过一些消息。实际上,日志实现了一个有限大小的缓冲区,当缓冲区填满时会丢弃旧消息,它也被称为**循环缓冲区(circular buffer)**或**环形缓冲区(ring buffer)**。不过由于缓冲区在磁盘上,因此可能相当的大。 +​ 这就意味着如果一个慢消费者跟不上消息产生的速率而落后得太多,它的消费偏移量指向了删除的段,那么它就会错过一些消息。实际上,日志实现了一个有限大小的缓冲区,当缓冲区填满时会丢弃旧消息,它也被称为**循环缓冲区(circular buffer)**或**环形缓冲区(ring buffer)**。不过由于缓冲区在磁盘上,因此缓冲区可能相当的大。 ​ 让我们做个简单计算。在撰写本文时,典型的大型硬盘容量为6TB,顺序写入吞吐量为150MB/s。如果以最快的速度写消息,则需要大约11个小时才能填满磁盘。因而磁盘可以缓冲11个小时的消息,之后它将开始覆盖旧的消息。即使使用多个磁盘和机器,这个比率也是一样的。实践中的部署很少能用满磁盘的写入带宽,所以通常可以保存一个几天甚至几周的日志缓冲区。 @@ -185,15 +185,15 @@ #### 当消费者跟不上生产者时 -​ 在“[消息传递系统](#消息传递系统)”中,如果消费者无法跟上生产者发送信息的速度时,我们讨论了三种选择:丢弃信息,进行缓冲或施加背压。在这种分类法里,基于日志的方法是缓冲的一种形式,具有很大,但大小固定的缓冲区(受可用磁盘空间的限制)。 +​ 在“[消息传递系统](#消息传递系统)”中,如果消费者无法跟上生产者发送信息的速度时,我们讨论了三种选择:丢弃信息,进行缓冲或施加背压。在这种分类法里,基于日志的方法是缓冲的一种形式,具有很大但大小固定的缓冲区(受可用磁盘空间的限制)。 -​ 如果消费者远远落后,而所要求的信息比保留在磁盘上的信息还要旧,那么它将不能读取这些信息,所以代理实际上丢弃了比缓冲区容量更大的旧信息。你可以监控消费者落后日志头部的距离,如果落后太多就发出报警。由于缓冲区很大,因而有足够的时间让人类运维来修复慢消费者,并在消息开始丢失之前让其赶上。 +​ 如果消费者远远落后,而所要求的信息比保留在磁盘上的信息还要旧,那么它将不能读取这些信息,所以代理实际上丢弃了比缓冲区容量更大的旧信息。你可以监控消费者落后日志头部的距离,如果落后太多就发出报警。由于缓冲区很大,因而有足够的时间让运维人员来修复慢消费者,并在消息开始丢失之前让其赶上。 ​ 即使消费者真的落后太多开始丢失消息,也只有那个消费者受到影响;它不会中断其他消费者的服务。这是一个巨大的运维优势:你可以实验性地消费生产日志,以进行开发,测试或调试,而不必担心会中断生产服务。当消费者关闭或崩溃时,会停止消耗资源,唯一剩下的只有消费者偏移量。 -​ 这种行为也与传统的信息代理形成了鲜明对比,在那种情况下,你需要小心地删除那些消费者已经关闭的队列—— 否则那些队列就会累积不必要的消息,从其他仍活跃的消费者那里占走内存。 +​ 这种行为也与传统的消息代理形成了鲜明对比,在那种情况下,你需要小心地删除那些消费者已经关闭的队列—— 否则那些队列就会累积不必要的消息,从其他仍活跃的消费者那里占走内存。 -#### 重播旧信息 +#### 重播旧消息 ​ 我们之前提到,使用AMQP和JMS风格的消息代理,处理和确认消息是一个破坏性的操作,因为它会导致消息在代理上被删除。另一方面,在基于日志的消息代理中,使用消息更像是从文件中读取数据:这是只读操作,不会更改日志。 @@ -203,23 +203,23 @@ -## 流与数据库 +## 数据库与流 ​ 我们已经在消息代理和数据库之间进行了一些比较。尽管传统上它们被视为单独的工具类别,但是我们看到基于日志的消息代理已经成功地从数据库中获取灵感并将其应用于消息传递。我们也可以反过来:从消息传递和流中获取灵感,并将它们应用于数据库。 -​ 我们之前曾经说过,事件是某个时刻发生的事情的记录。发生的事情可能是用户操作(例如键入搜索查询)或读取传感器,但也可能是**写入数据库**。某些东西被写入数据库的事实是可以被捕获,存储和处理的事件。这一观察结果表明,数据库和数据流之间的联系不仅仅是磁盘日志的物理存储 —— 而是更深层的联系。 +​ 我们之前曾经说过,事件是某个时刻发生的事情的记录。发生的事情可能是用户操作(例如键入搜索查询)或读取传感器,但也可能是**写入数据库**。某些东西被写入数据库的事实是可以被捕获、存储和处理的事件。这一观察结果表明,数据库和数据流之间的联系不仅仅是磁盘日志的物理存储 —— 而是更深层的联系。 -​ 事实上,复制日志(参阅“[复制日志的实现](ch5.md#复制日志的实现)”)是数据库写入事件的流,由主库在处理事务时生成。从库将写入流应用到它们自己的数据库副本,从而最终得到相同数据的精确副本。复制日志中的事件描述发生的数据更改。 +​ 事实上,复制日志(参阅“[复制日志的实现](ch5.md#复制日志的实现)”)是一个由数据库写入事件组成的流,由主库在处理事务时生成。从库将写入流应用到它们自己的数据库副本,从而最终得到相同数据的精确副本。复制日志中的事件描述发生的数据更改。 -​ 我们还在“[全序广播](ch9.md#全序广播)”中遇到了状态机复制原理,其中指出:如果每个事件代表对数据库的写入,并且每个副本按相同的顺序处理相同的事件,则副本将达到相同的最终状态 (假设处理一个事件是一个确定性的操作)。这是事件流的又一种场景! +​ 我们还在“[全序广播](ch9.md#全序广播)”中遇到了状态机复制原理,其中指出:如果每个事件代表对数据库的写入,并且每个副本按相同的顺序处理相同的事件,则副本将达到相同的最终状态 (假设事件处理是一个确定性的操作)。这是事件流的又一种场景! ​ 在本节中,我们将首先看看异构数据系统中出现的一个问题,然后探讨如何通过将事件流的想法带入数据库来解决这个问题。 ### 保持系统同步 -​ 正如我们在本书中所看到的,没有一个系统能够满足所有的数据存储,查询和处理需求。在实践中,大多数重要应用都需要组合使用几种不同的技术来满足所有的需求:例如,使用OLTP数据库来为用户请求提供服务,使用缓存来加速常见请求,使用全文索引搜索处理搜索查询,使用数据仓库用于分析。每一个组件都有自己的数据副本,以自己的表示存储,并根据自己的目的进行优化。 +​ 正如我们在本书中所看到的,没有一个系统能够满足所有的数据存储、查询和处理需求。在实践中,大多数重要应用都需要组合使用几种不同的技术来满足所有的需求:例如,使用OLTP数据库来为用户请求提供服务,使用缓存来加速常见请求,使用全文索引来处理搜索查询,使用数据仓库用于分析。每一种技术都有自己的数据副本,并根据自己的目的进行存储方式的优化。 -​ 由于相同或相关的数据出现在了不同的地方,因此相互间需要保持同步:如果某个项目在数据库中被更新,它也应当在缓存,搜索索引和数据仓库中被更新。对于数据仓库,这种同步通常由ETL进程执行(参见“[数据仓库](ch3.md#数据仓库)”),通常是先取得数据库的完整副本,然后执行转换,并批量加载到数据仓库中 —— 换句话说,批处理。我们在“[批量工作流的输出](ch10.md#批量工作流的输出)”中同样看到了如何使用批处理创建搜索索引,推荐系统和其他衍生数据系统。 +​ 由于相同或相关的数据出现在了不同的地方,因此相互间需要保持同步:如果某个项目在数据库中被更新,它也应当在缓存、搜索索引和数据仓库中被更新。对于数据仓库,这种同步通常由ETL进程执行(参见“[数据仓库](ch3.md#数据仓库)”),通常是先取得数据库的完整副本,然后执行转换,并批量加载到数据仓库中 —— 换句话说,批处理。我们在“[批处理工作流的输出](ch10.md#批处理工作流的输出)”中同样看到了如何使用批处理创建搜索索引、推荐系统和其他衍生数据系统。 ​ 如果周期性的完整数据库转储过于缓慢,有时会使用的替代方法是**双写(dual write)**,其中应用代码在数据变更时明确写入每个系统:例如,首先写入数据库,然后更新搜索索引,然后使缓存项失效(甚至同时执行这些写入)。 @@ -231,9 +231,9 @@ ​ 除非有一些额外的并发检测机制,例如我们在“[检测并发写入](ch5.md#检测并发写入)”中讨论的版本向量,否则你甚至不会意识到发生了并发写入 —— 一个值将简单地以无提示方式覆盖另一个值。 -​ 双重写入的另一个问题是,其中一个写入可能会失败,而另一个成功。这是一个容错问题,而不是一个并发问题,但也会造成两个系统互相不一致的结果。确保它们要么都成功要么都失败,是原子提交问题的一个例子,解决这个问题的代价是昂贵的(参阅“[原子提交和两阶段提交(2PC)](ch7.md#原子提交和两阶段提交(2PC))”)。 +​ 双重写入的另一个问题是,其中一个写入可能会失败,而另一个成功。这是一个容错问题,而不是一个并发问题,但也会造成两个系统互相不一致的结果。确保它们要么都成功要么都失败,是原子提交问题的一个例子,解决这个问题的代价是昂贵的(参阅“[原子提交与两阶段提交(2PC)](ch7.md#原子提交与两阶段提交(2PC))”)。 -​ 如果你只有一个单领导者复制的数据库,那么这个领导者决定了写入顺序,而状态机复制方法可以在数据库副本上工作。然而,在[图11-4](img/fig11-4.png)中,没有单个主库:数据库可能有一个领导者,搜索索引也可能有一个领导者,但是两者都不追随对方,所以可能会发生冲突(参见“[多领导者复制](ch5.md#多领导者复制)“)。 +​ 如果你只有一个单领导者复制的数据库,那么这个领导者决定了写入顺序,而状态机复制方法可以在数据库副本上工作。然而,在[图11-4](img/fig11-4.png)中,没有单个主库:数据库可能有一个领导者,搜索索引也可能有一个领导者,但是两者都不追随对方,所以可能会发生冲突(参见“[多主复制](ch5.md#多主复制)“)。 ​ 如果实际上只有一个领导者 —— 例如,数据库 —— 而且我们能让搜索索引成为数据库的追随者,情况要好得多。但这在实践中可能吗? @@ -241,7 +241,7 @@ ​ 大多数数据库的复制日志的问题在于,它们一直被当做数据库的内部实现细节,而不是公开的API。客户端应该通过其数据模型和查询语言来查询数据库,而不是解析复制日志并尝试从中提取数据。 -​ 数十年来,许多数据库根本没有记录在档的,获取变更日志的方式。由于这个原因,捕获数据库中所有的变更,然后将其复制到其他存储技术(搜索索引,缓存,数据仓库)中是相当困难的。 +​ 数十年来,许多数据库根本没有记录在档的获取变更日志的方式。由于这个原因,捕获数据库中所有的变更,然后将其复制到其他存储技术(搜索索引、缓存或数据仓库)中是相当困难的。 ​ 最近,人们对**变更数据捕获(change data capture, CDC)** 越来越感兴趣,这是一种观察写入数据库的所有数据变更,并将其提取并转换为可以复制到其他系统中的形式的过程。 CDC是非常有意思的,尤其是当变更能在被写入后立刻用于流时。 @@ -253,21 +253,21 @@ #### 变更数据捕获的实现 -​ 我们可以将日志消费者叫做**衍生数据系统**,正如在第三部分的[介绍](part-iii.md)中所讨论的:存储在搜索索引和数据仓库中的数据,只是**记录系统**数据的额外视图。变更数据捕获是一种机制,可确保对记录系统所做的所有更改都反映在衍生数据系统中,以便衍生系统具有数据的准确副本。 +​ 我们可以将日志消费者叫做**衍生数据系统**,正如在[第三部分](part-iii.md)的介绍中所讨论的:存储在搜索索引和数据仓库中的数据,只是**记录系统**数据的额外视图。变更数据捕获是一种机制,可确保对记录系统所做的所有更改都反映在衍生数据系统中,以便衍生系统具有数据的准确副本。 ​ 从本质上说,变更数据捕获使得一个数据库成为领导者(被捕获变化的数据库),并将其他组件变为追随者。基于日志的消息代理非常适合从源数据库传输变更事件,因为它保留了消息的顺序(避免了[图11-2](img/fig11-2.png)的重新排序问题)。 -​ 数据库触发器可用来实现变更数据捕获(参阅“[基于触发器的复制](ch5.md#基于触发器的复制)”),通过注册观察所有变更的触发器,并将相应的变更项写入变更日志表中。但是它们往往是脆弱的,而且有显著的性能开销。解析复制日志可能是一种更稳健的方法,但它也很有挑战,例如应对模式变更。 +​ 数据库触发器可用来实现变更数据捕获(参阅“[基于触发器的复制](ch5.md#基于触发器的复制)”),通过注册观察所有变更的触发器,并将相应的变更项写入变更日志表中。但是它们往往是脆弱的,而且有显著的性能开销。解析复制日志可能是一种更稳健的方法,但它也很有挑战,例如如何应对模式变更。 -​ LinkedIn的Databus 【25】,Facebook的Wormhole 【26】和Yahoo!的Sherpa【27】大规模地应用这个思路。 Bottled Water使用解码WAL的API实现了PostgreSQL的CDC 【28】,Maxwell和Debezium通过解析binlog对MySQL做了类似的事情【29,30,31】,Mongoriver读取MongoDB oplog 【32,33】 ,而GoldenGate为Oracle提供类似的功能【34,35】。 +​ LinkedIn的Databus【25】,Facebook的Wormhole【26】和Yahoo!的Sherpa【27】大规模地应用这个思路。 Bottled Water使用解码WAL的API实现了PostgreSQL的CDC【28】,Maxwell和Debezium通过解析binlog对MySQL做了类似的事情【29,30,31】,Mongoriver读取MongoDB oplog【32,33】,而GoldenGate为Oracle提供类似的功能【34,35】。 ​ 像消息代理一样,变更数据捕获通常是异步的:记录数据库系统不会等待消费者应用变更再进行提交。这种设计具有的运维优势是,添加缓慢的消费者不会过度影响记录系统。不过,所有复制延迟可能有的问题在这里都可能出现(参见“[复制延迟问题](ch5.md#复制延迟问题)”)。 #### 初始快照 -​ 如果你拥有**所有**对数据库进行变更的日志,则可以通过重放该日志,来重建数据库的完整状态。但是在许多情况下,永远保留所有更改会耗费太多磁盘空间,且重放过于费时,因此日志需要被截断。 +​ 如果你拥有**所有**对数据库进行变更的日志,则可以通过重播该日志,来重建数据库的完整状态。但是在许多情况下,永远保留所有更改会耗费太多磁盘空间,且重播过于费时,因此日志需要被截断。 -​ 例如,构建新的全文索引需要整个数据库的完整副本 —— 仅仅应用最近变更的日志是不够的,因为这样会丢失最近未曾更新的项目。因此,如果你没有完整的历史日志,则需要从一个一致的快照开始,如先前上的“[设置新的从库](ch5.md#设置新的从库)”中所述。 +​ 例如,构建新的全文索引需要整个数据库的完整副本 —— 仅仅应用最近变更的日志是不够的,因为这样会丢失最近未曾更新的项目。因此,如果你没有完整的历史日志,则需要从一个一致的快照开始,如先前的“[设置新从库](ch5.md#设置新从库)”中所述。 ​ 数据库的快照必须与变更日志中的已知位置或偏移量相对应,以便在处理完快照后知道从哪里开始应用变更。一些CDC工具集成了这种快照功能,而其他工具则把它留给你手动执行。 @@ -275,38 +275,38 @@ ​ 如果你只能保留有限的历史日志,则每次要添加新的衍生数据系统时,都需要做一次快照。但**日志压缩(log compaction)** 提供了一个很好的备选方案。 -​ 我们之前在日志结构存储引擎的上下文中讨论了“[Hash索引](ch3.md#Hash索引)”中的日志压缩(参见[图3-2](img/fig3-2.png)的示例)。原理很简单:存储引擎定期在日志中查找具有相同键的记录,丢掉所有重复的内容,并只保留每个键的最新更新。这个压缩与合并过程在后台运行。 +​ 我们之前在“[哈希索引](ch3.md#哈希索引)”中关于日志结构存储引擎的上下文中讨论了日志压缩(参见[图3-2](img/fig3-2.png)的示例)。原理很简单:存储引擎定期在日志中查找具有相同键的记录,丢掉所有重复的内容,并只保留每个键的最新更新。这个压缩与合并过程在后台运行。 ​ 在日志结构存储引擎中,具有特殊值NULL(**墓碑(tombstone)**)的更新表示该键被删除,并会在日志压缩过程中被移除。但只要键不被覆盖或删除,它就会永远留在日志中。这种压缩日志所需的磁盘空间仅取决于数据库的当前内容,而不取决于数据库中曾经发生的写入次数。如果相同的键经常被覆盖写入,则先前的值将最终将被垃圾回收,只有最新的值会保留下来。 ​ 在基于日志的消息代理与变更数据捕获的上下文中也适用相同的想法。如果CDC系统被配置为,每个变更都包含一个主键,且每个键的更新都替换了该键以前的值,那么只需要保留对键的最新写入就足够了。 -​ 现在,无论何时需要重建衍生数据系统(如搜索索引),你可以从压缩日志主题0偏移量处启动新的消费者,然后依次扫描日志中的所有消息。日志能保证包含数据库中每个键的最新值(也可能是一些较旧的值)—— 换句话说,你可以使用它来获取数据库内容的完整副本,而无需从CDC源数据库取一个快照。 +​ 现在,无论何时需要重建衍生数据系统(如搜索索引),你可以从压缩日志主题的零偏移量处启动新的消费者,然后依次扫描日志中的所有消息。日志能保证包含数据库中每个键的最新值(也可能是一些较旧的值)—— 换句话说,你可以使用它来获取数据库内容的完整副本,而无需从CDC源数据库取一个快照。 ​ Apache Kafka支持这种日志压缩功能。正如我们将在本章后面看到的,它允许消息代理被当成持久性存储使用,而不仅仅是用于临时消息。 #### 变更流的API支持 -​ 越来越多的数据库开始将变更流作为第一类的接口,而不像传统上要去做加装改造,费工夫逆向工程一个CDC。例如,RethinkDB允许查询订阅通知,当查询结果变更时获得通知【36】,Firebase 【37】和CouchDB 【38】基于变更流进行同步,该变更流同样可用于应用。而Meteor使用MongoDB oplog订阅数据变更,并改变了用户接口【39】。 +​ 越来越多的数据库开始将变更流作为第一等的接口,而不像传统上要去做加装改造,或者费工夫逆向工程一个CDC。例如,RethinkDB允许查询订阅通知,当查询结果变更时获得通知【36】,Firebase 【37】和CouchDB 【38】基于变更流进行同步,该变更流同样可用于应用。而Meteor使用MongoDB oplog订阅数据变更,并改变了用户接口【39】。 ​ VoltDB允许事务以流的形式连续地从数据库中导出数据【40】。数据库将关系数据模型中的输出流表示为一个表,事务可以向其中插入元组,但不能查询。已提交事务按照提交顺序写入这个特殊表,而流则由该表中的元组日志构成。外部消费者可以异步消费该日志,并使用它来更新衍生数据系统。 -​ Kafka Connect 【41】致力于将广泛的数据库系统的变更数据捕获工具与Kafka集成。一旦变更事件进入Kafka中,它就可以用于更新衍生数据系统,比如搜索索引,也可以用于本章稍后讨论的流处理系统。 +​ Kafka Connect【41】致力于将广泛的数据库系统的变更数据捕获工具与Kafka集成。一旦变更事件进入Kafka中,它就可以用于更新衍生数据系统,比如搜索索引,也可以用于本章稍后讨论的流处理系统。 ### 事件溯源 -​ 我们在这里讨论的想法和**事件溯源( Event Sourcing)** 之间有一些相似之处,这是一个在 **领域驱动设计(domain-driven design, DDD)** 社区中折腾出来的技术。我们将简要讨论事件溯源,因为它包含了一些关于流处理系统的有用想法。 +​ 我们在这里讨论的想法和**事件溯源(Event Sourcing)** 之间有一些相似之处,这是一个在 **领域驱动设计(domain-driven design, DDD)** 社区中折腾出来的技术。我们将简要讨论事件溯源,因为它包含了一些关于流处理系统的有用想法。 -​ 与变更数据捕获类似,事件溯源涉及到**将所有对应用状态的变更** 存储为变更事件日志。最大的区别是事件溯源将这一想法应用到了几个不同的抽象层次上: +​ 与变更数据捕获类似,事件溯源涉及到**将所有对应用状态的变更**存储为变更事件日志。最大的区别是事件溯源将这一想法应用到了一个不同的抽象层次上: -* 在变更数据捕获中,应用以**可变方式(mutable way)** 使用数据库,任意更新和删除记录。变更日志是从数据库的底层提取的(例如,通过解析复制日志),从而确保从数据库中提取的写入顺序与实际写入的顺序相匹配,从而避免[图11-4](img/fig11-4.png)中的竞态条件。写入数据库的应用不需要知道CDC的存在。 +* 在变更数据捕获中,应用以**可变方式(mutable way)**使用数据库,可以任意更新和删除记录。变更日志是从数据库的底层提取的(例如,通过解析复制日志),从而确保从数据库中提取的写入顺序与实际写入的顺序相匹配,从而避免[图11-4](img/fig11-4.png)中的竞态条件。写入数据库的应用不需要知道CDC的存在。 * 在事件溯源中,应用逻辑显式构建在写入事件日志的不可变事件之上。在这种情况下,事件存储是仅追加写入的,更新与删除是不鼓励的或禁止的。事件被设计为旨在反映应用层面发生的事情,而不是底层的状态变更。 -事件源是一种强大的数据建模技术:从应用的角度来看,将用户的行为记录为不可变的事件更有意义,而不是在可变数据库中记录这些行为的影响。事件代理使得应用随时间演化更为容易,通过事实更容易理解事情发生的原因,使得调试更为容易,并有利于防止应用Bug(请参阅“[不可变事件的优点](#不可变事件的优点)”)。 +​ 事件溯源是一种强大的数据建模技术:从应用的角度来看,将用户的行为记录为不可变的事件更有意义,而不是在可变数据库中记录这些行为的影响。事件溯源使得应用随时间演化更为容易,通过更容易理解事情发生的原因来帮助调试的进行,并有利于防止应用Bug(请参阅“[不可变事件的优点](#不可变事件的优点)”)。 -​ 例如,存储“学生取消选课”事件以中性的方式清楚地表达了单个行为的意图,而副作用“从注册表中删除了一个条目,而一条取消原因被添加到学生反馈表“则嵌入了很多有关稍后数据使用方式的假设。如果引入一个新的应用功能,例如“将位置留给等待列表中的下一个人” —— 事件溯源方法允许将新的副作用轻松地链接至现有事件之后。 +​ 例如,存储“学生取消选课”事件以中性的方式清楚地表达了单个行为的意图,而其副作用“从登记表中删除了一个条目,而一条取消原因的记录被添加到学生反馈表“则嵌入了很多有关稍后对数据的使用方式的假设。如果引入一个新的应用功能,例如“将位置留给等待列表中的下一个人” —— 事件溯源方法允许将新的副作用轻松地从现有事件中脱开。 -​ 事件溯源类似于**编年史(chronicle)** 数据模型【45】,事件日志与星型模式中的事实表之间也存在相似之处(参阅“[星型和雪花型:分析的模式](ch3.md#星型和雪花型:分析的模式)”) 。 +​ 事件溯源类似于**编年史(chronicle)**数据模型【45】,事件日志与星型模式中的事实表之间也存在相似之处(参阅“[星型和雪花型:分析的模式](ch3.md#星型和雪花型:分析的模式)”) 。 ​ 诸如Event Store【46】这样的专业数据库已经被开发出来,供使用事件溯源的应用使用,但总的来说,这种方法独立于任何特定的工具。传统的数据库或基于日志的消息代理也可以用来构建这种风格的应用。 @@ -316,34 +316,34 @@ ​ 因此,使用事件溯源的应用需要拉取事件日志(表示**写入**系统的数据),并将其转换为适合向用户显示的应用状态(从系统**读取**数据的方式【47】)。这种转换可以使用任意逻辑,但它应当是确定性的,以便能再次运行,并从事件日志中衍生出相同的应用状态。 -​ 与变更数据捕获一样,重放事件日志允许让你重新构建系统的当前状态。不过,日志压缩需要采用不同的方式处理: +​ 与变更数据捕获一样,重播事件日志允许让你重新构建系统的当前状态。不过,日志压缩需要采用不同的方式处理: * 用于记录更新的CDC事件通常包含记录的**完整新版本**,因此主键的当前值完全由该主键的最近事件确定,而日志压缩可以丢弃相同主键的先前事件。 * 另一方面,事件溯源在更高层次进行建模:事件通常表示用户操作的意图,而不是因为操作而发生的状态更新机制。在这种情况下,后面的事件通常不会覆盖先前的事件,所以你需要完整的历史事件来重新构建最终状态。这里进行同样的日志压缩是不可能的。 -使用事件溯源的应用通常有一些机制,用于存储从事件日志中导出的当前状态快照,因此它们不需要重复处理完整的日志。然而这只是一种性能优化,用来加速读取,提高从崩溃中恢复的速度;真正的目的是系统能够永久存储所有原始事件,并在需要时重新处理完整的事件日志。我们将在“[不变性的限制](#不变性的限制)”中讨论这个假设。 +​ 使用事件溯源的应用通常有一些机制,用于存储从事件日志中导出的当前状态快照,因此它们不需要重复处理完整的日志。然而这只是一种性能优化,用来加速读取,提高从崩溃中恢复的速度;真正的目的是系统能够永久存储所有原始事件,并在需要时重新处理完整的事件日志。我们将在“[不变性的限制](#不变性的限制)”中讨论这个假设。 #### 命令和事件 ​ 事件溯源的哲学是仔细区分**事件(event)**和**命令(command)**【48】。当来自用户的请求刚到达时,它一开始是一个命令:在这个时间点上它仍然可能可能失败,比如,因为违反了一些完整性条件。应用必须首先验证它是否可以执行该命令。如果验证成功并且命令被接受,则它变为一个持久化且不可变的事件。 -​ 例如,如果用户试图注册特定用户名,或预定飞机或剧院的座位,则应用需要检查用户名或座位是否已被占用。 (先前在“[容错概念](ch8.md#容错概念)”中讨论过这个例子)当检查成功时,应用可以生成一个事件,指示特定的用户名是由特定的用户ID注册的,座位已经预留给特定的顾客。 +​ 例如,如果用户试图注册特定用户名,或预定飞机或剧院的座位,则应用需要检查用户名或座位是否已被占用。(先前在“[容错共识](ch8.md#容错共识)”中讨论过这个例子)当检查成功时,应用可以生成一个事件,指示特定的用户名是由特定的用户ID注册的,或者座位已经预留给特定的顾客。 ​ 在事件生成的时刻,它就成为了**事实(fact)**。即使客户稍后决定更改或取消预订,他们之前曾预定了某个特定座位的事实仍然成立,而更改或取消是之后添加的单独的事件。 -​ 事件流的消费者不允许拒绝事件:当消费者看到事件时,它已经成为日志中不可变的一部分,并且可能已经被其他消费者看到了。因此任何对命令的验证,都需要在它成为事件之前同步完成。例如,通过使用一个可自动验证命令的可序列化事务来发布事件。 +​ 事件流的消费者不允许拒绝事件:当消费者看到事件时,它已经成为日志中不可变的一部分,并且可能已经被其他消费者看到了。因此任何对命令的验证,都需要在它成为事件之前同步完成。例如,通过使用一个可以原子性地自动验证命令并发布事件的可串行事务。 ​ 或者,预订座位的用户请求可以拆分为两个事件:第一个是暂时预约,第二个是验证预约后的独立的确认事件(如“[使用全序广播实现线性一致的存储](ch9.md#使用全序广播实现线性一致的存储)”中所述) 。这种分割方式允许验证发生在一个异步的过程中。 -### 状态,流和不变性 +### 状态、流和不变性 ​ 我们在[第10章](ch10.md)中看到,批处理因其输入文件不变性而受益良多,你可以在现有输入文件上运行实验性处理作业,而不用担心损坏它们。这种不变性原则也是使得事件溯源与变更数据捕获如此强大的原因。 -​ 我们通常将数据库视为应用程序当前状态的存储 —— 这种表示针对读取进行了优化,而且通常对于服务查询而言是最为方便的表示。状态的本质是,它会变化,所以数据库才会支持数据的增删改。这又是如何符合不变性的呢? +​ 我们通常将数据库视为应用程序当前状态的存储 —— 这种表示针对读取进行了优化,而且通常对于服务查询而言是最为方便的表示。状态的本质是,它会变化,所以数据库才会支持数据的增删改。这又该如何匹配不变性呢? -​ 只要你的状态发生了变化,那么这个状态就是这段时间中事件修改的结果。例如,当前可用的座位列表是已处理预订产生的结果,当前帐户余额是帐户中的借与贷的结果,而Web服务器的响应时间图,是所有已发生Web请求的独立响应时间的聚合结果。 +​ 只要你的状态发生了变化,那么这个状态就是这段时间中事件修改的结果。例如,当前可用的座位列表是你已处理的预订所产生的结果,当前帐户余额是帐户中的借与贷的结果,而Web服务器的响应时间图,是所有已发生Web请求的独立响应时间的聚合结果。 -​ 无论状态如何变化,总是有一系列事件导致了这些变化。即使事情已经执行与回滚,这些事件出现是始终成立的。关键的想法是:可变的状态与不可变事件的仅追加日志相互之间并不矛盾:它们是一体两面,互为阴阳的。所有变化的日志—— **变化日志(change log)**,表示了随时间演变的状态。 +​ 无论状态如何变化,总是有一系列事件导致了这些变化。即使事情已经执行与回滚,这些事件出现是始终成立的。关键的想法是:可变的状态与不可变事件的仅追加日志相互之间并不矛盾:它们是一体两面,互为阴阳的。所有变化的日志—— **变化日志(changelog)**,表示了随时间演变的状态。 ​ 如果你倾向于数学表示,那么你可能会说,应用状态是事件流对时间求积分得到的结果,而变更流是状态对时间求微分的结果,如[图11-6](img/fig11-6.png)所示【49,50,51】。这个比喻有一些局限性(例如,状态的二阶导似乎没有意义),但这是考虑数据的一个实用出发点。 $$ @@ -362,35 +362,35 @@ $$ #### 不可变事件的优点 -​ 数据库中的不变性是一个古老的概念。例如,会计在几个世纪以来一直在财务记账中应用不变性。一笔交易发生时,它被记录在一个仅追加写入的分类帐中,实质上是描述货币,商品或服务转手的事件日志。账目,比如利润、亏损、资产负债表,是从分类账中的交易求和衍生而来【53】。 +​ 数据库中的不变性是一个古老的概念。例如,会计在几个世纪以来一直在财务记账中应用不变性。一笔交易发生时,它被记录在一个仅追加写入的分类帐中,实质上是描述货币、商品或服务转手的事件日志。账目,比如利润、亏损、资产负债表,是从分类账中的交易求和衍生而来【53】。 -​ 如果发生错误,会计师不会删除或更改分类帐中的错误交易 —— 而是添加另一笔交易以补偿错误,例如退还一比不正确的费用。不正确的交易将永远保留在分类帐中,对于审计而言可能非常重要。如果从不正确的分类账衍生出的错误数字已经公布,那么下一个会计周期的数字就会包括一个更正。这个过程在会计事务中是很常见的【54】。 +​ 如果发生错误,会计师不会删除或更改分类帐中的错误交易 —— 而是添加另一笔交易以补偿错误,例如退还一笔不正确的费用。不正确的交易将永远保留在分类帐中,对于审计而言可能非常重要。如果从不正确的分类账衍生出的错误数字已经公布,那么下一个会计周期的数字就会包括一个更正。这个过程在会计事务中是很常见的【54】。 -​ 尽管这种可审计性在金融系统中尤其重要,但对于不受这种严格监管的许多其他系统,也是很有帮助的。如“[批处理输出的哲学](ch10.md#批处理输出的哲学)”中所讨论的,如果你意外地部署了将错误数据写入数据库的错误代码,当代码会破坏性地覆写数据时,恢复要困难得多。使用不可变事件的仅追加日志,诊断问题与故障恢复就要容易的多。 +​ 尽管这种可审计性只在金融系统中尤其重要,但对于不受这种严格监管的许多其他系统,也是很有帮助的。如“[批处理输出的哲学](ch10.md#批处理输出的哲学)”中所讨论的,如果你意外地部署了将错误数据写入数据库的错误代码,当代码会破坏性地覆写数据时,恢复要困难得多。使用不可变事件的仅追加日志,诊断问题与故障恢复就要容易的多。 -​ 不可变的事件也包含了比当前状态更多的信息。例如在购物网站上,顾客可以将物品添加到他们的购物车,然后再将其移除。虽然从履行订单的角度,第二个事件取消了第一个事件,但对分析目的而言,知道客户考虑过某个特定项而之后又反悔,可能是很有用的。也许他们会选择在未来购买,或者他们已经找到了替代品。这个信息被记录在事件日志中,但对于移出购物车就删除记录的数据库而言,这个信息在移出购物车时可能就丢失【42】。 +​ 不可变的事件也包含了比当前状态更多的信息。例如在购物网站上,顾客可以将物品添加到他们的购物车,然后再将其移除。虽然从履行订单的角度,第二个事件取消了第一个事件,但对分析目的而言,知道客户考虑过某个特定项而之后又反悔,可能是很有用的。也许他们会选择在未来购买,或者他们已经找到了替代品。这个信息被记录在事件日志中,但对于移出购物车就删除记录的数据库而言,这个信息在移出购物车时可能就丢失了【42】。 #### 从同一事件日志中派生多个视图 ​ 此外,通过从不变的事件日志中分离出可变的状态,你可以针对不同的读取方式,从相同的事件日志中衍生出几种不同的表现形式。效果就像一个流的多个消费者一样([图11-5](img/fig11-5.png)):例如,分析型数据库Druid使用这种方式直接从Kafka摄取数据【55】,Pistachio是一个分布式的键值存储,使用Kafka作为提交日志【56】,Kafka Connect能将来自Kafka的数据导出到各种不同的数据库与索引【41】。这对于许多其他存储和索引系统(如搜索服务器)来说是很有意义的,当系统要从分布式日志中获取输入时亦然(参阅“[保持系统同步](#保持系统同步)”)。 -​ 添加从事件日志到数据库的显式转换,能够使应用更容易地随时间演进:如果你想要引入一个新功能,以新的方式表示现有数据,则可以使用事件日志来构建一个单独的,针对新功能的读取优化视图,无需修改现有系统而与之共存。并行运行新旧系统通常比在现有系统中执行复杂的模式迁移更容易。一旦不再需要旧的系统,你可以简单地关闭它并回收其资源【47,57】。 +​ 添加从事件日志到数据库的显式转换,能够使应用更容易地随时间演进:如果你想要引入一个新功能,以新的方式表示现有数据,则可以使用事件日志来构建一个单独的、针对新功能的读取优化视图,无需修改现有系统而与之共存。并行运行新旧系统通常比在现有系统中执行复杂的模式迁移更容易。一旦不再需要旧的系统,你可以简单地关闭它并回收其资源【47,57】。 -​ 如果你不需要担心如何查询与访问数据,那么存储数据通常是非常简单的。模式设计,索引和存储引擎的许多复杂性,都是希望支持某些特定查询和访问模式的结果(参见[第3章](ch3.md))。出于这个原因,通过将数据写入的形式与读取形式相分离,并允许几个不同的读取视图,你能获得很大的灵活性。这个想法有时被称为**命令查询责任分离(command query responsibility segregation, CQRS)**【42,58,59】。 +​ 如果你不需要担心如何查询与访问数据,那么存储数据通常是非常简单的。模式设计、索引和存储引擎的许多复杂性,都是希望支持某些特定查询和访问模式的结果(参见[第3章](ch3.md))。出于这个原因,通过将数据写入的形式与读取形式相分离,并允许几个不同的读取视图,你能获得很大的灵活性。这个想法有时被称为**命令查询责任分离(command query responsibility segregation, CQRS)**【42,58,59】。 ​ 数据库和模式设计的传统方法是基于这样一种谬论,数据必须以与查询相同的形式写入。如果可以将数据从针对写入优化的事件日志转换为针对读取优化的应用状态,那么有关规范化和非规范化的争论就变得无关紧要了(参阅“[多对一和多对多的关系](ch2.md#多对一和多对多的关系)”):在针对读取优化的视图中对数据进行非规范化是完全合理的,因为翻译过程提供了使其与事件日志保持一致的机制。 -​ 在“[描述负载](ch1.md#描述负载)”中,我们讨论了推特主页时间线,它是特定用户关注人群所发推特的缓存(类似邮箱)。这是**针对读取优化的状态**的又一个例子:主页时间线是高度非规范化的,因为你的推文与所有粉丝的时间线都构成了重复。然而,扇出服务保持了这种重复状态与新推特以及新关注关系的同步,从而保证了重复的可管理性。 +​ 在“[描述负载](ch1.md#描述负载)”中,我们讨论了推特主页时间线,它是特定用户关注的人群所发推特的缓存(类似邮箱)。这是**针对读取优化的状态**的又一个例子:主页时间线是高度非规范化的,因为你的推文与你所有粉丝的时间线都构成了重复。然而,扇出服务保持了这种重复状态与新推特以及新关注关系的同步,从而保证了重复的可管理性。 #### 并发控制 -​ 事件溯源和变更数据捕获的最大缺点是,事件日志的消费者通常是异步的,所以可能会出现这样的情况:用户会写入日志,然后从日志衍生视图中读取,结果发现他的写入还没有反映在读取视图中。我们之前在在“[读己之写](ch5.md#读己之写)”中讨论了这个问题以及可能的解决方案。 +​ 事件溯源和变更数据捕获的最大缺点是,事件日志的消费者通常是异步的,所以可能会出现这样的情况:用户会写入日志,然后从日志衍生视图中读取,结果发现他的写入还没有反映在读取视图中。我们之前在“[读己之写](ch5.md#读己之写)”中讨论了这个问题以及可能的解决方案。 -​ 一种解决方案是将事件附加到日志时同步执行读取视图的更新。而将这些写入操作合并为一个原子单元需要**事务**,所以要么将事件日志和读取视图保存在同一个存储系统中,要么就需要跨不同系统进行分布式事务。或者,你也可以使用在“[使用全序广播实现线性一致的存储](ch9.md#使用全序广播实现线性一致的存储)”中讨论的方法。 +​ 一种解决方案是将事件追加到日志时同步执行读取视图的更新。而将这些写入操作合并为一个原子单元需要**事务**,所以要么将事件日志和读取视图保存在同一个存储系统中,要么就需要跨不同系统进行分布式事务。或者,你也可以使用在“[使用全序广播实现线性一致的存储](ch9.md#使用全序广播实现线性一致的存储)”中讨论的方法。 ​ 另一方面,从事件日志导出当前状态也简化了并发控制的某些部分。许多对于多对象事务的需求(参阅“[单对象和多对象操作](ch7.md#单对象和多对象操作)”)源于单个用户操作需要在多个不同的位置更改数据。通过事件溯源,你可以设计一个自包含的事件以表示一个用户操作。然后用户操作就只需要在一个地方进行单次写入操作 —— 即将事件附加到日志中 —— 这个还是很容易使原子化的。 -​ 如果事件日志与应用状态以相同的方式分区(例如,处理分区3中的客户事件只需要更新分区3中的应用状态),那么直接使用单线程日志消费者就不需要写入并发控制了。它从设计上一次只处理一个事件(参阅“[真的的串行执行](ch7.md#真的的串行执行)”)。日志通过在分区中定义事件的序列顺序,消除了并发性的不确定性【24】。如果一个事件触及多个状态分区,那么需要做更多的工作,我们将在[第12章](ch12.md)讨论。 +​ 如果事件日志与应用状态以相同的方式分区(例如,处理分区3中的客户事件只需要更新分区3中的应用状态),那么直接使用单线程日志消费者就不需要写入并发控制了。它从设计上一次只处理一个事件(参阅“[真的串行执行](ch7.md#真的串行执行)”)。日志通过在分区中定义事件的序列顺序,消除了并发性的不确定性【24】。如果一个事件触及多个状态分区,那么需要做更多的工作,我们将在[第12章](ch12.md)讨论。 #### 不变性的限制 @@ -402,7 +402,7 @@ $$ ​ 在这种情况下,仅仅在日志中添加另一个事件来指明先前的数据应该被视为删除是不够的 —— 你实际上是想改写历史,并假装数据从一开始就没有写入。例如,Datomic管这个特性叫**切除(excision)** 【62】,而Fossil版本控制系统有一个类似的概念叫**避免(shunning)** 【63】。 -​ 真正删除数据是非常非常困难的【64】,因为副本可能存在于很多地方:例如,存储引擎,文件系统和SSD通常会向一个新位置写入,而不是原地覆盖旧数据【52】,而备份通常是特意做成不可变的,防止意外删除或损坏。删除更多的是“使取回数据更困难”,而不是“使取回数据不可能”。无论如何,有时你必须得尝试,正如我们在“[立法与自律](ch12.md#立法与自律)”中所看到的。 +​ 真正删除数据是非常非常困难的【64】,因为副本可能存在于很多地方:例如,存储引擎,文件系统和SSD通常会向一个新位置写入,而不是原地覆盖旧数据【52】,而备份通常是特意做成不可变的,防止意外删除或损坏。删除操作更多的是指“使取回数据更困难”,而不是指“使取回数据不可能”。无论如何,有时你必须得尝试,正如我们在“[立法与自律](ch12.md#立法与自律)”中所看到的。 @@ -412,19 +412,19 @@ $$ 剩下的就是讨论一下你可以用流做什么 —— 也就是说,你可以处理它。一般来说,有三种选项: -1. 你可以将事件中的数据写入数据库,缓存,搜索索引或类似的存储系统,然后能被其他客户端查询。如[图11-5](img/fig11-5.png)所示,这是数据库与系统其他部分发生变更保持同步的好方法 —— 特别是当流消费者是写入数据库的唯一客户端时。如“[批处理工作流的输出](ch10.md#批处理工作流的输出)”中所讨论的,它是写入存储系统的流等价物。 +1. 你可以将事件中的数据写入数据库、缓存、搜索索引或类似的存储系统,然后能被其他客户端查询。如[图11-5](img/fig11-5.png)所示,这是数据库与系统其他部分所发生的变更保持同步的好方法 —— 特别是当流消费者是写入数据库的唯一客户端时。如“[批处理工作流的输出](ch10.md#批处理工作流的输出)”中所讨论的,它是写入存储系统的流等价物。 2. 你能以某种方式将事件推送给用户,例如发送报警邮件或推送通知,或将事件流式传输到可实时显示的仪表板上。在这种情况下,人是流的最终消费者。 3. 你可以处理一个或多个输入流,并产生一个或多个输出流。流可能会经过由几个这样的处理阶段组成的流水线,最后再输出(选项1或2)。 -在本章的剩余部分中,我们将讨论选项3:处理流以产生其他衍生流。处理这样的流的代码片段,被称为**算子(operator)**或**作业(job)**。它与我们在[第10章](ch10.md)中讨论过的Unix进程和MapReduce作业密切相关,数据流的模式是相似的:一个流处理器以只读的方式使用输入流,并将其输出以仅追加的方式写入一个不同的位置。 + 在本章的剩余部分中,我们将讨论选项3:处理流以产生其他衍生流。处理这样的流的代码片段,被称为**算子(operator)**或**作业(job)**。它与我们在[第10章](ch10.md)中讨论过的Unix进程和MapReduce作业密切相关,数据流的模式是相似的:一个流处理器以只读的方式使用输入流,并将其输出以仅追加的方式写入一个不同的位置。 ​ 流处理中的分区和并行化模式也非常类似于[第10章](ch10.md)中介绍的MapReduce和数据流引擎,因此我们不再重复这些主题。基本的Map操作(如转换和过滤记录)也是一样的。 -​ 与批量作业相比的一个关键区别是,流不会结束。这种差异会带来很多隐含的结果。正如本章开始部分所讨论的,排序对无界数据集没有意义,因此无法使用**排序合并联接**(请参阅“[Reduce端连接与分组](ch10.md#减少连接和分组)”)。容错机制也必须改变:对于已经运行了几分钟的批处理作业,可以简单地从头开始重启失败任务,但是对于已经运行数年的流作业,重启后从头开始跑可能并不是一个可行的选项。 +​ 与批量作业相比的一个关键区别是,流不会结束。这种差异会带来很多隐含的结果。正如本章开始部分所讨论的,排序对无界数据集没有意义,因此无法使用**排序合并连接**(请参阅“[Reduce侧连接与分组](ch10.md#Reduce侧连接与分组)”)。容错机制也必须改变:对于已经运行了几分钟的批处理作业,可以简单地从头开始重启失败任务,但是对于已经运行数年的流作业,重启后从头开始跑可能并不是一个可行的选项。 ### 流处理的应用 -长期以来,流处理一直用于监控目的,如果某个事件发生,单位希望能得到警报。例如: +长期以来,流处理一直用于监控目的,如果某个事件发生,组织希望能得到警报。例如: * 欺诈检测系统需要确定信用卡的使用模式是否有意外地变化,如果信用卡可能已被盗刷,则锁卡。 * 交易系统需要检查金融市场的价格变化,并根据指定的规则进行交易。 @@ -435,13 +435,13 @@ $$ #### 复合事件处理 -​ **复合事件处理(complex, event processing, CEP)** 是20世纪90年代为分析事件流而开发出的一种方法,尤其适用于需要搜索某些事件模式的应用【65,66】。与正则表达式允许你在字符串中搜索特定字符模式的方式类似,CEP允许你指定规则以在流中搜索某些事件模式。 +​ **复合事件处理(complex event processing, CEP)** 是20世纪90年代为分析事件流而开发出的一种方法,尤其适用于需要搜索某些事件模式的应用【65,66】。与正则表达式允许你在字符串中搜索特定字符模式的方式类似,CEP允许你指定规则以在流中搜索某些事件模式。 -​ CEP系统通常使用高层次的声明式查询语言,比如SQL,或者图形用户界面,来描述应该检测到的事件模式。这些查询被提交给处理引擎,该引擎消费输入流,并在内部维护一个执行所需匹配的状态机。当发现匹配时,引擎发出一个**复合事件(complex event)**(因此得名),并附有检测到的事件模式详情【67】。 +​ CEP系统通常使用高层次的声明式查询语言,比如SQL,或者图形用户界面,来描述应该检测到的事件模式。这些查询被提交给处理引擎,该引擎消费输入流,并在内部维护一个执行所需匹配的状态机。当发现匹配时,引擎发出一个**复合事件(complex event)**(CEP因此得名),并附有检测到的事件模式详情【67】。 ​ 在这些系统中,查询和数据之间的关系与普通数据库相比是颠倒的。通常情况下,数据库会持久存储数据,并将查询视为临时的:当查询进入时,数据库搜索与查询匹配的数据,然后在查询完成时丢掉查询。 CEP引擎反转了角色:查询是长期存储的,来自输入流的事件不断流过它们,搜索匹配事件模式的查询【68】。 -​ CEP的实现包括Esper 【69】,IBM InfoSphere Streams 【70】,Apama,TIBCO StreamBase和SQLstream。像Samza这样的分布式流处理组件,支持使用SQL在流上进行声明式查询【71】。 +​ CEP的实现包括Esper【69】,IBM InfoSphere Streams【70】,Apama,TIBCO StreamBase和SQLstream。像Samza这样的分布式流处理组件,支持使用SQL在流上进行声明式查询【71】。 #### 流分析 @@ -451,37 +451,35 @@ $$ * 滚动计算一段时间窗口内某个值的平均值 * 将当前的统计值与先前的时间区间的值对比(例如,检测趋势,当指标与上周同比异常偏高或偏低时报警) -这些统计值通常是在固定时间区间内进行计算的,例如,你可能想知道在过去5分钟内服务每秒查询次数的均值,以及此时间段内响应时间的第99百分位点。在几分钟内取平均,能抹平秒和秒之间的无关波动,且仍然能向你展示流量模式的时间图景。聚合的时间间隔称为**窗口(window)**,我们将在“[理解时间](#理解时间)”中更详细地讨论窗口。 + 这些统计值通常是在固定时间区间内进行计算的,例如,你可能想知道在过去5分钟内服务每秒查询次数的均值,以及此时间段内响应时间的第99百分位点。在几分钟内取平均,能抹平秒和秒之间的无关波动,且仍然能向你展示流量模式的时间图景。聚合的时间间隔称为**窗口(window)**,我们将在“[时间推理](#时间推理)”中更详细地讨论窗口。 -​ 流分析系统有时会使用概率算法,例如Bloom filter(我们在“[性能优化](ch3.md#性能优化)”中遇到过)来管理成员资格,HyperLogLog 【72】用于基数估计以及各种百分比估计算法(请参阅“[实践中的百分位点](ch1.md#实践中的百分位点)“)。概率算法产出近似的结果,但比起精确算法的优点是内存使用要少得多。使用近似算法有时让人们觉得流处理系统总是有损的和不精确的,但这是错误看法:流处理并没有任何内在的近似性,而概率算法只是一种优化【73】。 +​ 流分析系统有时会使用概率算法,例如Bloom filter(我们在“[性能优化](ch3.md#性能优化)”中遇到过)来管理成员资格,HyperLogLog【72】用于基数估计以及各种百分比估计算法(请参阅“[实践中的百分位点](ch1.md#实践中的百分位点)“)。概率算法产出近似的结果,但比起精确算法的优点是内存使用要少得多。使用近似算法有时让人们觉得流处理系统总是有损的和不精确的,但这是错误看法:流处理并没有任何内在的近似性,而概率算法只是一种优化【73】。 ​ 许多开源分布式流处理框架的设计都是针对分析设计的:例如Apache Storm,Spark Streaming,Flink,Concord,Samza和Kafka Streams 【74】。托管服务包括Google Cloud Dataflow和Azure Stream Analytics。 #### 维护物化视图 -​ 我们在“[数据库和数据流](#数据库和数据流)”中看到,数据库的变更流可以用于维护衍生数据系统(如缓存,搜索索引和数据仓库),使其与源数据库保持最新。我们可以将这些示例视作维护**物化视图(materialized view)** 的一种具体场景(参阅“[聚合:数据立方体和物化视图](ch3.md#聚合:数据立方体和物化视图)”):在某个数据集上衍生出一个替代视图以便高效查询,并在底层数据变更时更新视图【50】。 +​ 我们在“[数据库与流](#数据库与流)”中看到,数据库的变更流可以用于维护衍生数据系统(如缓存、搜索索引和数据仓库),并使其与源数据库保持最新。我们可以将这些示例视作维护**物化视图(materialized view)** 的一种具体场景(参阅“[聚合:数据立方体和物化视图](ch3.md#聚合:数据立方体和物化视图)”):在某个数据集上衍生出一个替代视图以便高效查询,并在底层数据变更时更新视图【50】。 -​ 同样,在事件溯源中,应用程序的状态是通过**应用(apply)**事件日志来维护的;这里的应用状态也是一种物化视图。与流分析场景不同的是,仅考虑某个时间窗口内的事件通常是不够的:构建物化视图可能需要任意时间段内的**所有**事件,除了那些可能由日志压缩丢弃的过时事件(请参阅“[日志压缩](#日志压缩)“)。实际上,你需要一个可以一直延伸到时间开端的窗口。 +​ 同样,在事件溯源中,应用程序的状态是通过应用事件日志来维护的;这里的应用程序状态也是一种物化视图。与流分析场景不同的是,仅考虑某个时间窗口内的事件通常是不够的:构建物化视图可能需要任意时间段内的**所有**事件,除了那些可能由日志压缩丢弃的过时事件(请参阅“[日志压缩](#日志压缩)“)。实际上,你需要一个可以一直延伸到时间开端的窗口。 -​ 原则上讲,任何流处理组件都可以用于维护物化视图,尽管“永远运行”与一些面向分析的框架假设的“主要在有限时间段窗口上运行”背道而驰, Samza和Kafka Streams支持这种用法,建立在Kafka对日志压缩comp的支持上【75】。 +​ 原则上讲,任何流处理组件都可以用于维护物化视图,尽管“永远运行”与一些面向分析的框架假设的“主要在有限时间段窗口上运行”背道而驰, Samza和Kafka Streams支持这种用法,建立在Kafka对日志压缩的支持上【75】。 #### 在流上搜索 ​ 除了允许搜索由多个事件构成模式的CEP外,有时也存在基于复杂标准(例如全文搜索查询)来搜索单个事件的需求。 -​ 例如,媒体监测服务可以订阅新闻文章Feed与来自媒体的播客,搜索任何关于公司,产品或感兴趣的话题的新闻。这是通过预先构建一个搜索查询来完成的,然后不断地将新闻项的流与该查询进行匹配。在一些网站上也有类似的功能:例如,当市场上出现符合其搜索条件的新房产时,房地产网站的用户可以要求网站通知他们。 Elasticsearch的这种过滤器功能,是实现这种流搜索的一种选择【76】。 +​ 例如,媒体监测服务可以订阅新闻文章Feed与来自媒体的播客,搜索任何关于公司、产品或感兴趣的话题的新闻。这是通过预先构建一个搜索查询来完成的,然后不断地将新闻项的流与该查询进行匹配。在一些网站上也有类似的功能:例如,当市场上出现符合其搜索条件的新房产时,房地产网站的用户可以要求网站通知他们。Elasticsearch的这种过滤器功能,是实现这种流搜索的一种选择【76】。 -​ 传统的搜索引擎首先索引文件,然后在索引上跑查询。相比之下,搜索一个数据流则反了过来:查询被存储下来,文档从查询中流过,就像在CEP中一样。在简单的情况就是,你可以为每个文档测试每个查询。但是如果你有大量查询,这可能会变慢。为了优化这个过程,可以像对文档一样,为查询建立索引。因而收窄可能匹配的查询集合【77】。 +​ 传统的搜索引擎首先索引文件,然后在索引上跑查询。相比之下,搜索一个数据流则反了过来:查询被存储下来,文档从查询中流过,就像在CEP中一样。最简单的情况就是,你可以为每个文档测试每个查询。但是如果你有大量查询,这可能会变慢。为了优化这个过程,可以像对文档一样,为查询建立索引。因而收窄可能匹配的查询集合【77】。 #### 消息传递和RPC -​ 在“[消息传递数据流](ch4.md#消息传递数据流)”中我们讨论过,消息传递系统可以作为RPC的替代方案,即作为一种服务间通信的机制,比如在Actor模型中所使用的那样。尽管这些系统也是基于消息和事件,但我们通常不会将其视作流处理组件: +​ 在“[消息传递中的数据流](ch4.md#消息传递中的数据流)”中我们讨论过,消息传递系统可以作为RPC的替代方案,即作为一种服务间通信的机制,比如在Actor模型中所使用的那样。尽管这些系统也是基于消息和事件,但我们通常不会将其视作流处理组件: * Actor框架主要是管理模块通信的并发和分布式执行的一种机制,而流处理主要是一种数据管理技术。 - - -* Actor之间的交流往往是短暂的,一对一的;而事件日志则是持久的,多订阅者的。 -* Actor可以以任意方式进行通信(允许包括循环的请求/响应),但流处理通常配置在无环流水线中,其中每个流都是一个特定作业的输出,由良好定义的输入流中派生而来。 +* Actor之间的交流往往是短暂的、一对一的;而事件日志则是持久的、多订阅者的。 +* Actor可以以任意方式进行通信(包括循环的请求/响应模式),但流处理通常配置在无环流水线中,其中每个流都是一个特定作业的输出,由良好定义的输入流中派生而来。 也就是说,RPC类系统与流处理之间有一些交叉领域。例如,Apache Storm有一个称为**分布式RPC**的功能,它允许将用户查询分散到一系列也处理事件流的节点上;然后这些查询与来自输入流的事件交织,而结果可以被汇总并发回给用户【78】(另参阅“[多分区数据处理](ch12.md#多分区数据处理)”)。 @@ -489,21 +487,21 @@ $$ ### 时间推理 -​ 流处理通常需要与时间打交道,尤其是用于分析目的时候,会频繁使用时间窗口,例如“过去五分钟的平均值”。“最后五分钟”的含义看上去似乎是清晰而无歧义的,但不幸的是,这个概念非常棘手。 +​ 流处理通常需要与时间打交道,尤其是用于分析目的时候,会频繁使用时间窗口,例如“过去五分钟的平均值”。“过去五分钟”的含义看上去似乎是清晰而无歧义的,但不幸的是,这个概念非常棘手。 -​ 在批处理中过程中,大量的历史事件迅速收缩。如果需要按时间来分析,批处理器需要检查每个事件中嵌入的时间戳。读取运行批处理机器的系统时钟没有任何意义,因为处理运行的时间与事件实际发生的时间无关。 +​ 在批处理中过程中,大量的历史事件被快速地处理。如果需要按时间来分析,批处理器需要检查每个事件中嵌入的时间戳。读取运行批处理机器的系统时钟没有任何意义,因为处理运行的时间与事件实际发生的时间无关。 -​ 批处理可以在几分钟内读取一年的历史事件;在大多数情况下,感兴趣的时间线是历史中的一年,而不是处理中的几分钟。而且使用事件中的时间戳,使得处理是**确定性**的:在相同的输入上再次运行相同的处理过程会得到相同的结果(参阅“[故障容错](ch10.md#故障容错)”)。 +​ 批处理可以在几分钟内读取一年的历史事件;在大多数情况下,感兴趣的时间线是历史中的一年,而不是处理中的几分钟。而且使用事件中的时间戳,使得处理是**确定性**的:在相同的输入上再次运行相同的处理过程会得到相同的结果(参阅“[容错](ch10.md#容错)”)。 -​ 另一方面,许多流处理框架使用处理机器上的本地系统时钟(**处理时间(processing time)**)来确定**窗口**【79】。这种方法的优点是简单,事件创建与事件处理之间的延迟可以忽略不计。然而,如果存在任何显著的处理延迟 —— 即,事件处理显著地晚于事件实际发生的时间,处理就失效了。 +​ 另一方面,许多流处理框架使用处理机器上的本地系统时钟(**处理时间(processing time)**)来确定**窗口(windowing)**【79】。这种方法的优点是简单,如果事件创建与事件处理之间的延迟可以忽略不计,那也是合理的。然而,如果存在任何显著的处理延迟 —— 即,事件处理显著地晚于事件实际发生的时间,这种处理方式就失效了。 #### 事件时间与处理时间 -​ 很多原因都可能导致处理延迟:排队,网络故障(参阅“[不可靠的网络](ch8.md#不可靠的网络)”),性能问题导致消息代理/消息处理器出现争用,流消费者重启,重新处理过去的事件(参阅“[重放旧消息](#重放旧消息)”),或者在修复代码BUG之后从故障中恢复。 +​ 很多原因都可能导致处理延迟:排队,网络故障(参阅“[不可靠的网络](ch8.md#不可靠的网络)”),性能问题导致消息代理/消息处理器出现争用,流消费者重启,从故障中恢复时重新处理过去的事件(参阅“[重播旧消息](#重播旧消息)”),或者在修复代码BUG之后。 ​ 而且,消息延迟还可能导致无法预测消息顺序。例如,假设用户首先发出一个Web请求(由Web服务器A处理),然后发出第二个请求(由服务器B处理)。 A和B发出描述它们所处理请求的事件,但是B的事件在A的事件发生之前到达消息代理。现在,流处理器将首先看到B事件,然后看到A事件,即使它们实际上是以相反的顺序发生的。 -​ 有一个类比也许能帮助理解,“星球大战”电影:第四集于1977年发行,第五集于1980年,第六集于1983年,紧随其后的是1999年的第一集,2002年的第二集,和2005年的三集,以及2015年的第七集【80】[^ii]。如果你按照按照它们上映的顺序观看电影,你处理电影的顺序与它们叙事的顺序就是不一致的。 (集数编号就像事件时间戳,而你观看电影的日期就是处理时间)作为人类,我们能够应对这种不连续性,但是流处理算法需要专门编写,以适应这种时机与顺序的问题。 +​ 有一个类比也许能帮助理解,“星球大战”电影:第四集于1977年发行,第五集于1980年,第六集于1983年,紧随其后的是1999年的第一集,2002年的第二集,和2005年的第三集,以及2015年的第七集【80】[^ii]。如果你按照按照它们上映的顺序观看电影,你处理电影的顺序与它们叙事的顺序就是不一致的。 (集数编号就像事件时间戳,而你观看电影的日期就是处理时间)作为人类,我们能够应对这种不连续性,但是流处理算法需要专门编写,以适应这种时序与顺序的问题。 [^ii]: 感谢Flink社区的Kostas Kloudas提出这个比喻。 @@ -522,7 +520,7 @@ $$ ​ 在一段时间没有看到任何新的事件之后,你可以超时并宣布一个窗口已经就绪,但仍然可能发生这种情况:某些事件被缓冲在另一台机器上,由于网络中断而延迟。你需要能够处理这种在窗口宣告完成之后到达的 **滞留(straggler)** 事件。大体上,你有两种选择【1】: 1. 忽略这些滞留事件,因为在正常情况下它们可能只是事件中的一小部分。你可以将丢弃事件的数量作为一个监控指标,并在出现大量丢消息的情况时报警。 -2. 发布一个**更正(correction)**,一个包括滞留事件的更新窗口值。更新的窗口与包含散兵队员的价值。你可能还需要收回以前的输出。 +2. 发布一个**更正(correction)**,一个包括滞留事件的更新窗口值。你可能还需要收回以前的输出。 在某些情况下,可以使用特殊的消息来指示“从现在开始,不会有比t更早时间戳的消息了”,消费者可以使用它来触发窗口【81】。但是,如果不同机器上的多个生产者都在生成事件,每个生产者都有自己的最小时间戳阈值,则消费者需要分别跟踪每个生产者。在这种情况下,添加和删除生产者都是比较棘手的。 @@ -540,7 +538,7 @@ $$ 通过从第三个时间戳中减去第二个时间戳,可以估算设备时钟和服务器时钟之间的偏移(假设网络延迟与所需的时间戳精度相比可忽略不计)。然后可以将该偏移应用于事件时间戳,从而估计事件实际发生的真实时间(假设设备时钟偏移在事件发生时与送往服务器之间没有变化)。 -​ 这并不是流处理独有的问题,批处理有着完全一样的时间推理问题。只是在流处理的上下文中,我们更容易意识到时间的流逝。 +​ 这并不是流处理独有的问题,批处理有着完全一样的时 间推理问题。只是在流处理的上下文中,我们更容易意识到时间的流逝。 #### 窗口的类型 @@ -552,7 +550,7 @@ $$ ***跳动窗口(Hopping Window)*** -​ 跳动窗口也有着固定的长度,但允许窗口重叠以提供一些平滑。例如,一个带有1分钟跳跃步长的5分钟窗口将包含`10:03:00`至`10:07:59`之间的事件,而下一个窗口将覆盖`10:04:00`至`10:08:59`之间的事件,等等。通过首先计算1分钟的滚动窗口,然后在几个相邻窗口上进行聚合,可以实现这种跳动窗口。 +​ 跳动窗口也有着固定的长度,但允许窗口重叠以提供一些平滑。例如,一个带有1分钟跳跃步长的5分钟窗口将包含`10:03:00`至`10:07:59`之间的事件,而下一个窗口将覆盖`10:04:00`至`10:08:59`之间的事件,等等。通过首先计算1分钟的滚动窗口(tunmbling window),然后在几个相邻窗口上进行聚合,可以实现这种跳动窗口。 ***滑动窗口(Sliding Window)*** @@ -560,7 +558,7 @@ $$ ***会话窗口(Session window)*** -​ 与其他窗口类型不同,会话窗口没有固定的持续时间,而定义为:将同一用户出现时间相近的所有事件分组在一起,而当用户一段时间没有活动时(例如,如果30分钟内没有事件)窗口结束。会话切分是网站分析的常见需求(参阅“[GROUP BY](ch10.md#GROUP\ BY)”)。 +​ 与其他窗口类型不同,会话窗口没有固定的持续时间,而定义为:将同一用户出现时间相近的所有事件分组在一起,而当用户一段时间没有活动时(例如,如果30分钟内没有事件)窗口结束。会话切分是网站分析的常见需求(参阅“[分组](ch10.md#分组)”)。 ### 流式连接 @@ -578,17 +576,17 @@ $$ ​ 为了实现这种类型的连接,流处理器需要维护**状态**:例如,按会话ID索引最近一小时内发生的所有事件。无论何时发生搜索事件或点击事件,都会被添加到合适的索引中,而流处理器也会检查另一个索引是否有具有相同会话ID的事件到达。如果有匹配事件就会发出一个表示搜索结果被点击的事件;如果搜索事件直到过期都没看见有匹配的点击事件,就会发出一个表示搜索结果未被点击的事件。 -#### 流表连接(流扩展) +#### 流表连接(流扩充) -​ 在“[示例:用户活动事件分析](ch10.md#示例:用户活动事件分析)”([图10-2](img/fig10-2.png))中,我们看到了连接两个数据集的批处理作业示例:一组用户活动事件和一个用户档案数据库。将用户活动事件视为流,并在流处理器中连续执行相同的连接是很自然的想法:输入是包含用户ID的活动事件流,而输出还是活动事件流,但其中用户ID已经被扩展为用户的档案信息。这个过程有时被称为 使用数据库的信息来**扩充(enriching)** 活动事件。 +​ 在“[示例:用户活动事件分析](ch10.md#示例:用户活动事件分析)”([图10-2](img/fig10-2.png))中,我们看到了连接两个数据集的批处理作业示例:一组用户活动事件和一个用户档案数据库。将用户活动事件视为流,并在流处理器中连续执行相同的连接是很自然的想法:输入是包含用户ID的活动事件流,而输出还是活动事件流,但其中用户ID已经被扩展为用户的档案信息。这个过程有时被称为使用数据库的信息来**扩充(enriching)** 活动事件。 -​ 要执行此联接,流处理器需要一次处理一个活动事件,在数据库中查找事件的用户ID,并将档案信息添加到活动事件中。数据库查询可以通过查询远程数据库来实现。但正如在“[示例:分析用户活动事件](ch10.md#示例:分析用户活动事件)”一节中讨论的,此类远程查询可能会很慢,并且有可能导致数据库过载【75】。 +​ 要执行此连接,流处理器需要一次处理一个活动事件,在数据库中查找事件的用户ID,并将档案信息添加到活动事件中。数据库查询可以通过查询远程数据库来实现。但正如在“[示例:用户活动事件分析](ch10.md#示例:用户活动事件分析)”一节中讨论的,此类远程查询可能会很慢,并且有可能导致数据库过载【75】。 -​ 另一种方法是将数据库副本加载到流处理器中,以便在本地进行查询而无需网络往返。这种技术与我们在“[Map端连接](ch10.md#Map端连接)”中讨论的散列连接非常相似:如果数据库的本地副本足够小,则可以是内存中的散列表,比较大的话也可以是本地磁盘上的索引。 +​ 另一种方法是将数据库副本加载到流处理器中,以便在本地进行查询而无需网络往返。这种技术与我们在“[Map侧连接](ch10.md#Map侧连接)”中讨论的散列连接非常相似:如果数据库的本地副本足够小,则可以是内存中的散列表,比较大的话也可以是本地磁盘上的索引。 -​ 与批处理作业的区别在于,批处理作业使用数据库的时间点快照作为输入,而流处理器是长时间运行的,且数据库的内容可能随时间而改变,所以流处理器数据库的本地副本需要保持更新。这个问题可以通过变更数据捕获来解决:流处理器可以订阅用户档案数据库的更新日志,如同活跃事件流一样。当增添或修改档案时,流处理器会更新其本地副本。因此,我们有了两个流之间的连接:活动事件和档案更新。 +​ 与批处理作业的区别在于,批处理作业使用数据库的时间点快照作为输入,而流处理器是长时间运行的,且数据库的内容可能随时间而改变,所以流处理器数据库的本地副本需要保持更新。这个问题可以通过变更数据捕获来解决:流处理器可以订阅用户档案数据库的更新日志,如同活动事件流一样。当增添或修改档案时,流处理器会更新其本地副本。因此,我们有了两个流之间的连接:活动事件和档案更新。 -​ 流表连接实际上非常类似于流流连接;最大的区别在于对于表的变更日志流,连接使用了一个可以回溯到“时间起点”的窗口(概念上是无限的窗口),新版本的记录会覆盖更早的版本。对于输入的流,连接可能压根儿就没有维护窗口。 +​ 流表连接实际上非常类似于流流连接;最大的区别在于对于表的变更日志流,连接使用了一个可以回溯到“时间起点”的窗口(概念上是无限的窗口),新版本的记录会覆盖更早的版本。对于输入的流,连接可能压根儿就没有维护任何窗口。 #### 表表连接(维护物化视图) @@ -601,7 +599,7 @@ $$ * 当用户$u_1$开始关注用户$u_2$时,$u_2$最近的推文将被添加到$u_1$的时间线上。 * 当用户$u_1$取消关注用户$u_2$时,$u_2$的推文将从$u_1$的时间线中移除。 -要在流处理器中实现这种缓存维护,你需要推文事件流(发送与删除)和关注关系事件流(关注与取消关注)。流处理需要为维护一个数据库,包含每个用户的粉丝集合。以便知道当一条新推文到达时,需要更新哪些时间线【86】。 +要在流处理器中实现这种缓存维护,你需要推文事件流(发送与删除)和关注关系事件流(关注与取消关注)。流处理需要维护一个数据库,包含每个用户的粉丝集合。以便知道当一条新推文到达时,需要更新哪些时间线【86】。 观察这个流处理过程的另一种视角是:它维护了一个连接了两个表(推文与关注)的物化视图,如下所示: @@ -613,9 +611,9 @@ JOIN follows ON follows.followee_id = tweets.sender_id GROUP BY follows.follower_id ``` -​ 流连接直接对应于这个查询中的表连接。时间线实际上是这个查询结果的缓存,每当基础表发生变化时都会更新[^iii]。 +​ 流连接直接对应于这个查询中的表连接。时间线实际上是这个查询结果的缓存,每当底层的表发生变化时都会更新[^iii]。 -[^iii]: 如果你将流视作表的衍生物,如[图11-6](img/fig11-6.png)所示,而把一个连接看作是两个表的乘法u·v,那么会发生一些有趣的事情:物化连接的变化流遵循乘积法则:(u·v)'= u'v + uv'(u·v)'= u'v + uv'。 换句话说,任何推文的变化量都与当前的关注联系在一起,任何关注的变化量都与当前的推文相连接【49,50】。 +[^iii]: 如果你将流视作表的衍生物,如[图11-6](img/fig11-6.png)所示,而把一个连接看作是两个表的乘法u·v,那么会发生一些有趣的事情:物化连接的变化流遵循乘积法则:(u·v)'= u'v + uv'。 换句话说,任何推文的变化量都与当前的关注联系在一起,任何关注的变化量都与当前的推文相连接【49,50】。 #### 连接的时间依赖性 @@ -625,7 +623,7 @@ GROUP BY follows.follower_id ​ 这就产生了一个问题:如果不同流中的事件发生在近似的时间范围内,则应该按照什么样的顺序进行处理?在流表连接的例子中,如果用户更新了它们的档案,哪些活动事件与旧档案连接(在档案更新前处理),哪些又与新档案连接(在档案更新之后处理)?换句话说:你需要对一些状态做连接,如果状态会随着时间推移而变化,那应当使用什么时间点来连接呢【45】? -​ 这种时序依赖可能出现在很多地方。例如销售东西需要对发票应用适当的税率,这取决于所处的国家/州,产品类型,销售日期(因为税率会随时变化)。当连接销售额与税率表时,你可能期望的是使用销售时的税率参与连接。如果你正在重新处理历史数据,销售时的税率可能和现在的税率有所不同。 +​ 这种时序依赖可能出现在很多地方。例如销售东西需要对发票应用适当的税率,这取决于所处的国家/州,产品类型,销售日期(因为税率时不时会变化)。当连接销售额与税率表时,你可能期望的是使用销售时的税率参与连接。如果你正在重新处理历史数据,销售时的税率可能和现在的税率有所不同。 ​ 如果跨越流的事件顺序是未定的,则连接会变为不确定性的【87】,这意味着你在同样输入上重跑相同的作业未必会得到相同的结果:当你重跑任务时,输入流上的事件可能会以不同的方式交织。 @@ -645,7 +643,7 @@ GROUP BY follows.follower_id ​ 微批次也隐式提供了一个与批次大小相等的滚动窗口(按处理时间而不是事件时间戳分窗)。任何需要更大窗口的作业都需要显式地将状态从一个微批次转移到下一个微批次。 -​ Apache Flink则使用不同的方法,它会定期生成状态的滚动存档点并将其写入持久存储【92,93】。如果流算子崩溃,它可以从最近的存档点重启,并丢弃从最近检查点到崩溃之间的所有输出。存档点会由消息流中的 **壁障(barrier)** 触发,类似于微批次之间的边界,但不会强制一个特定的窗口大小。 +​ Apache Flink则使用不同的方法,它会定期生成状态的滚动存档点并将其写入持久存储【92,93】。如果流算子崩溃,它可以从最近的存档点重启,并丢弃从最近检查点到崩溃之间的所有输出。存档点会由消息流中的**壁障(barrier)**触发,类似于微批次之间的边界,但不会强制一个特定的窗口大小。 ​ 在流处理框架的范围内,微批次与存档点方法提供了与批处理一样的**恰好一次语义**。但是,只要输出离开流处理器(例如,写入数据库,向外部消息代理发送消息,或发送电子邮件),框架就无法抛弃失败批次的输出了。在这种情况下,重启失败任务会导致外部副作用发生两次,只有微批次或存档点不足以阻止这一问题。 @@ -663,23 +661,23 @@ GROUP BY follows.follower_id ​ 幂等操作是多次重复执行与单次执行效果相同的操作。例如,将键值存储中的某个键设置为某个特定值是幂等的(再次写入该值,只是用同样的值替代),而递增一个计数器不是幂等的(再次执行递增意味着该值递增两次)。 -​ 即使一个操作不是天生幂等的,往往可以通过一些额外的元数据做成幂等的。例如,在使用来自Kafka的消息时,每条消息都有一个持久的,单调递增的偏移量。将值写入外部数据库时可以将这个偏移量带上,这样你就可以判断一条更新是不是已经执行过了,因而避免重复执行。 +​ 即使一个操作不是天生幂等的,往往可以通过一些额外的元数据做成幂等的。例如,在使用来自Kafka的消息时,每条消息都有一个持久的、单调递增的偏移量。将值写入外部数据库时可以将这个偏移量带上,这样你就可以判断一条更新是不是已经执行过了,因而避免重复执行。 -​ Storm的Trident基于类似的想法来处理状态【78】。依赖幂等性意味着隐含了一些假设:重启一个失败的任务必须以相同的顺序重放相同的消息(基于日志的消息代理能做这些事),处理必须是确定性的,没有其他节点能同时更新相同的值【98,99】。 +​ Storm的Trident基于类似的想法来处理状态【78】。依赖幂等性意味着隐含了一些假设:重启一个失败的任务必须以相同的顺序重播相同的消息(基于日志的消息代理能做这些事),处理必须是确定性的,没有其他节点能同时更新相同的值【98,99】。 -​ 当从一个处理节点故障切换到另一个节点时,可能需要进行**防护(fencing)**(参阅“[领导和锁](ch8.md#领导和锁)”),以防止被假死节点干扰。尽管有这么多注意事项,幂等操作是一种实现**恰好一次语义**的有效方式,仅需很小的额外开销。 +​ 当从一个处理节点故障切换到另一个节点时,可能需要进行**防护(fencing)**(参阅“[领导者和锁](ch8.md#领导者和锁)”),以防止被假死节点干扰。尽管有这么多注意事项,幂等操作是一种实现**恰好一次语义**的有效方式,仅需很小的额外开销。 #### 失败后重建状态 ​ 任何需要状态的流处理 —— 例如,任何窗口聚合(例如计数器,平均值和直方图)以及任何用于连接的表和索引,都必须确保在失败之后能恢复其状态。 -​ 一种选择是将状态保存在远程数据存储中,并进行复制,然而正如在“[流表连接](#流表连接)”中所述,每个消息都要查询远程数据库可能会很慢。另一种方法是在流处理器本地保存状态,并定期复制。然后当流处理器从故障中恢复时,新任务可以读取状态副本,恢复处理而不丢失数据。 +​ 一种选择是将状态保存在远程数据存储中,并进行复制,然而正如在“[流表连接(流扩充)](#流表连接(流扩充))”中所述,每个消息都要查询远程数据库可能会很慢。另一种方法是在流处理器本地保存状态,并定期复制。然后当流处理器从故障中恢复时,新任务可以读取状态副本,恢复处理而不丢失数据。 ​ 例如,Flink定期捕获算子状态的快照,并将它们写入HDFS等持久存储中【92,93】。 Samza和Kafka Streams通过将状态变更发送到具有日志压缩功能的专用Kafka主题来复制状态变更,这与变更数据捕获类似【84,100】。 VoltDB通过在多个节点上对每个输入消息进行冗余处理来复制状态(参阅“[真的串行执行](ch7.md#真的串行执行)”)。 -​ 在某些情况下,甚至可能都不需要复制状态,因为它可以从输入流重建。例如,如果状态是从相当短的窗口中聚合而成,则简单地重放该窗口中的输入事件可能是足够快的。如果状态是通过变更数据捕获来维护的数据库的本地副本,那么也可以从日志压缩的变更流中重建数据库(参阅“[日志压缩](#日志压缩)”)。 +​ 在某些情况下,甚至可能都不需要复制状态,因为它可以从输入流重建。例如,如果状态是从相当短的窗口中聚合而成,则简单地重播该窗口中的输入事件可能是足够快的。如果状态是通过变更数据捕获来维护的数据库的本地副本,那么也可以从日志压缩的变更流中重建数据库(参阅“[日志压缩](#日志压缩)”)。 -​ 然而,所有这些权衡取决于底层基础架构的性能特征:在某些系统中,网络延迟可能低于磁盘访问延迟,网络带宽可能与磁盘带宽相当。没有针对所有情况的普世理想权衡,随着存储和网络技术的发展,本地状态与远程状态的优点也可能会互换。 +​ 然而,所有这些权衡取决于底层基础架构的性能特征:在某些系统中,网络延迟可能低于磁盘访问延迟,网络带宽也可能与磁盘带宽相当。没有针对所有情况的普适理想权衡,随着存储和网络技术的发展,本地状态与远程状态的优点也可能会互换。 @@ -691,19 +689,19 @@ GROUP BY follows.follower_id ***AMQP/JMS风格的消息代理*** -​ 代理将单条消息分配给消费者,消费者在成功处理单条消息后确认消息。消息被确认后从代理中删除。这种方法适合作为一种异步形式的RPC(另请参阅“[消息传递数据流](ch4.md#消息传递数据流)”),例如在任务队列中,消息处理的确切顺序并不重要,而且消息在处理完之后,不需要回头重新读取旧消息。 +​ 代理将单条消息分配给消费者,消费者在成功处理单条消息后确认消息。消息被确认后从代理中删除。这种方法适合作为一种异步形式的RPC(另请参阅“[消息传递中的数据流](ch4.md#消息传递中的数据流)”),例如在任务队列中,消息处理的确切顺序并不重要,而且消息在处理完之后,不需要回头重新读取旧消息。 ***基于日志的消息代理*** ​ 代理将一个分区中的所有消息分配给同一个消费者节点,并始终以相同的顺序传递消息。并行是通过分区实现的,消费者通过存档最近处理消息的偏移量来跟踪工作进度。消息代理将消息保留在磁盘上,因此如有必要的话,可以回跳并重新读取旧消息。 -​ 基于日志的方法与数据库中的复制日志(参见[第5章](ch5.md))和日志结构存储引擎(请参阅[第3章](ch3.md))有相似之处。我们看到,这种方法对于消费输入流,产生衍生状态与衍生输出数据流的系统而言特别适用。 +​ 基于日志的方法与数据库中的复制日志(参见[第5章](ch5.md))和日志结构存储引擎(请参阅[第3章](ch3.md))有相似之处。我们看到,这种方法对于消费输入流,并产生衍生状态或衍生输出数据流的系统而言特别适用。 ​ 就流的来源而言,我们讨论了几种可能性:用户活动事件,定期读数的传感器,和Feed数据(例如,金融中的市场数据)能够自然地表示为流。我们发现将数据库写入视作流也是很有用的:我们可以捕获变更日志 —— 即对数据库所做的所有变更的历史记录 —— 隐式地通过变更数据捕获,或显式地通过事件溯源。日志压缩允许流也能保有数据库内容的完整副本。 -​ 将数据库表示为流为系统集成带来了很多强大机遇。通过消费变更日志并将其应用至衍生系统,你能使诸如搜索索引,缓存,以及分析系统这类衍生数据系统不断保持更新。你甚至能从头开始,通过读取从创世至今的所有变更日志,为现有数据创建全新的视图。 +​ 将数据库表示为流为系统集成带来了很多强大机遇。通过消费变更日志并将其应用至衍生系统,你能使诸如搜索索引、缓存以及分析系统这类衍生数据系统不断保持更新。你甚至能从头开始,通过读取从创世至今的所有变更日志,为现有数据创建全新的视图。 -​ 像流一样维护状态,以及消息重放的基础设施,是在各种流处理框架中实现流连接和容错的基础。我们讨论了流处理的几种目的,包括搜索事件模式(复杂事件处理),计算分窗聚合(流分析),以及保证衍生数据系统处于最新状态(物化视图)。 +​ 像流一样维护状态以及消息重播的基础设施,是在各种流处理框架中实现流连接和容错的基础。我们讨论了流处理的几种目的,包括搜索事件模式(复杂事件处理),计算分窗聚合(流分析),以及保证衍生数据系统处于最新状态(物化视图)。 ​ 然后我们讨论了在流处理中对时间进行推理的困难,包括处理时间与事件时间戳之间的区别,以及当你认为窗口已经完事之后,如何处理到达的掉队事件的问题。 @@ -721,7 +719,7 @@ GROUP BY follows.follower_id ​ 两个输入流都是数据库变更日志。在这种情况下,一侧的每一个变化都与另一侧的最新状态相连接。结果是两表连接所得物化视图的变更流。 -最后,我们讨论了在流处理中实现容错和恰好一次语义的技术。与批处理一样,我们需要放弃任何部分失败任务的输出。然而由于流处理长时间运行并持续产生输出,所以不能简单地丢弃所有的输出。相反,可以使用更细粒度的恢复机制,基于微批次,存档点,事务,或幂等写入。 +最后,我们讨论了在流处理中实现容错和恰好一次语义的技术。与批处理一样,我们需要放弃任何失败任务的部分输出。然而由于流处理长时间运行并持续产生输出,所以不能简单地丢弃所有的输出。相反,可以使用更细粒度的恢复机制,基于微批次、存档点、事务或幂等写入。 ## 参考文献 diff --git a/ch12.md b/ch12.md index 071fb01..bbb9dba 100644 --- a/ch12.md +++ b/ch12.md @@ -50,7 +50,7 @@ #### 衍生数据与分布式事务 -​ 保持不同数据系统彼此一致的经典方法涉及分布式事务,如“[原子提交和两阶段提交(2PC)](ch9.md#原子提交和两阶段提交(2PC))”中所述。与分布式事务相比,使用衍生数据系统的方法如何? +​ 保持不同数据系统彼此一致的经典方法涉及分布式事务,如“[原子提交与两阶段提交(2PC)](ch9.md#原子提交与两阶段提交(2PC))”中所述。与分布式事务相比,使用衍生数据系统的方法如何? ​ 在抽象层面,它们通过不同的方式达到类似的目标。分布式事务通过**锁**进行互斥来决定写入的顺序(参阅“[两阶段锁定(2PL)](ch7.md#两阶段锁定(2PL))”),而CDC和事件溯源使用日志进行排序。分布式事务使用原子提交来确保变更只生效一次,而基于日志的系统通常基于**确定性重试**和**幂等性**。 @@ -289,11 +289,11 @@ ​ 从数据流的角度思考应用,意味着重新协调应用代码和状态管理之间的关系。将数据库视作被应用操纵的被动变量,取而代之的是更多地考虑状态,状态变更和处理它们的代码之间的相互作用与协同关系。应用代码通过在另一个地方触发状态变更来响应状态变更。 -​ 我们在“[流与数据库](ch11.md#流与数据库)”中看到了这一思路,我们讨论了将数据库的变更日志视为一种我们可以订阅的事件流。诸如Actor的消息传递系统(参阅“[消息传递数据流](ch4.md#消息传递数据流)”)也具有响应事件的概念。早在20世纪80年代,**元组空间(tuple space)** 模型就已经探索了表达分布式计算的方式:观察状态变更并作出反应【38,39】。 +​ 我们在“[数据库与流](ch11.md#数据库与流)”中看到了这一思路,我们讨论了将数据库的变更日志视为一种我们可以订阅的事件流。诸如Actor的消息传递系统(参阅“[消息传递数据流](ch4.md#消息传递数据流)”)也具有响应事件的概念。早在20世纪80年代,**元组空间(tuple space)** 模型就已经探索了表达分布式计算的方式:观察状态变更并作出反应【38,39】。 ​ 如前所述,当触发器由于数据变更而被触发时,或次级索引更新以反映索引表中的变更时,数据库内部也发生着类似的情况。分拆数据库意味着将这个想法应用于在主数据库之外,用于创建衍生数据集:缓存,全文搜索索引,机器学习或分析系统。我们可以为此使用流处理和消息传递系统。 -​ 需要记住的重要一点是,维护衍生数据不同于执行异步任务。传统消息系统通常是为执行异步任务设计的(参阅“[与传统消息传递相比的日志](ch11.md#与传统消息传递相比的日志)”): +​ 需要记住的重要一点是,维护衍生数据不同于执行异步任务。传统的消息传递系统通常是为执行异步任务设计的(参阅“[与传统消息传递相比的日志](ch11.md#与传统消息传递相比的日志)”): * 在维护衍生数据时,状态变更的顺序通常很重要(如果多个视图是从事件日志衍生的,则需要按照相同的顺序处理事件,以便它们之间保持一致)。如“[确认与重传](ch11.md#确认与重传)”中所述,许多消息代理在重传未确认消息时没有此属性,双写也被排除在外(参阅“[保持系统同步](ch11.md#保持系统同步)”)。 @@ -864,7 +864,7 @@ COMMIT; > > ​ 我们应该设法让他们感到骄傲。 -#### 立法和自律 +#### 立法与自律 ​ 数据保护法可能有助于维护个人的权利。例如,1995年的“欧洲数据保护指示”规定,个人数据必须“为特定的,明确的和合法的目的收集,而不是以与这些目的不相符的方式进一步处理”,并且数据必须“就收集的目的而言适当,相关,不过分。“【107】。 diff --git a/ch7.md b/ch7.md index ed933f0..f3e4350 100644 --- a/ch7.md +++ b/ch7.md @@ -208,7 +208,7 @@ SELECT COUNT(*)FROM emails WHERE recipient_id = 2 AND unread_flag = true - 如果事务实际上成功了,但是在服务器试图向客户端确认提交成功时网络发生故障(所以客户端认为提交失败了),那么重试事务会导致事务被执行两次——除非你有一个额外的应用级除重机制。 - 如果错误是由于负载过大造成的,则重试事务将使问题变得更糟,而不是更好。为了避免这种正反馈循环,可以限制重试次数,使用指数退避算法,并单独处理与过载相关的错误(如果允许)。 - 仅在临时性错误(例如,由于死锁,异常情况,临时性网络中断和故障切换)后才值得重试。在发生永久性错误(例如,违反约束)之后重试是毫无意义的。 -- 如果事务在数据库之外也有副作用,即使事务被中止,也可能发生这些副作用。例如,如果你正在发送电子邮件,那你肯定不希望每次重试事务时都重新发送电子邮件。如果你想确保几个不同的系统一起提交或放弃,**二阶段提交(2PC, two-phase commit)** 可以提供帮助(“[原子提交和两阶段提交(2PC)](ch9.md#原子提交与二阶段提交(2PC))”中将讨论这个问题)。 +- 如果事务在数据库之外也有副作用,即使事务被中止,也可能发生这些副作用。例如,如果你正在发送电子邮件,那你肯定不希望每次重试事务时都重新发送电子邮件。如果你想确保几个不同的系统一起提交或放弃,**两阶段提交(2PC, two-phase commit)** 可以提供帮助(“[原子提交与两阶段提交(2PC)](ch9.md#原子提交与两阶段提交(2PC))”中将讨论这个问题)。 - 如果客户端进程在重试中失效,任何试图写入数据库的数据都将丢失。 ## 弱隔离级别 diff --git a/ch9.md b/ch9.md index 4d04ef7..a05cf57 100644 --- a/ch9.md +++ b/ch9.md @@ -574,7 +574,7 @@ -### 原子提交与二阶段提交(2PC) +### 原子提交与两阶段提交(2PC) ​ 在[第7章](ch7.md)中我们了解到,事务原子性的目的是在多次写操作中途出错的情况下,提供一种简单的语义。事务的结果要么是成功提交,在这种情况下,事务的所有写入都是持久化的;要么是中止,在这种情况下,事务的所有写入都被回滚(即撤消或丢弃)。 diff --git a/glossary.md b/glossary.md index e92aede..0e22ed7 100644 --- a/glossary.md +++ b/glossary.md @@ -18,13 +18,13 @@ 1.在并发操作的上下文中:描述一个在单个时间点看起来生效的操作,所以另一个并发进程永远不会遇到处于“半完成”状态的操作。另见隔离。 -2.在事务的上下文中:将一些写入操作分为一组,这组写入要么全部提交成功,要么遇到错误时全部回滚。参见“[原子性(Atomicity)](ch7.md#原子性(Atomicity))”和“[原子提交与二阶段提交(2PC)](ch9.md#原子提交与二阶段提交(2PC))”。 +2.在事务的上下文中:将一些写入操作分为一组,这组写入要么全部提交成功,要么遇到错误时全部回滚。参见“[原子性(Atomicity)](ch7.md#原子性(Atomicity))”和“[原子提交与两阶段提交(2PC)](ch9.md#原子提交与两阶段提交(2PC))”。 ### 背压(backpressure) -接收方接收数据速度较慢时,强制降低发送方的数据发送速度。也称为流量控制。请参阅“[消息系统](ch11.md#消息系统)”。 +接收方接收数据速度较慢时,强制降低发送方的数据发送速度。也称为流量控制。请参阅“[消息传递系统](ch11.md#消息传递系统)”。 @@ -362,7 +362,7 @@ ### 两阶段提交(2PC, two-phase commit) -一种确保多个数据库节点全部提交或全部中止事务的算法。 请参阅[原子提交与二阶段提交(2PC)](ch9.md#原子提交与二阶段提交(2PC))”。 +一种确保多个数据库节点全部提交或全部中止事务的算法。 请参阅[原子提交与两阶段提交(2PC)](ch9.md#原子提交与两阶段提交(2PC))”。