ddia/ch11.md
2018-05-21 04:02:16 +08:00

117 KiB
Raw Blame History

11. 流处理

有效的复杂系统总是从简单的系统演化而来。 反之亦然:从零设计的复杂系统没一个能有效工作的。

——约翰·加尔Systemantics1975


[TOC]

第10章中,我们讨论了批处理技术,它读取一组文件作为输入,并生成一组新的文件作为输出。输出是**衍生数据derived data**的一种形式;也就是说,如果需要,可以通过再次运行批处理过程来重新创建数据集。我们看到了如何使用这个简单而强大的想法来建立搜索索引,推荐系统,做分析等等。

然而,在第10章中仍然有一个很大的假设即输入是有界的即已知和有限的大小所以批处理知道它何时完成输入的读取。例如MapReduce核心的排序操作必须读取其全部输入然后才能开始生成输出可能发生这种情况最后一条输入记录具有最小的键因此需要第一个被输出所以提早开始输出是不可行的。

实际上,很多数据是无界限因为它随着时间的推移而逐渐到达你的用户在昨天和今天产生了数据明天他们将继续产生更多的数据。除非你停业否则这个过程永远都不会结束所以数据集从来就不会以任何有意义的方式“完成”【1】。因此批处理程序必须将数据人为地分成固定时间段的数据块例如在每天结束时处理一天的数据或者在每小时结束时处理一小时的数据。

日常批处理中的问题是,输入的变更只会在一天之后的输出中反映出来,这对于许多急躁的用户来说太慢了。为了减少延迟,我们可以更频繁地运行处理 —— 比如说,在每秒钟的末尾 —— 或者甚至更连续一些,完全抛开固定的时间切片,当事件发生时就立即进行处理,这就是**流处理stream processing**背后的想法。

一般来说“流”是指随着时间的推移逐渐可用的数据。这个概念出现在很多地方Unix的stdin和stdout编程语言惰性列表【2】文件系统API如Java的FileInputStreamTCP连接通过互联网传送音频和视频等等。 在本章中,我们将把**事件流event stream**视为一种数据管理机制:无界限,增量处理,与上一章中批量数据相对应。我们将首先讨论怎样表示、存储、通过网络传输流。在“数据库和流”中,我们将研究流和数据库之间的关系。最后在“流处理”中,我们将研究连续处理这些流的方法和工具,以及它们用于应用构建的方式。

传递事件流

在批处理领域,作业的输入和输出是文件(也许在分布式文件系统上)。流处理领域中的等价物看上去是什么样子的?

当输入是一个文件(一个字节序列),第一个处理步骤通常是将其解析为一系列记录。在流处理的上下文中,记录通常被叫做事件event,但它本质上是一样的:一个小的,自包含的,不可变的对象,包含某个时间点发生的某件事情的细节。一个事件通常包含一个来自时钟的时间戳,以指明事件发生的时间(参见“单调钟与时钟”)。

例如发生的事件可能是用户采取的行动例如查看页面或进行购买。它也可能来源于机器例如对温度传感器或CPU利用率的周期性测量。在“使用Unix工具进行批处理”的示例中Web服务器日志的每一行都是一个事件。

事件可能被编码为文本字符串或JSON或者某种二进制编码第4章所述。这种编码允许你存储一个事件,例如将其附加到一个文件,将其插入关系表,或将其写入文档数据库。它还允许你通过网络将事件发送到另一个节点以进行处理。

在批处理领域,作业的输入和输出是文件(也许在分布式文件系统上)。什么是类似的流媒体?

当输入是一个文件(一个字节序列)时,第一个处理步骤通常是将其解析为一系列记录。在流处理的上下文中,记录通常被称为事件,但它本质上是一样的:一个小的,自包含的,不可变的对象,包含某个时间点发生的事情的细节。一个事件通常包含一个时间戳,指示何时根据时钟来发生(参见“单调钟与时钟”)。

例如发生的事情可能是用户采取的行动例如查看页面或进行购买。它也可能来源于机器例如来自温度传感器的周期性测量或者CPU利用率度量。在“使用Unix工具进行批处理”的示例中Web服务器日志的每一行都是一个事件。

事件可能被编码为文本字符串或JSON或者以某种二进制形式编码第4章所述。这种编码允许你存储一个事件,例如将其追加写入一个文件,将其插入关系型表,或将其写入文档数据库。它还允许你通过网络将事件发送到其他节点以进行处理。

在批处理中,文件被写入一次,然后可能被多个作业读取。类似地,在流处理术语中,一个事件由生产者producer(也称为发布者publisher发送者sender)生成一次,然后可能由多个消费者consumer订阅者subscribers接收者recipients进行处理【3】。在文件系统中文件名标识一组相关记录在流媒体系统中相关的事件通常被聚合为一个主题topicstream

原则上将,文件或数据库就足以连接生产者和消费者:生产者将其生成的每个事件写入数据存储,且每个消费者定期轮询数据存储,检查自上次运行以来新出现的事件。这实际上正是批处理在每天结束时处理当天数据时所做的事情。

但当我们想要进行低延迟的连续处理时,如果数据存储不是为这种用途专门设计的,那么轮询开销就会很大。轮询的越频繁,能返回新事件的请求比例就越低,而额外开销也就越高。相比之下,最好能在新事件出现时直接通知消费者。

数据库在传统上对这种通知机制支持的并不好,关系型数据库通常有触发器trigger它们可以对变化作出反应插入表中的一行但它们的功能非常有限而且在数据库设计中算是一种事后反思【4,5】。相应的是已经有为传递事件通知这一目开发的专用工具已经被开发出来。

消息系统

向消费者通知新事件的常用方式是使用消息传递系统messaging system:生产者发送包含事件的消息,然后将消息推送给消费者。我们之前在“消息传递中的数据流”中介绍了这些系统,但现在我们将详细介绍这些系统。

像生产者和消费者之间的Unix管道或TCP连接这样的直接信道是实现消息传递系统的简单方法。但是大多数消息传递系统都在这一基本模型上进行扩展。特别的是Unix管道和TCP将恰好一个发送者与恰好一个接收者连接而一个消息传递系统允许多个生产者节点将消息发送到同一个主题并允许多个消费者节点接收主题中的消息。

在这个发布/订阅模式中,不同的系统采取各种各样的方法,并没有针对所有目的的通用答案。为了区分这些系统,问一下这两个问题会特别有帮助:

  1. 如果生产者发送消息的速度比消费者能够处理的速度快会发生什么?一般来说,有三种选择:系统可以丢掉消息,将消息放入缓冲队列,或使用背压backpressure(也称为**流量控制flow control**即阻塞生产者以免其发送更多的消息。例如Unix管道和TCP使用背压它们有一个固定大小的小缓冲区如果填满发送者会被阻塞直到接收者从缓冲区中取出数据参见“网络拥塞和排队”)。

    如果消息被缓存在队列中那么理解队列增长会发生什么是很重要的。当队列装不进内存时系统会崩溃吗还是将消息写入磁盘如果是这样磁盘访问又会如何影响消息传递系统的性能【6】

  2. 如果节点崩溃或暂时脱机,会发生什么情况? —— 是否会有消息丢失?与数据库一样,持久性可能需要写入磁盘和/或复制的某种组合(参阅“复制和持久性”),这是有代价的。如果你能接受有时消息会丢失,则可能在同一硬件上获得更高的吞吐量和更低的延迟。

是否可以接受消息丢失取决于应用。例如对于周期传输的传感器读数和指标偶尔丢失的数据点可能并不重要因为更新的值会在短时间内发出。但要注意如果大量的消息被丢弃可能无法立刻意识到指标已经不正确了【7】。如果你正在对事件计数那么更重要的是它们能够可靠送达因为每个丢失的消息都意味着使计数器的错误扩大。

我们在第10章中探讨的批处理系统的一个很好的特性是,它们提供了强大的可靠性保证:失败的任务会自动重试,失败任务的部分输出会自动丢弃。这意味着输出与没有发生故障一样,这有助于简化编程模型。在本章的后面,我们将研究如何在流处理的上下文中提供类似的保证。

直接从生产者传递给消费者

许多消息传递系统使用生产者和消费者之间的直接网络通信,而不通过中间节点:

  • UDP组播广泛应用于金融行业例如股票市场其中低时延非常重要【8】。虽然UDP本身是不可靠的但应用层的协议可以恢复丢失的数据包生产者必须记住它发送的数据包以便能按需重新发送数据包
  • 无代理的消息库如ZeroMQ 【9】和nanomsg采取类似的方法通过TCP或IP多播实现发布/订阅消息传递。
  • StatsD 【10】和Brubeck 【7】使用不可靠的UDP消息传递来收集网络中所有机器的指标并对其进行监控。 在StatsD协议中只有接收到所有消息才认为计数器指标是正确的使用UDP将使得指标处在一种最佳近似状态【11】。另请参阅“TCP与UDP
  • 如果消费者在网络上公开了服务生产者可以直接发送HTTP或RPC请求参阅“通过服务进行数据流REST和RPC将消息推送给使用者。这就是webhooks背后的想法【12】一种服务的回调URL被注册到另一个服务中并且每当事件发生时都会向该URL发出请求。

尽管这些直接消息传递系统在设计它们的环境中运行良好,但是它们通常要求应用代码意识到消息丢失的可能性。它们的容错程度极为有限:即使协议检测到并重传在网络中丢失的数据包,它们通常也只是假设生产者和消费者始终在线。

如果消费者处于脱机状态,则可能会丢失其不可达时发送的消息。一些协议允许生产者重试失败的消息传递,但当生产者崩溃时,它可能会丢失消息缓冲区及其本应发送的消息,这种方法可能就没用了。

消息代理

一种广泛使用的替代方法是通过消息代理message broker(也称为消息队列message queue)发送消息,消息代理实质上是一种针对处理消息流而优化的数据库。它作为服务器运行,生产者和消费者作为客户端连接到服务器。生产者将消息写入代理,消费者通过从代理那里读取来接收消息。

通过将数据集中在代理上,这些系统可以更容易地容忍来来去去的客户端(连接,断开连接和崩溃),而持久性问题则转移到代理的身上。一些消息代理只将消息保存在内存中,而另一些消息代理(取决于配置)将其写入磁盘,以便在代理崩溃的情况下不会丢失。针对缓慢的消费者,它们通常会允许无上限的排队(而不是丢弃消息或背压),尽管这种选择也可能取决于配置。

排队的结果是,消费者通常是**异步asynchronous**的:当生产者发送消息时,通常只会等待代理确认消息已经被缓存,而不等待消息被消费者处理。向消费者递送消息将发生在未来某个未定的时间点 —— 通常在几分之一秒之内,但有时当消息堆积时会显著延迟。

消息代理与数据库对比

有些消息代理甚至可以使用XA或JTA参与两阶段提交协议参阅“实践中的分布式事务”)。这个功能与数据库在本质上非常相似,尽管消息代理和数据库之间仍存在实践上很重要的差异:

  • 数据库通常保留数据直至显式删除,而大多数消息代理在消息成功递送给消费者时会自动删除消息。这样的消息代理不适合长期的数据存储。
  • 由于它们很快就能删除消息,大多数消息代理都认为它们的工作集相当小—— 即队列很短。如果代理需要缓冲很多消息比如因为消费者速度较慢如果内存装不下消息可能会溢出到磁盘每个消息需要更长的处理时间整体吞吐量可能会恶化【6】。
  • 数据库通常支持二级索引和各种搜索数据的方式,而消息代理通常支持按照某种模式匹配主题,订阅其子集。机制并不一样,对于客户端选择想要了解的数据的一部分,这是两种基本的方式。
  • 查询数据库时,结果通常基于某个时间点的数据快照;如果另一个客户端随后向数据库写入一些改变了查询结果的内容,则第一个客户端不会发现其先前结果现已过期(除非它重复查询或轮询变更)。相比之下,消息代理不支持任意查询,但是当数据发生变化时(即新消息可用时),它们会通知客户端。

这是关于消息代理的传统观点它被封装在诸如JMS 【14】和AMQP 【15】的标准中并且被诸如RabbitMQActiveMQHornetQQpidTIBCO企业消息服务IBM MQAzure Service Bus和Google Cloud Pub/Sub实现 【16】。

