ch11 rough trans

This commit is contained in:
Vonng 2018-03-26 03:41:45 +08:00
parent b70136534e
commit 9549a15471

192
ch11.md
View File

@ -10,16 +10,16 @@
[TOC]
在第10章中我们讨论了批处理技术它将一组文件读取为输入并生成一组新的输出文件。输出是派生数据的一种形式;也就是说,如果需要,可以通过再次运行批处理过程来重新创建数据集。我们看到了如何使用这个简单而强大的想法来创建搜索索引,推荐系统,分析等等。
[第10章](ch10.md)中,我们讨论了批处理技术,它将一组文件读取为输入并生成一组新的输出文件。输出是派生数据的一种形式;也就是说,如果需要,可以通过再次运行批处理过程来重新创建数据集。我们看到了如何使用这个简单而强大的想法来创建搜索索引,推荐系统,分析等等。
然而在第10章中仍然有一个大的假设即输入是有界的即已知和有限的大小所以批处理知道它何时完成了它的输入。例如MapReduce中心的排序操作必须读取其全部输入然后才能开始生成输出可能发生最后一个输入记录是具有最低键的输入记录因此必须是第一个输出记录所以提前开始输出不是一种选择。
然而,在[第10章](ch10.md)中仍然有一个大的假设即输入是有界的即已知和有限的大小所以批处理知道它何时完成了它的输入。例如MapReduce中心的排序操作必须读取其全部输入然后才能开始生成输出可能发生最后一个输入记录是具有最低键的输入记录因此必须是第一个输出记录所以提前开始输出不是一种选择。
实际上很多数据是无限的因为它随着时间的推移逐渐到达你的用户昨天和今天产生了数据明天他们将继续产生更多的数据。除非您停业否则这个过程永远都不会结束所以数据集从来就不会以任何有意义的方式“完成”【1】。因此批处理程序必须将数据人为地分成固定时间段的数据块例如在每天结束时处理一天的数据或者在每小时结束时处理一小时的数据。
日常批处理过程中的问题是,输入的更改只会在一天之后的输出中反映出来,这对于许多急躁的用户来说太慢了。为了减少延迟,我们可以更频繁地运行处理 - 比如说,在每秒钟的末尾,甚至连续地处理秒数的数据,完全放弃固定的时间片,并简单地处理每一个事件。这是流处理背后的想法。
一般来说“流”是指随着时间的推移逐渐可用的数据。这个概念出现在很多地方Unix的stdin和stdout编程语言lazy lists【2】文件系统API如Java的FileInputStreamTCP连接通过互联网传送音频和视频等等。
在本章中,我们将把事件流视为一个数据管理机制:我们在上一章中看到的批量数据的无界的,递增处理的对应物。我们将首先讨论流如何被表示,存储和通过网络传输。在第451页的“数据库和流”中我们将调查流和数据库之间的关系。最后在第464页的“Processing Streams”中,我们将探索连续处理这些流的方法和工具,以及它们可以用来构建应用程序的方法。
在本章中,我们将把事件流视为一个数据管理机制:我们在上一章中看到的批量数据的无界的,递增处理的对应物。我们将首先讨论流如何被表示,存储和通过网络传输。在“[数据库和流](#数据库和流)”中,我们将研究流和数据库之间的关系。最后在“[流处理](#流处理)”中,我们将探索连续处理这些流的方法和工具,以及它们可以用来构建应用程序的方法。
@ -27,19 +27,19 @@
在批处理领域,作业的输入和输出是文件(也许在分布式文件系统上)。什么是类似的流媒体?
当输入是一个文件(一个字节序列)时,第一个处理步骤通常是将其解析为一系列记录。在流处理的上下文中,记录通常被称为事件,但它本质上是一样的:一个小的,自包含的,不可变的对象,包含某个时间点发生的事情的细节。一个事件通常包含一个时间戳,指示何时根据时钟来发生(参见第288页上的“单调对时钟”)。
当输入是一个文件(一个字节序列)时,第一个处理步骤通常是将其解析为一系列记录。在流处理的上下文中,记录通常被称为事件,但它本质上是一样的:一个小的,自包含的,不可变的对象,包含某个时间点发生的事情的细节。一个事件通常包含一个时间戳,指示何时根据时钟来发生(参见“[单调钟与时钟](ch8.md#单调钟与时钟)”)。
例如发生的事情可能是用户采取的行动例如查看页面或进行购买。它也可能来源于机器例如来自温度传感器的周期性测量或者CPU利用率度量。在第391页上的“使用Unix工具进行批处理”的示例中Web服务器日志的每一行都是一个事件。
例如发生的事情可能是用户采取的行动例如查看页面或进行购买。它也可能来源于机器例如来自温度传感器的周期性测量或者CPU利用率度量。在“[使用Unix工具进行批处理](ch10.md#使用Unix工具进行批处理)”的示例中Web服务器日志的每一行都是一个事件。
事件可能被编码为文本字符串或JSON或者以某种二进制形式编码如第4章所述。这种编码允许您存储一个事件例如将其附加到一个文件将其插入关系表或将其写入文档数据库。它还允许您通过网络将事件发送到另一个节点以进行处理。
在批处理领域,作业的输入和输出是文件(也许在分布式文件系统上)。什么是类似的流媒体?
当输入是一个文件(一个字节序列)时,第一个处理步骤通常是将其解析为一系列记录。在流处理的上下文中,记录通常被称为事件,但它本质上是一样的:一个小的,自包含的,不可变的对象,包含某个时间点发生的事情的细节。一个事件通常包含一个时间戳,指示何时根据时钟来发生(参见第288页上的“单调对时钟”)。
当输入是一个文件(一个字节序列)时,第一个处理步骤通常是将其解析为一系列记录。在流处理的上下文中,记录通常被称为事件,但它本质上是一样的:一个小的,自包含的,不可变的对象,包含某个时间点发生的事情的细节。一个事件通常包含一个时间戳,指示何时根据时钟来发生(参见“[单调钟与时钟](ch8.md#单调钟与时钟)”)。
例如发生的事情可能是用户采取的行动例如查看页面或进行购买。它也可能来源于机器例如来自温度传感器的周期性测量或者CPU利用率度量。在第391页上的“使用Unix工具进行批处理”的示例中Web服务器日志的每一行都是一个事件。
例如发生的事情可能是用户采取的行动例如查看页面或进行购买。它也可能来源于机器例如来自温度传感器的周期性测量或者CPU利用率度量。在“[使用Unix工具进行批处理](ch10.md#使用Unix工具进行批处理)”的示例中Web服务器日志的每一行都是一个事件。
事件可能被编码为文本字符串或JSON或者以某种二进制形式编码如第4章所述。这种编码允许您存储一个事件例如将其附加到一个文件将其插入关系表或将其写入文档数据库。它还允许您通过网络将事件发送到另一个节点以进行处理。
事件可能被编码为文本字符串或JSON或者以某种二进制形式编码[第4章](ch4.md)所述。这种编码允许您存储一个事件,例如将其附加到一个文件,将其插入关系表,或将其写入文档数据库。它还允许您通过网络将事件发送到另一个节点以进行处理。
在批处理中文件被写入一次然后可能被多个作业读取。类似地在流媒体术语中一个事件由生产者也称为发布者或发送者生成一次然后由多个消费者订阅者或接收者进行处理【3】。在文件系统中文件名标识一组相关记录;在流媒体系统中,相关的事件通常被组合成一个主题或流。
@ -53,17 +53,17 @@
### 消息系统
通知消费者有关新事件的常用方法是使用消息传递系统:生产者发送包含事件的消息,然后将消息推送给消费者。我们之前在第136页的“消息传递数据流”中介绍了这些系统,但现在我们将详细介绍这些系统。
通知消费者有关新事件的常用方法是使用消息传递系统:生产者发送包含事件的消息,然后将消息推送给消费者。我们之前在“[消息传递中的数据流](ch4.md#消息传递中的数据流)”中介绍了这些系统,但现在我们将详细介绍这些系统。
像生产者和消费者之间的Unix管道或TCP连接这样的直接通信渠道将是实现消息传递系统的简单方法。但是大多数消息传递系统都在这个基本模型上扩展特别是Unix管道和TCP将一个发送者与一个接收者完全连接而一个消息传递系统允许多个生产者节点将消息发送到相同的主题并允许多个消费者节点接收主题中的消息。
在这个发布/订阅模式中,不同的系统采取多种方法,对于所有目的都没有一个正确的答案。为了区分这些系统,询问以下两个问题特别有帮助:
1. 如果生产者发送消息的速度比消费者能够处理的速度快,会发生什么?一般来说,有三种选择:系统可以放置消息,缓冲队列中的消息或应用背压(也称为流量控制;即阻止生产者发送更多的消息。例如Unix管道和TCP使用背压他们有一个小的固定大小的缓冲区如果填满发件人被阻塞直到收件人从缓冲区中取出数据参见“网络拥塞和排队”第282页)。
1. 如果生产者发送消息的速度比消费者能够处理的速度快,会发生什么?一般来说,有三种选择:系统可以放置消息,缓冲队列中的消息或应用背压(也称为流量控制;即阻止生产者发送更多的消息。例如Unix管道和TCP使用背压他们有一个小的固定大小的缓冲区如果填满发件人被阻塞直到收件人从缓冲区中取出数据参见“[网络拥塞和排队](ch8.md#网络拥塞和排队)”)。
如果消息被缓存在队列中那么了解该队列增长会发生什么很重要。如果队列不再适合内存或者将消息写入磁盘系统是否会崩溃如果是这样磁盘访问如何影响邮件系统的性能【6】
2. 如果节点崩溃或暂时脱机,会发生什么情况 - 是否有消息丢失?与数据库一样,持久性可能需要写入磁盘和/或复制的某种组合(请参阅第227页的侧栏“复制和耐久性”),这有成本。如果您有时可能会丢失消息,则可能在同一硬件上获得更高的吞吐量和更低的延迟。
2. 如果节点崩溃或暂时脱机,会发生什么情况 - 是否有消息丢失?与数据库一样,持久性可能需要写入磁盘和/或复制的某种组合(参阅“[复制和持久性](ch7.md#复制和持久性)”),这有成本。如果您有时可能会丢失消息,则可能在同一硬件上获得更高的吞吐量和更低的延迟。
消息丢失是否可以接受取决于应用程序。例如对于定期传输的传感器读数和指标偶尔丢失的数据点可能并不重要因为更新后的值将在短时间后发送。但是要注意如果大量的消息被丢弃那么衡量标准是不正确的【7】。如果您正在计数事件那么更重要的是它们可靠地传送因为每个丢失的消息都意味着不正确的计数器。
@ -77,9 +77,9 @@
* 无代理的消息库如ZeroMQ 【9】和nanomsg采取类似的方法通过TCP或IP多播实现发布/订阅消息传递。
StatsD 【10】和Brubeck 【7】使用不可靠的UDP消息传递来收集网络中所有机器的指标并对其进行监控。 在StatsD协议中如果接收到所有消息则计数器度量标准是正确的;使用UDP将使得度量标准最好近似为【11】。另请参阅“TCP与UDP”
StatsD 【10】和Brubeck 【7】使用不可靠的UDP消息传递来收集网络中所有机器的指标并对其进行监控。 在StatsD协议中如果接收到所有消息则计数器度量标准是正确的;使用UDP将使得度量标准最好近似为【11】。另请参阅“[TCP与UDP](ch8.md#TCP与UDP)
* 如果消费者在网络上公开服务生产者可以直接发送HTTP或RPC请求参阅第131页的“通过服务进行数据流REST和RPC”以将消息推送给使用者。这就是webhooks背后的想法【12】一种服务的回调URL被注册到另一个服务中并且每当事件发生时都会向该URL发出请求。
* 如果消费者在网络上公开服务生产者可以直接发送HTTP或RPC请求参阅“[通过服务进行数据流REST和RPC](ch4.md#通过服务进行数据流REST和RPC)以将消息推送给使用者。这就是webhooks背后的想法【12】一种服务的回调URL被注册到另一个服务中并且每当事件发生时都会向该URL发出请求。
尽管这些直接消息传递系统在设计它们的情况下运行良好,但是它们通常要求应用程序代码知道消息丢失的可能性。他们可以容忍的错误是相当有限的:即使协议检测并重新传输在网络中丢失的数据包,他们通常假设生产者和消费者不断在线。
@ -95,7 +95,7 @@
#### 消息代理与数据库进行比较
有些消息代理甚至可以使用XA或JTA参与两阶段提交协议请参阅第367页的“实践中的分布式事务”)。这个功能与数据库在本质上非常相似,尽管消息代理和数据库之间仍存在重要的实际差异:
有些消息代理甚至可以使用XA或JTA参与两阶段提交协议参阅“[实践中的分布式事务](ch9.md#实践中的分布式事务)”)。这个功能与数据库在本质上非常相似,尽管消息代理和数据库之间仍存在重要的实际差异:
* 数据库通常保留数据,直到被明确删除,而大多数消息代理在消息成功传递给消费者时自动删除消息。这样的消息代理不适合长期的数据存储。
* 由于他们很快删除了邮件大多数邮件经纪人都认为他们的工作集合相当小即队列很短。如果代理需要缓冲很多消息因为消费者速度较慢如果消息不再适合内存则可能会将消息泄漏到磁盘每个消息需要更长的时间处理整体吞吐量可能会降低【6】。
@ -119,21 +119,22 @@
![](img/fig11-1.png)
**图11-1。 a负载平衡分担消费者之间消费话题的工作; b扇出将消息传递给多个消费者。**
两种模式可以组合:例如,两个独立的消费者组可以每个订阅一个话题,使得每个组共同收到所有消息,但是在每个组内,只有一个节点接收每个消息。
#### 确认和重新交付
消费者可能会随时崩溃,所以可能发生的情况是经纪人向消费者提供消息,但消费者从不处理消费者,或者在消费者崩溃之前只处理消费者。为了确保消息不会丢失,消息代理使用确认:客户端必须明确告诉代理处理消息的时间,以便代理可以将其从队列中移除。
如果与客户端的连接关闭或超时而没有代理收到确认,则认为消息未处理,因此它将消息再次传递给另一个消费者。 (请注意,可能发生这样的消息实际上是完全处理的,但网络中的确认丢失了,处理这种情况需要一个原子提交协议,正如在第360页的“实践中的分布式事务”中所讨论的那样)
如果与客户端的连接关闭或超时而没有代理收到确认,则认为消息未处理,因此它将消息再次传递给另一个消费者。 (请注意,可能发生这样的消息实际上是完全处理的,但网络中的确认丢失了,处理这种情况需要一个原子提交协议,正如在“[实践中的分布式事务](ch9.md#实践中的分布式事务)”中所讨论的那样)
当与负载平衡相结合时这种重新传递行为对消息的排序有一个有趣的影响。在图11-2中消费者通常按照生产者发送的顺序处理消息。然而消费者2在处理消息m3时崩溃与此同时消费者1正在处理消息m4。未确认的消息m3随后被重新发送给消费者1结果消费者1按照m4m3m5的顺序处理消息。因此m3和m4不是以与生产者1发送的顺序相同的顺序交付的。
当与负载平衡相结合时,这种重新传递行为对消息的排序有一个有趣的影响。在[图11-2](img/fig11-2.png)消费者通常按照生产者发送的顺序处理消息。然而消费者2在处理消息m3时崩溃与此同时消费者1正在处理消息m4。未确认的消息m3随后被重新发送给消费者1结果消费者1按照m4m3m5的顺序处理消息。因此m3和m4不是以与生产者1发送的顺序相同的顺序交付的。
![](img/fig11-2.png)
**图11-2 消费者2在处理m3时崩溃因此稍后再次向消费者1递送**
即使消息代理试图保留消息的顺序如JMS和AMQP标准所要求的负载平衡与重新传递的组合也不可避免地导致消息被重新排序。为避免此问题您可以为每个使用者使用单独的队列即不使用负载衡功能)。如果消息是完全独立的,消息重新排序并不是一个问题,但是如果消息之间存在因果依赖关系,这一点很重要,我们将在后面的章节中看到。
即使消息代理试图保留消息的顺序如JMS和AMQP标准所要求的负载平衡与重新传递的组合也不可避免地导致消息被重新排序。为避免此问题您可以为每个使用者使用单独的队列即不使用负载衡功能)。如果消息是完全独立的,消息重新排序并不是一个问题,但是如果消息之间存在因果依赖关系,这一点很重要,我们将在后面的章节中看到。
### 分区日志
@ -141,7 +142,7 @@
数据库和文件系统采用相反的方法:写入数据库或文件的所有内容通常都要被永久记录下来,至少在某些人明确选择将其删除之前。
思维方式上的这种差异对如何创建派生数据有很大的影响。批处理过程的一个关键特征是,你可以反复运行它们,试验处理步骤,没有损坏输入的风险(因为输入是只读的)。 AMQP / JMS风格的消息并非如此如果确认导致从代理中删除消息则接收消息具有破坏性因此您不能再次运行同一消费者并期望得到相同的结果。
思维方式上的这种差异对如何创建派生数据有很大的影响。批处理过程的一个关键特征是,你可以反复运行它们,试验处理步骤,没有损坏输入的风险(因为输入是只读的)。 AMQP/JMS风格的消息并非如此如果确认导致从代理中删除消息则接收消息具有破坏性因此您不能再次运行同一消费者并期望得到相同的结果。
如果您将新消费者添加到消息系统,则通常只会开始接收在注册后发送的消息;任何先前的消息已经消失,无法恢复。将它与文件和数据库进行对比,您可以随时添加新客户端,并且可以读取过去任意写入的数据(只要应用程序没有明确覆盖或删除数据)。
@ -149,13 +150,13 @@
#### 使用日志进行消息存储
日志只是磁盘上只能追加记录的序列。我们先前在第3章中的日志结构存储引擎和预写日志的上下文中讨论了日志在第5章中也讨论了复制的上下文中。
日志只是磁盘上只能追加记录的序列。我们先前在第3章中的日志结构存储引擎和预写日志的上下文中讨论了日志[第5章](ch5.md)中也讨论了复制的上下文中。
可以使用相同的结构来实现消息代理:生产者通过将消息附加到日志的末尾来发送消息,并且消费者通过依次读取日志来接收消息。如果消费者到达日志的末尾,则等待通知新消息已被追加。 Unix工具tail -f监视数据被附加的文件基本上是这样工作的。
可以使用相同的结构来实现消息代理:生产者通过将消息附加到日志的末尾来发送消息,并且消费者通过依次读取日志来接收消息。如果消费者到达日志的末尾,则等待通知新消息已被追加。 Unix工具`tail -f`监视数据被附加的文件,基本上是这样工作的。
为了扩展到比单个磁盘提供更高的吞吐量可以对日志进行分区在第6章的意义上。然后可以在不同的机器上托管不同的分区使每个分区成为一个单独的日志可以独立于其他分区读取和写入。然后可以将一个话题定义为一组分段它们都携带相同类型的消息。这种方法如图11-3所示。
为了扩展到比单个磁盘提供更高的吞吐量,可以对日志进行分区(在[第6章](ch6.md)的意义上)。然后可以在不同的机器上托管不同的分区,使每个分区成为一个单独的日志,可以独立于其他分区读取和写入。然后可以将一个话题定义为一组分段,它们都携带相同类型的消息。这种方法如[图11-3](img/fig11-3.png)所示。
在每个分区中代理为每个消息分配一个单调递增的序列号或偏移量在图11-3中框中的数字是消息偏移量。这样的序列号是有意义的因为分区是仅附加的所以分区内的消息是完全有序的。没有跨不同分区的订购保证。
在每个分区中,代理为每个消息分配一个单调递增的序列号或偏移量(在[图11-3](img/fig11-3.png)中,框中的数字是消息偏移量)。这样的序列号是有意义的,因为分区是仅附加的,所以分区内的消息是完全有序的。没有跨不同分区的订购保证。
![](img/fig11-3.png)
@ -170,9 +171,9 @@ Apache Kafka 【17,18】Amazon Kinesis Streams 【19】和Twitter的Distribut
每个客户端然后使用分配的分区中的所有消息。通常情况下,当一个用户被分配了一个日志分区时,它会以直接的单线程的方式顺序地读取分区中的消息。这种粗粒度的负载均衡方法有一些缺点:
* 共享消费主题工作的节点数最多可以是该主题中的日志分区数,因为同一个分区内的消息被传递到同一个节点[^i]。
* 如果单个消息处理缓慢,则会阻止处理该分区中的后续消息(一种行头阻塞形式;请参阅第13页上的“描述性能”)。
* 如果单个消息处理缓慢,则会阻止处理该分区中的后续消息(一种行头阻塞形式;请参阅“[描述性能](ch1.md#描述性能)”)。
因此在处理消息可能代价高昂并且希望逐个消息地平行处理以及消息排序并不那么重要的情况下消息代理的JMS / AMQP风格是可取的。另一方面在消息吞吐量高的情况下每条消息的处理速度都很快消息的排序也是重要的基于日志的方法运行得很好。
因此在处理消息可能代价高昂并且希望逐个消息地平行处理以及消息排序并不那么重要的情况下消息代理的JMS/AMQP风格是可取的。另一方面在消息吞吐量高的情况下每条消息的处理速度都很快消息的排序也是重要的基于日志的方法运行得很好。
[^i]: 有可能创建一个负载均衡方案,在这个方案中,两个消费者通过读取全部消息来共享处理分区的工作,但是其中一个只考虑具有偶数偏移量的消息,而另一个消费者处理奇数编号的偏移量。或者,您可以将消息处理扩展到线程池,但这种方法会使消费者偏移管理变得复杂。一般来说,分区的单线程处理是可取的,并行分区可以通过使用更多的分区来增加。
@ -180,7 +181,7 @@ Apache Kafka 【17,18】Amazon Kinesis Streams 【19】和Twitter的Distribut
消耗一个分区依次可以很容易地判断哪些消息已经被处理:所有消息的偏移量小于消费者的当前偏移量已经被处理,并且具有更大偏移量的所有消息还没有被看到。因此,经纪人不需要跟踪每条消息的确认,只需要定期记录消费者的偏移。在这种方法中减少的簿记开销以及批处理和流水线的机会有助于提高基于日志的系统的吞吐量。
实际上,这种偏移量与单领先数据库复制中常见的日志序列号非常相似,我们在第151页的“设置新的跟踪者”中讨论了这种情况。在数据库复制中,日志序列号允许跟随者断开连接后,重新连接到领导,并在不跳过任何写入的情况下恢复复制。这里使用的原则完全相同:信息经纪人的行为像一个领导者数据库,而消费者就像一个追随者。
实际上,这种偏移量与单领先数据库复制中常见的日志序列号非常相似,我们在“[设置新从库](ch5.md#设置新从库)”中讨论了这种情况。在数据库复制中,日志序列号允许跟随者断开连接后,重新连接到领导,并在不跳过任何写入的情况下恢复复制。这里使用的原则完全相同:消息代理的行为像一个领导者数据库,而消费者就像一个追随者。
如果消费者节点失败,则使用者组中的另一个节点将被分配失败的使用者分区,并开始以最后记录的偏移量使用消息。如果消费者已经处理了后续的消息,但还没有记录他们的偏移量,那么在重新启动后这些消息将被第二次处理。本章后面我们将讨论处理这个问题的方法。
@ -190,13 +191,13 @@ Apache Kafka 【17,18】Amazon Kinesis Streams 【19】和Twitter的Distribut
这就意味着,如果一个消费者的速度慢,消费者的消费速度落后于消费者的偏移量,那么消费者的偏移量就会指向一个已经被删除的消费者。实际上,日志实现了一个有限大小的缓冲区,当旧的消息满时,它也被称为循环缓冲区或环形缓冲区。但是,由于该缓冲区在磁盘上,因此可能相当大。
让我们来做一个后台计算。在撰写本文时典型的大型硬盘驱动器容量为6TB顺序写入吞吐量为150MB / s。如果您以最快的速度写邮件则需要大约11个小时才能填满驱动器。因此磁盘可以缓存11个小时的消息之后它将开始覆盖旧的消息。即使您使用多个硬盘驱动器和机器这个比率也是一样的。在实践中部署很少使用磁盘的完整写入带宽所以日志通常可以保存几天甚至几周的缓冲区。
让我们来做一个后台计算。在撰写本文时典型的大型硬盘驱动器容量为6TB顺序写入吞吐量为150MB/s。如果您以最快的速度写邮件则需要大约11个小时才能填满驱动器。因此磁盘可以缓存11个小时的消息之后它将开始覆盖旧的消息。即使您使用多个硬盘驱动器和机器这个比率也是一样的。在实践中部署很少使用磁盘的完整写入带宽所以日志通常可以保存几天甚至几周的缓冲区。
不管你保留多长时间的消息一个日志的吞吐量或多或少保持不变因为无论如何每个消息都被写入磁盘【18】。这种行为与将邮件默认保存在内存中的消息传递系统形成鲜明对比如果队列变得太大只将其写入磁盘当这些系统开始写入磁盘时这些系统速度很快并且变慢得多所以吞吐量取决于保留的历史数量。
#### 当消费者跟不上生产者时
在“信息系统”第441页的开头我们讨论了如果消费者无法跟上生产者发送信息的速度的三种选择丢弃信息缓冲或施加背压。在这个分类法中基于日志的方法是一种缓冲形式具有较大但固定大小的缓冲区受可用磁盘空间的限制
在“[信息系统]()”第441页的开头我们讨论了如果消费者无法跟上生产者发送信息的速度的三种选择丢弃信息缓冲或施加背压。在这个分类法中基于日志的方法是一种缓冲形式具有较大但固定大小的缓冲区受可用磁盘空间的限制
如果消费者远远落后于它所要求的信息比保留在磁盘上的信息要旧,那么它将不能读取这些信息,所以代理人有效地丢弃了比缓冲区容量更大的旧信息。您可以监控消费者在日志头后面的距离,如果显着落后,则会发出警报。由于缓冲区很大,因此有足够的时间让人类操作员修复缓慢的消费者,并在消息开始丢失之前让其赶上。
@ -220,30 +221,31 @@ Apache Kafka 【17,18】Amazon Kinesis Streams 【19】和Twitter的Distribut
我们之前曾经说过,事件是某个时刻发生的事情的记录。发生的事情可能是用户操作(例如键入搜索查询)或传感器读取,但也可能是写入数据库。事情被写入数据库的事实是可以被捕获,存储和处理的事件。这一观察结果表明,数据库和数据流之间的连接不仅仅是磁盘上日志的物理存储 - 这是非常重要的。
事实上复制日志请参阅第158页上的“复制日志的实现”是数据库写入事件的流由领导者在处理事务时生成。追随者将写入流应用到他们自己的数据库副本从而最终得到相同数据的精确副本。复制日志中的事件描述发生的数据更改。
事实上,复制日志(参阅“[复制日志的实现](ch5.md#复制日志的实现)”)是数据库写入事件的流,由领导者在处理事务时生成。追随者将写入流应用到他们自己的数据库副本,从而最终得到相同数据的精确副本。复制日志中的事件描述发生的数据更改。
我们还在“[全序广播](ch9.md#全序广播)”中遇到了状态机复制原理,其中指出:如果每个事件代表对数据库的写入,并且每个副本按相同的顺序处理相同的事件,则副本将所有这些都以相同的最终状态结束。 (处理一个事件被认为是一个确定性的操作。)这只是事件流的另一种情况!
我们还在第348页的“全序广播”中遇到了状态机复制原理其中指出如果每个事件代表对数据库的写入并且每个副本按相同的顺序处理相同的事件则副本将所有这些都以相同的最终状态结束。 (处理一个事件被认为是一个确定性的操作。)这只是事件流的另一种情况!
在本节中,我们将首先看看异构数据系统中出现的一个问题,然后探讨如何通过将事件流的想法带入数据库来解决这个问题。
### 保持系统同步
正如我们在本书中所看到的没有一个系统能够满足所有的数据存储查询和处理需求。在实践中大多数不重要的应用程序需要结合多种不同的技术来满足他们的需求例如使用OLTP数据库来为用户请求提供服务缓存来加速常见请求处理全文索引搜索查询和用于分析的数据仓库。每个数据都有其自己的数据副本存储在自己的表示中并根据自己的目的进行了优化。
由于相同或相关的数据出现在不同的地方因此需要保持相互同步如果某个项目在数据库中进行了更新则还需要在缓存搜索索引和数据仓库中进行更新。对于数据仓库这种同步通常由ETL进程执行参见第91页的“数据仓库”),通常通过获取数据库的完整副本,转换数据库并将其批量加载到数据仓库中 - 换句话说一个批处理。同样我们在第419页的“批量工作流的输出”中看到了如何使用批处理过程创建搜索索引,建议系统和其他派生数据系统。
由于相同或相关的数据出现在不同的地方因此需要保持相互同步如果某个项目在数据库中进行了更新则还需要在缓存搜索索引和数据仓库中进行更新。对于数据仓库这种同步通常由ETL进程执行参见“[数据仓库](ch3.md#数据仓库)”),通常通过获取数据库的完整副本,转换数据库并将其批量加载到数据仓库中 —— 换句话说,一个批处理。同样,我们在“[批量工作流的输出](ch10.md#批量工作流的输出)”中看到了如何使用批处理过程创建搜索索引,建议系统和其他派生数据系统。
如果周期性的完整数据库转储过于缓慢,有时使用的替代方法是双重写入,其中应用程序代码在数据更改时明确写入每个系统:例如,首先写入数据库,然后更新搜索索引,然后使缓存条目无效(或者甚至同时执行这些写入)。
但是双重写入有一些严重的问题其中一个是图11-4所示的竞争条件。在这个例子中两个客户端同时想要更新一个项目X客户端1想要将值设置为A客户端2想要将其设置为B.两个客户端首先将新值写入数据库然后将其写入到搜索索引。由于运行时间不正确这些请求是交错的数据库首先看到从客户端1的写入将值设置为A然后从客户端2写入将值设置为B因此数据库中的最终值为B.搜索索引首先看到来自客户端2然后是客户端1的写入所以搜索索引中的最终值是A.这两个系统现在永久地不一致,即使没有发生错误。
但是,双重写入有一些严重的问题,其中一个是[图11-4](img/fig11-4.png)所示的竞争条件。在这个例子中两个客户端同时想要更新一个项目X客户端1想要将值设置为A客户端2想要将其设置为B.两个客户端首先将新值写入数据库然后将其写入到搜索索引。由于运行时间不正确这些请求是交错的数据库首先看到从客户端1的写入将值设置为A然后从客户端2写入将值设置为B因此数据库中的最终值为B.搜索索引首先看到来自客户端2然后是客户端1的写入所以搜索索引中的最终值是A.这两个系统现在永久地不一致,即使没有发生错误。
![](img/fig11-4.png)
**图11-4 在数据库中X首先被设置为A然后被设置为B而在搜索索引处写入以相反的顺序到达**
除非有一些额外的并发检测机制,例如我们在第184页上的“检测并发写入”中讨论的版本向量,否则您甚至不会注意到发生了并发写入 - 一个值将简单地以无提示方式覆盖另一个值。
除非有一些额外的并发检测机制,例如我们在“[检测并发写入](ch5.md#检测并发写入)”中讨论的版本向量,否则您甚至不会注意到发生了并发写入 —— 一个值将简单地以无提示方式覆盖另一个值。
双重写入的另一个问题是其中一个写入可能会失败,而另一个成功。这是一个容错问题,而不是一个并发问题,但也会造成两个系统互相矛盾的结果。确保它们都成功或者两者都失败是原子提交问题的一个例子,这个问题的解决是昂贵的(参阅第354页上的“原子提交和两阶段提交2PC
双重写入的另一个问题是其中一个写入可能会失败,而另一个成功。这是一个容错问题,而不是一个并发问题,但也会造成两个系统互相矛盾的结果。确保它们都成功或者两者都失败是原子提交问题的一个例子,这个问题的解决是昂贵的(参阅“[原子提交和两阶段提交2PC](ch7.md#原子提交和两阶段提交2PC)”)。
如果只有一个复制的数据库和一个领导者那么这个领导者决定了写入顺序所以状态机复制方法可以在数据库的副本中工作。然而在图11-4中没有一个领导者数据库可能有一个领导者搜索索引可能有一个领导者但是既不在另一个领导者之后也可能发生冲突参见“多领导者复制“
如果只有一个复制的数据库和一个领导者,那么这个领导者决定了写入顺序,所以状态机复制方法可以在数据库的副本中工作。然而,在[图11-4](img/fig11-4.png)中,没有一个领导者:数据库可能有一个领导者,搜索索引可能有一个领导者,但是既不在另一个领导者之后,也可能发生冲突(参见“[多领导者复制](ch5.md#多领导者复制)“)。
如果实际上只有一个领导者(例如数据库),并且如果我们可以使搜索索引成为数据库的追随者,情况会更好。但这在实践中可能吗?
@ -257,21 +259,23 @@ Apache Kafka 【17,18】Amazon Kinesis Streams 【19】和Twitter的Distribut
最近人们对变更数据捕获CDC越来越感兴趣它是观察写入数据库的所有数据变化并将其提取出来并将其复制到其他系统中的过程。 CDC特别感兴趣的是如果改变可以立即用于流可以立即写入。
例如您可以捕获数据库中的更改并不断将相同的更改应用于搜索索引。如果更改的日志以相同的顺序应用则可以预期搜索索引中的数据与数据库中的数据匹配。搜索索引和任何其他派生的数据系统只是变化流的消费者如图11-5所示。
例如,您可以捕获数据库中的更改并不断将相同的更改应用于搜索索引。如果更改的日志以相同的顺序应用,则可以预期搜索索引中的数据与数据库中的数据匹配。搜索索引和任何其他派生的数据系统只是变化流的消费者,如[图11-5](img/fig11-5.png)所示。
![](img/fig11-5.png)
**图11-5 将数据按顺序写入一个数据库,然后按照相同的顺序将这些更改应用到其他系统**
实施更改数据捕获
#### 实现变更数据捕获
我们可以调用日志消费者导出的数据系统,正如在第三部分的介绍中所讨论的:存储在搜索索引和数据仓库中的数据只是记录系统中数据的另一个视图。更改数据捕获是一种机制,可确保对记录系统所做的所有更改都反映在派生数据系统中,以便派生系统具有数据的准确副本。
从本质上说改变数据捕获使得一个数据库成为领导者从中捕获变化的数据库并将其他人变成追随者。基于日志的消息代理非常适合从源数据库传输更改事件因为它保留了消息的排序避免了图11-2的重新排序问题
数据库触发器可用于通过注册触发器来实现更改数据捕获(参阅第152页的“基于触发器的复制”),这些触发器可观察数据表的所有更改,并将相应的条目添加到更改日志表中。但是,他们往往是脆弱的,并有显着的性能开销。解析复制日志可以是一个更强大的方法,但它也带来了挑战,例如处理模式更改。
数据库触发器可用于通过注册触发器来实现更改数据捕获(参阅“[基于触发器的复制](ch5.md#基于触发器的复制)”),这些触发器可观察数据表的所有更改,并将相应的条目添加到更改日志表中。但是,他们往往是脆弱的,并有显着的性能开销。解析复制日志可以是一个更强大的方法,但它也带来了挑战,例如处理模式更改。
LinkedIn的Databus 【25】Facebook的Wormhole 【26】和Yahoo的Sherpa 【27】大规模地使用这个想法。 Bottled Water使用解码预写日志的API来实现PostgreSQL的CDC 【28】Maxwell和Debezium通过解析binlog为MySQL做类似的事情【29,30,31】Mongoriver读取MongoDB oplog 【32,33】 而GoldenGate为Oracle提供类似的功能【34,35】。
像消息代理一样,更改数据捕获通常是异步的:记录数据库系统不会等待更改应用到消费者,然后再进行更改。这种设计具有的操作优势是添加缓慢的使用者不会影响记录系统太多,但是它具有所有复制滞后问题的缺点(请参见第161页中的“复制滞后问题”)。
像消息代理一样,更改数据捕获通常是异步的:记录数据库系统不会等待更改应用到消费者,然后再进行更改。这种设计具有的操作优势是添加缓慢的使用者不会影响记录系统太多,但是它具有所有复制滞后问题的缺点(参见“[复制延迟问题](ch5.md#复制延迟问题)”)。
#### 初始快照
@ -285,13 +289,13 @@ LinkedIn的Databus 【25】Facebook的Wormhole 【26】和Yahoo的Sherpa
如果只能保留有限的日志历史记录,则每次需要添加新的派生数据系统时都需要执行快照过程。但是,日志压缩提供了一个很好的选择。
在日志结构化的存储引擎的情况下,我们先讨论了第72页的“Hash索引”中的日志压缩参见图3-2的示例。原理很简单存储引擎使用相同的密钥定期查找日志记录丢弃任何重复内容并且只保留每个密钥的最新更新。这个压缩和合并过程在后台运行。
在日志结构化的存储引擎的情况下,我们先讨论了“[Hash索引](ch3.md#Hash索引)”中的日志压缩(参见[图3-2](img/fig3-2.png)的示例)。原理很简单:存储引擎使用相同的密钥定期查找日志记录,丢弃任何重复内容,并且只保留每个密钥的最新更新。这个压缩和合并过程在后台运行。
在日志结构存储引擎中,具有特殊空值(逻辑删除)的更新指示删除了一个密钥,并在日志压缩过程中将其删除。但只要密钥不被覆盖或删除,它就永远留在日志中。这种压缩日志所需的磁盘空间仅取决于数据库的当前内容,而不取决于数据库中发生的写入次数。如果相同的密钥经常被覆盖,则以前的值将最终被垃圾收集,并且只保留最新的值。
在基于日志的消息代理和更改数据捕获方面相同的想法也适用。如果CDC系统设置为每个更改都有一个主键并且每个键的更新都替换了该键的以前的值那么仅保留最近写入的特定键就足够了。
现在无论何时要重建派生数据系统如搜索索引都可以从日志压缩主题的偏移量0开始新的使用者然后依次扫描日志中的所有消息。日志保证包含数据库中每个键的最新值也可能是一些较旧的值 - 换句话说您可以使用它来获取数据库内容的完整副本而无需获取CDC的另一个快照源数据库。
现在无论何时要重建派生数据系统如搜索索引都可以从日志压缩主题的偏移量0开始新的使用者然后依次扫描日志中的所有消息。日志保证包含数据库中每个键的最新值也可能是一些较旧的值—— 换句话说您可以使用它来获取数据库内容的完整副本而无需获取CDC的另一个快照源数据库。
Apache Kafka支持此日志压缩功能。正如我们将在本章后面看到的它允许消息代理被用于持久存储而不仅仅是用于临时消息。
@ -309,14 +313,14 @@ Kafka Connect 【41】致力于将广泛的数据库系统的变更数据捕获
与更改数据捕获类似,事件采购涉及将所有对应用程序状态的更改存储为更改事件的日志。最大的区别是事件源代码在不同的抽象层次上应用了这个想法:
* 在更改数据捕获中,应用程序以可变方式使用数据库,随意更新和删除记录。从数据库中提取较低级别的更改日志(例如,通过解析复制日志),从而确保从数据库中提取的写入顺序与实际写入的顺序相匹配,从而避免图中的竞争条件11-4。写入数据库的应用程序不需要知道CDC正在发生。
* 在更改数据捕获中,应用程序以可变方式使用数据库,随意更新和删除记录。从数据库中提取较低级别的更改日志(例如,通过解析复制日志),从而确保从数据库中提取的写入顺序与实际写入的顺序相匹配,从而避免[11-4](img/fig11-4.png)中的竞争条件。写入数据库的应用程序不需要知道CDC正在发生。
* 在事件源中,应用程序逻辑是基于写入事件日志的不可变事件而显式构建的。在这种情况下,事件存储是附加的,更新或删除是不鼓励或禁止的。事件旨在反映应用程序级别发生的事情,而不是低级状态更改。
事件源是一种强大的数据建模技术:从应用程序的角度来看,将用户的行为记录为不可变的事件更有意义,而不是记录这些行为对可变数据库的影响。事件采购使得随着时间的推移而逐渐发展应用程序变得更加容易,通过更容易理解事情发生的原因以及防范应用程序错误(请参阅“不可变事件的优点”),帮助进行调试。
例如,存储“学生取消课程注册”事件清楚地表达了单一行为的中性意图,而副作用“从注册表中删除了一个条目,并且一个取消原因被添加到学生反馈表“嵌入了很多有关方式的假设数据稍后将被使用。如果引入新的应用程序功能,例如“将地点提供给等待列表中的下一个人” - 事件采购方法允许将新的副作用轻松地链接到现有事件上。
例如,存储“学生取消课程注册”事件清楚地表达了单一行为的中性意图,而副作用“从注册表中删除了一个条目,并且一个取消原因被添加到学生反馈表“嵌入了很多有关方式的假设数据稍后将被使用。如果引入新的应用程序功能,例如“将地点提供给等待列表中的下一个人” —— 事件顺序方法允许将新的副作用轻松地链接到现有事件上。
事件采购类似于编年史数据模型【45】事件日志和事实表之间也有相似之处您可以在星型模式中找到它参阅第93页上的“星星和雪花:分析模式”) 。
事件顺序类似于编年史数据模型【45】事件日志和事实表之间也有相似之处您可以在星型模式中找到它参阅“[型和雪花型:分析的模式](ch3.md#和雪花:分析模式)”) 。
专门的数据库如Event Store 【46】已经被开发来支持使用事件采购的应用程序但总的来说这个方法是独立于任何特定的工具的。传统的数据库或基于日志的消息代理也可以用来构建这种风格的应用程序。
@ -324,7 +328,7 @@ Kafka Connect 【41】致力于将广泛的数据库系统的变更数据捕获
事件日志本身并不是很有用,因为用户通常期望看到系统的当前状态,而不是修改的历史。例如,在购物网站上,用户期望能够看到他们购物车的当前内容,而不是他们对购物车所做的所有改变的附加列表。
因此,使用事件源的应用程序需要记录事件的日志(表示写入系统的数据),并将其转换为适合向用户显示的应用程序状态(从系统读取数据的方式[47 ])。这种转换可以使用任意的逻辑,但它应该是确定性的,以便您可以再次运行它并从事件日志中派生相同的应用程序状态。
因此,使用事件源的应用程序需要记录事件的日志(表示写入系统的数据),并将其转换为适合向用户显示的应用程序状态(从系统读取数据的方式【47】)。这种转换可以使用任意的逻辑,但它应该是确定性的,以便您可以再次运行它并从事件日志中派生相同的应用程序状态。
与更改数据捕获一样,重放事件日志可以让您重新构建系统的当前状态。但是,日志压缩需要以不同的方式处理:
@ -337,7 +341,7 @@ Kafka Connect 【41】致力于将广泛的数据库系统的变更数据捕获
事件采购哲学是仔细区分事件和命令【48】。当来自用户的请求首先到达时它最初是一个命令在这一点上它可能仍然失败例如因为违反了一些完整性条件。应用程序必须首先验证它是否可以执行该命令。如果验证成功并且命令被接受则它变成一个持久且不可变的事件。
例如,如果用户试图注册特定用户名,或在飞机上或剧院中预定座位,则应用程序需要检查用户名或座位是否已被占用。 我们先前在第364页的“容错概念”中讨论过这个例子。当检查成功时应用程序可以生成一个事件来指示特定的用户名是由特定的用户ID注册的座位已经预留给特定的顾客。
例如,如果用户试图注册特定用户名,或在飞机上或剧院中预定座位,则应用程序需要检查用户名或座位是否已被占用。 我们先前在第364页的“[容错概念](ch8.md#容错概念)”中讨论过这个例子。当检查成功时应用程序可以生成一个事件来指示特定的用户名是由特定的用户ID注册的座位已经预留给特定的顾客。
在事件发生的时候,这成为事实。即使客户稍后决定更改或取消预订,事实仍然是事实,他们以前曾为某个特定的座位进行预订,而更改或取消是稍后添加的单独事件。
@ -365,7 +369,7 @@ Kafka Connect 【41】致力于将广泛的数据库系统的变更数据捕获
> 事务日志记录对数据库所做的所有更改。高速追加是更改日志的唯一方法。从这个角度来看,数据库的内容会保存日志中最新记录值的缓存。事实是日志。数据库是日志子集的缓存。该缓存子集恰好是来自日志的每个记录和索引值的最新值。
日志压缩(如第456页的“日志压缩”中所述)是一种桥接日志和数据库状态之间区别的方法:它只保留每条记录的最新版本,并丢弃被覆盖的版本。
日志压缩(如“[日志压缩](#日志压缩)”中所述)是一种桥接日志和数据库状态之间区别的方法:它只保留每条记录的最新版本,并丢弃被覆盖的版本。
#### 不可变事件的优点
@ -379,17 +383,17 @@ Kafka Connect 【41】致力于将广泛的数据库系统的变更数据捕获
#### 从同一事件日志中获取多个视图
而且通过从不变事件日志中分离可变状态可以从事件的相同日志中派生出几个不同的面向读取的表示。这就像一个流的多个消费者一样工作图11-5例如分析数据库Druid使用这种方法从Kafka直接获取【55】Pista chio是一个分布式的键值存储使用Kafka作为提交日志【56】Kafka Connect接收器可以将来自Kafka的数据导出到各种不同的数据库和索引【41】。对于许多其他存储和索引系统如搜索服务器来说类似地从分布式日志中获取输入也是有意义的请参阅第455页上的“保持系统同步”
而且,通过从不变事件日志中分离可变状态,可以从事件的相同日志中派生出几个不同的面向读取的表示。这就像一个流的多个消费者一样工作([图11-5](img/fig11-5.png)例如分析数据库Druid使用这种方法从Kafka直接获取【55】Pista chio是一个分布式的键值存储使用Kafka作为提交日志【56】Kafka Connect接收器可以将来自Kafka的数据导出到各种不同的数据库和索引【41】。对于许多其他存储和索引系统如搜索服务器来说类似地从分布式日志中获取输入也是有意义的请参阅第455页上的“保持系统同步”
从事件日志到数据库有一个明确的转换步骤,可以更容易地随时间推移您的应用程序:如果您想要引入一个以新的方式呈现现有数据的新功能,您可以使用事件日志来构建一个单独的新功能的读取优化视图,并与现有的一起运行
系统而不必修改它们。并行运行旧系统和新系统通常比在现有系统中执行复杂的模式迁移更容易。一旦旧的系统不再需要你可以简单地关闭它并回收它的资源【47,57】。
如果您不必担心如何查询和访问数据那么存储数据通常是非常简单的。模式设计索引和存储引擎的许多复杂性都是希望支持某些查询和访问模式的结果参见第3章。出于这个原因通过将数据写入的形式与读取形式分开并允许几个不同的读取视图可以获得很大的灵活性。这个想法有时被称为命令查询责任分离CQRS【42,58,59】。
如果您不必担心如何查询和访问数据,那么存储数据通常是非常简单的。模式设计,索引和存储引擎的许多复杂性都是希望支持某些查询和访问模式的结果(参见[第3章](ch3.md)。出于这个原因通过将数据写入的形式与读取形式分开并允许几个不同的读取视图可以获得很大的灵活性。这个想法有时被称为命令查询责任分离CQRS【42,58,59】。
数据库和模式设计的传统方法是基于数据必须以与查询相同的形式写入的谬误。有关正常化和非规范化的争论请参阅第31页上的“多对一和多对多关系”如果可以将数据从写入优化的事件日志转换为读取优化的应用程序状态则变得基本无关紧要在读取优化的视图中对数据进行非规范化是完全合理的因为翻译过程为您提供了一种机制使其与事件日志保持一致。
第11页的“描述负载”中我们讨论了Twitter的家庭时间表,最近一个特定用户正在关注的人(如邮箱)写的最近发布的推文缓存。这是阅读优化状态的另一个例子:家庭时间表高度变形,因为你的推文在所有跟随你的人的时间线上都是重复的。然而,扇出服务保持这种复制状态与新的推文和新的以下关系保持同步,这保持了复制的可管理性。
“[描述负载](ch1.md#描述负载)”中,我们讨论了推特主页时间表,最近一个特定用户正在关注的人(如邮箱)写的最近发布的推文缓存。这是阅读优化状态的另一个例子:家庭时间表高度变形,因为你的推文在所有跟随你的人的时间线上都是重复的。然而,扇出服务保持这种复制状态与新的推文和新的以下关系保持同步,这保持了复制的可管理性。
#### 并发控制
@ -403,15 +407,15 @@ Kafka Connect 【41】致力于将广泛的数据库系统的变更数据捕获
#### 不变性的限制
许多不使用事件源模型的系统依赖于不可变性:各种数据库在内部使用不可变的数据结构或多版本数据来支持时间点快照(参见“索引和快照隔离” )。 GitMercurial和Fossil等版本控制系统也依靠不可变的数据来保存文件的版本历史记录。
许多不使用事件源模型的系统依赖于不可变性:各种数据库在内部使用不可变的数据结构或多版本数据来支持时间点快照(参见“[索引和快照隔离](ch7.md#索引和快照隔离)” )。 GitMercurial和Fossil等版本控制系统也依靠不可变的数据来保存文件的版本历史记录。
永远保持所有变化的不变的历史在多大程度上是可行的?答案取决于数据集中的流失量。一些工作负载主要是添加数据,很少更新或删除;他们很容易使不变。其他工作负载在较小的数据集上有较高的更新和删除率;在这些情况下不可改变的历史可能变得过于庞大碎片化可能成为一个问题压缩和垃圾收集的表现对于操作的鲁棒性变得至关重要【60,61】。
除了性能方面的原因外,也可能出于管理方面的原因需要删除数据的情况,尽管这些数据都是不可变的。例如,隐私条例可能要求在关闭帐户后删除用户的个人信息,数据保护立法可能要求删除错误的信息,或者可能需要包含敏感信息的意外泄露。
在这种情况下,仅仅在日志中添加另一个事件来指示先前的数据应该被视为删除是不够的 - 您实际上是想重写历史并假装数据从未写在第一位。例如Datomic调用这个特性excision 【62】而Fossil版本控制系统有一个类似的概念叫做shunning 【63】。
在这种情况下,仅仅在日志中添加另一个事件来指示先前的数据应该被视为删除是不够的 —— 您实际上是想重写历史并假装数据从未写在第一位。例如Datomic调用这个特性excision 【62】而Fossil版本控制系统有一个类似的概念叫做shunning 【63】。
真正的删除数据是非常困难的【64】因为拷贝可以存在于很多地方例如存储引擎文件系统和SSD通常写入一个新的位置而不是覆盖到位【52】而备份通常是故意不可改变的防止意外删除或腐败。删除更多的是“使检索数据更难”而不是“使检索数据不可能”。无论如何有时您必须尝试正如我们在“立法和自律”中所看到的第542页
真正的删除数据是非常困难的【64】因为拷贝可以存在于很多地方例如存储引擎文件系统和SSD通常写入一个新的位置而不是覆盖到位【52】而备份通常是故意不可改变的防止意外删除或腐败。删除更多的是“使检索数据更难”而不是“使检索数据不可能”。无论如何有时您必须尝试正如我们在“[立法和自律](ch12.md#立法和自律)”中所看到的。
@ -419,15 +423,15 @@ Kafka Connect 【41】致力于将广泛的数据库系统的变更数据捕获
到目前为止,本章中我们已经讨论了流的来源(用户活动事件,传感器和写入数据库),我们讨论了流如何传输(通过直接消息传送,通过消息代理和事件日志)。
剩下的就是讨论一下你可以用流做什么 - 也就是说,你可以处理它。一般来说,有三种选择:
剩下的就是讨论一下你可以用流做什么 —— 也就是说,你可以处理它。一般来说,有三种选择:
1. 您可以将事件中的数据写入数据库缓存搜索索引或类似的存储系统然后由其他客户端查询。如图11-5所示这是保持数据库与系统其他部分发生更改同步的好方法 - 特别是当流消费者是写入数据库的唯一客户端时。写入存储系统的流程相当于我们在“批处理工作流程的输出”页面上讨论的内容。
2. 您可以以某种方式将事件推送给用户,例如通过发送电子邮件警报或推送通知,或通过将事件流式传输到可实时显示的实时仪表板。在这种情况下,人是流的最终消费者。
3. 您可以处理一个或多个输入流以产生一个或多个输出流。数据流可能会经过由几个这样的处理阶段组成的流水线然后才会输出选项1或2
在本章的其余部分中我们将讨论选项3处理流以产生其他派生流。处理这样的流的代码片段被称为操作员或作业。它与我们在第10章中讨论过的Unix进程和MapReduce作业密切相关数据流的模式是相似的一个流处理器以只读的方式使用输入流并将其输出写入一个不同的位置时尚。
在本章的其余部分中我们将讨论选项3处理流以产生其他派生流。处理这样的流的代码片段被称为操作员或作业。它与我们在[第10章](ch10.md)中讨论过的Unix进程和MapReduce作业密切相关数据流的模式是相似的一个流处理器以只读的方式使用输入流并将其输出写入一个不同的位置时尚。
流处理器中的分区和并行化模式也非常类似于第10章中介绍的MapReduce和数据流引擎因此我们不在这里重复这些主题。基本的映射操作如转换和过滤记录也是一样的。
流处理器中的分区和并行化模式也非常类似于[第10章](ch10.md)中介绍的MapReduce和数据流引擎因此我们不在这里重复这些主题。基本的映射操作如转换和过滤记录也是一样的。
批量作业的一个关键区别是流不会结束。这种差别有很多含义:正如本章开始部分所讨论的,排序对无界数据集没有意义,因此不能使用排序合并联接(请参阅“减少联接和分组”)。容错机制也必须改变:对于已经运行了几分钟的批处理作业,可以简单地从头开始重新启动失败的任务,但是对于已经运行数年的流作业,在开始后重新开始崩溃可能不是一个可行的选择。
@ -460,18 +464,20 @@ CEP的实现包括Esper 【69】IBM InfoSphere Streams 【70】ApamaTIB
* 计算一段时间内某个值的滚动平均值
* 将当前的统计数据与以前的时间间隔进行比较(例如,检测趋势或提醒与上周同期相比过高或过低的指标)
这些统计信息通常是在固定的时间间隔内进行计算的例如您可能想知道在过去5分钟内每秒对服务的平均查询次数以及在此期间的第99百分位响应时间。在几分钟内平均从一秒钟到下一秒钟平滑无关的波动同时还能及时了解交通模式的任何变化。您汇总的时间间隔称为窗口我们将在第468页的“关于时间的推理”中更详细地讨论窗口。
这些统计信息通常是在固定的时间间隔内进行计算的例如您可能想知道在过去5分钟内每秒对服务的平均查询次数以及在此期间的第99百分位响应时间。在几分钟内平均从一秒钟到下一秒钟平滑无关的波动同时还能及时了解交通模式的任何变化。您汇总的时间间隔称为窗口我们将在“[关于时间的推理](#关于时间的推理)”中更详细地讨论窗口。
流分析系统有时使用概率算法例如Bloom filter我们在第79页的“性能优化”中遇到过设置成员资格HyperLogLog 【72】基数估计以及各种百分比估计算法请参阅“Percentiles in Practice “第16页。概率算法产生近似的结果但是具有在流处理器中比精确算法需要少得多的存储器的优点。近似算法的使用有时会使人们相信流处理系统总是有损和不精确的但这是错误的流处理没有任何内在的近似而概率算法只是一个优化【73】。
流分析系统有时使用概率算法例如Bloom filter我们在“[性能优化](ch3.md#性能优化)”中遇到过设置成员资格HyperLogLog 【72】基数估计以及各种百分比估计算法请参阅“[实践中的百分位点](ch1.md#实践中的百分位点)“第16页。概率算法产生近似的结果但是具有在流处理器中比精确算法需要少得多的存储器的优点。近似算法的使用有时会使人们相信流处理系统总是有损和不精确的但这是错误的流处理没有任何内在的近似而概率算法只是一个优化【73】。
许多开源分布式流处理框架的设计都是以分析为基础的例如Apache StormSpark StreamingFlinkConcordSamza和Kafka Streams 【74】。托管服务包括Google Cloud Dataflow和Azure Stream Analytics。
#### 保持物化视图
我们在第451页的“数据库和数据流”中看到可以使用数据库更改流来保持派生数据系统如缓存搜索索引和数据仓库与源数据库保持最新。我们可以将这些示例视为维护实体化视图的具体情况请参阅“聚合数据多维数据集和实例化视图”第101页导出某个数据集的替代视图以便可以高效地查询它并在底层数据更改【50】。
我们在“[数据库和数据流](#数据库和数据流)”中看到,可以使用数据库更改流来保持派生数据系统(如缓存,搜索索引和数据仓库)与源数据库保持最新。我们可以将这些示例视为维护实体化视图的具体情况(请参阅“[聚合:数据立方体和物化视图](ch3.md#聚合:数据立方体和物化视图)”导出某个数据集的替代视图以便可以高效地查询它并在底层数据更改【50】。
同样,在事件采购中,应用程序状态通过应用事件日志来维护;这里的应用状态也是一种物化视图。与流分析场景不同,在某个时间窗口内仅考虑事件通常是不够的:构建物化视图可能需要在任意时间段内的所有事件,除了可能由日志压缩丢弃的任何过时事件(请参阅“[日志压缩](#日志压缩)“)。实际上,您需要一个可以一直延伸到一开始的窗口。
同样,在事件采购中,应用程序状态通过应用事件日志来维护;这里的应用状态也是一种物化视图。与流分析场景不同,在某个时间窗口内仅考虑事件通常是不够的:构建物化视图可能需要在任意时间段内的所有事件,除了可能由日志压缩丢弃的任何过时事件(请参阅“日志压缩“)。实际上,您需要一个可以一直延伸到一开始的窗口。
原则上,任何流处理器都可以用于物化视图维护,尽管永久维护事件的需要与一些主要在有限持续时间的窗口上运行的面向分析的框架的假设背道而驰。 Samza和Kafka Streams支持这种用法建立在Kafka对夯实的支持上【75】。
#### 在流上搜索
@ -484,14 +490,14 @@ CEP的实现包括Esper 【69】IBM InfoSphere Streams 【70】ApamaTIB
#### 消息传递和RPC
在第136页的“消息传递数据流”中我们讨论了消息传递系统作为RPC的替代方案即作为通信服务的机制例如在参与者模型中所使用的。虽然这些系统也是基于消息和事件但我们通常不会将它们视为流处理器
在第136页的“[消息传递数据流](ch4.md#消息传递数据流)”中我们讨论了消息传递系统作为RPC的替代方案即作为通信服务的机制例如在参与者模型中所使用的。虽然这些系统也是基于消息和事件但我们通常不会将它们视为流处理器
Actor框架主要是管理通信模块的并发和分布式执行的机制而流处理主要是数据管理技术。
* 参与者之间的交流往往是短暂的,而且是一对一的,而事件日志则是持久的,多用户的。
* 参与者可以以任意方式进行通信(包括循环请求/响应模式),但流处理器通常设置在非循环流水线中,其中每个流是一个特定作业的输出,并且从一组明确定义的输入流派生。
也就是说RPC类系统和流处理之间有一些交叉区域。例如Apache Storm有一个称为分布式RPC的功能它允许将用户查询分散到一系列也处理事件流的节点上;这些查询然后与来自输入流的事件交织结果可以被汇总并发回给用户【78】。 (另请参阅“多分区数据处理”第514页
也就是说RPC类系统和流处理之间有一些交叉区域。例如Apache Storm有一个称为分布式RPC的功能它允许将用户查询分散到一系列也处理事件流的节点上;这些查询然后与来自输入流的事件交织结果可以被汇总并发回给用户【78】。 (另参阅“[多分区数据处理](ch12.md#多分区数据处理)”
也可以使用actor框架来处理流。但是很多这样的框架在崩溃的情况下不能保证消息的传递所以这个过程不是容错的除非你实现了额外的重试逻辑。
@ -501,13 +507,13 @@ Actor框架主要是管理通信模块的并发和分布式执行的机制
在批处理过程中,处理任务通过大量的历史事件迅速收缩。如果需要按时间分类,批处理需要查看每个事件中嵌入的时间戳。查看运行批处理的机器的系统时钟没有意义,因为处理运行的时间与事件实际发生的时间无关。
批处理可以在几分钟内读取一年的历史事件;在大多数情况下,感兴趣的时间表是历史的一年,而不是几分钟的处理。而且,在事件中使用时间戳允许处理确定性的:在相同的输入上再次运行相同的处理过程会得到相同的结果(参阅“故障容错”在页面429)。
批处理可以在几分钟内读取一年的历史事件;在大多数情况下,感兴趣的时间表是历史的一年,而不是几分钟的处理。而且,在事件中使用时间戳允许处理确定性的:在相同的输入上再次运行相同的处理过程会得到相同的结果(参阅“[故障容错](ch10.md#故障容错)”)。
另一方面许多流处理框架使用处理机器上的本地系统时钟处理时间来确定窗口【79】。这种方法具有简单的优点事件创建和事件处理之间的延迟可以忽略不计。然而如果存在任何显着的处理滞后即处理可能比事件实际发生的时间显着晚则会中断处理。
#### 事件时间与处理时间
有许多原因可能会延迟处理排队网络故障请参阅第267页的“不可靠的网络”导致消息代理或处理器中出现争用的性能问题重新启动流消费者或重新处理过去的事件参阅第451页的“重播旧消息”),或者在修复代码中的错误之后进行恢复。
有许多原因可能会延迟处理排队网络故障请参阅第267页的“[不可靠的网络](ch8.md#不可靠的网络)”),导致消息代理或处理器中出现争用的性能问题,重新启动流消费者或重新处理过去的事件(参阅“[重播旧消息](#重播旧消息)”),或者在修复代码中的错误之后进行恢复。
而且消息延迟还可能导致消息的不可预知的排序。例如假设用户首先发出一个Web请求由Web服务器A处理然后发出第二个请求由服务器B处理。 A和B发出描述他们处理的请求的事件但是B的事件在A的事件发生之前到达消息代理。现在流处理器将首先看到B事件然后看到A事件即使它们实际上是以相反的顺序发生的。
@ -515,7 +521,7 @@ Actor框架主要是管理通信模块的并发和分布式执行的机制
[^ii]: 感谢Flink社区的Kostas Kloudas提出这个比喻。
令人困惑的事件时间和处理时间导致错误的数据。例如假设您有一个流处理器来测量请求率计算每秒请求数。如果您重新部署流处理器则可能会关闭一分钟并在事件恢复时处理积压的事件。如果您根据处理时间来衡量速率那么看起来好像在处理积压时突然出现异常的请求高峰而事实上请求的实际速率是稳定的图11-7
令人困惑的事件时间和处理时间导致错误的数据。例如,假设您有一个流处理器来测量请求率(计算每秒请求数)。如果您重新部署流处理器,则可能会关闭一分钟,并在事件恢复时处理积压的事件。如果您根据处理时间来衡量速率,那么看起来好像在处理积压时突然出现异常的请求高峰,而事实上请求的实际速率是稳定的([图11-7](img/fig11-7.png))。
![](img/fig11-7.png)
@ -538,7 +544,7 @@ Actor框架主要是管理通信模块的并发和分布式执行的机制
当事件可以在系统中的多个点缓冲时,为事件分配时间戳更加困难。例如,考虑将使用率度量的事件报告给服务器的移动应用程序。该应用程序可能会在设备处于脱机状态时使用,在这种情况下,它将在设备上本地缓冲事件,并在下一次可用的互联网连接(可能是几小时甚至几天)时将它们发送到服务器。对于这个流的任何消费者来说,这些事件将显示为极其滞后的落后者。
在这种情况下,根据移动设备的本地时钟,事件的时间戳实际上应该是发生用户交互的时间。但是,用户控制的设备上的时钟通常是不可信的,因为它可能会被意外或故意设置为错误的时间(请参见“时钟同步和精度”第269页)。服务器收到事件的时间(根据服务器的时钟)更可能是准确的,因为服务器在您的控制之下,但在描述用户交互方面意义不大。
在这种情况下,根据移动设备的本地时钟,事件的时间戳实际上应该是发生用户交互的时间。但是,用户控制的设备上的时钟通常是不可信的,因为它可能会被意外或故意设置为错误的时间(请参见“[时钟同步与准确性](ch8.md#时钟同步与准确性)”)。服务器收到事件的时间(根据服务器的时钟)更可能是准确的,因为服务器在您的控制之下,但在描述用户交互方面意义不大。
要调整不正确的设备时钟一种方法是记录三个时间戳【82】
@ -556,11 +562,11 @@ Actor框架主要是管理通信模块的并发和分布式执行的机制
***Tumbling窗口***
一个翻滚的窗口有一个固定的长度每个事件都属于一个窗口。例如如果您有1分钟的翻滚窗口则所有时间戳在10:03:00和10:03:59之间的事件会被分组到一个窗口中10:04:00和10:04:59之间的事件下一个窗口等等。您可以通过获取每个事件时间戳并将其四舍五入到最接近的分钟来确定它所属的窗口从而实现1分钟的翻滚窗口。
一个翻滚的窗口有一个固定的长度每个事件都属于一个窗口。例如如果您有1分钟的翻滚窗口则所有时间戳在`10:03:00``10:03:59`之间的事件会被分组到一个窗口中,`10:04:00``10:04:59`之间的事件下一个窗口等等。您可以通过获取每个事件时间戳并将其四舍五入到最接近的分钟来确定它所属的窗口从而实现1分钟的翻滚窗口。
***Hopping窗***
跳频窗口也具有固定的长度但允许窗口重叠以提供一些平滑。例如1分钟跳跃大小的5分钟窗口将包含10:03:00至10:07:59之间的事件则下一个窗口将覆盖10:04:00至10:08之间的事件 59等等。您可以通过首先计算1分钟滚动窗口然后聚合在几个相邻的窗口上来实现此跳频窗口。
跳频窗口也具有固定的长度但允许窗口重叠以提供一些平滑。例如1分钟跳跃大小的5分钟窗口将包含`10:03:00``10:07:59`之间的事件,则下一个窗口将覆盖`10:04:00``10:08`之间的事件: 59等等。您可以通过首先计算1分钟滚动窗口然后聚合在几个相邻的窗口上来实现此跳频窗口。
***滑动窗口***
@ -568,13 +574,13 @@ Actor框架主要是管理通信模块的并发和分布式执行的机制
***会话窗口***
与其他窗口类型不同会话窗口没有固定的持续时间。相反它是通过将同一用户的所有事件分组在一起并在时间上紧密地组合在一起来定义的并且当用户在一段时间内不活动时例如如果30分钟内没有事件窗口结束。会话化是网站分析的常见要求请参阅第406页的“GROUP BY”)。
与其他窗口类型不同会话窗口没有固定的持续时间。相反它是通过将同一用户的所有事件分组在一起并在时间上紧密地组合在一起来定义的并且当用户在一段时间内不活动时例如如果30分钟内没有事件窗口结束。会话化是网站分析的常见要求参阅“[GROUP BY](ch10.md#GROUP BY)”)。
### 流式连接
在第10章中我们讨论了批处理作业如何通过关键连接数据集以及这种连接如何构成数据管道的重要组成部分。由于流处理将数据管道概括为对无界数据集进行增量处理因此对流进行连接的需求也完全相同。
[第10章](ch10.md)中,我们讨论了批处理作业如何通过关键连接数据集,以及这种连接如何构成数据管道的重要组成部分。由于流处理将数据管道概括为对无界数据集进行增量处理,因此对流进行连接的需求也完全相同。
然而,新事件随时可能出现在一个流中,这使得加入流比批处理作业更具挑战性。为了更好地理解情况,我们来区分三种不同类型的连接:流 - 流连接流表连接和表连接【84】。在下面的章节中我们将通过例子来说明。
然而,新事件随时可能出现在一个流中,这使得加入流比批处理作业更具挑战性。为了更好地理解情况,我们来区分三种不同类型的连接:流-流连接流表连接和表连接【84】。在下面的章节中我们将通过例子来说明。
流 - 流连接(窗口连接)
假设您的网站上有搜索功能并且想要检测搜索到的网址的近期趋势。每次有人输入搜索查询时都会记录包含查询和返回结果的事件。每当有人点击其中一个搜索结果时就会记录另一个记录点击的事件。为了计算搜索结果中每个网址的点击率您需要将搜索操作和点击操作的事件组合在一起这些事件通过具有相同的会话ID进行连接。广告系统需要类似的分析【85】。
@ -587,19 +593,21 @@ Actor框架主要是管理通信模块的并发和分布式执行的机制
#### 流表连接stream enrichment
第404页的“示例用户活动事件分析”图10-2我们看到了加入两个数据集的批量作业示例一组用户活动事件和一个用户配置文件数据库。将用户活动事件视为流并在流处理器中连续执行相同的连接是很自然的输入是
在“[示例:用户活动事件分析](ch10.md#示例:用户活动事件分析)”([图10-2](img/fig10-2.png))中,我们看到了加入两个数据集的批量作业示例:一组用户活动事件和一个用户配置文件数据库。将用户活动事件视为流,并在流处理器中连续执行相同的连接是很自然的:输入是包含用户ID的活动事件流并且输出是活动事件流其中用户ID已经用关于用户的简档信息来扩充。这个过程有时被称为使用来自数据库的信息来丰富活动事件。
包含用户ID的活动事件流并且输出是活动事件流其中用户ID已经用关于用户的简档信息来扩充。这个过程有时被称为使用来自数据库的信息来丰富活动事件。
要执行此联接,流程过程需要一次查看一个活动事件,在数据库中查找事件的用户标识,并将该概要信息添加到活动事件中。数据库查询可以通过查询远程数据库来实现;但是,正如在“[示例:分析用户活动事件](ch10.md#示例:分析用户活动事件)”一节中讨论的此类远程查询可能会很慢并且有可能导致数据库过载【75】。
另一种方法是将数据库副本加载到流处理器中,以便在本地进行查询而无需网络往返。这种技术与我们在“[Map端连接](ch10.md#Map端连接)”中讨论的散列连接非常相似:如果数据库的本地副本足够小,则本地副本可能是内存中的散列表,或者是本地磁盘上的索引。
要执行此联接,流程过程需要一次查看一个活动事件,在数据库中查找事件的用户标识,并将该概要信息添加到活动事件中。数据库查询可以通过查询远程数据库来实现;但是正如在“示例分析用户活动事件”一节中讨论的此类远程查询可能会很慢并且有可能导致数据库过载【75】。
另一种方法是将数据库副本加载到流处理器中以便在本地进行查询而无需网络往返。这种技术与我们在408页的“Map-Side连接”中讨论的散列连接非常相似如果数据库的本地副本足够小则本地副本可能是内存中的散列表或者是本地磁盘上的索引。
与批处理作业的区别在于批处理作业使用数据库的时间点快照作为输入,而流处理器是长时间运行的,并且数据库的内容可能随时间而改变,所以流处理器数据库的本地副本需要保持最新。这个问题可以通过更改数据捕获来解决:流处理器可以订阅用户配置文件数据库的更新日志以及活动事件流。在创建或修改配置文件时,流处理器会更新其本地副本。因此,我们获得两个流之间的连接:活动事件和配置文件更新。
流表连接实际上非常类似于流 - 流连接;最大的区别在于对于表changelog流连接使用一个可以回溯到“开始时间”概念上是无限的窗口的窗口新版本的记录会覆盖较早的版本。对于流输入连接可能根本没有维护窗口。
#### 表格表连接(物化视图维护)
考虑我们在第11页的“描述负载”中讨论的Twitter时间线示例。我们说过当用户想要查看他们的家庭时间线时对用户所关注的所有人进行迭代是非常昂贵的推文并合并它们。
相反我们需要一个时间线缓存一种每个用户的“收件箱”在发送tweets的时候写入这些信息以便读取时间线是一次查询。实现和维护此缓存需要以下事件处理
考虑我们在“[描述负载](ch1.md#描述负载)”中讨论的推特时间线例子。我们说过,当用户想要查看他们的主页时间线时,对用户所关注的所有人进行迭代是非常昂贵的,推文,并合并它们。
相反,我们需要一个时间线缓存:一种每个用户的“收件箱”,在发送推文的时候写入这些信息,以便读取时间线是一次查询。实现和维护此缓存需要以下事件处理:
* 当用户发送新的推文时,它将被添加到每个跟随你的用户的时间线上。
* 用户删除推文时,将从所有用户的时间表中删除。
@ -619,7 +627,7 @@ GROUP BY follows.follower_id
流的连接直接对应于该查询中的表的连接。时间轴实际上是这个查询结果的缓存,每当基础表发生变化时都会更新[^iii]。
[^iii]: 如果你把一个流视为一个表的衍生物如图11-6所示并且把一个连接看作是两个表u·v的乘积那么会发生一些有趣的事情物化连接的变化流遵循产品规则 u·v'= u'v + uv'。 换句话说任何tweets的变化都与当前的追随者联系在一起任何追随者的变化都与当前的tweets [4950]相结合。
[^iii]: 如果你把一个流视为一个表的衍生物,如[图11-6](img/fig11-6.png)所示并且把一个连接看作是两个表u·v的乘积那么会发生一些有趣的事情物化连接的变化流遵循产品规则(u·v)'= u'v + uv'(u·v)'= u'v + uv'。 换句话说任何tweets的变化都与当前的追随者联系在一起任何追随者的变化都与当前的tweets 【49,50】相结合。
#### 连接的时间依赖性
@ -633,15 +641,15 @@ GROUP BY follows.follower_id
### 容错
在本章的最后一节中让我们考虑流处理器如何容忍错误。我们在第10章中看到批处理框架可以很容易地容忍错误如果MapReduce作业中的任务失败可以简单地在另一台机器上重新启动并且丢弃失败任务的输出。这种透明的重试是可能的因为输入文件是不可变的每个任务都将其输出写入到HDFS上的单独文件并且输出仅在任务成功完成时可见。
在本章的最后一节中,让我们考虑流处理器如何容忍错误。我们在[第10章](ch10.md)中看到批处理框架可以很容易地容忍错误如果MapReduce作业中的任务失败可以简单地在另一台机器上重新启动并且丢弃失败任务的输出。这种透明的重试是可能的因为输入文件是不可变的每个任务都将其输出写入到HDFS上的单独文件并且输出仅在任务成功完成时可见。
特别是,批处理容错方法可确保批处理作业的输出与没有出错的情况相同,即使事实上某些任务失败了。看起来好像每个输入记录都被处理了一次 - 没有记录被跳过,而且没有处理两次。尽管重新启动任务意味着实际上可能会多次处理记录,但输出中的可见效果好像只处理过一次。这个原则被称为一次语义学,虽然有效 - 一次将是一个更具描述性的术语【90】。
特别是,批处理容错方法可确保批处理作业的输出与没有出错的情况相同,即使事实上某些任务失败了。看起来好像每个输入记录都被处理了一次 —— 没有记录被跳过,而且没有处理两次。尽管重新启动任务意味着实际上可能会多次处理记录,但输出中的可见效果好像只处理过一次。这个原则被称为一次语义学,虽然有效 —— 一次将是一个更具描述性的术语【90】。
在流处理过程中也出现了同样的容错问题,但是处理起来不那么直观:等到某个任务完成之后才使其输出可见,因为流是无限的,因此您永远无法完成处理。
#### 小批量和检查点
一个解决方案是将流分解成小块并像小型批处理一样处理每个块。这种方法被称为microbatching它被用于Spark Streaming 【91】。批处理大小通常约为1秒这是性能折中的结果较小的批次会导致更大的调度和协调开销而较大的批次意味着流处理器的结果变得可见之前的较长延迟。
一个解决方案是将流分解成小块,并像小型批处理一样处理每个块。这种方法被称为**小批量(microbatching**它被用于Spark Streaming 【91】。批处理大小通常约为1秒这是性能折中的结果较小的批次会导致更大的调度和协调开销而较大的批次意味着流处理器的结果变得可见之前的较长延迟。
微缩也隐含地提供了与批量大小相等的翻滚窗口(通过处理时间而不是事件时间戳)。任何需要更大窗口的作业都需要明确地将状态从一个微阵列转移到下一个微阵列。
@ -655,7 +663,7 @@ Apache Flink中使用的一种变体方法是定期生成状态滚动检查点
这些事情要么都是原子地发生要么都不发生但是不应该彼此不同步。如果这种方法听起来很熟悉那是因为我们在分布式事务和两阶段提交的情况下在第360页的“准确一次的消息处理”中讨论了它。
在第9章中我们讨论了分布式交易如XA的传统实现中的问题。然而在更受限制的环境中可以有效地实现这样的原子提交设施。 Google云数据流【81,92】和VoltDB 【94】中使用了这种方法并计划在Apache Kafka 【95,96】中添加类似的功能。与XA不同这些实现不会尝试跨异构技术提供事务而是通过在流处理框架中管理状态更改和消息传递来保持内部事务。事务协议的开销可以通过在单个事务中处理几个输入消息来分摊。
[第9章](ch9.md)我们讨论了分布式交易如XA的传统实现中的问题。然而在更受限制的环境中可以有效地实现这样的原子提交设施。 Google云数据流【81,92】和VoltDB 【94】中使用了这种方法并计划在Apache Kafka 【95,96】中添加类似的功能。与XA不同这些实现不会尝试跨异构技术提供事务而是通过在流处理框架中管理状态更改和消息传递来保持内部事务。事务协议的开销可以通过在单个事务中处理几个输入消息来分摊。
#### 幂等
@ -665,19 +673,19 @@ Apache Flink中使用的一种变体方法是定期生成状态滚动检查点
即使一个操作不是天生的幂等,它往往可以与一些额外的元数据幂等。例如,在使用来自卡夫卡的消息时,每条消息都有一个持续的,单调递增的偏移量。将值写入外部数据库时,可以将触发上次写入的消息的偏移量与值包含在一起。因此,您可以判断是否已应用更新,并避免再次执行相同的更新。
风暴三叉戟的状态处理基于类似的想法【78】。依赖幂等性意味着一些假设重启一个失败的任务必须以相同的顺序重播相同的消息一个基于日志的消息代理这样做处理必须是确定性的其他节点不能同时更新相同的值[ 9899]
风暴三叉戟的状态处理基于类似的想法【78】。依赖幂等性意味着一些假设重启一个失败的任务必须以相同的顺序重播相同的消息一个基于日志的消息代理这样做处理必须是确定性的其他节点不能同时更新相同的值【98,99】
当从一个处理节点故障转移到另一个处理节点时,可能需要进行防护(请参阅第291页上的“领导和锁定”),以防止被认为是死的节点的干扰
当从一个处理节点故障转移到另一个处理节点时,可能需要进行防护(参阅“[领导和锁](ch8.md#领导和锁)”),以防止被认为是死的节点的干扰
#### 失败后重建状态
任何需要状态的流进程(例如,任何窗口聚合(例如计数器,平均值和直方图)以及用于连接的任何表和索引)都必须确保在失败之后可以恢复此状态。
一种选择是将状态保持在远程数据存储中并复制它,尽管如每个单独消息的远程数据库查询速度可能会很慢,正如在“流表加入第473页”中所述。另一种方法是保持流处理器的本地状态,并定期复制。然后,当流处理器从故障中恢复时,新任务可以读取复制状态并恢复处理而不丢失数据。
一种选择是将状态保持在远程数据存储中并复制它,尽管如每个单独消息的远程数据库查询速度可能会很慢,正如在“[流表连接](#流表连接)”中所述。另一种方法是保持流处理器的本地状态,并定期复制。然后,当流处理器从故障中恢复时,新任务可以读取复制状态并恢复处理而不丢失数据。
例如Flink定期捕获操作员状态的快照并将它们写入HDFS等持久存储器中【92,93】。 Samza和Kafka Streams通过将状态更改发送到具有日志压缩功能的专用Kafka主题来复制状态更改这类似于更改数据捕获[84100]。 VoltDB通过冗余处理多个节点上的每个输入消息来复制状态请参阅第252页的“实际的串行执行”
例如Flink定期捕获操作员状态的快照并将它们写入HDFS等持久存储器中【92,93】。 Samza和Kafka Streams通过将状态更改发送到具有日志压缩功能的专用Kafka主题来复制状态更改这类似于更改数据捕获【84,100】。 VoltDB通过冗余处理多个节点上的每个输入消息来复制状态请参阅第252页的“实际的串行执行”
在某些情况下,甚至可能不需要复制状态,因为它可以从输入流重建。例如,如果状态由一个相当短的窗口中的聚合组成,则它可能足够快,以便重放与该窗口相对应的输入事件。如果状态是通过更改数据捕获维护的数据库的本地副本,那么也可以从日志压缩的更改流重建数据库(请参阅“日志压缩”一节第456页)。
在某些情况下,甚至可能不需要复制状态,因为它可以从输入流重建。例如,如果状态由一个相当短的窗口中的聚合组成,则它可能足够快,以便重放与该窗口相对应的输入事件。如果状态是通过更改数据捕获维护的数据库的本地副本,那么也可以从日志压缩的更改流重建数据库(请参阅“[日志压缩](#日志压缩)”一节)。
但是,所有这些权衡取决于底层基础架构的性能特征:在某些系统中,网络延迟可能低于磁盘访问延迟,网络带宽可能与磁盘带宽相当。在所有情况下都没有普遍理想的权衡,随着存储和网络技术的发展,本地和远程状态的优点也可能会发生变化。
@ -685,24 +693,28 @@ Apache Flink中使用的一种变体方法是定期生成状态滚动检查点
## 本章小结
在本章中我们讨论了事件流他们所服务的目的以及如何处理它们。在某些方面流处理非常类似于我们在第10章讨论的批处理而是在无限的永无止境的流而不是固定大小的输入上持续进行。从这个角度来看消息代理和事件日志可以作为文件系统的流媒体。
在本章中,我们讨论了事件流,他们所服务的目的以及如何处理它们。在某些方面,流处理非常类似于我们在[第10章](ch10.md)讨论的批处理,而是在无限的(永无止境的)流而不是固定大小的输入上持续进行。从这个角度来看,消息代理和事件日志可以作为文件系统的流媒体。
我们花了一些时间比较两种消息代理:
***AMQP/JMS风格的消息代理***
经纪人将个人消息分配给消费者消费者在成功处理个人消息时确认消息。消息被确认后从代理中删除。这种方法适合作为RPC的异步形式另请参阅第136页的“消息传递数据流”例如在任务队列中消息处理的确切顺序并不重要没有在处理之后需要重新读取旧消息。
***基于日志的消息代理***
代理将分区中的所有消息分配给相同的使用者节点,并始终以相同的顺序传递消息。并行性是通过划分来实现的,消费者通过检查他们所处理的最后一个消息的偏移来跟踪他们的进度。代理将消息保留在磁盘上,因此如有必要,可以跳回并重新读取旧消息。
基于日志的方法与数据库中的复制日志参见第5章和日志结构存储引擎请参阅第3章具有相似之处。我们看到这种方法特别适用于消耗输入流并生成派生状态或派生输出流的流处理系统。
就流的来源而言,我们讨论了几种可能性:用户活动事件,提供定期读数的传感器和数据馈送(例如金融市场数据)自然地表示为流。我们看到,将数据写入数据流也是有用的:我们可以捕获更改日志 - 即对数据库所做的所有更改的历史记录 - 隐式地通过更改数据捕获或通过事件明确地捕获采购。日志压缩允许流保留数据库内容的完整副本。
基于日志的方法与数据库中的复制日志(参见[第5章](ch5.md))和日志结构存储引擎(请参阅[第3章](ch3.md))具有相似之处。我们看到,这种方法特别适用于消耗输入流并生成派生状态或派生输出流的流处理系统。
就流的来源而言,我们讨论了几种可能性:用户活动事件,提供定期读数的传感器和数据馈送(例如金融市场数据)自然地表示为流。我们看到,将数据写入数据流也是有用的:我们可以捕获更改日志 —— 即对数据库所做的所有更改的历史记录 —— 隐式地通过更改数据捕获或通过事件明确地捕获采购。日志压缩允许流保留数据库内容的完整副本。
将数据库表示为流为系统集成提供了强大的机会。您可以通过使用更改日志并将其应用于派生系统,使派生的数据系统(如搜索索引,缓存和分析系统)保持最新。您甚至可以从头开始,从开始一直到现在消耗更改的日志,从而为现有数据构建新的视图。
将状态保持为流并重放消息的设施也是在各种流处理框架中实现流连接和容错的技术的基础。我们讨论了流处理的几个目的,包括搜索事件模式(复杂事件处理),计算加窗聚合(流分析)以及保持派生数据系统处于最新状态(材料化视图)。
将状态保持为流并重放消息的设施也是在各种流处理框架中实现流连接和容错的技术的基础。我们讨论了流处理的几个目的,包括搜索事件模式(复杂事件处理),计算加窗聚合(流分析)以及保持派生数据系统处于最新状态(化视图)。
然后我们讨论了在流处理器中推理时间的困难,包括处理时间和事件时间戳之间的区别,以及在你认为窗口完成之后处理到达的离散事件的问题。