ch11 done

This commit is contained in:
Vonng 2018-05-22 20:13:10 +08:00
parent 0e2da35bbe
commit 9f481740a9
2 changed files with 145 additions and 139 deletions

View File

@ -106,8 +106,8 @@
| 第八章:分布式系统中的问题 | 初翻 | |
| 第九章:一致性与共识 | 初翻 | |
| 第三部分:衍生数据 | 精翻 | |
| 第十章:批处理 | 精翻 | Vonng |
| 第十一章:流处理 | 翻 | |
| 第十章:批处理 | 初翻 | |
| 第十一章:流处理 | 翻 | |
| 第十二章:数据系统的未来 | 初翻 40% | Vonng |
| 术语表 | - | |
| 后记 | 初翻 | |

280
ch11.md
View File

@ -417,200 +417,202 @@ $$
## 流处理
到目前为止,本章中我们已经讨论了流的来源(用户活动事件,传感器和写入数据库),我们讨论了流如何传输(通过直接消息传送,通过消息代理和事件日志)。
到目前为止,本章中我们已经讨论了流的来源(用户活动事件,传感器和写入数据库),我们讨论了流如何传输(直接通过消息传送,通过消息代理,通过事件日志)。
剩下的就是讨论一下你可以用流做什么 —— 也就是说,你可以处理它。一般来说,有三种选
剩下的就是讨论一下你可以用流做什么 —— 也就是说,你可以处理它。一般来说,有三种选
1. 你可以将事件中的数据写入数据库,缓存,搜索索引或类似的存储系统,然后由其他客户端查询。如[图11-5](img/fig11-5.png)所示,这是保持数据库与系统其他部分发生更改同步的好方法 —— 特别是当流消费者是写入数据库的唯一客户端时。写入存储系统的流程相当于我们在“批处理工作流程的输出”页面上讨论的内容
2. 你可以以某种方式将事件推送给用户,例如通过发送电子邮件警报或推送通知,或通过将事件流式传输到可实时显示的实时仪表板。在这种情况下,人是流的最终消费者。
3. 你可以处理一个或多个输入流以产生一个或多个输出流。数据流可能会经过由几个这样的处理阶段组成的流水线,然后才会输出选项1或2
1. 你可以将事件中的数据写入数据库,缓存,搜索索引或类似的存储系统,然后能被其他客户端查询。如[图11-5](img/fig11-5.png)所示,这是数据库与系统其他部分发生变更保持同步的好方法 —— 特别是当流消费者是写入数据库的唯一客户端时。如“[批处理工作流的输出](ch10.md#批处理工作流的输出)”中所讨论的,它是写入存储系统的流等价物
2. 你能以某种方式将事件推送给用户,例如发送报警邮件或推送通知,或将事件流式传输到可实时显示的仪表板上。在这种情况下,人是流的最终消费者。
3. 你可以处理一个或多个输入流,并产生一个或多个输出流。流可能会经过由几个这样的处理阶段组成的流水线,最后再输出选项1或2
在本章的其余部分中我们将讨论选项3处理流以产生其他派生流。处理这样的流的代码片段被称为操作员或作业。它与我们在[第10章](ch10.md)中讨论过的Unix进程和MapReduce作业密切相关数据流的模式是相似的一个流处理器以只读的方式使用输入流并将其输出写入一个不同的位置时尚
在本章的剩余部分中我们将讨论选项3处理流以产生其他衍生流。处理这样的流的代码片段被称为**算子operator**或**作业job**。它与我们在[第10章](ch10.md)中讨论过的Unix进程和MapReduce作业密切相关数据流的模式是相似的一个流处理器以只读的方式使用输入流并将其输出以仅追加的方式写入一个不同的位置。
流处理中的分区和并行化模式也非常类似于[第10章](ch10.md)中介绍的MapReduce和数据流引擎因此我们不在这里重复这些主题。基本的映射操作(如转换和过滤记录)也是一样的。
流处理中的分区和并行化模式也非常类似于[第10章](ch10.md)中介绍的MapReduce和数据流引擎因此我们不再重复这些主题。基本的Map操作(如转换和过滤记录)也是一样的。
批量作业的一个关键区别是流不会结束。这种差别有很多含义:正如本章开始部分所讨论的,排序对无界数据集没有意义,因此不能使用排序合并联接(请参阅“[减少连接和分组](ch10.md#减少连接和分组)”)。容错机制也必须改变:对于已经运行了几分钟的批处理作业,可以简单地从头开始重失败任务,但是对于已经运行数年的流作业,在开始后重新开始崩溃可能不是一个可行的选择
与批量作业相比的一个关键区别是,流不会结束。这种差异会带来很多隐含的结果。正如本章开始部分所讨论的,排序对无界数据集没有意义,因此无法使用**排序合并联接**(请参阅“[Reduce端连接与分组](ch10.md#减少连接和分组)”)。容错机制也必须改变:对于已经运行了几分钟的批处理作业,可以简单地从头开始重启失败任务,但是对于已经运行数年的流作业,重启后从头开始跑可能并不是一个可行的选项
### 流处理的应用
长期以来,流处理一直用于监控目的,如果某个事情发生,组织就希望得到警报。例如:
长期以来,流处理一直用于监控目的,如果某个事件发生,单位希望能得到警报。例如:
* 欺诈检测系统需要确定信用卡的使用模式是否意外地发生了变化,并且如果信用卡可能已被盗用,则将其封锁
* 欺诈检测系统需要确定信用卡的使用模式是否有意外地变化,如果信用卡可能已被盗刷,则锁卡
* 交易系统需要检查金融市场的价格变化,并根据指定的规则进行交易。
* 制造系统需要监控工厂中机器的状态,如果出现故障,可以快速识别问题。
* 军事和情报系统需要跟踪潜在的攻击者的行动,并在发生袭击的迹象时发出警报。
* 制造系统需要监控工厂中机器的状态,如果出现故障,可以快速定位问题。
* 军事和情报系统需要跟踪潜在侵略者的活动,并在出现袭击征兆时发出警报。
这些类型的应用程序需要非常复杂的模式匹配和相关性。然而,流处理的其他用途也随着时间的推移而出现。在本节中,我们将简要比较一下这些应用程序
这些类型的应用需要非常精密复杂的模式匹配与相关检测。然而随着时代的进步,流处理的其他用途也开始出现。在本节中,我们将简要比较一下这些应用。
#### 复杂的事件处理
#### 复事件处理
复杂事件处理CEP是20世纪90年代为分析事件流而开发的一种方法,尤其适用于需要搜索某些事件模式的应用程序【65,66】。与正则表达式允许你在字符串中搜索特定字符模式的方式类似CEP允许你指定规则以在流中搜索某些事件模式。
**复合事件处理complex, event processing, CEP**是20世纪90年代为分析事件流而开发出的一种方法尤其适用于需要搜索某些事件模式的应用【65,66】。与正则表达式允许你在字符串中搜索特定字符模式的方式类似CEP允许你指定规则以在流中搜索某些事件模式。
CEP系统通常使用高级声明式查询语言如SQL或图形用户界面来描述应该检测到的事件模式。这些查询被提交给一个处理引擎该引擎使用输入流并在内部维护一个执行所需匹配的状态机。当发现匹配时引擎发出一个复杂的事件因此名字与事件模式的细节【67】。
CEP系统通常使用高层次的声明式查询语言比如SQL或者图形用户界面来描述应该检测到的事件模式。这些查询被提交给处理引擎该引擎消费输入流并在内部维护一个执行所需匹配的状态机。当发现匹配时引擎发出一个**复合事件complex event**(因此得名),并附有检测到的事件模式详情【67】。
在这些系统中,查询和数据之间的关系与普通数据库相比是颠倒的。通常情况下,数据库会持久存储数据,并将查询视为暂时的:当查询进入时,数据库搜索与查询匹配的数据,然后在查询完成时忘记查询。 CEP引擎反转了这些角色:查询是长期存储的,来自输入流的事件不断流过它们,搜索匹配事件模式的查询【68】。
在这些系统中,查询和数据之间的关系与普通数据库相比是颠倒的。通常情况下,数据库会持久存储数据,并将查询视为临时的:当查询进入时,数据库搜索与查询匹配的数据,然后在查询完成时丢掉查询。 CEP引擎反转了角色查询是长期存储的来自输入流的事件不断流过它们搜索匹配事件模式的查询【68】。
CEP的实现包括Esper 【69】IBM InfoSphere Streams 【70】ApamaTIBCO StreamBase和SQLstream。像Samza这样的分布式流处理器也获得了对流声明式查询的SQL支持【71】。
CEP的实现包括Esper 【69】IBM InfoSphere Streams 【70】ApamaTIBCO StreamBase和SQLstream。像Samza这样的分布式流处理组件支持使用SQL在流上进行声明式查询【71】。
#### 流分析
使用流处理的另一个领域是对流进行分析。 CEP和流分析之间的边界是模糊的,但作为一般规则,分析往往不太关心找到特定的事件序列,并且更倾向于聚合和统计度量大量的事件——例如:
使用流处理的另一个领域是对流进行分析。 CEP与流分析之间的边界是模糊的,但一般来说,分析往往对找出特定事件序列并不关心,而更关注大量事件上的聚合与统计指标 —— 例如:
* 测量某种类型事件的速率(每个时间间隔发生的频率)
* 计算一段时间内某个值的滚动平均值
* 将当前的统计数据与以前的时间间隔进行比较(例如,检测趋势或提醒与上周同期相比过高或过低的指标
* 测量某种类型事件的速率(每个时间间隔发生的频率)
* 滚动计算一段时间窗口内某个值的平均值
* 将当前的统计值与先前的时间区间的值对比(例如,检测趋势,当指标与上周同比异常偏高或偏低时报警
这些统计信息通常是在固定的时间间隔内进行计算的例如你可能想知道在过去5分钟内每秒对服务的平均查询次数以及在此期间的第99百分位响应时间。在几分钟内平均从一秒钟到下一秒钟平滑无关的波动同时还能及时了解交通模式的任何变化。你汇总的时间间隔称为窗口我们将在“[关于时间的推理](#关于时间的推理)”中更详细地讨论窗口。
这些统计值通常是在固定时间区间内进行计算的例如你可能想知道在过去5分钟内服务每秒查询次数的均值以及此时间段内响应时间的第99百分位点。在几分钟内取平均能抹平秒和秒之间的无关波动且仍然能向你展示流量模式的时间图景。聚合的时间间隔称为**窗口window**,我们将在“[理解时间](#理解时间)”中更详细地讨论窗口。
流分析系统有时使用概率算法例如Bloom filter我们在“[性能优化](ch3.md#性能优化)”中遇到过)设置成员资格HyperLogLog 【72】基数估计以及各种百分比估计算法(请参阅“[实践中的百分位点](ch1.md#实践中的百分位点)“第16页。概率算法产生近似的结果但是具有在流处理器中比精确算法需要少得多的存储器的优点。近似算法的使用有时会使人们相信流处理系统总是有损和不精确的但这是错误的流处理没有任何内在的近似而概率算法只是一个优化【73】。
流分析系统有时使用概率算法例如Bloom filter我们在“[性能优化](ch3.md#性能优化)”中遇到过)来管理成员资格HyperLogLog 【72】用于基数估计以及各种百分比估计算法(请参阅“[实践中的百分位点](ch1.md#实践中的百分位点)“)。概率算法产出近似的结果,但比起精确算法的优点是内存使用要少得多。使用近似算法有时让人们觉得流处理系统总是有损的和不精确的,但这是错误看法:流处理并没有任何内在的近似性,而概率算法只是一种优化【73】。
许多开源分布式流处理框架的设计都是以分析为基础例如Apache StormSpark StreamingFlinkConcordSamza和Kafka Streams 【74】。托管服务包括Google Cloud Dataflow和Azure Stream Analytics。
许多开源分布式流处理框架的设计都是针对分析设计例如Apache StormSpark StreamingFlinkConcordSamza和Kafka Streams 【74】。托管服务包括Google Cloud Dataflow和Azure Stream Analytics。
#### 保持物化视图
#### 维护物化视图
我们在“[数据库和数据流](#数据库和数据流)”中看到,可以使用数据库更改流来保持派生数据系统(如缓存,搜索索引和数据仓库)与源数据库保持最新。我们可以将这些示例视为维护实体化视图的具体情况(请参阅“[聚合:数据立方体和物化视图](ch3.md#聚合:数据立方体和物化视图)”):导出某个数据集的替代视图,以便可以高效地查询它,并在底层数据更改【50】。
我们在“[数据库和数据流](#数据库和数据流)”中看到,数据库的变更流可以用于维护衍生数据系统(如缓存,搜索索引和数据仓库),使其与源数据库保持最新。我们可以将这些示例视作维护**物化视图materialized view**的一种具体场景(参阅“[聚合:数据立方体和物化视图](ch3.md#聚合:数据立方体和物化视图)”):在某个数据集上衍生出一个替代视图以便高效查询,并在底层数据变更时更新视图【50】。
同样,在事件代理中,应用程序状态通过应用事件日志来维护;这里的应用状态也是一种物化视图。与流分析场景不同,在某个时间窗口内仅考虑事件通常是不够的:构建物化视图可能需要任意时间段内的所有事件,除了可能由日志压缩丢弃的任何过时事件(请参阅“[日志压缩](#日志压缩)“)。实际上,你需要一个可以一直延伸到一开始的窗口。
同样,在事件溯源中,应用程序的状态是通过**应用apply**事件日志来维护的;这里的应用状态也是一种物化视图。与流分析场景不同的是,仅考虑某个时间窗口内的事件通常是不够的:构建物化视图可能需要任意时间段内的**所有**事件,除了那些可能由日志压缩丢弃的过时事件(请参阅“[日志压缩](#日志压缩)“)。实际上,你需要一个可以一直延伸到时间开端的窗口。
原则上,任何流处理器都可以用于物化视图维护,尽管永久维护事件的需要与一些主要在有限持续时间的窗口上运行的面向分析的框架的假设背道而驰。 Samza和Kafka Streams支持这种用法建立在Kafka对夯实的支持上【75】。
原则上讲,任何流处理组件都可以用于维护物化视图,尽管“永远运行”与一些面向分析的框架假设的“主要在有限时间段窗口上运行”背道而驰, Samza和Kafka Streams支持这种用法建立在Kafka对日志压缩comp的支持上【75】。
#### 在流上搜索
除了允许搜索由多个事件组成的模式的CEP外还有时需要基于复杂的标准(例如全文搜索查询)来搜索单个事件。
除了允许搜索由多个事件构成模式的CEP外有时也存在基于复杂标准(例如全文搜索查询)来搜索单个事件的需求
例如,媒体监测服务可以订阅新闻文章和媒体广播,并搜索任何关于公司,产品或感兴趣的话题的新闻。这是通过预先制定一个搜索查询来完成的,然后不断地将新闻项目流与这个查询进行匹配。在一些网站上也有类似的功能:例如,房地产网站的用户在市场上出现符合其搜索条件的新房产时,可以要求通知。 Elasticsearch 【76】的渗滤器功能是实现这种流式搜索的一种选择
例如,媒体监测服务可以订阅新闻文章Feed与来自媒体的播客搜索任何关于公司产品或感兴趣的话题的新闻。这是通过预先构建一个搜索查询来完成的然后不断地将新闻项的流与该查询进行匹配。在一些网站上也有类似的功能例如当市场上出现符合其搜索条件的新房产时房地产网站的用户可以要求网站通知他们。 Elasticsearch的这种过滤器功能是实现这种流搜索的一种选择【76】
传统的搜索引擎首先索引文件,然后在索引上运行查询。相比之下搜索一个数据流将会处理它的头部查询被存储文档通过查询运行就像CEP一样。在最简单的情况下你可以针对每个查询来测试每个文档但是如果你有大量查询这可能会变慢。为了优化过程可以对查询和文档进行索引从而缩小可能匹配的查询集合【77】。
传统的搜索引擎首先索引文件,然后在索引上跑查询。相比之下搜索一个数据流则反了过来查询被存储下来文档从查询中流过就像在CEP中一样。在简单的情况就是你可以为每个文档测试每个查询。但是如果你有大量查询这可能会变慢。为了优化这个过程可以像对文档一样为查询建立索引。因而收窄可能匹配的查询集合【77】。
#### 消息传递和RPC
第136页的“[消息传递数据流](ch4.md#消息传递数据流)”中我们讨论了消息传递系统作为RPC的替代方案即作为通信服务的机制例如在参与者模型中所使用的。虽然这些系统也是基于消息和事件但我们通常不会将它们视为流处理器
在“[消息传递数据流](ch4.md#消息传递数据流)”中我们讨论过消息传递系统可以作为RPC的替代方案即作为一种服务间通信的机制比如在Actor模型中所使用的那样。尽管这些系统也是基于消息和事件但我们通常不会将其视作流处理组件
Actor框架主要是管理通信模块的并发和分布式执行的机制,而流处理主要是数据管理技术。
* Actor框架主要是管理模块通信的并发和分布式执行的一种机制,而流处理主要是一种数据管理技术。
* 参与者之间的交流往往是短暂的,而且是一对一的,而事件日志则是持久的,多用户的。
* 参与者可以以任意方式进行通信(包括循环请求/响应模式),但流处理器通常设置在非循环流水线中,其中每个流是一个特定作业的输出,并且从一组明确定义的输入流派生。
也就是说RPC类系统和流处理之间有一些交叉区域。例如Apache Storm有一个称为分布式RPC的功能它允许将用户查询分散到一系列也处理事件流的节点上;这些查询然后与来自输入流的事件交织结果可以被汇总并发回给用户【78】。 (另参阅“[多分区数据处理](ch12.md#多分区数据处理)”)
* Actor之间的交流往往是短暂的一对一的而事件日志则是持久的多订阅者的。
* Actor可以以任意方式进行通信允许包括循环的请求/响应),但流处理通常配置在无环流水线中,其中每个流都是一个特定作业的输出,由良好定义的输入流中派生而来。
也可以使用actor框架来处理流。但是很多这样的框架在崩溃的情况下不能保证消息的传递所以这个过程不是容错的除非你实现了额外的重试逻辑
也就是说RPC类系统与流处理之间有一些交叉领域。例如Apache Storm有一个称为**分布式RPC**的功能它允许将用户查询分散到一系列也处理事件流的节点上然后这些查询与来自输入流的事件交织而结果可以被汇总并发回给用户【78】另参阅“[多分区数据处理](ch12.md#多分区数据处理)”)
### 关于时间的推理
也可以使用Actor框架来处理流。但是很多这样的框架在崩溃时不能保证消息的传递除非你实现了额外的重试逻辑否则这种处理不是容错的。
流处理器通常需要处理时间,特别是在用于分析目的的时候,频繁使用时间窗口,例如“过去五分钟的平均时间”。“最后五分钟”的含义似乎应该是未知的,大而清晰,但不幸的是这个概念是令人惊讶的棘手。
### 时间推理
在批处理过程中,处理任务通过大量的历史事件迅速收缩。如果需要按时间分类,批处理需要查看每个事件中嵌入的时间戳。查看运行批处理的机器的系统时钟没有意义,因为处理运行的时间与事件实际发生的时间无关
流处理通常需要与时间打交道,尤其是用于分析目的时候,会频繁使用时间窗口,例如“过去五分钟的平均值”。“最后五分钟”的含义看上去似乎是清晰而无歧义的,但不幸的是,这个概念非常棘手
批处理可以在几分钟内读取一年的历史事件;在大多数情况下,感兴趣的时间表是历史的一年,而不是几分钟的处理。而且,在事件中使用时间戳允许处理确定性的:在相同的输入上再次运行相同的处理过程会得到相同的结果(参阅“[故障容错](ch10.md#故障容错)”)
在批处理中过程中,大量的历史事件迅速收缩。如果需要按时间来分析,批处理器需要检查每个事件中嵌入的时间戳。读取运行批处理机器的系统时钟没有任何意义,因为处理运行的时间与事件实际发生的时间无关
另一方面许多流处理框架使用处理机器上的本地系统时钟处理时间来确定窗口【79】。这种方法具有简单的优点事件创建和事件处理之间的延迟可以忽略不计。然而如果存在任何显着的处理滞后即处理可能比事件实际发生的时间显着晚则会中断处理。
批处理可以在几分钟内读取一年的历史事件;在大多数情况下,感兴趣的时间线是历史中的一年,而不是处理中的几分钟。而且使用事件中的时间戳,使得处理是**确定性**的:在相同的输入上再次运行相同的处理过程会得到相同的结果(参阅“[故障容错](ch10.md#故障容错)”)。
另一方面,许多流处理框架使用处理机器上的本地系统时钟(**处理时间processing time**)来确定**窗口**【79】。这种方法的优点是简单事件创建与事件处理之间的延迟可以忽略不计。然而如果存在任何显著的处理延迟 —— 即,事件处理显著地晚于事件实际发生的时间,处理就失效了。
#### 事件时间与处理时间
有许多原因可能会延迟处理:排队,网络故障(参阅“[不可靠的网络](ch8.md#不可靠的网络)”),导致消息代理或处理器中出现争用的性能问题,重新启动流消费者或重新处理过去的事件(参阅“[重放旧消息](#重放旧消息)”),或者在修复代码中的BUG之后进行恢复。
很多原因都可能导致处理延迟:排队,网络故障(参阅“[不可靠的网络](ch8.md#不可靠的网络)”),性能问题导致消息代理/消息处理器出现争用,流消费者重启,重新处理过去的事件(参阅“[重放旧消息](#重放旧消息)”),或者在修复代码BUG之后从故障中恢复。
而且,消息延迟还可能导致消息的不可预知的排序。例如假设用户首先发出一个Web请求由Web服务器A处理然后发出第二个请求由服务器B处理。 A和B发出描述他们处理的请求的事件但是B的事件在A的事件发生之前到达消息代理。现在流处理器将首先看到B事件然后看到A事件即使它们实际上是以相反的顺序发生的。
而且,消息延迟还可能导致无法预测消息顺序。例如假设用户首先发出一个Web请求由Web服务器A处理然后发出第二个请求由服务器B处理。 A和B发出描述它们所处理请求的事件但是B的事件在A的事件发生之前到达消息代理。现在流处理器将首先看到B事件然后看到A事件即使它们实际上是以相反的顺序发生的。
如果有一个类比的话可以考虑一下“星球大战”的电影第四集于1977年发行1980年第五集1983年第六集之后分别在1999年2002年和2005年发行第一三集以及2015年的第七集【80】[^ii]。如果你按照他们出来的顺序观看电影,你处理电影的顺序与他们叙述的顺序是不一致的。 (情节编号就像事件时间戳一样,观看电影的日期就是处理时间。)作为人类,我们能够应对这种不连续性,但是流处理算法需要专门编写以适应这种情况时间安排和订购问题。
有一个类比也许能帮助理解“星球大战”电影第四集于1977年发行第五集于1980年第六集于1983年紧随其后的是1999年的第一集2002年的第二集和2005年的三集以及2015年的第七集【80】[^ii]。如果你按照按照它们上映的顺序观看电影,你处理电影的顺序与它们叙事的顺序就是不一致的。 (集数编号就像事件时间戳,而你观看电影的日期就是处理时间)作为人类,我们能够应对这种不连续性,但是流处理算法需要专门写就,以适应这种时机与顺序的问题。
[^ii]: 感谢Flink社区的Kostas Kloudas提出这个比喻。
令人困惑的事件时间和处理时间导致错误的数据。例如,假设你有一个流处理器来测量请求率(计算每秒请求数)。如果你重新部署流处理器,则可能会关闭一分钟,并在事件恢复时处理积压的事件。如果你根据处理时间来衡量速率,那么看起来好像在处理积压时突然出现异常的请求高峰,而事实上请求的实际速率是稳定的([图11-7](img/fig11-7.png))。
将事件时间和处理时间搞混会导致错误的数据。例如,假设你有一个流处理器用于测量请求速率(计算每秒请求数)。如果你重新部署流处理器,它可能会停止一分钟,并在恢复之后处理积压的事件。如果你按处理时间来衡量速率,那么在处理积压日志时,请求速率看上去就像有一个异常的突发尖峰,而实际上请求速率是稳定的([图11-7](img/fig11-7.png))。
![](img/fig11-7.png)
**图11-7 由于处理时间的窗口化,由于处理速率的变化而引入人为因素**
**图11-7 按处理时间分窗,会因为处理速率的变动引入人为因素**
#### 知道什么时候准备好了
#### 知道什么时候准备好了
从事件时间的角度来定义窗口时,一个棘手的问题是,当你收到特定窗口的所有事件,或者是否还有事件发生时,你永远无法确定
用事件时间来定义窗口的一个棘手的问题是,你永远也无法确定是不是已经收到了特定窗口的所有事件,还是说还有一些事件正在来的路上
例如,假设你将事件分组为一分钟的窗口,以便你可以统计每分钟的请求数。你已经计算了一些事件这些事件的时间戳是在第37分钟的时间落下的时间已经推移了。现在大部分的事件都在一小时的第38和第39分钟之内。你什么时候宣布你已经完成了第37分钟的窗口,并输出其计数器值?
例如,假设你将事件分组为一分钟的窗口,以便统计每分钟的请求数。你已经计数了一些带有本小时内第37分钟时间戳的事件时间流逝现在进入的主要都是本小时内第38和第39分钟的事件。什么时候才能宣布你已经完成了第37分钟的窗口计数,并输出其计数器值?
在一段时间没有看到任何新的事件之后,你可以超时并宣布一个窗口,但仍然可能发生某些事件被缓存在另一台计算机上,由于网络中断而延迟。你需要能够处理窗口已经声明完成后到达的这样的滞留事件。大体上,你有两个选择【1】
在一段时间没有看到任何新的事件之后,你可以超时并宣布一个窗口已经就绪,但仍然可能发生这种情况:某些事件被缓冲在另一台机器上,由于网络中断而延迟。你需要能够处理这种在窗口宣告完成之后到达的**滞留straggler**事件。大体上,你有两种选择【1】
1. 忽略这些零散的事件,因为它们在正常情况下可能只是一小部分事件。你可以将丢弃事件的数量作为度量标准进行跟踪,并在你开始丢弃大量数据时发出警报
2. 发布一个更正,更新的窗口与包含散兵队员的价值。你可能还需要收回以前的输出。
1. 忽略这些滞留事件,因为在正常情况下它们可能只是事件中的一小部分。你可以将丢弃事件的数量作为一个监控指标,并在出现大量丢消息的情况时报警
2. 发布一个**更正correction**一个包括滞留事件的更新窗口值。更新的窗口与包含散兵队员的价值。你可能还需要收回以前的输出。
在某些情况下可以使用特殊的消息来指示“从现在开始不会有比t更早时间戳的消息”消费者可以使用它来触发窗口【81】。但是如果不同机器上的多个生产者正在生成事件,每个事件都有自己的最小时间戳阈值,则消费者需要分别跟踪每个生产者。在这种情况下添加和删除生产者是比较棘手的。
在某些情况下可以使用特殊的消息来指示“从现在开始不会有比t更早时间戳的消息消费者可以使用它来触发窗口【81】。但是如果不同机器上的多个生产者都在生成事件,每个生产者都有自己的最小时间戳阈值,则消费者需要分别跟踪每个生产者。在这种情况下添加和删除生产者是比较棘手的。
#### 你用的是什么时间
#### 你用的是谁的时钟
当事件可以在系统中的多个点缓冲时,为事件分配时间戳更加困难。例如,考虑将使用率度量的事件报告给服务器的移动应用程序。该应用程序可能会在设备处于脱机状态时使用,在这种情况下,它将在设备上本地缓冲事件,并在下一次可用的互联网连接(可能是几小时甚至几天)时将它们发送到服务器。对于这个流的任何消费者来说,这些事件将显示为极其滞后的落后者
当事件可能在系统内多个地方进行缓冲时,为事件分配时间戳更加困难了。例如,考虑一个移动应用向服务器上报关于用量的事件。该应用可能会在设备处于脱机状态时被使用,在这种情况下,它将在设备本地缓冲事件,并在下一次互联网连接可用时向服务器上报这些事件(可能是几小时甚至几天)。对于这个流的任意消费者而言,它们就如延迟极大的滞留事件一样
在这种情况下,根据移动设备的本地时钟,事件的时间戳实际上应该是发生用户交互的时间。但是,用户控制的设备上的时钟通常是不可信的,因为它可能会被意外或故意设置为错误的时间(请参见“[时钟同步与准确性](ch8.md#时钟同步与准确性)”)。服务器收到事件的时间(根据服务器的时钟)更可能是准确的,因为服务器在你的控制之下,但在描述用户交互方面意义不大。
在这种情况下,事件上的事件戳实际上应当是用户交互发生的时间,取决于移动设备的本地时钟。然而用户控制的设备上的时钟通常是不可信的,因为它可能会被无意或故意设置成错误的时间(参见“[时钟同步与准确性](ch8.md#时钟同步与准确性)”)。服务器收到事件的时间(取决于服务器的时钟)可能是更准确的,因为服务器在你的控制之下,但在描述用户交互方面意义不大。
要调整不正确的设备时钟一种方法是记录三个时间戳【82】
要校正不正确的设备时钟一种方法是记录三个时间戳【82】
* 事件发生的时间,根据设备时钟
* 根据设备时钟将事件发送到服务器的时间
* 服务器根据服务器时钟收到事件的时间
* 事件发生的时间,取决于设备时钟
* 事件发送往服务器的时间,取决于设备时钟
* 事件被服务器接收的时间,取决于服务器时钟
通过从第三个时间戳中减去第二个时间戳,可以估算设备时钟和服务器时钟之间的偏移(假设网络延迟与所需的时间戳精度相比可忽略不计)。然后,你可以将该偏移应用于事件时间戳,从而估计事件实际发生的真实时间(假设设备时钟偏移在事件发生的时间与发送到服务器的时间之间没有变化)。
通过从第三个时间戳中减去第二个时间戳,可以估算设备时钟和服务器时钟之间的偏移(假设网络延迟与所需的时间戳精度相比可忽略不计)。然后可以将该偏移应用于事件时间戳,从而估计事件实际发生的真实时间(假设设备时钟偏移在事件发生时与送往服务器之间没有变化)。
个问题对于流处理来说并不是唯一的,批处理遇到了与时间推理完全相同的问题。在一个流式环境中,我们更加注意到时间的流逝。
并不是流处理独有的问题,批处理有着完全一样的时间推理问题。只是在流处理的上下文中,我们更容易意识到时间的流逝。
#### 窗口的类型
一旦你知道如何确定一个事件的时间戳,下一步就是决定如何定义一段时间的窗口。窗口然后可以用于聚合,例如计数事件,或计算窗口内的值的平均值。有几种窗口是常用的【79,83】
当你知道如何确定一个事件的时间戳后,下一步就是如何定义时间段的窗口。然后窗口就可以用于聚合,例如事件计数,或计算窗口内值的平均值。有几种窗口很常用【79,83】
***滚动窗口Tumbling Window***
一个滚动窗口有一个固定的长度每个事件都属于一个窗口。例如如果你有1分钟的翻滚窗口,则所有时间戳在`10:03:00`和`10:03:59`之间的事件会被分组到一个窗口中,`10:04:00`和`10:04:59`之间的事件下一个窗口等等。你可以通过获取每个事件时间戳并将其四舍五入到最接近的分钟来确定它所属的窗口从而实现1分钟的翻滚窗口。
滚动窗口有着固定的长度每个事件都仅能属于一个窗口。例如假设你有一个1分钟的滚动窗口,则所有时间戳在`10:03:00`和`10:03:59`之间的事件会被分组到一个窗口中,`10:04:00`和`10:04:59`之间的事件被分组到下一个窗口依此类推。通过将每个事件时间戳四舍五入至最近的分钟来确定它所属的窗口可以实现1分钟的滚动窗口。
***跳动窗口Hopping Window***
频窗口也具有固定的长度但允许窗口重叠以提供一些平滑。例如1分钟跳跃大小的5分钟窗口将包含`10:03:00`至`10:07:59`之间的事件,则下一个窗口将覆盖`10:04:00`至`10:08`之间的事件: 59等等。你可以通过首先计算1分钟滚动窗口然后聚合在几个相邻的窗口上来实现此跳频窗口。
动窗口也有着固定的长度但允许窗口重叠以提供一些平滑。例如一个带有1分钟跳跃步长的5分钟窗口将包含`10:03:00`至`10:07:59`之间的事件,而下一个窗口将覆盖`10:04:00`至`10:08`之间的事件: 59等等。通过首先计算1分钟的滚动窗口然后在几个相邻窗口上进行聚合可以实现这种跳动窗口。
***滑动窗口***
***滑动窗口Sliding Window***
滑动窗口包含在彼此的某个间隔内发生的所有事件。例如一个5分钟的滑动窗口将覆盖10点03分39秒和10点08分12秒的事件因为它们相距不到5分钟注意翻滚和跳跃的5分钟窗口不会把这两个事件在同一个窗口中因为他们使用固定的边界。滑动窗口可以通过保持按时间排序的事件缓冲区并在从窗口到期时移除旧事件来实现
滑动窗口包含了彼此间距在特定时长内的所有事件。例如一个5分钟的滑动窗口应当覆盖`10:03:39`和`10:08:12`的事件因为它们相距不超过5分钟注意滚动窗口与步长5分钟的跳动窗口可能不会把这两个事件分组到同一个窗口中因为它们使用固定的边界。通过维护一个按时间排序的事件缓冲区并不断从窗口中移除过期的旧事件可以实现滑动窗口
***会话窗口***
***会话窗口Session window***
与其他窗口类型不同,会话窗口没有固定的持续时间。相反它是通过将同一用户的所有事件分组在一起并在时间上紧密地组合在一起来定义的并且当用户在一段时间内不活动时例如如果30分钟内没有事件窗口结束。会话化是网站分析的常见要求(参阅“[GROUP BY](ch10.md#GROUP BY)”)。
与其他窗口类型不同,会话窗口没有固定的持续时间而定义为将同一用户出现时间相近的所有事件分组在一起而当用户一段时间没有活动时例如如果30分钟内没有事件窗口结束。会话切分是网站分析的常见需求(参阅“[GROUP BY](ch10.md#GROUP BY)”)。
### 流式连接
在[第10章](ch10.md)中,我们讨论了批处理作业如何通过关键连接数据集,以及这种连接如何构成数据管道的重要组成部分。由于流处理将数据管道概括为对无界数据集进行增量处理,因此对流进行连接的需求也完全相同。
在[第10章](ch10.md)中,我们讨论了批处理作业如何通过键来连接数据集,以及这种连接是如何成为数据管道的重要组成部分的。由于流处理将数据管道泛化为对无限数据集进行增量处理,因此对流进行连接的需求也完全相同
然而,新事件随时可能出现在一个流中,这使得加入流比批处理作业更具挑战性。为了更好地理解情况,我们来区分三种不同类型的连接:流-流连接流表连接和表连接【84】。在下面的章节中我们将通过例子来说明。
流 - 流连接(窗口连接)
然而,新事件随时可能出现在一个流中,这使得流连接要比批处理连接更具挑战性。为了更好地理解情况,让我们先来区分三种不同类型的连接:**流-流**连接,**流-表**连接,与**表-表**连接【84】。我们将在下面的章节中通过例子来说明。
假设你的网站上有搜索功能并且想要检测搜索到的网址的近期趋势。每次有人输入搜索查询时都会记录包含查询和返回结果的事件。每当有人点击其中一个搜索结果时就会记录另一个记录点击的事件。为了计算搜索结果中每个网址的点击率你需要将搜索操作和点击操作的事件组合在一起这些事件通过具有相同的会话ID进行连接。广告系统需要类似的分析【85】。
#### 流流连接(窗口连接)
如果用户放弃他们的搜索,点击可能永远不会到来,即使它到了,搜索和点击之间的时间可能是高度可变的:在很多情况下,它可能是几秒钟,但可能长达几天或几周(如果用户运行搜索,忘记关于该浏览器选项卡,然后返回到选项卡,稍后再单击一个结果)。由于可变的网络延迟,点击事件甚至可能在搜索事件之前到达。你可以选择合适的加入窗口,例如,如果间隔至多一小时发生一次搜索,你可以选择加入搜索
假设你的网站上有搜索功能而你想要找出搜索URL的近期趋势。每当有人键入搜索查询时都会记录下一个包含查询与其返回结果的事件。每当有人点击其中一个搜索结果时就会记录另一个记录点击事件。为了计算搜索结果中每个URL的点击率你需要将搜索动作与点击动作的事件连在一起这些事件通过相同的会话ID进行连接。广告系统中需要类似的分析【85】
请注意在click事件中嵌入搜索的细节并不等同于加入事件这样做只会告诉你有关用户单击搜索结果的情况而不是用户未点击任何搜索结果的搜索结果。为了衡量搜索质量你需要准确的点击率为此你需要搜索事件和点击事件
如果用户丢弃了搜索结果,点击可能永远不会发生,即使它出现了,搜索与点击之间的时间可能是高度可变的:在很多情况下,它可能是几秒钟,但也可能长达几天或几周(如果用户执行搜索,忘掉了这个浏览器页面,过了一段时间后重新回到这个浏览器页面上,并点击了一个结果)。由于可变的网络延迟,点击事件甚至可能先于搜索事件到达。你可以选择合适的连接窗口 —— 例如,如果点击与搜索之间的时间间隔在一小时内,你可能会选择连接两者
为了实现这种类型的连接流处理器需要维护状态例如在最后一小时发生的所有事件都由会话标识索引。无论何时发生搜索事件或点击事件都会将其添加到适当的索引并且流处理器还检查另一个索引以查看是否已经到达同一会话ID的另一个事件。如果有匹配的事件则发出说明哪个搜索结果被点击的事件。如果搜索事件过期而没有看到匹配的点击事件则会发出说明哪些搜索结果未被点击的事件
请注意,在点击事件中嵌入搜索详情与事件连接并不一样:这样做的话,只有当用户点击了一个搜索结果时你才能知道,而那些没有点击的搜索就无能为力了。为了衡量搜索质量,你需要准确的点击率,为此搜索事件和点击事件两者都是必要的
#### 流表连接stream enrichment
为了实现这种类型的连接,流处理器需要维护**状态**例如按会话ID索引最近一小时内发生的所有事件。无论何时发生搜索事件或点击事件都会被添加到合适的索引中而流处理器也会检查另一个索引是否有具有相同会话ID的事件到达。如果有匹配事件就会发出一个表示搜索结果被点击的事件如果搜索事件直到过期都没看见有匹配的点击事件就会发出一个表示搜索结果未被点击的事件。
在“[示例:用户活动事件分析](ch10.md#示例:用户活动事件分析)”([图10-2](img/fig10-2.png)我们看到了加入两个数据集的批量作业示例一组用户活动事件和一个用户配置文件数据库。将用户活动事件视为流并在流处理器中连续执行相同的连接是很自然的输入是包含用户ID的活动事件流并且输出是活动事件流其中用户ID已经用关于用户的简档信息来扩充。这个过程有时被称为使用来自数据库的信息来丰富活动事件。
#### 流表连接(流扩展)
要执行此联接,流程过程需要一次查看一个活动事件,在数据库中查找事件的用户标识,并将该概要信息添加到活动事件中。数据库查询可以通过查询远程数据库来实现;但是,正如在“[示例:分析用户活动事件](ch10.md#示例:分析用户活动事件)”一节中讨论的此类远程查询可能会很慢并且有可能导致数据库过载【75】
在“[示例:用户活动事件分析](ch10.md#示例:用户活动事件分析)”([图10-2](img/fig10-2.png)我们看到了连接两个数据集的批处理作业示例一组用户活动事件和一个用户档案数据库。将用户活动事件视为流并在流处理器中连续执行相同的连接是很自然的想法输入是包含用户ID的活动事件流而输出还是活动事件流但其中用户ID已经被扩展为用户的档案信息。这个过程有时被称为 使用数据库的信息来**扩充enriching**活动事件
另一种方法是将数据库副本加载到流处理器中,以便在本地进行查询而无需网络往返。这种技术与我们在“[Map端连接](ch10.md#Map端连接)”中讨论的散列连接非常相似:如果数据库的本地副本足够小,则本地副本可能是内存中的散列表,或者是本地磁盘上的索引
要执行此联接流处理器需要一次处理一个活动事件在数据库中查找事件的用户ID并将档案信息添加到活动事件中。数据库查询可以通过查询远程数据库来实现。但正如在“[示例:分析用户活动事件](ch10.md#示例:分析用户活动事件)”一节中讨论的此类远程查询可能会很慢并且有可能导致数据库过载【75】
与批处理作业的区别在于批处理作业使用数据库的时间点快照作为输入,而流处理器是长时间运行的,并且数据库的内容可能随时间而改变,所以流处理器数据库的本地副本需要保持最新。这个问题可以通过变更数据捕获来解决:流处理器可以订阅用户配置文件数据库的更新日志以及活动事件流。在创建或修改配置文件时,流处理器会更新其本地副本。因此,我们获得两个流之间的连接:活动事件和配置文件更新
另一种方法是将数据库副本加载到流处理器中,以便在本地进行查询而无需网络往返。这种技术与我们在“[Map端连接](ch10.md#Map端连接)”中讨论的散列连接非常相似:如果数据库的本地副本足够小,则可以是内存中的散列表,比较大的话也可以是本地磁盘上的索引
流表连接实际上非常类似于流 - 流连接;最大的区别在于对于表changelog流连接使用一个可以回溯到“开始时间”概念上是无限的窗口的窗口新版本的记录会覆盖较早的版本。对于流输入连接可能根本没有维护窗口
与批处理作业的区别在于,批处理作业使用数据库的时间点快照作为输入,而流处理器是长时间运行的,且数据库的内容可能随时间而改变,所以流处理器数据库的本地副本需要保持更新。这个问题可以通过变更数据捕获来解决:流处理器可以订阅用户档案数据库的更新日志,如同活跃事件流一样。当增添或修改档案时,流处理器会更新其本地副本。因此,我们有了两个流之间的连接:活动事件和档案更新
#### 表格表连接(物化视图维护)
流表连接实际上非常类似于流流连接;最大的区别在于对于表的变更日志流,连接使用了一个可以回溯到“时间起点”的窗口(概念上是无限的窗口),新版本的记录会覆盖更早的版本。对于输入的流,连接可能压根儿就没有维护窗口。
考虑我们在“[描述负载](ch1.md#描述负载)”中讨论的推特时间线例子。我们说过,当用户想要查看他们的主页时间线时,对用户所关注的所有人进行迭代是非常昂贵的,推文,并合并它们。
#### 表表连接(维护物化视图)
相反,我们需要一个时间线缓存:一种每个用户的“收件箱”,在发送推文的时候写入这些信息,以便读取时间线是一次查询。实现和维护此缓存需要以下事件处理:
我们在“[描述负载](ch1.md#描述负载)”中讨论的推特时间线例子时说过,当用户想要查看他们的主页时间线时,迭代用户所关注人群的推文并合并它们是一个开销巨大的操作。
* 当用户发送新的推文时,它将被添加到每个跟随你的用户的时间线上。
* 用户删除推文时,将从所有用户的时间表中删除。
* 当用户u1开始跟随用户u2时u2最近的tweets被添加到u1的时间线上。
* 当用户u1取消关注用户u2时u1的推文将从u1的时间线中移除。
相反,我们需要一个时间线缓存:一种每个用户的“收件箱”,在发送推文的时候写入这些信息,因而读取时间线时只需要简单地查询即可。物化与维护这个缓存需要处理以下事件:
要在流处理器中实现这种缓存维护需要用于推文发送和删除和跟随关系跟随和取消跟随的事件流。流过程需要维护一个包含每个用户关注者集合的数据库以便知道当一个新的tweet到达时需要更新哪些时间轴【86】。
* 当用户u发送新的推文时它将被添加到每个关注用户u的时间线上。
* 用户删除推文时,推文将从所有用户的时间表中删除。
* 当用户$u_1$开始关注用户$u_2$时,$u_2$最近的推文将被添加到$u_1$的时间线上。
* 当用户$u_1$取消关注用户$u_2$时,$u_2$的推文将从$u_1$的时间线中移除。
查看这个流过程的另一种方式是维护一个连接两个表tweet和follow的查询的物化视图如下所示
要在流处理器中实现这种缓存维护你需要推文事件流发送与删除和关注关系事件流关注与取消关注。流处理需要为维护一个数据库包含每个用户的粉丝集合。以便知道当一条新推文到达时需要更新哪些时间线【86】。
观察这个流处理过程的另一种视角是:它维护了一个连接了两个表(推文与关注)的物化视图,如下所示:
```sql
SELECT follows.follower_id AS timeline_id,
@ -620,111 +622,115 @@ JOIN follows ON follows.followee_id = tweets.sender_id
GROUP BY follows.follower_id
```
的连接直接对应于该查询中的表的连接。时间轴实际上是这个查询结果的缓存,每当基础表发生变化时都会更新[^iii]。
连接直接对应于这个查询中的表连接。时间线实际上是这个查询结果的缓存,每当基础表发生变化时都会更新[^iii]。
[^iii]: 如果你把一个流视为一个表的衍生物,如[图11-6](img/fig11-6.png)所示并且把一个连接看作是两个表u·v的乘积那么会发生一些有趣的事情物化连接的变化流遵循产品规则(u·v)'= u'v + uv'(u·v)'= u'v + uv'。 换句话说任何tweets的变化都与当前的追随者联系在一起任何追随者的变化都与当前的tweets 【49,50】相结合
[^iii]: 如果你将流视作表的衍生物,如[图11-6](img/fig11-6.png)所示而把一个连接看作是两个表的乘法u·v那么会发生一些有趣的事情物化连接的变化流遵循乘积法则(u·v)'= u'v + uv'(u·v)'= u'v + uv'。 换句话说任何推文的变化量都与当前的关注联系在一起任何关注的变化量都与当前的推文相连接【49,50】
#### 连接的时间依赖性
就产生了一个问题如果不同的事件发生在相似的时间周围他们按照何种顺序进行处理在流表连接示例中如果用户更新其配置文件哪些活动事件与旧配置文件在配置文件更新之前处理结合哪些与新配置文件结合在配置文件更新之后处理换句话说如果状态随着时间的推移而改变并且你加入了某个状态那么你使用什么时间点来加入【45】
里描述的三种连接(流流,流表,表表)有很多共通之处:它们都需要流处理器维护连接一侧的一些状态(搜索与点击事件,用户档案,关注列表),然后当连接另一侧的消息到达时查询该状态。
这种时间依赖性可能发生在许多地方。例如,如果你销售东西,则需要对发票进行适当的税率,这取决于国家或州,产品类型和销售日期(因为税率会随时变化)。将销售额加入税率表时,如果你正在重新处理历史数据,你可能希望加入销售时的税率,这可能与当前的税率不同
用于维护状态的事件顺序是很重要的(先关注然后取消关注,或者其他类似操作)。在分区日志中,单个分区内的事件顺序是保留下来的。但典型情况下是没有跨流或跨分区的顺序保证的
如果跨流的事件排序是未确定的那么这个连接变得不确定【87】这意味着你不能在相同的输入上重新运行相同的工作并且必然会得到相同的结果输入流上的事件可能交织在当你再次运行这个工作时采用不同的方式
这就产生了一个问题如果不同流中的事件发生在近似的时间范围内则应该按照什么样的顺序进行处理在流表连接的例子中如果用户更新了它们的档案哪些活动事件与旧档案连接在档案更新前处理哪些又与新档案连接在档案更新之后处理换句话说你需要对一些状态做连接如果状态会随着时间推移而变化那应当使用什么时间点来连接呢【45】
在数据仓库中这个问题被称为缓慢变化的维度SCD通常通过对特定版本的联合记录使用唯一的标识符来解决例如每当税率改变时新的标识符并且发票包括销售时的税率标识符【88,89】。这种变化使连接成为确定性的但是由于表中所有记录的版本都需要保留导致日志压缩是不可能的。
这种时序依赖可能出现在很多地方。例如销售东西需要对发票应用适当的税率,这取决于所处的国家/州,产品类型,销售日期(因为税率会随时变化)。当连接销售额与税率表时,你可能期望的是使用销售时的税率参与连接。如果你正在重新处理历史数据,销售时的税率可能和现在的税率有所不同。
如果跨越流的事件顺序是未定的则连接会变为不确定性的【87】这意味着你在同样输入上重跑相同的作业未必会得到相同的结果当你重跑任务时输入流上的事件可能会以不同的方式交织。
在数据仓库中,这个问题被称为**缓慢变化的维度slowly changing dimension, SCD**通常通过对特定版本的记录使用唯一的标识符来解决例如每当税率改变时都会获得一个新的标识符而发票在销售时会带有税率的标识符【88,89】。这种变化使连接变为确定性的但也会导致日志压缩无法进行表中所有的记录版本都需要保留。
### 容错
在本章的最后一节中,让我们考虑流处理器如何容忍错误。我们在[第10章](ch10.md)中看到批处理框架可以很容易地容忍错误如果MapReduce作业中的任务失败可以简单地在另一台机器上重新启动并且丢弃失败任务的输出。这种透明的重试是可能的因为输入文件是不可变的每个任务都将其输出写入到HDFS上的单独文件并且输出仅在任务成功完成时可见。
在本章的最后一节中,让我们看一看流处理是如何容错的。我们在[第10章](ch10.md)中看到批处理框架可以很容易地容错如果MapReduce作业中的任务失败可以简单地在另一台机器上再次启动并且丢弃失败任务的输出。这种透明的重试是可能的因为输入文件是不可变的每个任务都将其输出写入到HDFS上的独立文件中,而输出仅当任务成功完成后可见。
特别是,批处理容错方法可确保批处理作业的输出与没有出错的情况相同,即使事实上某些任务失败了。看起来好像每个输入记录都被处理了一次 —— 没有记录被跳过,而且没有处理两次。尽管重任务意味着实际上可能会多次处理记录,但输出中的可见效果好像只处理过一次。这个原则被称为一次语义学,虽然有效 —— 一次将是一个更具描述性的术语【90】。
特别是,批处理容错方法可确保批处理作业的输出与没有出错的情况相同,即使实际上某些任务失败了。看起来好像每条输入记录都被处理了恰好一次 —— 没有记录被跳过,而且没有记录被处理两次。尽管重启任务意味着实际上可能会多次处理记录,但输出中的可见效果看上去就像只处理过一次。这个原则被称为**恰好一次语义exactly-once semantics**,尽管**有效一次effectively-once**可能会是一个更写实的术语【90】。
在流处理过程中也出现了同样的容错问题,但是处理起来不那么直观:等到某个任务完成之后才使其输出可见,因为流是无限的,因此你永远无法完成处理
在流处理中也出现了同样的容错问题,但是处理起来没有那么直观:等待某个任务完成之后再使其输出可见并不是一个可行选项,因为你永远无法处理完一个无限的流
#### 小批量和检查
#### 微批量与存档
一个解决方案是将流分解成小块,并像小型批处理一样处理每个块。这种方法被称为**小批量microbatching**它被用于Spark Streaming 【91】。批处理大小通常约为1秒这是性能折中的结果较小的批次会导致更大的调度和协调开销,而较大的批次意味着流处理器结果变得可见之前的较长延迟。
一个解决方案是将流分解成小块,并像微型批处理一样处理每个块。这种方法被称为**微批次microbatching**它被用于Spark Streaming 【91】。批次的大小通常约为1秒这是对性能妥协的结果较小的批次会导致更大的调度与协调开销,而较大的批次意味着流处理器结果可见之前的延迟要更长
缩也隐含地提供了与批量大小相等的翻滚窗口(通过处理时间而不是事件时间戳)。任何需要更大窗口的作业都需要明确地将状态从一个微阵列转移到下一个微阵列
批次也隐式提供了一个与批次大小相等的滚动窗口(按处理时间而不是事件时间戳分窗)。任何需要更大窗口的作业都需要显式地将状态从一个微批次转移到下一个微批次
Apache Flink中使用的一种变体方法是定期生成状态滚动检查点并将其写入持久存储器【92,93】。如果流操作符崩溃它可以从最近的检查点重新启动并放弃在最后一个检查点和崩溃之间生成的任何输出。检查点由消息流中的条形码触发类似于微型图形之间的边界但不强制特定的窗口大小。
Apache Flink则使用不同的方法它会定期生成状态的滚动存档点并将其写入持久存储【92,93】。如果流算子崩溃它可以从最近的存档点重启并丢弃从最近检查点到崩溃之间的所有输出。存档点会由消息流中的**壁障barrier**触发,类似于微批次之间的边界,但不会强制一个特定的窗口大小。
在流处理框架的范围内,微观网格化和检查点方法提供了与批处理一样的一次语义。但是,只要输出离开流处理器(例如,通过写入数据库,向外部消息代理发送消息或发送电子邮件),框架将不再能够放弃失败批处理的输出。在这种情况下,重新启动失败的任务会导致外部副作用发生两次,单独使用微配量或检查点不足以防止此问题。
在流处理框架的范围内,微批次与存档点方法提供了与批处理一样的**恰好一次语义**。但是,只要输出离开流处理器(例如,写入数据库,向外部消息代理发送消息,或发送电子邮件),框架就无法抛弃失败批次的输出了。在这种情况下,重启失败任务会导致外部副作用发生两次,只有微批次或存档点不足以阻止这一问题。
#### 原子提交重访
#### 原子提交再现
为了在出现故障时给出精确的一次处理,我们需要确保处理事件的所有输出和副作用只有当处理成功时才会生效。这些影响包括发送给下游运营商或外部消息传递系统(包括电子邮件或推送通知)的任何消息,任何数据库写入,对运营商状态的任何更改以及对输入消息的任何确认(包括将消费者偏移量向前移动基于日志的消息代理)。
为了在出现故障时表现出恰好处理一次的样子,我们需要确保事件处理的所有输出和副作用**当且仅当**处理成功时才会生效。这些影响包括发送给下游算子或外部消息传递系统(包括电子邮件或推送通知)的任何消息,任何数据库写入,对算子状态的任何变更,以及对输入消息的任何确认(包括在基于日志的消息代理中将消费者偏移量前移)。
这些事情要么都是原子地发生,要么都不发生,但是不应该彼此不同步。如果这种方法听起来很熟悉,那是因为我们在分布式事务和两阶段提交的情况下在第360页的“准确一次的消息处理”中讨论了它
这些事情要么都原子地发生,要么都不发生,但是它们不应当失去同步。如果这种方法听起来很熟悉,那是因为我们在分布式事务和两阶段提交的上下文中讨论过它(参阅“[恰好一次的消息处理](ch9.md#恰好一次的消息处理)”)
在[第9章](ch9.md)中,我们讨论了分布式交易如XA的传统实现中的问题。然而在更受限制的环境中可以有效地实现这样的原子提交设施。 Google云数据流【81,92】和VoltDB 【94】中使用了这种方法并计划在Apache Kafka 【95,96】中添加类似的功能。与XA不同这些实现不会尝试跨异构技术提供事务而是通过在流处理框架中管理状态更改和消息传递来保持内部事务。事务协议的开销可以通过在单个事务中处理几个输入消息来分摊。
在[第9章](ch9.md)中,我们讨论了分布式事务传统实现中的问题如XA。然而在限制更为严苛的环境中也是有可能高效实现这种原子提交机制的。 Google Cloud Dataflow【81,92】和VoltDB 【94】中使用了这种方法Apache Kafka有计划加入类似的功能【95,96】。与XA不同这些实现不会尝试跨异构技术提供事务而是通过在流处理框架中同时管理状态变更与消息传递来内化事务。事务协议的开销可以通过在单个事务中处理多个输入消息来分摊。
#### 幂等性
我们的目标是放弃任何失败的任务的部分输出,以便他们可以安全地重试,而不会两次生效。分布式事务是实现这一目标的一种方式,但另一种方式是依赖幂等性【97】。
我们的目标是丢弃任何失败任务的部分输出,以便能安全地重试,而不会生效两次。分布式事务是实现这个目标的一种方式,而另一种方式是依赖**幂等性idempotence**【97】。
幂等操作是可以多次执行的操作,并且与只执行一次操作具有相同的效果。例如,将键值存储中的某个键设置为某个固定值是幂等的(再次写入该值会覆盖具有相同值的值),而递增计数器不是幂等的(再次执行递增意味着该值递增两次)。
幂等操作是多次重复执行与单次执行效果相同的操作。例如,将键值存储中的某个键设置为某个特定值是幂等的(再次写入该值,只是用同样的值替代),而递增一个计数器不是幂等的(再次执行递增意味着该值递增两次)。
即使一个操作不是天生的幂等它往往可以与一些额外的元数据幂等。例如在使用来自Kafka的消息时每条消息都有一个持续的单调递增的偏移量。将值写入外部数据库时可以将触发上次写入的消息的偏移量与值包含在一起。因此你可以判断是否已应用更新并避免再次执行相同的更新
即使一个操作不是天生幂等的往往可以通过一些额外的元数据做成幂等的。例如在使用来自Kafka的消息时每条消息都有一个持久的单调递增的偏移量。将值写入外部数据库时可以将这个偏移量带上这样你就可以判断一条更新是不是已经执行过了因而避免重复执行
风暴三叉戟的状态处理基于类似的想法【78】。依赖幂等性意味着一些假设重启一个失败的任务必须以相同的顺序重播相同的消息一个基于日志的消息代理这样做处理必须是确定性的其他节点不能同时更新相同的值【98,99】。
Storm的Trident基于类似的想法来处理状态【78】。依赖幂等性意味着隐含了一些假设重启一个失败的任务必须以相同的顺序重放相同的消息基于日志的消息代理能做这些事处理必须是确定性的没有其他节点能同时更新相同的值【98,99】。
当从一个处理节点故障转移到另一个处理节点时,可能需要进行防护(参阅“[领导和锁](ch8.md#领导和锁)”),以防止被认为是死的节点的干扰
当从一个处理节点故障转移到另一个节点时,可能需要进行**防护fencing**(参阅“[领导和锁](ch8.md#领导和锁)”),以防止被假死节点干扰。尽管有这么多注意事项,幂等操作是一种实现**恰好一次语义**的有效方式,仅需很小的额外开销。
#### 失败后重建状态
任何需要状态的流进程(例如,任何窗口聚合(例如计数器,平均值和直方图)以及用于连接的任何表和索引)都必须确保在失败之后可以恢复此状态。
任何需要状态的流处理 —— 例如,任何窗口聚合(例如计数器,平均值和直方图)以及任何用于连接的表和索引,都必须确保在失败之后能恢复其状态。
一种选择是将状态保持在远程数据存储中并复制它,尽管如每个单独消息的远程数据库查询速度可能会很慢,正如在“[流表连接](#流表连接)”中所述。另一种方法是保持流处理器的本地状态,并定期复制。然后,当流处理器从故障中恢复时,新任务可以读取复制状态并恢复处理而不丢失数据。
一种选择是将状态保存在远程数据存储中,并进行复制,然而正如在“[流表连接](#流表连接)”中所述,每个消息都要查询远程数据库可能会很慢。另一种方法是在流处理器本地保存状态,并定期复制。然后当流处理器从故障中恢复时,新任务可以读取状态副本,恢复处理而不丢失数据。
例如Flink定期捕获操作员状态的快照并将它们写入HDFS等持久存储中【92,93】。 Samza和Kafka Streams通过将状态更发送到具有日志压缩功能的专用Kafka主题来复制状态更改这类似于变更数据捕获【84,100】。 VoltDB通过冗余处理多个节点上的每个输入消息来复制状态(参阅“[真的串行执行](ch7.md#真的串行执行)”)。
例如Flink定期捕获算子状态的快照并将它们写入HDFS等持久存储中【92,93】。 Samza和Kafka Streams通过将状态更发送到具有日志压缩功能的专用Kafka主题来复制状态变更这与变更数据捕获类似【84,100】。 VoltDB通过在多个节点上对每个输入消息进行冗余处理来复制状态(参阅“[真的串行执行](ch7.md#真的串行执行)”)。
在某些情况下,甚至可能不需要复制状态,因为它可以从输入流重建。例如,如果状态由一个相当短的窗口中的聚合组成,则它可能足够快,以便重放与该窗口相对应的输入事件。如果状态是通过变更数据捕获维护的数据库的本地副本,那么也可以从日志压缩的更流重建数据库(参阅“[日志压缩](#日志压缩)”一节)。
在某些情况下,甚至可能不需要复制状态,因为它可以从输入流重建。例如,如果状态是从相当短的窗口中聚合而成,则简单地重放该窗口中的输入事件可能是足够快的。如果状态是通过变更数据捕获来维护的数据库的本地副本,那么也可以从日志压缩的更流重建数据库(参阅“[日志压缩](#日志压缩)”)。
但是,所有这些权衡取决于底层基础架构的性能特征:在某些系统中,网络延迟可能低于磁盘访问延迟,网络带宽可能与磁盘带宽相当。在所有情况下都没有普遍理想的权衡,随着存储和网络技术的发展,本地和远程状态的优点也可能会发生变化
然而,所有这些权衡取决于底层基础架构的性能特征:在某些系统中,网络延迟可能低于磁盘访问延迟,网络带宽可能与磁盘带宽相当。没有针对所有情况的普世理想权衡,随着存储和网络技术的发展,本地状态与远程状态的优点也可能会互换
## 本章小结
在本章中,我们讨论了事件流,它们所服务的目的以及如何处理它们。在某些方面,流处理非常类似于我们在[第10章](ch10.md)讨论的批处理,而是在无限的(永无止境的)流而不是固定大小的输入上持续进行。从这个角度来看,消息代理和事件日志可以作为文件系统的流媒体
在本章中,我们讨论了事件流,它们所服务的目的以及如何处理它们。在某些方面,流处理非常类似于在[第10章](ch10.md) 中讨论的批处理,不过是在无限的(永无止境的)流而不是固定大小的输入上持续进行。从这个角度来看,消息代理和事件日志可以视作文件系统的流式等价物
我们花了一些时间比较两种消息代理:
我们花了一些时间比较两种消息代理:
***AMQP/JMS风格的消息代理***
代理将个人消息分配给消费者消费者在成功处理个人消息时确认消息。消息被确认后从代理中删除。这种方法适合作为RPC的异步形式(另请参阅“[消息传递数据流](ch4.md#消息传递数据流)”),例如在任务队列中,消息处理的确切顺序并不重要,没有在处理之后,需要重新读取旧消息。
代理将单条消息分配给消费者消费者在成功处理单条消息后确认消息。消息被确认后从代理中删除。这种方法适合作为一种异步形式的RPC(另请参阅“[消息传递数据流](ch4.md#消息传递数据流)”),例如在任务队列中,消息处理的确切顺序并不重要,而且消息在处理完之后,不需要回头重新读取旧消息。
***基于日志的消息代理***
代理将分区中的所有消息分配给相同的使用者节点,并始终以相同的顺序传递消息。并行性是通过划分来实现的,消费者通过检查他们所处理的最后一个消息的偏移来跟踪他们的进度。代理将消息保留在磁盘上,因此如有必要,可以回并重新读取旧消息。
代理将一个分区中的所有消息分配给同一个消费者节点,并始终以相同的顺序传递消息。并行是通过分区实现的,消费者通过存档最近处理消息的偏移量来跟踪工作进度。消息代理将消息保留在磁盘上,因此如有必要的话,可以回并重新读取旧消息。
基于日志的方法与数据库中的复制日志(参见[第5章](ch5.md))和日志结构存储引擎(请参阅[第3章](ch3.md)具有相似之处。我们看到,这种方法特别适用于消耗输入流并生成派生状态或派生输出流的流处理系统
基于日志的方法与数据库中的复制日志(参见[第5章](ch5.md))和日志结构存储引擎(请参阅[第3章](ch3.md)有相似之处。我们看到,这种方法对于消费输入流,产生衍生状态与衍生输出数据流的系统而言特别适用
就流的来源而言,我们讨论了几种可能性:用户活动事件,提供定期读数的传感器和数据馈送(例如金融市场数据)自然地表示为流。我们看到,将数据写入数据流也是有用的:我们可以捕获更改日志 —— 即对数据库所做的所有更改的历史记录 —— 隐式地通过变更数据捕获或通过事件明确地捕获代理。日志压缩允许流保留数据库 内容的完整副本。
就流的来源而言,我们讨论了几种可能性:用户活动事件,定期读数的传感器和Feed数据例如金融中的市场数据能够自然地表示为流。我们发现将数据库写入视作流也是很有用的我们可以捕获变更日志 —— 即对数据库所做的所有变更的历史记录 —— 隐式地通过变更数据捕获,或显式地通过事件溯源。日志压缩允许流也能保有数据库内容的完整副本。
将数据库表示为流为系统集成提供了强大的机会。你可以通过使用更改日志并将其应用于派生系统,使派生的数据系统(如搜索索引,缓存和分析系统)保持最新。你甚至可以从头开始,从开始一直到现在消耗更改的日志,从而为现有数据构建新的视图。
将数据库表示为流为系统集成带来了很多强大机遇。通过消费变更日志并将其应用至衍生系统,你能使诸如搜索索引,缓存,以及分析系统这类衍生数据系统不断保持更新。你甚至能从头开始,通过读取从创世至今的所有变更日志,为现有数据创建全新的视图。
将状态保持为流并重放消息的设施也是在各种流处理框架中实现流连接和容错的技术的基础。我们讨论了流处理的几个目的,包括搜索事件模式(复杂事件处理),计算加窗聚合(流分析)以及保持派生数据系统处于最新状态(物化视图)。
像流一样维护状态,以及消息重放的基础设施,是在各种流处理框架中实现流连接和容错的基础。我们讨论了流处理的几种目的,包括搜索事件模式(复杂事件处理),计算分窗聚合(流分析),以及保证衍生数据系统处于最新状态(物化视图)。
然后我们讨论了在流处理器中推理时间的困难,包括处理时间和事件时间戳之间的区别,以及在你认为窗口完成之后处理到达的离散事件的问题。
然后我们讨论了在流处理中对时间进行推理的困难,包括处理时间与事件时间戳之间的区别,以及当你认为窗口已经完事之后,如何处理到达的掉队事件的问题。
我们区分了可能出现在流程中的三种类型的连接:
我们区分了流处理中可能出现的三种连接类型
***流流连接***
***流流连接***
两个输入流由活动事件组成并且连接操作符搜索在某个时间窗口内发生的相关事件。例如它可以匹配相同用户在30分钟内采取的两个动作。如果你想要在一个流中查找相关事件则两个连接输入实际上可以是相同的流自连接)。
两个输入流都由活动事件组成而连接算子在某个时间窗口内搜索相关的事件。例如它可能会将同一个用户30分钟内进行的两个活动联系在一起。如果你想要找出一个流内的相关事件连接的两侧输入可能实际上都是同一个流**自连接self-join**)。
***流表连接***
一个输入流由活动事件组成,另一个输入流是数据库更改日志。更新日志保持数据库的本地副本最新。对于每个活动事件,连接运算符将查询数据库并输出一个丰富的活动事件。
一个输入流由活动事件组成,另一个输入流是数据库变更日志。变更日志保证了数据库的本地副本是最新的。对于每个活动事件,连接算子将查询数据库,并输出一个扩展的活动事件。
***表连接***
***表连接***
两个输入流都是数据库更新日志。在这种情况下,一方的每一个变化都与另一方的最新状态相结合。结果是对两个表之间的连接的物化视图进行了一系列更改
两个输入流都是数据库变更日志。在这种情况下,一侧的每一个变化都与另一侧的最新状态相连接。结果是两表连接所得物化视图的变更流
最后,我们讨论了在流处理中实现容错和一次语义的技术。与批处理一样,我们需要放弃任何失败任务的部分输出。然而,由于流程长时间运行并持续产生输出,所以不能简单地丢弃所有的输出。相反,可以使用更细粒度的恢复机制,基于微博,检查点,事务或幂等写入。
最后,我们讨论了在流处理中实现容错和恰好一次语义的技术。与批处理一样,我们需要放弃任何部分失败任务的输出。然而由于流处理长时间运行并持续产生输出,所以不能简单地丢弃所有的输出。相反,可以使用更细粒度的恢复机制,基于微批次,存档点,事务,或幂等写入。
## 参考文献