多个消费者

当多个消费者从同一主题中读取消息时,有使用两种主要的消息传递模式,如图11-1所示:

负载均衡load balance

每条消息都被传递给消费者之一所以处理该主题下消息的工作能被多个消费者共享。代理可以为消费者任意分配消息。当处理消息的代价高昂希望能并行处理消息时此模式非常有用在AMQP中可以通过让多个客户端从同一个队列中消费来实现负载均衡而在JMS中则称之为共享订阅shared subscription)。

扇出fan-out

每条消息都被传递给所有消费者。扇出允许几个独立的消费者各自“收听”相同的消息广播,而不会相互影响 —— 这个流处理中的概念对应批处理中多个不同批处理作业读取同一份输入文件 JMS中的主题订阅与AMQP中的交叉绑定提供了这一功能

图11-1 a负载平衡在消费者间共享消费主题b扇出将每条消息传递给多个消费者。

两种模式可以组合使用:例如,两个独立的消费者组可以每组各订阅一个主题,每一组组都共同收到所有消息,但在每一组内部,每条消息仅由单个节点处理。

确认与重新交付

消费随时可能会崩溃,所以有一种可能的情况是:代理向消费者递送消息,但消费者没有处理,或者在消费者崩溃之前只进行了部分处理。为了确保消息不会丢失,消息代理使用确认acknowledgments:客户端必须显式告知代理消息处理完毕的时间,以便代理能将消息从队列中移除。

如果与客户端的连接关闭,或者代理超出一段时间未收到确认,代理则认为消息没有被处理,因此它将消息再递送给另一个消费者。 (请注意可能发生这样的情况,消息实际上是处理完毕的,但确认在网络中丢失了。需要一种原子提交协议才能处理这种情况,正如在“实践中的分布式事务”中所讨论的那样)

当与负载均衡相结合时,这种重传行为对消息的顺序有种有趣的影响。在图11-2消费者通常按照生产者发送的顺序处理消息。然而消费者2在处理消息m3时崩溃与此同时消费者1正在处理消息m4。未确认的消息m3随后被重新发送给消费者1结果消费者1按照m4m3m5的顺序处理消息。因此m3和m4的交付顺序与以生产者1的发送顺序不同。

图11-2 在处理m3时消费者2崩溃因此稍后重传至消费者1

即使消息代理试图保留消息的顺序如JMS和AMQP标准所要求的负载均衡与重传的组合也不可避免地导致消息被重新排序。为避免此问题你可以让每个消费者使用单独的队列即不使用负载均衡功能。如果消息是完全独立的则消息顺序重排并不是一个问题。但正如我们将在本章后续部分所述如果消息之间存在因果依赖关系这就是一个很重要的问题。

分区日志

通过网络发送数据包或向网络服务发送请求通常是短暂的操作,不会留下永久的痕迹。尽管可以永久记录(通过抓包与日志),但我们通常不这么做。即使是将消息持久地写入磁盘的消息代理,在送达给消费者之后也会很快删除消息,因为它们建立在短暂消息传递的思维方式上。

数据库和文件系统采用截然相反的方法论:至少在某人显式删除前,通常写入数据库或文件的所有内容都要被永久记录下来。

这种思维方式上的差异对创建衍生数据的方式有巨大影响。如第10章所述,批处理过程的一个关键特性是,你可以反复运行它们,试验处理步骤,不用担心损坏输入(因为输入是只读的)。而 AMQP/JMS风格的消息传递并非如此收到消息是具有破坏性的因为确认可能导致消息从代理中被删除因此你不能期望再次运行同一个消费者能得到相同的结果。

如果你将新的消费者添加到消息系统,通常只能接收到消费者注册之后开始发送的消息。先前的任何消息都随风而逝,一去不复返。作为对比,你可以随时为文件和数据库添加新的客户端,且能读取任意久远的数据(只要应用没有显式覆盖或删除这些数据)。

为什么我们不能把它俩杂交一下,既有数据库的持久存储方式,又有消息传递的低延迟通知?这就是**基于日志的消息代理log-based message brokers**背后的想法。

使用日志进行消息存储

日志只是磁盘上简单的仅追加记录序列。我们先前在第3章中日志结构存储引擎和预写式日志的上下文中讨论了日志,在第5章复制的上下文里也讨论了它。

同样的结构可以用于实现消息代理:生产者通过将消息追加到日志末尾来发送消息,而消费者通过依次读取日志来接收消息。如果消费者读到日志末尾,则会等待新消息追加的通知。 Unix工具tail -f 能监视文件被追加写入的数据,基本上就是这样工作的。

为了扩展到比单个磁盘所能提供的更高吞吐量,可以对日志进行分区(在第6章的意义上)。不同的分区可以托管在不同的机器上,且每个分区都拆分出一份能独立于其他分区进行读写的日志。一个主题可以定义为一组携带相同类型消息的分区。这种方法如图11-3所示。

在每个分区内,代理为每个消息分配一个单调递增的序列号或偏移量offset(在图11-3中,框中的数字是消息偏移量)。这种序列号是有意义的,因为分区是仅追加写入的,所以分区内的消息是完全有序的。没有跨不同分区的顺序保证。

图11-3 生产者通过将消息追加写入主题分区文件来发送消息,消费者依次读取这些文件

Apache Kafka 【17,18】Amazon Kinesis Streams 【19】和Twitter的DistributedLog 【20,21】都是基于日志的消息代理。 Google Cloud Pub/Sub在架构上类似但对外暴露的是JMS风格的API而不是日志抽象【16】。尽管这些消息代理将所有消息写入磁盘但通过跨多台机器分区每秒能够实现数百万条消息的吞吐量并通过复制消息来实现容错性【22,23】。

日志与传统消息相比

基于日志的方法天然支持扇出式消息传递,因为多个消费者可以独立读取日志,而不会相互影响 —— 读取消息不会将其从日志中删除。为了在一组消费者之间实现负载平衡,代理可以将整个分区分配给消费者组中的节点,而不是将单条消息分配给消费者客户端。

每个客户端消费指派分区中的所有消息。然后使用分配的分区中的所有消息。通常情况下,当一个用户被指派了一个日志分区时,它会以简单的单线程方式顺序地读取分区中的消息。这种粗粒度的负载均衡方法有一些缺点:

  • 共享消费主题工作的节点数,最多为该主题中的日志分区数,因为同一个分区内的所有消息被递送到同一个节点1
  • 如果某条消息处理缓慢,则它会阻塞该分区中后续消息的处理(一种行首阻塞的形式;请参阅“描述性能”)。

因此在消息处理代价高昂希望逐条并行处理以及消息的顺序并没有那么重要的情况下JMS/AMQP风格的消息代理是可取的。另一方面在消息吞吐量很高处理迅速顺序很重要的情况下基于日志的方法表现得非常好。

消费者偏移量

顺序消费一个分区使得判断消息是否已经被处理变得相当容易:所有偏移量小于消费者的当前偏移量的消息已经被处理,而具有更大偏移量的消息还没有被看到。因此,代理不需要跟踪确认每条消息,只需要定期记录消费者的偏移即可。在这种方法减少了额外簿记开销,而且在批处理和流处理中采用这种方法有助于提高基于日志的系统的吞吐量。

实际上,这种偏移量与单领导者数据库复制中常见的日志序列号非常相似,我们在“设置新从库”中讨论了这种情况。在数据库复制中,日志序列号允许跟随者断开连接后,重新连接到领导者,并在不跳过任何写入的情况下恢复复制。这里原理完全相同:消息代理的表现得像一个主库,而消费者就像一个从库。

如果消费者节点失效,则失效消费者的分区将指派给其他节点,并从最后记录的偏移量开始消费消息。如果消费者已经处理了后续的消息,但还没有记录它们的偏移量,那么重启后这些消息将被处理两次。我们将在本章后面讨论这个问题的处理方法。

磁盘空间使用

如果只追加写入日志,则磁盘空间终究会耗尽。为了回收磁盘空间,日志实际上被分割成段,并不时地将旧段删除或移动到归档存储。 (我们将在后面讨论一种更为复杂的磁盘空间释放方式)

这就意味着如果一个慢消费者跟不上消息产生的速率而落后的太多,它的消费偏移量指向了删除的段,那么它就会错过一些消息。实际上,日志实现了一个有限大小的缓冲区,当缓冲区填满时会丢弃旧消息,它也被称为循环缓冲区circular buffer环形缓冲区ring buffer。不过由于缓冲区在磁盘上,因此可能相当的大。

让我们做个简单计算。在撰写本文时典型的大型硬盘容量为6TB顺序写入吞吐量为150MB/s。如果以最快的速度写消息则需要大约11个小时才能填满磁盘。因而磁盘可以缓冲11个小时的消息之后它将开始覆盖旧的消息。即使使用多个磁盘和机器这个比率也是一样的。实践中的部署很少能用满磁盘的写入带宽所以通常可以保存一个几天甚至几周的日志缓冲区。

不管保留多长时间的消息日志的吞吐量或多或少保持不变因为无论如何每个消息都会被写入磁盘【18】。这种行为与默认将消息保存在内存中仅当队列太长时才写入磁盘的消息传递系统形成鲜明对比。当队列很短时这些系统非常快而当这些系统开始写入磁盘时就要慢的多所以吞吐量取决于保留的历史数量。

当消费者跟不上生产者时

在“消息传递系统”中,如果消费者无法跟上生产者发送信息的速度时,我们讨论了三种选择:丢弃信息,进行缓冲或施加背压。在这种分类法里,基于日志的方法是缓冲的一种形式,具有很大,但大小固定的缓冲区(受可用磁盘空间的限制)。

如果消费者远远落后,而所要求的信息比保留在磁盘上的信息还要旧,那么它将不能读取这些信息,所以代理实际上丢弃了比缓冲区容量更大的旧信息。你可以监控消费者落后日志头部的距离,如果落后太多就发出报警。由于缓冲区很大,因而有足够的时间让人类运维来修复慢消费者,并在消息开始丢失之前让其赶上。

即使消费者真的落后太多开始丢失消息,也只有那个消费者受到影响;它不会中断其他消费者的服务。这是一个巨大的运维优势:你可以实验性地消费生产日志,以进行开发,测试或调试,而不必担心会中断生产服务。当消费者关闭或崩溃时,会停止消耗资源,唯一剩下的只有消费者偏移量。

这种行为也与传统的信息代理形成了鲜明对比,在那种情况下,你需要小心地删除那些消费者已经关闭的队列—— 否则那些队列就会累积不必要的消息,从其他仍活跃的消费者那里占走内存。

重播旧信息

我们之前提到使用AMQP和JMS风格的消息代理处理和确认消息是一个破坏性的操作因为它会导致消息在代理上被删除。另一方面在基于日志的消息代理中使用消息更像是从文件中读取数据这是只读操作不会更改日志。

除了消费者的任何输出之外,处理的唯一副作用是消费者偏移量的前进。但偏移量是在消费者的控制之下的,所以如果需要的话可以很容易地操纵:例如你可以用昨天的偏移量跑一个消费者副本,并将输出写到不同的位置,以便重新处理最近一天的消息。你可以使用各种不同的处理代码重复任意次。

这一方面使得基于日志的消息传递更像上一章的批处理其中衍生数据通过可重复的转换过程与输入数据显式分离。它允许进行更多的实验更容易从错误和漏洞中恢复使其成为在组织内集成数据流的良好工具【24】。

流与数据库

我们已经在消息代理和数据库之间进行了一些比较。尽管传统上它们被视为单独的工具类别,但是我们看到基于日志的消息代理已经成功地从数据库中获取灵感并将其应用于消息传递。我们也可以反过来:从消息传递和流中获取灵感,并将它们应用于数据库。

我们之前曾经说过,事件是某个时刻发生的事情的记录。发生的事情可能是用户操作(例如键入搜索查询)或读取传感器,但也可能是写入数据库。某些东西被写入数据库的事实是可以被捕获,存储和处理的事件。这一观察结果表明,数据库和数据流之间的联系不仅仅是磁盘日志的物理存储 —— 而是更深层的联系。

事实上,复制日志(参阅“复制日志的实现”)是数据库写入事件的流,由主库在处理事务时生成。从库将写入流应用到它们自己的数据库副本,从而最终得到相同数据的精确副本。复制日志中的事件描述发生的数据更改。

我们还在“全序广播”中遇到了状态机复制原理,其中指出:如果每个事件代表对数据库的写入,并且每个副本按相同的顺序处理相同的事件,则副本将达到相同的最终状态 (假设处理一个事件是一个确定性的操作)。这是事件流的又一种场景!

在本节中,我们将首先看看异构数据系统中出现的一个问题,然后探讨如何通过将事件流的想法带入数据库来解决这个问题。

保持系统同步

正如我们在本书中所看到的没有一个系统能够满足所有的数据存储查询和处理需求。在实践中大多数重要应用都需要组合使用几种不同的技术来满足所有的需求例如使用OLTP数据库来为用户请求提供服务使用缓存来加速常见请求使用全文索引搜索处理搜索查询使用数据仓库用于分析。每一个组件都有自己的数据副本以自己的表示存储并根据自己的目的进行优化。

由于相同或相关的数据出现在了不同的地方因此相互间需要保持同步如果某个项目在数据库中被更新它也应当在缓存搜索索引和数据仓库中被更新。对于数据仓库这种同步通常由ETL进程执行参见“数据仓库”),通常是先取得数据库的完整副本,然后执行转换,并批量加载到数据仓库中 —— 换句话说,批处理。我们在“批量工作流的输出”中同样看到了如何使用批处理创建搜索索引,推荐系统和其他衍生数据系统。

如果周期性的完整数据库转储过于缓慢,有时会使用的替代方法是双写dual write,其中应用代码在数据变更时明确写入每个系统:例如,首先写入数据库,然后更新搜索索引,然后使缓存项失效(甚至同时执行这些写入)。

但是,双写有一些严重的问题,其中一个是竞争条件,如图11-4所示。在这个例子中两个客户端同时想要更新一个项目X客户端1想要将值设置为A客户端2想要将其设置为B。两个客户端首先将新值写入数据库然后将其写入到搜索索引。因为运气不好这些请求的时序是交错的数据库首先看到来自客户端1的写入将值设置为A然后来自客户端2的写入将值设置为B因此数据库中的最终值为B。搜索索引首先看到来自客户端2的写入然后是客户端1的写入所以搜索索引中的最终值是A。即使没发生错误这两个系统现在也永久地不一致了。

图11-4 在数据库中X首先被设置为A然后被设置为B而在搜索索引处写入以相反的顺序到达

除非有一些额外的并发检测机制,例如我们在“检测并发写入”中讨论的版本向量,否则你甚至不会意识到发生了并发写入 —— 一个值将简单地以无提示方式覆盖另一个值。

双重写入的另一个问题是,其中一个写入可能会失败,而另一个成功。这是一个容错问题,而不是一个并发问题,但也会造成两个系统互相不一致的结果。确保它们要么都成功要么都失败,是原子提交问题的一个例子,解决这个问题的代价是昂贵的(参阅“原子提交和两阶段提交2PC”)。

如果你只有一个单领导者复制的数据库,那么这个领导者决定了写入顺序,而状态机复制方法可以在数据库副本上工作。然而,在图11-4中,没有单个主库:数据库可能有一个领导者,搜索索引也可能有一个领导者,但是两者都不追随对方,所以可能会发生冲突(参见“多领导者复制“)。

如果实际上只有一个领导者 —— 例如,数据库 —— 而且我们能让搜索索引成为数据库的追随者,情况要好得多。但这在实践中可能吗?

变更数据捕获

大多数数据库的复制日志的问题在于它们一直被当做数据库的内部实现细节而不是公开的API。客户端应该通过其数据模型和查询语言来查询数据库而不是解析复制日志并尝试从中提取数据。

数十年来,许多数据库根本没有记录在档的,获取变更日志的方式。由于这个原因,捕获数据库中所有的变更,然后将其复制到其他存储技术(搜索索引,缓存,数据仓库)中是相当困难的。

最近,人们对**变更数据捕获change data capture, CDC**越来越感兴趣,这是一种观察写入数据库的所有数据变更,并将其提取并转换为可以复制到其他系统中的形式的过程。 CDC是非常有意思的尤其是当变更能在被写入后立刻用于流时。

例如,你可以捕获数据库中的变更,并不断将相同的变更应用至搜索索引。如果变更日志以相同的顺序应用,则可以预期搜索索引中的数据与数据库中的数据是匹配的。搜索索引和任何其他衍生数据系统只是变更流的消费者,如图11-5所示。

图11-5 将数据按顺序写入一个数据库,然后按照相同的顺序将这些更改应用到其他系统

变更数据捕获的实现

我们可以将日志消费者叫做衍生数据系统,正如在第三部分的介绍中所讨论的:存储在搜索索引和数据仓库中的数据,只是记录系统数据的额外视图。变更数据捕获是一种机制,可确保对记录系统所做的所有更改都反映在派生数据系统中,以便派生系统具有数据的准确副本。

从本质上说,变更数据捕获使得一个数据库成为领导者(被捕获变化的数据库),并将其他组件变为追随者。基于日志的消息代理非常适合从源数据库传输变更事件,因为它保留了消息的顺序(避免了图11-2的重新排序问题)。

数据库触发器可用来实现变更数据捕获(参阅“基于触发器的复制”),通过注册观察所有变更的触发器,并将相应的变更项写入变更日志表中。但是它们往往是脆弱的,而且有显著的性能开销。解析复制日志可能是一种更稳健的方法,但它也很有挑战,例如应对模式变更。

LinkedIn的Databus 【25】Facebook的Wormhole 【26】和Yahoo!的Sherpa【27】大规模地应用这个思路。 Bottled Water使用解码WAL的API实现了PostgreSQL的CDC 【28】Maxwell和Debezium通过解析binlog对MySQL做了类似的事情【29,30,31】Mongoriver读取MongoDB oplog 【32,33】 而GoldenGate为Oracle提供类似的功能【34,35】。

像消息代理一样,变更数据捕获通常是异步的:记录数据库系统不会等待消费者应用变更再进行提交。这种设计具有的运维优势是,添加缓慢的消费者不会过度影响记录系统。不过,所有复制延迟可能有的问题在这里都可能出现(参见“复制延迟问题”)。

初始快照

如果你拥有所有对数据库进行变更的日志,则可以通过重放该日志,来重建数据库的完整状态。但是在许多情况下,永远保留所有更改会耗费太多磁盘空间,且重放过于费时,因此日志需要被截断。

例如,构建新的全文索引需要整个数据库的完整副本 —— 仅仅应用最近变更的日志是不够的,因为这样会丢失最近未曾更新的项目。因此,如果你没有完整的历史日志,则需要从一个一致的快照开始,如先前上的“设置新的从库”中所述。

数据库的快照必须与变更日志中的已知位置或偏移量相对应以便在处理完快照后知道从哪里开始应用变更。一些CDC工具集成了这种快照功能而其他工具则把它留给你手动执行。

日志压缩

如果你只能保留有限的历史日志,则每次要添加新的衍生数据系统时,都需要做一次快照。但**日志压缩log compaction**提供了一个很好的备选方案。

我们之前在日志结构存储引擎的上下文中讨论了“Hash索引”中的日志压缩(参见图3-2的示例)。原理很简单:存储引擎定期在日志中查找具有相同键的记录,丢掉所有重复的内容,并只保留每个键的最新更新。这个压缩与合并过程在后台运行。

在日志结构存储引擎中具有特殊值NULL墓碑tombstone)的更新表示该键被删除,并会在日志压缩过程中被移除。但只要键不被覆盖或删除,它就会永远留在日志中。这种压缩日志所需的磁盘空间仅取决于数据库的当前内容,而不取决于数据库中曾经发生的写入次数。如果相同的键经常被覆盖写入,则先前的值将最终将被垃圾回收,只有最新的值会保留下来。

在基于日志的消息代理与变更数据捕获的上下文中也适用相同的想法。如果CDC系统被配置为每个变更都包含一个主键且每个键的更新都替换了该键以前的值那么只需要保留对键的最新写入就足够了。

现在无论何时需要重建衍生数据系统如搜索索引你可以从压缩日志主题0偏移量处启动新的消费者然后依次扫描日志中的所有消息。日志能保证包含数据库中每个键的最新值也可能是一些较旧的值—— 换句话说你可以使用它来获取数据库内容的完整副本而无需从CDC源数据库取一个快照。

Apache Kafka支持这种日志压缩功能。正如我们将在本章后面看到的它允许消息代理被当成持久性存储使用而不仅仅是用于临时消息。

变更流的API支持

越来越多的数据库开始将变更流作为第一类的接口而不像传统上要去做加装改造费工夫逆向工程一个CDC。例如RethinkDB允许查询订阅通知当查询结果变更时获得通知【36】Firebase 【37】和CouchDB 【38】基于变更流进行同步该变更流同样可用于应用。而Meteor使用MongoDB oplog订阅数据变更并改变了用户接口【39】。

VoltDB允许事务以流的形式连续地从数据库中导出数据【40】。数据库将关系数据模型中的输出流表示为一个表事务可以向其中插入元组但不能查询。已提交事务按照提交顺序写入这个特殊表而流则由该表中的元组日志构成。外部消费者可以异步消费该日志并使用它来更新衍生数据系统。

Kafka Connect 【41】致力于将广泛的数据库系统的变更数据捕获工具与Kafka集成。一旦变更事件进入Kafka中它就可以用于更新衍生数据系统比如搜索索引也可以用于本章稍后讨论的流处理系统。

事件溯源

我们在这里讨论的想法和**事件溯源( Event Sourcing之间有一些相似之处,这是一个在领域驱动设计domain-driven design, DDD**社区中折腾出来的技术。我们将简要讨论事件溯源,因为它包含了一些关于流处理系统的有用想法。

与变更数据捕获类似,事件溯源涉及到将所有对应用状态的变更存储为变更事件日志。最大的区别是事件溯源将这一想法应用到了几个不同的抽象层次上:

  • 在变更数据捕获中,应用以**可变方式mutable way**使用数据库,任意更新和删除记录。变更日志是从数据库的底层提取的(例如,通过解析复制日志),从而确保从数据库中提取的写入顺序与实际写入的顺序相匹配,从而避免图11-4中的竞态条件。写入数据库的应用不需要知道CDC的存在。
  • 在事件溯源中,应用逻辑显式构建在写入事件日志的不可变事件之上。在这种情况下,事件存储是仅追加写入的,更新与删除是不鼓励的或禁止的。事件被设计为旨在反映应用层面发生的事情,而不是底层的状态变更。

事件源是一种强大的数据建模技术从应用的角度来看将用户的行为记录为不可变的事件更有意义而不是在可变数据库中记录这些行为的影响。事件代理使得应用随时间演化更为容易通过事实更容易理解事情发生的原因使得调试更为容易并有利于防止应用Bug请参阅“不可变事件的优点”)。

例如,存储“学生取消选课”事件以中性的方式清楚地表达了单个行为的意图,而副作用“从注册表中删除了一个条目,而一条取消原因被添加到学生反馈表“则嵌入了很多有关稍后数据使用方式的假设。如果引入一个新的应用功能,例如“将位置留给等待列表中的下一个人” —— 事件溯源方法允许将新的副作用轻松地链接至现有事件之后。

事件溯源类似于**编年史chronicle**数据模型【45】事件日志与星型模式中的事实表之间也存在相似之处参阅“星型和雪花型:分析的模式”) 。

诸如Event Store【46】这样的专业数据库已经被开发出来供使用事件溯源的应用使用但总的来说这种方法独立于任何特定的工具。传统的数据库或基于日志的消息代理也可以用来构建这种风格的应用。

从事件日志中派生出当前状态

事件日志本身并不是很有用,因为用户通常期望看到的是系统的当前状态,而不是变更历史。例如,在购物网站上,用户期望能看到他们购物车里的当前内容,而不是他们购物车所有变更的一个仅追加列表。

因此,使用事件溯源的应用需要拉取事件日志(表示写入系统的数据),并将其转换为适合向用户显示的应用状态(从系统读取数据的方式【47】。这种转换可以使用任意逻辑但它应当是确定性的以便能再次运行并从事件日志中衍生出相同的应用状态。

与变更数据捕获一样,重放事件日志允许让你重新构建系统的当前状态。不过,日志压缩需要采用不同的方式处理:

  • 用于记录更新的CDC事件通常包含记录的完整新版本,因此主键的当前值完全由该主键的最近事件确定,而日志压缩可以丢弃相同主键的先前事件。
  • 另一方面,事件溯源在更高层次进行建模:事件通常表示用户操作的意图,而不是因为操作而发生的状态更新机制。在这种情况下,后面的事件通常不会覆盖先前的事件,所以你需要完整的历史事件来重新构建最终状态。这里进行同样的日志压缩是不可能的。

使用事件溯源的应用通常有一些机制,用于存储从事件日志中导出的当前状态快照,因此它们不需要重复处理完整的日志。然而这只是一种性能优化,用来加速读取,提高从崩溃中恢复的速度;真正的目的是系统能够永久存储所有原始事件,并在需要时重新处理完整的事件日志。我们将在“不变性的限制”中讨论这个假设。

命令和事件

事件溯源的哲学是仔细区分事件event命令command【48】。当来自用户的请求刚到达时它一开始是一个命令在这个时间点上它仍然可能可能失败比如因为违反了一些完整性条件。应用必须首先验证它是否可以执行该命令。如果验证成功并且命令被接受则它变为一个持久化且不可变的事件。

例如,如果用户试图注册特定用户名,或预定飞机或剧院的座位,则应用需要检查用户名或座位是否已被占用。 (先前在“容错概念”中讨论过这个例子当检查成功时应用可以生成一个事件指示特定的用户名是由特定的用户ID注册的座位已经预留给特定的顾客。

在事件生成的时刻,它就成为了事实fact。即使客户稍后决定更改或取消预订,他们之前曾预定了某个特定座位的事实仍然成立,而更改或取消是之后添加的单独的事件。

事件流的消费者不允许拒绝事件:当消费者看到事件时,它已经成为日志中不可变的一部分,并且可能已经被其他消费者看到了。因此任何对命令的验证,都需要在它成为事件之前同步完成。例如,通过使用一个可自动验证命令的可序列化事务来发布事件。

或者,预订座位的用户请求可以拆分为两个事件:第一个是暂时预约,第二个是验证预约后的独立的确认事件(如“使用全序广播实现线性一致存储”中所述) 。这种分割方式允许验证发生在一个异步的过程中。

状态,流和不变性

我们在第10章中看到,批处理因其输入文件不变性而受益良多,你可以在现有输入文件上运行实验性处理作业,而不用担心损坏它们。这种不变性原则也是使得事件溯源与变更数据捕获如此强大的原因。

我们通常将数据库视为应用程序当前状态的存储 —— 这种表示针对读取进行了优化,而且通常对于服务查询而言是最为方便的表示。状态的本质是,它会变化,所以数据库才会支持数据的增删改。这又是如何符合不变性的呢?

只要你的状态发生了变化那么这个状态就是这段时间中事件修改的结果。例如当前可用的座位列表是已处理预订产生的结果当前帐户余额是帐户中的借与贷的结果而Web服务器的响应时间图是所有已发生Web请求的独立响应时间的聚合结果。

无论状态如何变化,总是有一系列事件导致了这些变化。即使事情已经执行与回滚,这些事件出现是始终成立的。关键的想法是:可变的状态与不可变事件的仅追加日志相互之间并不矛盾:它们是一体两面,互为阴阳的。所有变化的日志—— 变化日志change log,表示了随时间演变的状态。

如果你倾向于数学表示,那么你可能会说,应用状态是事件流对时间求积分得到的结果,而变更流是状态对时间求微分的结果,如图11-6所示【49,50,51】。这个比喻有一些局限性例如状态的二阶导似乎没有意义但这是考虑数据的一个实用出发点。


state(now) = \int_{t=0}^{now}{stream(t) \ dt} \\
stream(t) = \frac{d\ state(t)}{dt}

图11-6 应用当前状态与事件流之间的关系

如果你持久存储了变更日志那么重现状态就非常简单。如果你认为事件日志是你的记录系统而所有的衍生状态都从它派生而来那么系统中的数据流动就容易理解的多。正如帕特·赫兰Pat Helland所说的【52】

事务日志记录了数据库的所有变更。高速追加下入是更改日志的唯一方法。从这个角度来看,数据库的内容其实是日志中记录最新值的缓存。日志才是真相,数据库是日志子集的缓存,这一缓存子集恰好来自日志中每条记录与索引值的最新值。

日志压缩(如“日志压缩”中所述)是连接日志与数据库状态之间的桥梁:它只保留每条记录的最新版本,并丢弃被覆盖的版本。

不可变事件的优点

数据库中的不变性是一个古老的概念。例如会计在几个世纪以来一直在财务记账中应用不变性。一笔交易发生时它被记录在一个仅追加写入的分类帐中实质上是描述货币商品或服务转手的事件日志。账目比如利润、亏损、资产负债表是从分类账中的交易求和衍生而来【53】。

如果发生错误,会计师不会删除或更改分类帐中的错误交易 —— 而是添加另一笔交易以补偿错误例如退还一比不正确的费用。不正确的交易将永远保留在分类帐中对于审计而言可能非常重要。如果从不正确的分类账衍生出的错误数字已经公布那么下一个会计周期的数字就会包括一个更正。这个过程在会计事务中是很常见的【54】。

尽管这种可审计性在金融系统中尤其重要,但对于不受这种严格监管的许多其他系统,也是很有帮助的。如“批处理输出的哲学”中所讨论的,如果你意外地部署了将错误数据写入数据库的错误代码,当代码会破坏性地覆写数据时,恢复要困难得多。使用不可变事件的仅追加日志,诊断问题与故障恢复就要容易的多。

不可变的事件也包含了比当前状态更多的信息。例如在购物网站上顾客可以将物品添加到他们的购物车然后再将其移除。虽然从履行订单的角度第二个事件取消了第一个事件但对分析目的而言知道客户考虑过某个特定项而之后又反悔可能是很有用的。也许他们会选择在未来购买或者他们已经找到了替代品。这个信息被记录在事件日志中但对于移出购物车就删除记录的数据库而言这个信息在移出购物车时可能就丢失【42】。

从同一事件日志中派生多个视图

此外,通过从不变的事件日志中分离出可变的状态,你可以针对不同的读取方式,从相同的事件日志中衍生出几种不同的表现形式。效果就像一个流的多个消费者一样(图11-5例如分析型数据库Druid使用这种方式直接从Kafka摄取数据【55】Pistachio是一个分布式的键值存储使用Kafka作为提交日志【56】Kafka Connect能将来自Kafka的数据导出到各种不同的数据库与索引【41】。这对于许多其他存储和索引系统如搜索服务器来说是很有意义的当系统要从分布式日志中获取输入时亦然参阅“保持系统同步”)。

添加从事件日志到数据库的显式转换能够使应用更容易地随时间演进如果你想要引入一个新功能以新的方式表示现有数据则可以使用事件日志来构建一个单独的针对新功能的读取优化视图无需修改现有系统而与之共存。并行运行新旧系统通常比在现有系统中执行复杂的模式迁移更容易。一旦不再需要旧的系统你可以简单地关闭它并回收其资源【47,57】。

如果你不需要担心如何查询与访问数据,那么存储数据通常是非常简单的。模式设计,索引和存储引擎的许多复杂性,都是希望支持某些特定查询和访问模式的结果(参见第3章)。出于这个原因,通过将数据写入的形式与读取形式相分离,并允许几个不同的读取视图,你能获得很大的灵活性。这个想法有时被称为命令查询责任分离command query responsibility segregation, CQRS【42,58,59】。

数据库和模式设计的传统方法是基于这样一种谬论,数据必须以与查询相同的形式写入。如果可以将数据从针对写入优化的事件日志转换为针对读取优化的应用状态,那么有关规范化和非规范化的争论就变得无关紧要了(参阅“多对一和多对多的关系”):在针对读取优化的视图中对数据进行非规范化是完全合理的,因为翻译过程提供了使其与事件日志保持一致的机制。

在“描述负载”中,我们讨论了推特主页时间线,它是特定用户关注人群所发推特的缓存(类似邮箱)。这是针对读取优化的状态的又一个例子:主页时间线是高度非规范化的,因为你的推文与所有粉丝的时间线都构成了重复。然而,扇出服务保持了这种重复状态与新推特以及新关注关系的同步,从而保证了重复的可管理性。

并发控制

事件溯源和变更数据捕获的最大缺点是,事件日志的消费者通常是异步的,所以可能会出现这样的情况:用户会写入日志,然后从日志衍生视图中读取,结果发现他的写入还没有反映在读取视图中。我们之前在在“读己之写”中讨论了这个问题以及可能的解决方案。

一种解决方案是将事件附加到日志时同步执行读取视图的更新。而将这些写入操作合并为一个原子单元需要事务,所以要么将事件日志和读取视图保存在同一个存储系统中,要么就需要跨不同系统进行分布式事务。或者,你也可以使用在“使用全序广播实现线性化存储”中讨论的方法。

另一方面,从事件日志导出当前状态也简化了并发控制的某些部分。许多对于多对象事务的需求(参阅“单对象和多对象操作”)源于单个用户操作需要在多个不同的位置更改数据。通过事件溯源,你可以设计一个自包含的事件以表示一个用户操作。然后用户操作就只需要在一个地方进行单次写入操作 —— 即将事件附加到日志中 —— 这个还是很容易使原子化的。

如果事件日志与应用状态以相同的方式分区例如处理分区3中的客户事件只需要更新分区3中的应用状态那么直接使用单线程日志消费者就不需要写入并发控制了。它从设计上一次只处理一个事件参阅“真的的串行执行。日志通过在分区中定义事件的序列顺序消除了并发性的不确定性【24】。如果一个事件触及多个状态分区那么需要做更多的工作我们将在第12章讨论。

不变性的限制

许多不使用事件溯源模型的系统也还是依赖不可变性:各种数据库在内部使用不可变的数据结构或多版本数据来支持时间点快照(参见“索引和快照隔离” )。 GitMercurial和Fossil等版本控制系统也依靠不可变的数据来保存文件的版本历史记录。

永远保持所有变更的不变历史,在多大程度上是可行的?答案取决于数据集的流失率。一些工作负载主要是添加数据,很少更新或删除;它们很容易保持不变。其他工作负载在相对较小的数据集上有较高的更新/删除率在这些情况下不可变的历史可能增至难以接受的巨大碎片化可能成为一个问题压缩与垃圾收集的表现对于运维的稳健性变得至关重要【60,61】。

除了性能方面的原因外,也可能有出于管理方面的原因需要删除数据的情况,尽管这些数据都是不可变的。例如,隐私条例可能要求在用户关闭帐户后删除他们的个人信息,数据保护立法可能要求删除错误的信息,或者可能需要阻止敏感信息的意外泄露。

在这种情况下,仅仅在日志中添加另一个事件来指明先前的数据应该被视为删除是不够的 —— 你实际上是想改写历史并假装数据从一开始就没有写入。例如Datomic管这个特性叫切除excision 【62】而Fossil版本控制系统有一个类似的概念叫避免shunning 【63】。

真正删除数据是非常非常困难的【64】因为副本可能存在于很多地方例如存储引擎文件系统和SSD通常会向一个新位置写入而不是原地覆盖旧数据【52】而备份通常是特意做成不可变的防止意外删除或损坏。删除更多的是“使取回数据更困难”而不是“使取回数据不可能”。无论如何有时你必须得尝试正如我们在“立法与自律”中所看到的。

流处理

到目前为止,本章中我们已经讨论了流的来源(用户活动事件,传感器和写入数据库),我们讨论了流如何传输(通过直接消息传送,通过消息代理和事件日志)。

剩下的就是讨论一下你可以用流做什么 —— 也就是说,你可以处理它。一般来说,有三种选择:

  1. 你可以将事件中的数据写入数据库,缓存,搜索索引或类似的存储系统,然后由其他客户端查询。如图11-5所示,这是保持数据库与系统其他部分发生更改同步的好方法 —— 特别是当流消费者是写入数据库的唯一客户端时。写入存储系统的流程相当于我们在“批处理工作流程的输出”页面上讨论的内容。
  2. 你可以以某种方式将事件推送给用户,例如通过发送电子邮件警报或推送通知,或通过将事件流式传输到可实时显示的实时仪表板。在这种情况下,人是流的最终消费者。
  3. 你可以处理一个或多个输入流以产生一个或多个输出流。数据流可能会经过由几个这样的处理阶段组成的流水线然后才会输出选项1或2

在本章的其余部分中我们将讨论选项3处理流以产生其他派生流。处理这样的流的代码片段被称为操作员或作业。它与我们在第10章中讨论过的Unix进程和MapReduce作业密切相关数据流的模式是相似的一个流处理器以只读的方式使用输入流并将其输出写入一个不同的位置时尚。

流处理器中的分区和并行化模式也非常类似于第10章中介绍的MapReduce和数据流引擎因此我们不在这里重复这些主题。基本的映射操作如转换和过滤记录也是一样的。

批量作业的一个关键区别是流不会结束。这种差别有很多含义:正如本章开始部分所讨论的,排序对无界数据集没有意义,因此不能使用排序合并联接(请参阅“减少连接和分组”)。容错机制也必须改变:对于已经运行了几分钟的批处理作业,可以简单地从头开始重新启动失败的任务,但是对于已经运行数年的流作业,在开始后重新开始崩溃可能不是一个可行的选择。

流处理的应用

长期以来,流处理一直用于监控目的,如果某个事情发生,组织就希望得到警报。例如:

  • 欺诈检测系统需要确定信用卡的使用模式是否意外地发生了变化,并且如果信用卡可能已被盗用,则将其封锁。
  • 交易系统需要检查金融市场的价格变化,并根据指定的规则进行交易。
  • 制造系统需要监控工厂中机器的状态,如果出现故障,可以快速识别问题。
  • 军事和情报系统需要跟踪潜在的攻击者的行动,并在发生袭击的迹象时发出警报。

这些类型的应用程序需要非常复杂的模式匹配和相关性。然而,流处理的其他用途也随着时间的推移而出现。在本节中,我们将简要比较一下这些应用程序。

复杂的事件处理

复杂事件处理CEP是20世纪90年代为分析事件流而开发的一种方法尤其适用于需要搜索某些事件模式的应用程序【65,66】。与正则表达式允许你在字符串中搜索特定字符模式的方式类似CEP允许你指定规则以在流中搜索某些事件模式。

CEP系统通常使用高级声明式查询语言如SQL或图形用户界面来描述应该检测到的事件模式。这些查询被提交给一个处理引擎该引擎使用输入流并在内部维护一个执行所需匹配的状态机。当发现匹配时引擎发出一个复杂的事件因此名字与事件模式的细节【67】。

在这些系统中,查询和数据之间的关系与普通数据库相比是颠倒的。通常情况下,数据库会持久存储数据,并将查询视为暂时的:当查询进入时,数据库搜索与查询匹配的数据,然后在查询完成时忘记查询。 CEP引擎反转了这些角色查询是长期存储的来自输入流的事件不断流过它们以搜索匹配事件模式的查询【68】。

CEP的实现包括Esper 【69】IBM InfoSphere Streams 【70】ApamaTIBCO StreamBase和SQLstream。像Samza这样的分布式流处理器也获得了对流声明式查询的SQL支持【71】。

流分析

使用流处理的另一个领域是对流进行分析。 CEP和流分析之间的边界是模糊的但作为一般规则分析往往不太关心找到特定的事件序列并且更倾向于聚合和统计度量大量的事件——例如

  • 测量某种类型事件的速率(每个时间间隔发生的频率)
  • 计算一段时间内某个值的滚动平均值
  • 将当前的统计数据与以前的时间间隔进行比较(例如,检测趋势或提醒与上周同期相比过高或过低的指标)

这些统计信息通常是在固定的时间间隔内进行计算的例如你可能想知道在过去5分钟内每秒对服务的平均查询次数以及在此期间的第99百分位响应时间。在几分钟内平均从一秒钟到下一秒钟平滑无关的波动同时还能及时了解交通模式的任何变化。你汇总的时间间隔称为窗口我们将在“关于时间的推理”中更详细地讨论窗口。

流分析系统有时使用概率算法例如Bloom filter我们在“性能优化”中遇到过设置成员资格HyperLogLog 【72】基数估计以及各种百分比估计算法请参阅“实践中的百分位点“第16页。概率算法产生近似的结果但是具有在流处理器中比精确算法需要少得多的存储器的优点。近似算法的使用有时会使人们相信流处理系统总是有损和不精确的但这是错误的流处理没有任何内在的近似而概率算法只是一个优化【73】。

许多开源分布式流处理框架的设计都是以分析为基础的例如Apache StormSpark StreamingFlinkConcordSamza和Kafka Streams 【74】。托管服务包括Google Cloud Dataflow和Azure Stream Analytics。

保持物化视图

我们在“数据库和数据流”中看到,可以使用数据库更改流来保持派生数据系统(如缓存,搜索索引和数据仓库)与源数据库保持最新。我们可以将这些示例视为维护实体化视图的具体情况(请参阅“聚合:数据立方体和物化视图导出某个数据集的替代视图以便可以高效地查询它并在底层数据更改【50】。

同样,在事件代理中,应用程序状态通过应用事件日志来维护;这里的应用状态也是一种物化视图。与流分析场景不同,在某个时间窗口内仅考虑事件通常是不够的:构建物化视图可能需要在任意时间段内的所有事件,除了可能由日志压缩丢弃的任何过时事件(请参阅“日志压缩“)。实际上,你需要一个可以一直延伸到一开始的窗口。

原则上,任何流处理器都可以用于物化视图维护,尽管永久维护事件的需要与一些主要在有限持续时间的窗口上运行的面向分析的框架的假设背道而驰。 Samza和Kafka Streams支持这种用法建立在Kafka对夯实的支持上【75】。

在流上搜索

除了允许搜索由多个事件组成的模式的CEP外还有时需要基于复杂的标准例如全文搜索查询来搜索单个事件。

例如,媒体监测服务可以订阅新闻文章和媒体广播,并搜索任何关于公司,产品或感兴趣的话题的新闻。这是通过预先制定一个搜索查询来完成的,然后不断地将新闻项目流与这个查询进行匹配。在一些网站上也有类似的功能:例如,房地产网站的用户在市场上出现符合其搜索条件的新房产时,可以要求通知。 Elasticsearch 【76】的渗滤器功能是实现这种流式搜索的一种选择。

传统的搜索引擎首先索引文件然后在索引上运行查询。相比之下搜索一个数据流将会处理它的头部查询被存储文档通过查询运行就像CEP一样。在最简单的情况下你可以针对每个查询来测试每个文档但是如果你有大量查询这可能会变慢。为了优化过程可以对查询和文档进行索引从而缩小可能匹配的查询集合【77】。

消息传递和RPC

在第136页的“消息传递数据流”中我们讨论了消息传递系统作为RPC的替代方案即作为通信服务的机制例如在参与者模型中所使用的。虽然这些系统也是基于消息和事件但我们通常不会将它们视为流处理器

Actor框架主要是管理通信模块的并发和分布式执行的机制而流处理主要是数据管理技术。

  • 参与者之间的交流往往是短暂的,而且是一对一的,而事件日志则是持久的,多用户的。
  • 参与者可以以任意方式进行通信(包括循环请求/响应模式),但流处理器通常设置在非循环流水线中,其中每个流是一个特定作业的输出,并且从一组明确定义的输入流派生。

也就是说RPC类系统和流处理之间有一些交叉区域。例如Apache Storm有一个称为分布式RPC的功能它允许将用户查询分散到一系列也处理事件流的节点上;这些查询然后与来自输入流的事件交织结果可以被汇总并发回给用户【78】。 (另参阅“多分区数据处理”)

也可以使用actor框架来处理流。但是很多这样的框架在崩溃的情况下不能保证消息的传递所以这个过程不是容错的除非你实现了额外的重试逻辑。

关于时间的推理

流处理器通常需要处理时间,特别是在用于分析目的的时候,频繁使用时间窗口,例如“过去五分钟的平均时间”。“最后五分钟”的含义似乎应该是未知的,大而清晰,但不幸的是这个概念是令人惊讶的棘手。

在批处理过程中,处理任务通过大量的历史事件迅速收缩。如果需要按时间分类,批处理需要查看每个事件中嵌入的时间戳。查看运行批处理的机器的系统时钟没有意义,因为处理运行的时间与事件实际发生的时间无关。

批处理可以在几分钟内读取一年的历史事件;在大多数情况下,感兴趣的时间表是历史的一年,而不是几分钟的处理。而且,在事件中使用时间戳允许处理确定性的:在相同的输入上再次运行相同的处理过程会得到相同的结果(参阅“故障容错”)。

另一方面许多流处理框架使用处理机器上的本地系统时钟处理时间来确定窗口【79】。这种方法具有简单的优点事件创建和事件处理之间的延迟可以忽略不计。然而如果存在任何显着的处理滞后即处理可能比事件实际发生的时间显着晚则会中断处理。

事件时间与处理时间

有许多原因可能会延迟处理:排队,网络故障(参阅“不可靠的网络”),导致消息代理或处理器中出现争用的性能问题,重新启动流消费者或重新处理过去的事件(参阅“重放旧消息或者在修复代码中的BUG之后进行恢复。

而且消息延迟还可能导致消息的不可预知的排序。例如假设用户首先发出一个Web请求由Web服务器A处理然后发出第二个请求由服务器B处理。 A和B发出描述他们处理的请求的事件但是B的事件在A的事件发生之前到达消息代理。现在流处理器将首先看到B事件然后看到A事件即使它们实际上是以相反的顺序发生的。

如果有一个类比的话可以考虑一下“星球大战”的电影第四集于1977年发行1980年第五集1983年第六集之后分别在1999年2002年和2005年发行第一三集以及2015年的第七集【80】2。如果你按照他们出来的顺序观看电影,你处理电影的顺序与他们叙述的顺序是不一致的。 (情节编号就像事件时间戳一样,观看电影的日期就是处理时间。)作为人类,我们能够应对这种不连续性,但是流处理算法需要专门编写以适应这种情况时间安排和订购问题。

令人困惑的事件时间和处理时间导致错误的数据。例如,假设你有一个流处理器来测量请求率(计算每秒请求数)。如果你重新部署流处理器,则可能会关闭一分钟,并在事件恢复时处理积压的事件。如果你根据处理时间来衡量速率,那么看起来好像在处理积压时突然出现异常的请求高峰,而事实上请求的实际速率是稳定的(图11-7)。

图11-7 由于处理时间的窗口化,由于处理速率的变化而引入人为因素

知道什么时候你准备好了

从事件时间的角度来定义窗口时,一个棘手的问题是,当你收到特定窗口的所有事件,或者是否还有事件发生时,你永远无法确定。

例如假设你将事件分组为一分钟的窗口以便你可以统计每分钟的请求数。你已经计算了一些事件这些事件的时间戳是在第37分钟的时间落下的时间已经推移了。现在大部分的事件都在一小时的第38和第39分钟之内。你什么时候宣布你已经完成了第37分钟的窗口并输出其计数器值

在一段时间没有看到任何新的事件之后你可以超时并宣布一个窗口但仍然可能发生某些事件被缓存在另一台计算机上由于网络中断而延迟。你需要能够处理窗口已经声明完成后到达的这样的滞留事件。大体上你有两个选择【1】

  1. 忽略这些零散的事件,因为它们在正常情况下可能只是一小部分事件。你可以将丢弃事件的数量作为度量标准进行跟踪,并在你开始丢弃大量数据时发出警报。
  2. 发布一个更正,更新的窗口与包含散兵队员的价值。你可能还需要收回以前的输出。

在某些情况下可以使用特殊的消息来指示“从现在开始不会有比t更早的时间戳的消息”消费者可以使用它来触发窗口【81】。但是如果不同机器上的多个生产者正在生成事件每个事件都有自己的最小时间戳阈值则消费者需要分别跟踪每个生产者。在这种情况下添加和删除生产者是比较棘手的。

你用的是什么时间?

当事件可以在系统中的多个点缓冲时,为事件分配时间戳更加困难。例如,考虑将使用率度量的事件报告给服务器的移动应用程序。该应用程序可能会在设备处于脱机状态时使用,在这种情况下,它将在设备上本地缓冲事件,并在下一次可用的互联网连接(可能是几小时甚至几天)时将它们发送到服务器。对于这个流的任何消费者来说,这些事件将显示为极其滞后的落后者。

在这种情况下,根据移动设备的本地时钟,事件的时间戳实际上应该是发生用户交互的时间。但是,用户控制的设备上的时钟通常是不可信的,因为它可能会被意外或故意设置为错误的时间(请参见“时钟同步与准确性”)。服务器收到事件的时间(根据服务器的时钟)更可能是准确的,因为服务器在你的控制之下,但在描述用户交互方面意义不大。

要调整不正确的设备时钟一种方法是记录三个时间戳【82】

  • 事件发生的时间,根据设备时钟
  • 根据设备时钟将事件发送到服务器的时间
  • 服务器根据服务器时钟收到事件的时间

通过从第三个时间戳中减去第二个时间戳,可以估算设备时钟和服务器时钟之间的偏移量(假设网络延迟与所需的时间戳精度相比可忽略不计)。然后,你可以将该偏移量应用于事件时间戳,从而估计事件实际发生的真实时间(假设设备时钟偏移在事件发生的时间与发送到服务器的时间之间没有变化)。

这个问题对于流处理来说并不是唯一的,批处理遇到了与时间推理完全相同的问题。在一个流式环境中,我们更加注意到时间的流逝。

窗口的类型

一旦你知道如何确定一个事件的时间戳下一步就是决定如何定义一段时间的窗口。窗口然后可以用于聚合例如计数事件或计算窗口内的值的平均值。有几种窗口是常用的【79,83】

滚动窗口Tumbling Window

一个滚动窗口有一个固定的长度每个事件都属于一个窗口。例如如果你有1分钟的翻滚窗口则所有时间戳在10:03:0010:03:59之间的事件会被分组到一个窗口中,10:04:0010:04:59之间的事件下一个窗口等等。你可以通过获取每个事件时间戳并将其四舍五入到最接近的分钟来确定它所属的窗口从而实现1分钟的翻滚窗口。

跳动窗口Hopping Window

跳频窗口也具有固定的长度但允许窗口重叠以提供一些平滑。例如1分钟跳跃大小的5分钟窗口将包含10:03:0010:07:59之间的事件,则下一个窗口将覆盖10:04:0010:08之间的事件: 59等等。你可以通过首先计算1分钟滚动窗口然后聚合在几个相邻的窗口上来实现此跳频窗口。

滑动窗口

滑动窗口包含在彼此的某个间隔内发生的所有事件。例如一个5分钟的滑动窗口将覆盖10点03分39秒和10点08分12秒的事件因为它们相距不到5分钟注意翻滚和跳跃的5分钟窗口不会把这两个事件在同一个窗口中因为他们使用固定的边界。滑动窗口可以通过保持按时间排序的事件缓冲区并在从窗口到期时移除旧事件来实现。

会话窗口

与其他窗口类型不同会话窗口没有固定的持续时间。相反它是通过将同一用户的所有事件分组在一起并在时间上紧密地组合在一起来定义的并且当用户在一段时间内不活动时例如如果30分钟内没有事件窗口结束。会话化是网站分析的常见要求参阅“[GROUP BY](ch10.md#GROUP BY)”)。

流式连接

第10章中,我们讨论了批处理作业如何通过关键连接数据集,以及这种连接如何构成数据管道的重要组成部分。由于流处理将数据管道概括为对无界数据集进行增量处理,因此对流进行连接的需求也完全相同。

然而,新事件随时可能出现在一个流中,这使得加入流比批处理作业更具挑战性。为了更好地理解情况,我们来区分三种不同类型的连接:流-流连接流表连接和表连接【84】。在下面的章节中我们将通过例子来说明。 流 - 流连接(窗口连接)

假设你的网站上有搜索功能并且想要检测搜索到的网址的近期趋势。每次有人输入搜索查询时都会记录包含查询和返回结果的事件。每当有人点击其中一个搜索结果时就会记录另一个记录点击的事件。为了计算搜索结果中每个网址的点击率你需要将搜索操作和点击操作的事件组合在一起这些事件通过具有相同的会话ID进行连接。广告系统需要类似的分析【85】。

如果用户放弃他们的搜索,点击可能永远不会到来,即使它到了,搜索和点击之间的时间可能是高度可变的:在很多情况下,它可能是几秒钟,但可能长达几天或几周(如果用户运行搜索,忘记关于该浏览器选项卡,然后返回到选项卡,稍后再单击一个结果)。由于可变的网络延迟,点击事件甚至可能在搜索事件之前到达。你可以选择合适的加入窗口,例如,如果间隔至多一小时发生一次搜索,你可以选择加入搜索。

请注意在click事件中嵌入搜索的细节并不等同于加入事件这样做只会告诉你有关用户单击搜索结果的情况而不是用户未点击任何搜索结果的搜索结果。为了衡量搜索质量你需要准确的点击率为此你需要搜索事件和点击事件。

为了实现这种类型的连接流处理器需要维护状态例如在最后一小时发生的所有事件都由会话标识索引。无论何时发生搜索事件或点击事件都会将其添加到适当的索引并且流处理器还检查另一个索引以查看是否已经到达同一会话ID的另一个事件。如果有匹配的事件则发出说明哪个搜索结果被点击的事件。如果搜索事件过期而没有看到匹配的点击事件则会发出说明哪些搜索结果未被点击的事件。

流表连接stream enrichment

在“示例:用户活动事件分析”(图10-2我们看到了加入两个数据集的批量作业示例一组用户活动事件和一个用户配置文件数据库。将用户活动事件视为流并在流处理器中连续执行相同的连接是很自然的输入是包含用户ID的活动事件流并且输出是活动事件流其中用户ID已经用关于用户的简档信息来扩充。这个过程有时被称为使用来自数据库的信息来丰富活动事件。

要执行此联接,流程过程需要一次查看一个活动事件,在数据库中查找事件的用户标识,并将该概要信息添加到活动事件中。数据库查询可以通过查询远程数据库来实现;但是,正如在“示例:分析用户活动事件”一节中讨论的此类远程查询可能会很慢并且有可能导致数据库过载【75】。

另一种方法是将数据库副本加载到流处理器中,以便在本地进行查询而无需网络往返。这种技术与我们在“Map端连接”中讨论的散列连接非常相似:如果数据库的本地副本足够小,则本地副本可能是内存中的散列表,或者是本地磁盘上的索引。

与批处理作业的区别在于批处理作业使用数据库的时间点快照作为输入,而流处理器是长时间运行的,并且数据库的内容可能随时间而改变,所以流处理器数据库的本地副本需要保持最新。这个问题可以通过变更数据捕获来解决:流处理器可以订阅用户配置文件数据库的更新日志以及活动事件流。在创建或修改配置文件时,流处理器会更新其本地副本。因此,我们获得两个流之间的连接:活动事件和配置文件更新。

流表连接实际上非常类似于流 - 流连接;最大的区别在于对于表changelog流连接使用一个可以回溯到“开始时间”概念上是无限的窗口的窗口新版本的记录会覆盖较早的版本。对于流输入连接可能根本没有维护窗口。

表格表连接(物化视图维护)

考虑我们在“描述负载”中讨论的推特时间线例子。我们说过,当用户想要查看他们的主页时间线时,对用户所关注的所有人进行迭代是非常昂贵的,推文,并合并它们。

相反,我们需要一个时间线缓存:一种每个用户的“收件箱”,在发送推文的时候写入这些信息,以便读取时间线是一次查询。实现和维护此缓存需要以下事件处理:

  • 当用户发送新的推文时,它将被添加到每个跟随你的用户的时间线上。
  • 用户删除推文时,将从所有用户的时间表中删除。
  • 当用户u1开始跟随用户u2时u2最近的tweets被添加到u1的时间线上。
  • 当用户u1取消关注用户u2时u1的推文将从u1的时间线中移除。

要在流处理器中实现这种缓存维护需要用于推文发送和删除和跟随关系跟随和取消跟随的事件流。流过程需要维护一个包含每个用户关注者集合的数据库以便知道当一个新的tweet到达时需要更新哪些时间轴【86】。

查看这个流过程的另一种方式是维护一个连接两个表tweet和follow的查询的物化视图如下所示

SELECT follows.follower_id AS timeline_id, 
    array_agg(tweets.* ORDER BY tweets.timestamp DESC)
FROM tweets
JOIN follows ON follows.followee_id = tweets.sender_id 
GROUP BY follows.follower_id

流的连接直接对应于该查询中的表的连接。时间轴实际上是这个查询结果的缓存,每当基础表发生变化时都会更新3

连接的时间依赖性

这就产生了一个问题如果不同的事件发生在相似的时间周围他们按照何种顺序进行处理在流表连接示例中如果用户更新其配置文件哪些活动事件与旧配置文件在配置文件更新之前处理结合哪些与新配置文件结合在配置文件更新之后处理换句话说如果状态随着时间的推移而改变并且你加入了某个状态那么你使用什么时间点来加入【45】

这种时间依赖性可能发生在许多地方。例如,如果你销售东西,则需要对发票进行适当的税率,这取决于国家或州,产品类型和销售日期(因为税率会随时变化)。将销售额加入税率表时,如果你正在重新处理历史数据,你可能希望加入销售时的税率,这可能与当前的税率不同。

如果跨流的事件排序是未确定的那么这个连接变得不确定【87】这意味着你不能在相同的输入上重新运行相同的工作并且必然会得到相同的结果输入流上的事件可能交织在当你再次运行这个工作时采用不同的方式

在数据仓库中这个问题被称为缓慢变化的维度SCD通常通过对特定版本的联合记录使用唯一的标识符来解决例如每当税率改变时新的标识符并且发票包括销售时的税率标识符【88,89】。这种变化使连接成为确定性的但是由于表中所有记录的版本都需要保留导致日志压缩是不可能的。

容错

在本章的最后一节中,让我们考虑流处理器如何容忍错误。我们在第10章中看到批处理框架可以很容易地容忍错误如果MapReduce作业中的任务失败可以简单地在另一台机器上重新启动并且丢弃失败任务的输出。这种透明的重试是可能的因为输入文件是不可变的每个任务都将其输出写入到HDFS上的单独文件并且输出仅在任务成功完成时可见。

特别是,批处理容错方法可确保批处理作业的输出与没有出错的情况相同,即使事实上某些任务失败了。看起来好像每个输入记录都被处理了一次 —— 没有记录被跳过,而且没有处理两次。尽管重新启动任务意味着实际上可能会多次处理记录,但输出中的可见效果好像只处理过一次。这个原则被称为一次语义学,虽然有效 —— 一次将是一个更具描述性的术语【90】。

在流处理过程中也出现了同样的容错问题,但是处理起来不那么直观:等到某个任务完成之后才使其输出可见,因为流是无限的,因此你永远无法完成处理。

小批量和检查点

一个解决方案是将流分解成小块,并像小型批处理一样处理每个块。这种方法被称为小批量microbatching它被用于Spark Streaming 【91】。批处理大小通常约为1秒这是性能折中的结果较小的批次会导致更大的调度和协调开销而较大的批次意味着流处理器的结果变得可见之前的较长延迟。

微缩也隐含地提供了与批量大小相等的翻滚窗口(通过处理时间而不是事件时间戳)。任何需要更大窗口的作业都需要明确地将状态从一个微阵列转移到下一个微阵列。

Apache Flink中使用的一种变体方法是定期生成状态滚动检查点并将其写入持久存储器【92,93】。如果流操作符崩溃它可以从最近的检查点重新启动并放弃在最后一个检查点和崩溃之间生成的任何输出。检查点由消息流中的条形码触发类似于微型图形之间的边界但不强制特定的窗口大小。

在流处理框架的范围内,微观网格化和检查点方法提供了与批处理一样的一次语义。但是,只要输出离开流处理器(例如,通过写入数据库,向外部消息代理发送消息或发送电子邮件),框架将不再能够放弃失败批处理的输出。在这种情况下,重新启动失败的任务会导致外部副作用发生两次,单独使用微配量或检查点不足以防止此问题。

原子提交重访

为了在出现故障时给出精确的一次处理,我们需要确保处理事件的所有输出和副作用只有当处理成功时才会生效。这些影响包括发送给下游运营商或外部消息传递系统(包括电子邮件或推送通知)的任何消息,任何数据库写入,对运营商状态的任何更改以及对输入消息的任何确认(包括将消费者偏移量向前移动基于日志的消息代理)。

这些事情要么都是原子地发生要么都不发生但是不应该彼此不同步。如果这种方法听起来很熟悉那是因为我们在分布式事务和两阶段提交的情况下在第360页的“准确一次的消息处理”中讨论了它。

第9章我们讨论了分布式交易如XA的传统实现中的问题。然而在更受限制的环境中可以有效地实现这样的原子提交设施。 Google云数据流【81,92】和VoltDB 【94】中使用了这种方法并计划在Apache Kafka 【95,96】中添加类似的功能。与XA不同这些实现不会尝试跨异构技术提供事务而是通过在流处理框架中管理状态更改和消息传递来保持内部事务。事务协议的开销可以通过在单个事务中处理几个输入消息来分摊。

幂等性

我们的目标是放弃任何失败的任务的部分输出以便他们可以安全地重试而不会两次生效。分布式事务是实现这一目标的一种方式但另一种方式是依赖幂等性【97】。

幂等操作是可以多次执行的操作,并且与只执行一次操作具有相同的效果。例如,将键值存储中的某个键设置为某个固定值是幂等的(再次写入该值会覆盖具有相同值的值),而递增计数器不是幂等的(再次执行递增意味着该值递增两次)。

即使一个操作不是天生的幂等它往往可以与一些额外的元数据幂等。例如在使用来自Kafka的消息时每条消息都有一个持续的单调递增的偏移量。将值写入外部数据库时可以将触发上次写入的消息的偏移量与值包含在一起。因此你可以判断是否已应用更新并避免再次执行相同的更新。

风暴三叉戟的状态处理基于类似的想法【78】。依赖幂等性意味着一些假设重启一个失败的任务必须以相同的顺序重播相同的消息一个基于日志的消息代理这样做处理必须是确定性的其他节点不能同时更新相同的值【98,99】。

当从一个处理节点故障转移到另一个处理节点时,可能需要进行防护(参阅“领导和锁”),以防止被认为是死的节点的干扰

失败后重建状态

任何需要状态的流进程(例如,任何窗口聚合(例如计数器,平均值和直方图)以及用于连接的任何表和索引)都必须确保在失败之后可以恢复此状态。

一种选择是将状态保持在远程数据存储中并复制它,尽管如每个单独消息的远程数据库查询速度可能会很慢,正如在“流表连接”中所述。另一种方法是保持流处理器的本地状态,并定期复制。然后,当流处理器从故障中恢复时,新任务可以读取复制状态并恢复处理而不丢失数据。

例如Flink定期捕获操作员状态的快照并将它们写入HDFS等持久存储器中【92,93】。 Samza和Kafka Streams通过将状态更改发送到具有日志压缩功能的专用Kafka主题来复制状态更改这类似于变更数据捕获【84,100】。 VoltDB通过冗余处理多个节点上的每个输入消息来复制状态参阅“真的串行执行”)。

在某些情况下,甚至可能不需要复制状态,因为它可以从输入流重建。例如,如果状态由一个相当短的窗口中的聚合组成,则它可能足够快,以便重放与该窗口相对应的输入事件。如果状态是通过变更数据捕获维护的数据库的本地副本,那么也可以从日志压缩的更改流重建数据库(请参阅“日志压缩”一节)。

但是,所有这些权衡取决于底层基础架构的性能特征:在某些系统中,网络延迟可能低于磁盘访问延迟,网络带宽可能与磁盘带宽相当。在所有情况下都没有普遍理想的权衡,随着存储和网络技术的发展,本地和远程状态的优点也可能会发生变化。

本章小结

在本章中,我们讨论了事件流,它们所服务的目的以及如何处理它们。在某些方面,流处理非常类似于我们在第10章讨论的批处理,而是在无限的(永无止境的)流而不是固定大小的输入上持续进行。从这个角度来看,消息代理和事件日志可以作为文件系统的流媒体。

我们花了一些时间比较两种消息代理:

AMQP/JMS风格的消息代理

代理将个人消息分配给消费者消费者在成功处理个人消息时确认消息。消息被确认后从代理中删除。这种方法适合作为RPC的异步形式另请参阅“消息传递数据流”),例如在任务队列中,消息处理的确切顺序并不重要,没有在处理之后,需要重新读取旧消息。

基于日志的消息代理

代理将分区中的所有消息分配给相同的使用者节点,并始终以相同的顺序传递消息。并行性是通过划分来实现的,消费者通过检查他们所处理的最后一个消息的偏移来跟踪他们的进度。代理将消息保留在磁盘上,因此如有必要,可以跳回并重新读取旧消息。

基于日志的方法与数据库中的复制日志(参见第5章)和日志结构存储引擎(请参阅第3章)具有相似之处。我们看到,这种方法特别适用于消耗输入流并生成派生状态或派生输出流的流处理系统。

就流的来源而言,我们讨论了几种可能性:用户活动事件,提供定期读数的传感器和数据馈送(例如金融市场数据)自然地表示为流。我们看到,将数据写入数据流也是有用的:我们可以捕获更改日志 —— 即对数据库所做的所有更改的历史记录 —— 隐式地通过变更数据捕获或通过事件明确地捕获代理。日志压缩允许流保留数据库 内容的完整副本。

将数据库表示为流为系统集成提供了强大的机会。你可以通过使用更改日志并将其应用于派生系统,使派生的数据系统(如搜索索引,缓存和分析系统)保持最新。你甚至可以从头开始,从开始一直到现在消耗更改的日志,从而为现有数据构建新的视图。

将状态保持为流并重放消息的设施也是在各种流处理框架中实现流连接和容错的技术的基础。我们讨论了流处理的几个目的,包括搜索事件模式(复杂事件处理),计算加窗聚合(流分析)以及保持派生数据系统处于最新状态(物化视图)。

然后我们讨论了在流处理器中推理时间的困难,包括处理时间和事件时间戳之间的区别,以及在你认为窗口完成之后处理到达的离散事件的问题。

我们区分了可能出现在流程中的三种类型的连接:

流式流连接

两个输入流由活动事件组成并且连接操作符搜索在某个时间窗口内发生的相关事件。例如它可以匹配相同用户在30分钟内采取的两个动作。如果你想要在一个流中查找相关事件则两个连接输入实际上可以是相同的流自连接

流表连接

一个输入流由活动事件组成,另一个输入流是数据库更改日志。更新日志保持数据库的本地副本最新。对于每个活动事件,连接运算符将查询数据库并输出一个丰富的活动事件。

表格连接

两个输入流都是数据库更新日志。在这种情况下,一方的每一个变化都与另一方的最新状态相结合。结果是对两个表之间的连接的物化视图进行了一系列更改。

最后,我们讨论了在流处理器中实现容错和一次语义的技术。与批处理一样,我们需要放弃任何失败任务的部分输出。然而,由于流程长时间运行并持续产生输出,所以不能简单地丢弃所有的输出。相反,可以使用更细粒度的恢复机制,基于微博,检查点,事务或幂等写入。

参考文献

  1. Tyler Akidau, Robert Bradshaw, Craig Chambers, et al.: “The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing,” Proceedings of the VLDB Endowment, volume 8, number 12, pages 17921803, August 2015. doi:10.14778/2824032.2824076

  2. Harold Abelson, Gerald Jay Sussman, and Julie Sussman: Structure and Interpretation of Computer Programs, 2nd edition. MIT Press, 1996. ISBN: 978-0-262-51087-5, available online at mitpress.mit.edu

  3. Patrick Th. Eugster, Pascal A. Felber, Rachid Guerraoui, and Anne-Marie Kermarrec: “The Many Faces of Publish/Subscribe,” ACM Computing Surveys, volume 35, number 2, pages 114131, June 2003. doi:10.1145/857076.857078

  4. Joseph M. Hellerstein and Michael Stonebraker: Readings in Database Systems, 4th edition. MIT Press, 2005. ISBN: 978-0-262-69314-1, available online at redbook.cs.berkeley.edu

  5. Don Carney, Uğur Çetintemel, Mitch Cherniack, et al.: “Monitoring Streams A New Class of Data Management Applications,” at 28th International Conference on Very Large Data Bases (VLDB), August 2002.

  6. Matthew Sackman: “Pushing Back,” lshift.net, May 5, 2016. Vicent Martí: “Brubeck, a statsd-Compatible Metrics Aggregator,” githubengineering.com, June 15, 2015. Seth Lowenberger: “MoldUDP64 Protocol Specification V 1.00,” nasdaqtrader.com, July 2009.

  7. Pieter Hintjens: ZeroMQ The Guide. O'Reilly Media, 2013. ISBN: 978-1-449-33404-8

  8. Ian Malpass: “Measure Anything, Measure Everything,” codeascraft.com, February 15, 2011.

  9. Dieter Plaetinck: “25 Graphite, Grafana and statsd Gotchas,” blog.raintank.io, March 3, 2016.

  10. Jeff Lindsay: “Web Hooks to Revolutionize the Web,” progrium.com, May 3, 2007.

  11. Jim N. Gray: “Queues Are Databases,” Microsoft Research Technical Report MSR-TR-95-56, December 1995.

  12. Mark Hapner, Rich Burridge, Rahul Sharma, et al.: “JSR-343 Java Message Service (JMS) 2.0 Specification,” jms-spec.java.net, March 2013.

  13. Sanjay Aiyagari, Matthew Arrott, Mark Atwell, et al.: “AMQP: Advanced Message Queuing Protocol Specification,” Version 0-9-1, November 2008.

  14. Google Cloud Pub/Sub: A Google-Scale Messaging Service,” cloud.google.com, 2016.

  15. Apache Kafka 0.9 Documentation,” kafka.apache.org, November 2015.

  16. Jay Kreps, Neha Narkhede, and Jun Rao: “Kafka: A Distributed Messaging System for Log Processing,” at 6th International Workshop on Networking Meets Databases (NetDB), June 2011.

  17. Amazon Kinesis Streams Developer Guide,” docs.aws.amazon.com, April 2016.

  18. Leigh Stewart and Sijie Guo: “Building DistributedLog: Twitters High-Performance Replicated Log Service,” blog.twitter.com, September 16, 2015.

  19. DistributedLog Documentation,” Twitter, Inc., distributedlog.io, May 2016. Jay Kreps:

    Benchmarking Apache Kafka: 2 Million Writes Per Second (On Three Cheap Machines),” engineering.linkedin.com, April 27, 2014.

  20. Kartik Paramasivam: “How Were Improving and Advancing Kafka at LinkedIn,” engineering.linkedin.com, September 2, 2015.

  21. Jay Kreps: “The Log: What Every Software Engineer Should Know About Real-Time Data's Unifying Abstraction,” engineering.linkedin.com, December 16, 2013.

  22. Shirshanka Das, Chavdar Botev, Kapil Surlaker, et al.: “All Aboard the Databus!,” at 3rd ACM Symposium on Cloud Computing (SoCC), October 2012.

  23. Yogeshwer Sharma, Philippe Ajoux, Petchean Ang, et al.: “Wormhole: Reliable Pub-Sub to Support Geo-Replicated Internet Services,” at 12th USENIX Symposium on Networked Systems Design and Implementation (NSDI), May 2015.

  24. P. P. S. Narayan: “Sherpa Update,” developer.yahoo.com, June 8, .

  25. Martin Kleppmann: “Bottled Water: Real-Time Integration of PostgreSQL and Kafka,” martin.kleppmann.com, April 23, 2015.

  26. Ben Osheroff: “Introducing Maxwell, a mysql-to-kafka Binlog Processor,” developer.zendesk.com, August 20, 2015.

  27. Randall Hauch: “Debezium 0.2.1 Released,” debezium.io, June 10, 2016.

  28. Prem Santosh Udaya Shankar: “Streaming MySQL Tables in Real-Time to Kafka,” engineeringblog.yelp.com, August 1, 2016.

  29. Mongoriver,” Stripe, Inc., github.com, September 2014.

  30. Dan Harvey: “Change Data Capture with Mongo + Kafka,” at Hadoop Users Group UK, August 2015.

  31. Oracle GoldenGate 12c: Real-Time Access to Real-Time Information,” Oracle White Paper, March 2015.

  32. Oracle GoldenGate Fundamentals: How Oracle GoldenGate Works,” Oracle Corporation, youtube.com, November 2012.

  33. Slava Akhmechet: “Advancing the Realtime Web,” rethinkdb.com, January 27, 2015.

  34. Firebase Realtime Database Documentation,” Google, Inc., firebase.google.com, May 2016.

  35. Apache CouchDB 1.6 Documentation,” docs.couchdb.org, 2014.

  36. Matt DeBergalis: “Meteor 0.7.0: Scalable Database Queries Using MongoDB Oplog Instead of Poll-and-Diff,” info.meteor.com, December 17, 2013.

  37. Chapter 15. Importing and Exporting Live Data,” VoltDB 6.4 User Manual, docs.voltdb.com, June 2016.

  38. Neha Narkhede: “Announcing Kafka Connect: Building Large-Scale Low-Latency Data Pipelines,” confluent.io, February 18, 2016.

  39. Greg Young: “CQRS and Event Sourcing,” at Code on the Beach, August 2014.

  40. Martin Fowler: “Event Sourcing,” martinfowler.com, December 12, 2005.

  41. Vaughn Vernon: Implementing Domain-Driven Design. Addison-Wesley Professional, 2013. ISBN: 978-0-321-83457-7

  42. H. V. Jagadish, Inderpal Singh Mumick, and Abraham Silberschatz: “View Maintenance Issues for the Chronicle Data Model,” at 14th ACM SIGACT-SIGMOD-SIGART Symposium on Principles of Database Systems (PODS), May 1995. doi:10.1145/212433.220201

  43. Event Store 3.5.0 Documentation,” Event Store LLP, docs.geteventstore.com, February 2016.

  44. Martin Kleppmann: Making Sense of Stream Processing. Report, O'Reilly Media, May 2016.

  45. Sander Mak: “Event-Sourced Architectures with Akka,” at JavaOne, September 2014.

  46. Julian Hyde: personal communication, June 2016.

  47. Ashish Gupta and Inderpal Singh Mumick: Materialized Views: Techniques, Implementations, and Applications. MIT Press, 1999. ISBN: 978-0-262-57122-7

  48. Timothy Griffin and Leonid Libkin: “Incremental Maintenance of Views with Duplicates,” at ACM International Conference on Management of Data (SIGMOD), May 1995. doi:10.1145/223784.223849

  49. Pat Helland: “Immutability Changes Everything,” at 7th Biennial Conference on Innovative Data Systems Research (CIDR), January 2015.

  50. Martin Kleppmann: “Accounting for Computer Scientists,” martin.kleppmann.com, March 7, 2011.

  51. Pat Helland: “Accountants Don't Use Erasers,” blogs.msdn.com, June 14, 2007.

  52. Fangjin Yang: “Dogfooding with Druid, Samza, and Kafka: Metametrics at Metamarkets,” metamarkets.com, June 3, 2015.

  53. Gavin Li, Jianqiu Lv, and Hang Qi: “Pistachio: Co-Locate the Data and Compute for Fastest Cloud Compute,” yahoohadoop.tumblr.com, April 13, 2015.

  54. Kartik Paramasivam: “Stream Processing Hard Problems Part 1: Killing Lambda,” engineering.linkedin.com, June 27, 2016.

  55. Martin Fowler: “CQRS,” martinfowler.com, July 14, 2011.

  56. Greg Young: “CQRS Documents,” cqrs.files.wordpress.com, November 2010.

  57. Baron Schwartz: “Immutability, MVCC, and Garbage Collection,” xaprb.com, December 28, 2013.

  58. Daniel Eloff, Slava Akhmechet, Jay Kreps, et al.: "Re: Turning the Database Inside-out with Apache Samza," Hacker News discussion, news.ycombinator.com, March 4, 2015.

  59. Datomic Development Resources: Excision,” Cognitect, Inc., docs.datomic.com.

  60. Fossil Documentation: Deleting Content from Fossil,” fossil-scm.org, 2016.

  61. Jay Kreps: “The irony of distributed systems is that data loss is really easy but deleting data is surprisingly hard,twitter.com, March 30, 2015.

  62. David C. Luckham: “Whats the Difference Between ESP and CEP?,” complexevents.com, August 1, 2006.

  63. Srinath Perera: “How Is Stream Processing and Complex Event Processing (CEP) Different?,” quora.com, December 3, 2015.

  64. Arvind Arasu, Shivnath Babu, and Jennifer Widom: “The CQL Continuous Query Language: Semantic Foundations and Query Execution,” The VLDB Journal, volume 15, number 2, pages 121142, June 2006. doi:10.1007/s00778-004-0147-z

  65. Julian Hyde: “Data in Flight: How Streaming SQL Technology Can Help Solve the Web 2.0 Data Crunch,” ACM Queue, volume 7, number 11, December 2009. doi:10.1145/1661785.1667562

  66. Esper Reference, Version 5.4.0,” EsperTech, Inc., espertech.com, April 2016.

  67. Zubair Nabi, Eric Bouillet, Andrew Bainbridge, and Chris Thomas: “Of Streams and Storms,” IBM technical report, developer.ibm.com, April 2014.

  68. Milinda Pathirage, Julian Hyde, Yi Pan, and Beth Plale: “SamzaSQL: Scalable Fast Data Management with Streaming SQL,” at IEEE International Workshop on High-Performance Big Data Computing (HPBDC), May 2016. doi:10.1109/IPDPSW.2016.141

  69. Philippe Flajolet, Éric Fusy, Olivier Gandouet, and Frédéric Meunier: “HyperLogLog: The Analysis of a Near-Optimal Cardinality Estimation Algorithm,” at Conference on Analysis of Algorithms (AofA), June 2007.

  70. Jay Kreps: “Questioning the Lambda Architecture,” oreilly.com, July 2, 2014.

  71. Ian Hellström: “An Overview of Apache Streaming Technologies,” databaseline.wordpress.com, March 12, 2016.

  72. Jay Kreps: “Why Local State Is a Fundamental Primitive in Stream Processing,” oreilly.com, July 31, 2014.

  73. Shay Banon: “Percolator,” elastic.co, February 8, 2011.

  74. Alan Woodward and Martin Kleppmann: “Real-Time Full-Text Search with Luwak and Samza,” martin.kleppmann.com, April 13, 2015.

  75. Apache Storm 1.0.1 Documentation,” storm.apache.org, May 2016.

  76. Tyler Akidau: “The World Beyond Batch: Streaming 102,” oreilly.com, January 20, 2016.

  77. Stephan Ewen: “Streaming Analytics with Apache Flink,” at Kafka Summit, April 2016.

  78. Tyler Akidau, Alex Balikov, Kaya Bekiroğlu, et al.: “MillWheel: Fault-Tolerant Stream Processing at Internet Scale,” at 39th International Conference on Very Large Data Bases (VLDB), August 2013.

  79. Alex Dean: “Improving Snowplow's Understanding of Time,” snowplowanalytics.com, September 15, 2015.

  80. Windowing (Azure Stream Analytics),” Microsoft Azure Reference, msdn.microsoft.com, April 2016.

  81. State Management,” Apache Samza 0.10 Documentation, samza.apache.org, December 2015.

  82. Rajagopal Ananthanarayanan, Venkatesh Basker, Sumit Das, et al.: “Photon: Fault-Tolerant and Scalable Joining of Continuous Data Streams,” at ACM International Conference on Management of Data (SIGMOD), June 2013. doi:10.1145/2463676.2465272

  83. Martin Kleppmann: “Samza Newsfeed Demo,” github.com, September 2014.

  84. Ben Kirwin: “Doing the Impossible: Exactly-Once Messaging Patterns in Kafka,” ben.kirw.in, November 28, 2014.

  85. Pat Helland: “Data on the Outside Versus Data on the Inside,” at 2nd Biennial Conference on Innovative Data Systems Research (CIDR), January 2005.

  86. Ralph Kimball and Margy Ross: The Data Warehouse Toolkit: The Definitive Guide to Dimensional Modeling, 3rd edition. John Wiley & Sons, 2013. ISBN: 978-1-118-53080-1

  87. Viktor Klang: “I'm coining the phrase 'effectively-once' for message processing with at-least-once + idempotent operations,” twitter.com, October 20, 2016.

  88. Matei Zaharia, Tathagata Das, Haoyuan Li, et al.: “Discretized Streams: An Efficient and Fault-Tolerant Model for Stream Processing on Large Clusters,” at 4th USENIX Conference in Hot Topics in Cloud Computing (HotCloud), June 2012.

  89. Kostas Tzoumas, Stephan Ewen, and Robert Metzger: “High-Throughput, Low-Latency, and Exactly-Once Stream Processing with Apache Flink,” data-artisans.com, August 5, 2015.

  90. Paris Carbone, Gyula Fóra, Stephan Ewen, et al.: “Lightweight Asynchronous Snapshots for Distributed Dataflows,” arXiv:1506.08603 [cs.DC], June 29, 2015.

  91. Ryan Betts and John Hugg: Fast Data: Smart and at Scale. Report, O'Reilly Media, October 2015.

  92. Flavio Junqueira: “Making Sense of Exactly-Once Semantics,” at Strata+Hadoop World London, June 2016.

  93. Jason Gustafson, Flavio Junqueira, Apurva Mehta, Sriram Subramanian, and Guozhang Wang: “KIP-98 Exactly Once Delivery and Transactional Messaging,” cwiki.apache.org, November 2016.

  94. Pat Helland: “Idempotence Is Not a Medical Condition,” Communications of the ACM, volume 55, number 5, page 56, May 2012. doi:10.1145/2160718.2160734

  95. Jay Kreps: “Re: Trying to Achieve Deterministic Behavior on Recovery/Rewind,” email to samza-dev mailing list, September 9, 2014.

  96. E. N. (Mootaz) Elnozahy, Lorenzo Alvisi, Yi-Min Wang, and David B. Johnson: “A Survey of Rollback-Recovery Protocols in Message-Passing Systems,” ACM Computing Surveys, volume 34, number 3, pages 375408, September 2002. doi:10.1145/568522.568525

  97. Adam Warski: “Kafka Streams How Does It Fit the Stream Processing Landscape?,” softwaremill.com, June 1, 2016.


上一章 目录 下一章
第十章:批处理 设计数据密集型应用 第十二章:数据系统的未来

  1. 设计一种负载均衡方案是可行的,在这种方案中,两个消费者通过读取全部消息来共享处理分区的工作,但是其中一个只考虑具有偶数偏移量的消息,而另一个消费者只处理奇数编号的偏移量。或者你可以将消息摊到一个线程池中来处理,但这种方法会使消费者偏移量管理变得复杂。一般来说,单线程处理单分区是合适的,可以通过增加更多分区来提高并行度。 ↩︎

  2. 感谢Flink社区的Kostas Kloudas提出这个比喻。 ↩︎

  3. 如果你把一个流视为一个表的衍生物,如图11-6所示并且把一个连接看作是两个表u·v的乘积那么会发生一些有趣的事情物化连接的变化流遵循产品规则(u·v)'= u'v + uv'(u·v)'= u'v + uv'。 换句话说任何tweets的变化都与当前的追随者联系在一起任何追随者的变化都与当前的tweets 【49,50】相结合。 ↩︎