mirror of
https://github.com/Vonng/ddia.git
synced 2024-12-06 15:20:12 +08:00
ch10 80%
This commit is contained in:
parent
7f1959e95d
commit
9db0ed9d5f
@ -106,7 +106,7 @@
|
||||
| 第八章:分布式系统中的问题 | 初翻 | |
|
||||
| 第九章:一致性与共识 | 初翻 | |
|
||||
| 第三部分:衍生数据 | 精翻 | |
|
||||
| 第十章:批处理 | 初翻 50% | Vonng |
|
||||
| 第十章:批处理 | 80% | Vonng |
|
||||
| 第十一章:流处理 | 草翻 | |
|
||||
| 第十二章:数据系统的未来 | 初翻 40% | Vonng |
|
||||
| 术语表 | - | |
|
||||
|
189
ch10.md
189
ch10.md
@ -367,7 +367,7 @@ top5.each{|count, url| puts "#{count} #{url}" } # 5
|
||||
|
||||
上一节描述的连接算法在Reducer中执行实际的连接逻辑,因此被称为Reduce端连接。Mapper扮演着预处理输入数据的角色:从每个输入记录中提取键值,将键值对分配给Reducer分区,并按键排序。
|
||||
|
||||
Reduce端方法的优点是不需要对输入数据做任何假设:无论其属性和结构如何,Mapper都可以对其预处理以备连接。然而不利的一面是,所有这些排序,复制至Reducer,以及合并Reducer输入可能开销巨大。当数据通过MapReduce 【37】阶段时,数据可能要落盘好几次,取决于可用的内存缓冲区。
|
||||
Reduce端方法的优点是不需要对输入数据做任何假设:无论其属性和结构如何,Mapper都可以对其预处理以备连接。然而不利的一面是,排序,复制至Reducer,以及合并Reducer输入,所有这些操作可能开销巨大。当数据通过MapReduce 阶段时,数据可能需要落盘好几次,取决于可用的内存缓冲区【37】。
|
||||
|
||||
另一方面,如果你**能**对输入数据作出某些假设,则通过使用所谓的Map端连接来加快连接速度是可行的。这种方法使用了一个阉掉Reduce与排序的MapReduce作业,每个Mapper只是简单地从分布式文件系统中读取一个输入文件块,然后将输出文件写入文件系统,仅此而已。
|
||||
|
||||
@ -375,141 +375,145 @@ top5.each{|count, url| puts "#{count} #{url}" } # 5
|
||||
|
||||
适用于执行Map端连接的最简单场景是大数据集与小数据集连接的情况。要点在于小数据集需要足够小,以便可以将其全部加载到每个Mapper的内存中。
|
||||
|
||||
例如,假设在[图10-2](img/fig10-2.png)的情况下,用户数据库足够小以适应内存。在这种情况下,当Mapper启动时,它可以首先将用户数据库从分布式文件系统读取到内存中的哈希表中。完成此操作后,Map程序可以扫描用户活动事件,并简单地查找散列表中每个事件的用户标识[^vi]。
|
||||
例如,假设在[图10-2](img/fig10-2.png)的情况下,用户数据库小到足以放进内存中。在这种情况下,当Mapper启动时,它可以首先将用户数据库从分布式文件系统读取到内存中的散列中。完成此操作后,Map程序可以扫描用户活动事件,并简单地在散列表中查找每个事件的用户ID[^vi]。
|
||||
|
||||
[^vi]: 这个例子假定散列表中的每个键只有一个条目,这对用户数据库(用户ID唯一标识一个用户)可能是正确的。通常,哈希表可能需要包含具有相同键的多个条目,并且连接运算符将输出关键字的所有匹配。
|
||||
[^vi]: 这个例子假定散列表中的每个键只有一个条目,这对用户数据库(用户ID唯一标识一个用户)可能是正确的。通常,哈希表可能需要包含具有相同键的多个条目,而连接运算符将对每个键输出所有的匹配。
|
||||
|
||||
仍然可以有几个映射任务:一个用于连接的大输入的每个文件块(在[图10-2](img/fig10-2.png)的例子中,活动事件是大输入)。这些Mapper中的每一个都将小输入全部加载到内存中。
|
||||
参与连接的较大输入的每个文件块各有一个Mapper(在[图10-2](img/fig10-2.png)的例子中活动事件是较大的输入)。每个Mapper都会将较小输入整个加载到内存中。
|
||||
|
||||
这种简单而有效的算法被称为广播散列连接:广播一词反映了这样一个事实,即大输入的分区的每个Mapper都读取整个小输入(所以小输入有效地“广播”到大的输入),单词hash反映了它使用一个哈希表。 Pig(名为“replicated join”),Hive(“MapJoin”),Cascading和Crunch支持此连接方法。它也用于数据仓库查询引擎,如Impala 【41】。
|
||||
这种简单有效的算法被称为**广播散列连接(broadcast hash join)**:**广播**一词反映了这样一个事实,每个连接较大输入端分区的Mapper都会将较小输入端数据集整个读入内存中(所以较小输入实际上“广播”到较大数据的所有分区上),**散列**一词反映了它使用一个散列表。 Pig(名为“**复制链接(replicated join)**”),Hive(“**MapJoin**”),Cascading和Crunch支持这种连接。它也被诸如Impala的数据仓库查询引擎使用【41】。
|
||||
|
||||
而不是将小连接输入加载到内存散列表中,另一种方法是将小连接输入存储在本地磁盘上的只读索引中【42】。该索引中经常使用的部分将保留在操作系统的页面缓存中,因此这种方法可以提供与内存中哈希表几乎一样快的随机访问查找,但实际上并不需要数据集适合内存。
|
||||
除了将连接较小输入加载到内存散列表中,另一种方法是将较小输入存储在本地磁盘上的只读索引中【42】。索引中经常使用的部分将保留在操作系统的页面缓存中,因而这种方法可以提供与内存散列表几乎一样快的随机查找性能,但实际上并不需要数据集能放入内存中。
|
||||
|
||||
#### 分区散列连接
|
||||
|
||||
如果以相同方式对映射端连接的输入进行分区,则散列连接方法可以独立应用于每个分区。在[图10-2](img/fig10-2.png)的情况下,你可以根据用户标识的最后一位十进制数字来安排活动事件和用户数据库的每一个(因此每边有10个分区)。例如,Mapper3首先将所有具有以3结尾的ID的用户加载到散列表中,然后扫描ID为3的每个用户的所有活动事件。
|
||||
如果Map端连接的输入以相同的方式进行分区,则散列连接方法可以独立应用于每个分区。在[图10-2](img/fig10-2.png)的情况中,你可以根据用户ID的最后一位十进制数字来对活动事件和用户数据库进行分区(因此连接两侧各有10个分区)。例如,Mapper3首先将所有具有以3结尾的ID的用户加载到散列表中,然后扫描ID为3的每个用户的所有活动事件。
|
||||
|
||||
如果分区正确完成,你可以确定所有你可能要加入的记录都位于相同编号的分区中,因此每个Mapper只能从每个输入数据集中读取一个分区就足够了。这具有的优点是每个Mapper都可以将较少量的数据加载到其哈希表中。
|
||||
如果分区正确无误,可以确定的是,所有你可能需要连接的记录都落在同一个编号的分区中。因此每个Mapper只需要从输入两端各读取一个分区就足够了。好处是每个Mapper都可以在内存散列表中少放点数据。
|
||||
|
||||
这种方法只适用于两个连接的输入具有相同数量的分区,记录根据相同的键和相同的散列函数分配给分区。如果输入是由之前执行过这个分组的MapReduce作业生成的,那么这可能是一个合理的假设。
|
||||
这种方法只有当连接两端输入有相同的分区数,且两侧的记录都是使用相同的键与相同的哈希函数做分区时才适用。如果输入是由之前执行过这种分组的MapReduce作业生成的,那么这可能是一个合理的假设。
|
||||
|
||||
分区散列连接在Hive 【37】中称为bucketed映射连接。Map端合并连接
|
||||
分区散列连接在Hive中称为**Map端桶连接(bucketed map joins)【37】**。
|
||||
|
||||
如果输入数据集不仅以相同的方式进行分区,而且还基于相同的键进行排序,则应用另一种Map端联接的变体。在这种情况下,输入是否足够小以适应内存并不重要,因为Mapper可以执行通常由reducer执行的相同合并操作:按递增键递增读取两个输入文件,以及匹配相同的键记录。
|
||||
#### Map端合并连接
|
||||
|
||||
如果Map端合并连接是可能的,则可能意味着先前的MapReduce作业首先将输入数据集引入到这个分区和排序的表单中。原则上,这个加入可以在之前工作的Reduce阶段进行。但是,在单独的仅用于Map的作业中执行合并连接仍然是适当的,例如,除了此特定连接之外,还需要分区和排序数据集以用于其他目的。
|
||||
如果输入数据集不仅以相同的方式进行分区,而且还基于相同的键进行**排序**,则可适用另一种Map端联接的变体。在这种情况下,输入是否小到能放入内存并不重要,因为这时候Mapper同样可以执行归并操作(通常由Reducer执行)的归并操作:按键递增的顺序依次读取两个输入文件,将具有相同键的记录配对。
|
||||
|
||||
#### MapReduce与Map端连接的工作流程
|
||||
如果能进行Map端合并连接,这通常意味着前一个MapReduce作业可能一开始就已经把输入数据做了分区并进行了排序。原则上这个连接就可以在前一个作业的Reduce阶段进行。但使用独立的仅Map作业有时也是合适的,例如,分好区且排好序的中间数据集可能还会用于其他目的。
|
||||
|
||||
当下游作业使用MapReduce连接的输出时,map-side或reduce-side连接的选择会影响输出的结构。 reduce-side连接的输出按连接键进行分区和排序,而map-side连接的输出按照与大输入相同的方式进行分区和排序(因为对每个文件块启动一个map任务无论是使用分区连接还是广播连接,连接的大输入)。
|
||||
#### MapReduce工作流与Map端连接
|
||||
|
||||
如前所述,Map边连接也对输入数据集的大小,排序和分区做出了更多的假设。在优化连接策略时,了解分布式文件系统中数据集的物理布局变得非常重要:仅仅知道编码格式和数据存储目录的名称是不够的;你还必须知道数据分区和排序的分区数量和键。
|
||||
当下游作业使用MapReduce连接的输出时,选择Map端连接或Reduce端连接会影响输出的结构。Reduce端连接的输出是按照**连接键**进行分区和排序的,而Map端连接的输出则按照与较大输入相同的方式进行分区和排序(因为无论是使用分区连接还是广播连接,连接较大输入端的每个文件块都会启动一个Map任务)。
|
||||
|
||||
在Hadoop生态系统中,这种关于数据集分区的元数据经常在HCatalog和Hive Metastore中维护【37】。
|
||||
如前所述,Map端连接也对输入数据集的大小,有序性和分区方式做出了更多假设。在优化连接策略时,了解分布式文件系统中数据集的物理布局变得非常重要:仅仅知道编码格式和数据存储目录的名称是不够的;你还必须知道数据是按哪些键做的分区和排序,以及分区的数量。
|
||||
|
||||
### 工作流的输出
|
||||
在Hadoop生态系统中,这种关于数据集分区的元数据通常在HCatalog和Hive Metastore中维护【37】。
|
||||
|
||||
我们已经谈了很多关于实现MapReduce工作流程的各种算法,但是我们忽略了一个重要的问题:一旦完成,所有处理的结果是什么?我们为什么要把所有这些工作放在首位?
|
||||
|
||||
在数据库查询的情况下,我们根据分析目的来区分事务处理(OLTP)目的(参阅“[事务处理或分析?](ch3.md#事务处理或分析?)”)。我们看到,OLTP查询通常使用索引按键查找少量记录,以便将其呈现给用户(例如,在网页上)。另一方面,分析查询通常会扫描大量记录,执行分组和汇总,输出通常具有报告的形式:显示某个指标随时间变化的图表,或前10个项目根据一些排名,或一些数量分解成子类别。这种报告的消费者通常是需要做出商业决策的分析师或经理。
|
||||
|
||||
批处理在哪里适合?这不是交易处理,也不是分析。与分析更接近,因为批处理过程通常扫描输入数据集的大部分。但是,MapReduce作业的工作流程与用于分析目的的SQL查询不同(参阅“[比较Hadoop与分布式数据库](比较Hadoop与分布式数据库)”)。批处理过程的输出通常不是报告,而是一些其他类型的结构。
|
||||
### 批处理工作流的输出
|
||||
|
||||
我们已经说了很多用于实现MapReduce工作流的算法,但却忽略了一个重要的问题:这些处理完成之后的最终结果是什么?我们最开始为什么要跑这些作业?
|
||||
|
||||
在数据库查询的场景中,我们将事务处理(OLTP)与分析两种目的区分开来(参阅“[事务处理或分析?](ch3.md#事务处理或分析?)”)。我们看到,OLTP查询通常根据键查找少量记录,使用索引,并将其呈现给用户(比如在网页上)。另一方面,分析查询通常会扫描大量记录,执行分组与聚合,输出通常有着报告的形式:显示某个指标随时间变化的图表,或按照某种排位取前10项,或一些数字细化为子类。这种报告的消费者通常是需要做出商业决策的分析师或经理。
|
||||
|
||||
批处理放哪里合适?它不属于事务处理,也不是分析。它和分析比较接近,因为批处理通常会扫过输入数据集的绝大部分。然而MapReduce作业工作流与用于分析目的的SQL查询是不同的(参阅“[Hadoop与分布式数据库的对比](#Hadoop与分布式数据库的对比)”)。批处理过程的输出通常不是报表,而是一些其他类型的结构。
|
||||
|
||||
#### 建立搜索索引
|
||||
|
||||
Google最初使用的MapReduce是为其搜索引擎建立索引,这个索引是作为5到10个MapReduce作业的工作流实现的【1】。虽然Google为了这个目的后来不再使用MapReduce 【43】,但是如果从建立搜索索引的角度来看,它可以帮助理解MapReduce。 (即使在今天,Hadoop MapReduce仍然是构建Lucene / Solr索引的好方法。)
|
||||
Google最初使用MapReduce是为其搜索引擎建立索引,用了由5到10个MapReduce作业组成的工作流实现【1】。虽然Google后来也不仅仅是为这个目的而使用MapReduce 【43】,但如果从构建搜索索引的角度来看,更能帮助理解MapReduce。 (直至今日,Hadoop MapReduce仍然是为Lucene/Solr构建索引的好方法【44】)
|
||||
|
||||
我们在“[全文搜索和模糊索引](ch3.md#全文搜索和模糊索引)”中简要地看到了Lucene这样的全文搜索索引是如何工作的:它是一个文件(关键词字典),你可以在其中高效地查找特定关键字并找到包含该关键字的所有文档ID列表(发布列表)。这是一个非常简单的搜索索引视图 —— 实际上,它需要各种附加数据,以便根据相关性对搜索结果进行排名,纠正拼写错误,解析同义词等等,但这一原则是成立的。
|
||||
我们在“[全文搜索和模糊索引](ch3.md#全文搜索和模糊索引)”中简要地了解了Lucene这样的全文搜索索引是如何工作的:它是一个文件(关键词字典),你可以在其中高效地查找特定关键字,并找到包含该关键字的所有文档ID列表(文章列表)。这是一种非常简化的看法 —— 实际上,搜索索引需要各种额外数据,以便根据相关性对搜索结果进行排名,纠正拼写错误,解析同义词等等 —— 但这个原则是成立的。
|
||||
|
||||
如果需要对一组固定文档执行全文搜索,则批处理是构建索引的一种非常有效的方法:Mapper根据需要对文档集进行分区,每个reducer构建其分区的索引,并将索引文件写入分布式文件系统。构建这样的文档分区索引(参阅“[分区和二级索引](ch6.md#分区和二级索引)”)并行处理非常好。
|
||||
如果需要对一组固定文档执行全文搜索,则批处理是一种构建索引的高效方法:Mapper根据需要对文档集合进行分区,每个Reducer构建该分区的索引,并将索引文件写入分布式文件系统。构建这样的文档分区索引(参阅“[分区和二级索引](ch6.md#分区和二级索引)”)并行处理效果拔群。
|
||||
|
||||
由于按关键字查询搜索索引是只读操作,因此这些索引文件一旦创建就是不可变的。
|
||||
由于按关键字查询搜索索引是只读操作,因而这些索引文件一旦创建就是不可变的。
|
||||
|
||||
如果索引的文档集合发生更改,则可以选择定期重新运行整个索引工作流程,并在完成后用新的索引文件批量替换以前的索引文件。如果只有少量的文档发生了变化,这种方法可能会带来很高的计算成本,但是它的优点是索引过程很容易推理:文档,索引。
|
||||
如果索引的文档集合发生更改,一种选择是定期重跑整个索引工作流,并在完成后用新的索引文件批量替换以前的索引文件。如果只有少量的文档发生了变化,这种方法的计算成本可能会很高。但它的优点是索引过程很容易理解:文档进,索引出。
|
||||
|
||||
或者,可以逐渐建立索引。如[第3章](ch3.md)所述,如果要添加,删除或更新索引中的文档,Lucene会写出新的段文件,并异步合并和压缩背景中的段文件。我们将在[第11章](ch11.md)中看到更多这样的增量处理。
|
||||
另一个选择是,可以增量建立索引。如[第3章](ch3.md)中讨论的,如果要在索引中添加,删除或更新文档,Lucene会写新的段文件,并在后台异步合并压缩段文件。我们将在[第11章](ch11.md)中看到更多这种增量处理。
|
||||
|
||||
#### 键值存储作为批处理输出
|
||||
|
||||
搜索索引只是批处理工作流程可能输出的一个示例。批量处理的另一个常见用途是构建机器学习系统,如分类器(例如,垃圾邮件过滤器,异常检测,图像识别)和推荐系统(例如,你可能认识的人,你可能感兴趣的产品或相关搜索)。
|
||||
搜索索引只是批处理工作流可能输出的一个例子。批处理的另一个常见用途是构建机器学习系统,例如分类器(比如垃圾邮件过滤器,异常检测,图像识别)与推荐系统(例如,你可能认识的人,你可能感兴趣的产品或相关的搜索【29】)。
|
||||
|
||||
这些批处理作业的输出通常是某种数据库:例如,可以通过用户ID查询以获取该用户的建议朋友的数据库,或者可以通过产品ID查询的数据库以获取相关产品【45】。
|
||||
这些批处理作业的输出通常是某种数据库:例如,可以通过给定用户ID查询该用户推荐好友的数据库,或者可以通过产品ID查询相关产品的数据库【45】。
|
||||
|
||||
这些数据库需要从处理用户请求的Web应用程序中查询,这些请求通常与Hadoop基础架构分离。那么批处理过程的输出如何返回到Web应用程序可以查询的数据库?
|
||||
这些数据库需要被处理用户请求的Web应用所查询,而它们通常是独立于Hadoop基础设施的。那么批处理过程的输出如何回到Web应用可以查询的数据库中呢?
|
||||
|
||||
最明显的选择可能是直接在Mapper或Reducer中使用客户端库作为你最喜欢的数据库,并从批处理作业直接写入数据库服务器,一次写入一条记录。这将起作用(假设你的防火墙规则允许从你的Hadoop环境直接访问你的生产数据库),但由于以下几个原因,这是一个坏主意:
|
||||
最直接的选择可能是,直接在Mapper或Reducer中使用你最爱数据库的客户端库,并从批处理作业直接写入数据库服务器,一次写入一条记录。它能工作(假设你的防火墙规则允许从你的Hadoop环境直接访问你的生产数据库),但这并不是一个好主意,出于以下几个原因:
|
||||
|
||||
- 正如前面讨论的连接一样,为每个记录提出一个网络请求比批处理任务的正常吞吐量要慢几个数量级。即使客户端库支持批处理,性能也可能很差。
|
||||
- MapReduce作业经常并行运行许多任务。如果所有Mapper或Reducer都同时写入相同的输出数据库,并且批处理过程期望的速率,那么该数据库可能很容易被压倒,并且其查询性能可能受到影响。这可能会导致系统其他部分的操作问题【35】。
|
||||
- 通常情况下,MapReduce为作业输出提供了一个干净的“全有或全无”的保证:如果作业成功,则结果就是只执行一次任务的输出,即使某些任务失败并且必须重试。如果整个作业失败,则不会生成输出。然而,从作业内部写入外部系统会产生外部可见的副作用,这种副作用是不能被隐藏的。因此,你不得不担心部分完成的作业对其他系统可见的结果,以及Hadoop任务尝试和推测性执行的复杂性。
|
||||
- 正如前面在连接的上下文中讨论的那样,为每条记录发起一个网络请求,要比批处理任务的正常吞吐量慢几个数量级。即使客户端库支持批处理,性能也可能很差。
|
||||
- MapReduce作业经常并行运行许多任务。如果所有Mapper或Reducer都同时写入相同的输出数据库,并以批处理的预期速率工作,那么该数据库很可能被轻易压垮,其查询性能可能变差。这可能会导致系统其他部分的运行问题【35】。
|
||||
- 通常情况下,MapReduce为作业输出提供了一个干净利落的“全有或全无”保证:如果作业成功,则结果就是每个任务恰好执行一次所产生的输出,即使某些任务失败且必须一路重试。如果整个作业失败,则不会生成输出。然而从作业内部写入外部系统,会产生外部可见的副作用,这种副作用是不能以这种方式被隐藏的。因此,你不得不去操心部分完成的作业对其他系统可见的结果,并需要理解Hadoop任务尝试与预测执行的复杂性。
|
||||
|
||||
更好的解决方案是在批处理作业中创建一个全新的数据库,并将其作为文件写入分布式文件系统中作业的输出目录,就像上一节的搜索索引一样。这些数据文件一旦写入就是不可变的,可以批量加载到处理只读查询的服务器中。各种键值存储支持在MapReduce作业中构建数据库文件,包括Voldemort 【46】,Terrapin 【47】,ElephantDB 【48】和HBase批量加载【49】。
|
||||
更好的解决方案是在批处理作业**内**创建一个全新的数据库,并将其作为文件写入分布式文件系统中作业的输出目录,就像上节中的搜索索引一样。这些数据文件一旦写入就是不可变的,可以批量加载到处理只读查询的服务器中。不少键值存储都支持在MapReduce作业中构建数据库文件,包括Voldemort 【46】,Terrapin 【47】,ElephantDB 【48】和HBase批量加载【49】。
|
||||
|
||||
构建这些数据库文件是MapReduce的一个很好的使用方法:使用Mapper提取一个键,然后使用该键进行排序已经成为构建索引所需的大量工作。由于大多数这些键值存储是只读的(文件只能由批处理作业一次写入,而且是不可变的),所以数据结构非常简单。例如,它们不需要WAL(参阅“[使B树可靠](ch3.md#使B树可靠)”)。
|
||||
构建这些数据库文件是MapReduce的一种很好用法的使用方法:使用Mapper提取出键并按该键排序,现在已经是构建索引所必需的大量工作。由于这些键值存储大多都是只读的(文件只能由批处理作业一次性写入,然后就不可变),所以数据结构非常简单。比如它们就不需要WAL(参阅“[使B树可靠](ch3.md#使B树可靠)”)。
|
||||
|
||||
将数据加载到Voldemort时,服务器将继续向旧数据文件提供请求,同时将新数据文件从分布式文件系统复制到服务器的本地磁盘。一旦复制完成,服务器会自动切换到查询新文件。如果在这个过程中出现任何问题,它可以很容易地再次切换回旧的文件,因为它们仍然存在,并且是不变的【46】。
|
||||
将数据加载到Voldemort时,服务器将继续用旧数据文件服务请求,同时将新数据文件从分布式文件系统复制到服务器的本地磁盘。一旦复制完成,服务器会自动将查询切换到新文件。如果在这个过程中出现任何问题,它可以轻易回滚至旧文件,因为它们仍然存在而且不可变【46】。
|
||||
|
||||
#### 批量过程输出的哲学
|
||||
#### 批处理输出的哲学
|
||||
|
||||
本章前面讨论过的Unix哲学(“[Unix哲学](#Unix哲学)”)鼓励通过对数据流的非常明确的实验来进行实验:程序读取输入并写入输出。在这个过程中,输入保持不变,任何以前的输出都被新输出完全替换,并且没有其他副作用。这意味着你可以随心所欲地重新运行一个命令,调整或调试它,而不会扰乱系统的状态。
|
||||
本章前面讨论过的Unix哲学(“[Unix哲学](#Unix哲学)”)鼓励以显式指明数据流的方式进行实验:程序读取输入并写入输出。在这一过程中,输入保持不变,任何先前的输出都被新输出完全替换,且没有其他副作用。这意味着你可以随心所欲地重新运行一个命令,略做改动或进行调试,而不会搅乱系统的状态。
|
||||
|
||||
MapReduce作业的输出处理遵循相同的原理。通过将输入视为不可变且避免副作用(如写入外部数据库),批处理作业不仅实现了良好的性能,而且更容易维护:
|
||||
MapReduce作业的输出处理遵循同样的原理。通过将输入视为不可变且避免副作用(如写入外部数据库),批处理作业不仅实现了良好的性能,而且更容易维护:
|
||||
|
||||
- 如果在代码中引入了一个错误,并且输出错误或损坏了,则可以简单地回滚到代码的先前版本,然后重新运行该作业,输出将再次正确。或者,甚至更简单,你可以将旧的输出保存在不同的目录中,然后切换回原来的目录。具有读写事务的数据库没有这个属性:如果你部署了错误的代码,将错误的数据写入数据库,那么回滚代码将无法修复数据库中的数据。 (能够从错误代码中恢复的思想被称为人类容错【50】。)
|
||||
- 由于易于回滚,功能开发可以比错误意味着不可挽回的损害的环境更快地进行。这种使不可逆性最小化的原则有利于敏捷软件的开发【51】。
|
||||
- 如果映射或减少任务失败,MapReduce框架将自动重新调度并在同一个输入上再次运行它。如果失败是由于代码中的一个错误造成的,那么它会一直崩溃,并最终导致作业在几次尝试之后失败。但是如果故障是由于暂时的问题引起的,那么故障是可以容忍的。这种自动重试只是安全的,因为输入是不可变的,而失败任务的输出被MapReduce框架丢弃。
|
||||
- 同一组文件可用作各种不同作业的输入,其中包括计算度量标准的计算作业,并评估作业的输出是否具有预期的特性(例如,将其与前一次运行的输出进行比较并测量差异) 。
|
||||
- 与Unix工具类似,MapReduce作业将逻辑与布线(配置输入和输出目录)分开,这就提供了关注点的分离,并且可以重用代码:一个团队可以专注于实现一件好事的工作其他团队可以决定何时何地运行这项工作。
|
||||
- 如果在代码中引入了一个错误,而输出错误或损坏了,则可以简单地回滚到代码的先前版本,然后重新运行该作业,输出将重新被纠正。或者,甚至更简单,你可以将旧的输出保存在不同的目录中,然后切换回原来的目录。具有读写事务的数据库没有这个属性:如果你部署了错误的代码,将错误的数据写入数据库,那么回滚代码将无法修复数据库中的数据。 (能够从错误代码中恢复的概念被称为**人类容错(human fault tolerance)**【50】)
|
||||
- 由于回滚很容易,比起在错误意味着不可挽回的伤害的环境,功能开发进展能快很多。这种**最小化不可逆性(minimizing irreversibility)**的原则有利于敏捷软件开发【51】。
|
||||
- 如果Map或Reduce任务失败,MapReduce框架将自动重新调度,并在同样的输入上再次运行它。如果失败是由代码中的错误造成的,那么它会不断崩溃,并最终导致作业在几次尝试之后失败。但是如果故障是由于临时问题导致的,那么故障就会被容忍。因为输入不可变,这种自动重试是安全的,而失败任务的输出会被MapReduce框架丢弃。
|
||||
- 同一组文件可用作各种不同作业的输入,包括计算指标的监控作业可以评估作业的输出是否具有预期的性质(例如,将其与前一次运行的输出进行比较并测量差异) 。
|
||||
- 与Unix工具类似,MapReduce作业将逻辑与布线(配置输入和输出目录)分离,这使得关注点分离,可以重用代码:一个团队可以实现一个专注做好一件事的作业;而其他团队可以决定何时何地运行这项作业。
|
||||
|
||||
在这些领域,对Unix运行良好的设计原则似乎也适用于Hadoop,但Unix和Hadoop在某些方面也有所不同。例如,因为大多数Unix工具都假定没有类型的文本文件,所以他们必须做大量的输入解析(本章开头的日志分析示例使用`{print $7}`来提取URL)。在Hadoop上,通过使用更多结构化的文件格式,可以消除一些低价值的语法转换:Avro(参阅“[Avro](ch4.md#Avro)”)和Parquet(参阅第95页上的“[列存储](ch3.md#列存储)”)经常使用,因为它们提供高效的基于模式的编码,并允许随着时间的推移模式的演变(见第4章)。
|
||||
在这些领域,在Unix上表现良好的设计原则似乎也适用于Hadoop,但Unix和Hadoop在某些方面也有所不同。例如,因为大多数Unix工具都假设输入输出是无类型文本文件,所以它们必须做大量的输入解析工作(本章开头的日志分析示例使用`{print $7}`来提取URL)。在Hadoop上可以通过使用更结构化的文件格式消除一些低价值的语法转换:比如Avro(参阅“[Avro](ch4.md#Avro)”)和Parquet(参阅“[列存储](ch3.md#列存储)”)经常使用,因为它们提供了基于模式的高效编码,并允许模式随时间推移而演进(见第4章)。
|
||||
|
||||
### 比较Hadoop和分布式数据库
|
||||
### Hadoop与分布式数据库的对比
|
||||
|
||||
正如我们所看到的,Hadoop有点像Unix的分布式版本,其中HDFS是文件系统,而MapReduce是Unix进程的古怪实现(这恰好总是在映射阶段和缩小阶段之间运行排序实用程序)。我们看到了如何在这些基元之上实现各种连接和分组操作。
|
||||
正如我们所看到的,Hadoop有点像Unix的分布式版本,其中HDFS是文件系统,而MapReduce是Unix进程的怪异实现(总是在Map阶段和Reduce阶段运行`sort`工具)。我们了解了如何在这些原语的基础上实现各种连接和分组操作。
|
||||
|
||||
当MapReduce论文【1】发表时,它在某种意义上说并不新鲜。我们在前几节中讨论的所有处理和并行连接算法已经在十多年前的所谓的**大规模并行处理(MPP, massively parallel processing)**数据库中实现了【3,40】。例如,Gamma数据库机器,Teradata和Tandem NonStop SQL是这方面的先驱【52】。
|
||||
当MapReduce论文发表时【1】,它从某种意义上来说 —— 并不新鲜。我们在前几节中讨论的所有处理和并行连接算法已经在十多年前所谓的**大规模并行处理(MPP, massively parallel processing)**数据库中实现了【3,40】。比如Gamma database machine,Teradata和Tandem NonStop SQL就是这方面的先驱【52】。
|
||||
|
||||
最大的区别是MPP数据库集中于在一组机器上并行执行分析SQL查询,而MapReduce和分布式文件系统【19】的组合则更像是一个可以运行任意程序的通用操作系统。
|
||||
最大的区别是,MPP数据库专注于在一组机器上并行执行分析SQL查询,而MapReduce和分布式文件系统【19】的组合则更像是一个可以运行任意程序的通用操作系统。
|
||||
|
||||
#### 存储的多样性
|
||||
#### 存储多样性
|
||||
|
||||
数据库要求你根据特定的模型(例如关系或文档)来构造数据,而分布式文件系统中的文件只是字节序列,可以使用任何数据模型和编码来编写。它们可能是数据库记录的集合,但同样可以是文本,图像,视频,传感器读数,稀疏矩阵,特征向量,基因组序列或任何其他类型的数据。
|
||||
|
||||
说白了,Hadoop开放了将数据不加区分地转储到HDFS的可能性,之后才想出如何进一步处理它【53】。相比之下,在将数据导入数据库专有存储格式之前,MPP数据库通常需要对数据和查询模式进行仔细的前期建模。
|
||||
说白了,Hadoop开放了将数据不加区分地转储到HDFS的可能性,允许后续再研究如何进一步处理【53】。相比之下,在将数据导入数据库专有存储格式之前,MPP数据库通常需要对数据和查询模式进行仔细的前期建模。
|
||||
|
||||
从纯粹的角度来看,这种仔细的建模和导入似乎是可取的,因为这意味着数据库的用户有更好的质量数据来处理。然而,在实践中,似乎只是简单地使数据可用 —— 即使它是一个古怪的,难以使用的原始格式 —— 通常比尝试决定理想的数据模型更有价值[54 ]。
|
||||
在纯粹主义者看来,这种仔细的建模和导入似乎是可取的,因为这意味着数据库的用户有更高质量的数据来处理。然而实践经验表明,简单地使数据快速可用 —— 即使它很古怪,难以使用,使用原始格式 —— 也通常要比事先决定理想数据模型要更有价值【54】。
|
||||
|
||||
这个想法与数据仓库类似(参阅“[数据仓库](ch3.md#数据仓库)”):将大型组织的各个部分的数据集中在一起是很有价值的,因为它可以跨以前不同的数据集进行联接。 MPP数据库所要求的谨慎的模式设计减慢了集中式数据收集速度;以原始形式收集数据,以后担心模式设计,使数据收集速度加快(有时被称为“**数据湖(data lake)**”或“**企业数据中心(enterprise data hub)**”【55】)。
|
||||
这个想法与数据仓库类似(参阅“[数据仓库](ch3.md#数据仓库)”):将大型组织的各个部分的数据集中在一起是很有价值的,因为它可以跨越以前相分离的数据集进行连接。 MPP数据库所要求的谨慎模式设计拖慢了集中式数据收集速度;以原始形式收集数据,稍后再操心模式的设计,能使数据收集速度加快(有时被称为“**数据湖(data lake)**”或“**企业数据中心(enterprise data hub)**”【55】)。
|
||||
|
||||
不加区别的数据倾销改变了解释数据的负担:不是强迫数据集的生产者将其转化为标准化的格式,而是数据的解释成为消费者的问题(读时模式方法【56】;参阅“[文档模型中的架构灵活性](ch2.md#文档模型中的架构灵活性)”)。如果生产者和消费者是不同优先级的不同团队,这可能是一个优势。甚至可能不存在一个理想的数据模型,而是对适合不同目的的数据有不同的看法。以原始形式简单地转储数据可以进行多次这样的转换。这种方法被称为寿司原则:“原始数据更好”【57】。
|
||||
不加区分的数据转储转移了解释数据的负担:数据集的生产者不再需要强制将其转化为标准格式,数据的解释成为消费者的问题(**读时模式**方法【56】;参阅“[文档模型中的架构灵活性](ch2.md#文档模型中的架构灵活性)”)。如果生产者和消费者是不同优先级的不同团队,这可能是一种优势。甚至可能不存在一个理想的数据模型,对于不同目的有不同的合适视角。以原始形式简单地转储数据,可以允许多种这样的转换。这种方法被称为**寿司原则(sushi principle)**:“原始数据更好”【57】。
|
||||
|
||||
因此,Hadoop经常被用于实现ETL过程(参阅“[数据仓库](ch3.md#数据仓库)”):事务处理系统中的数据以某种原始形式转储到分布式文件系统中,然后编写MapReduce作业来清理数据,将其转换为关系表单,并将其导入MPP数据仓库以进行分析。数据建模仍然在发生,但它是在一个单独的步骤中,从数据收集中分离出来的。这种解耦是可能的,因为分布式文件系统支持以任何格式编码的数据。
|
||||
因此,Hadoop经常被用于实现ETL过程(参阅“[数据仓库](ch3.md#数据仓库)”):事务处理系统中的数据以某种原始形式转储到分布式文件系统中,然后编写MapReduce作业来清理数据,将其转换为关系形式,并将其导入MPP数据仓库以进行分析。数据建模仍然在进行,但它在一个单独的步骤中进行,与数据收集相解耦。这种解耦是可行的,因为分布式文件系统支持以任何格式编码的数据。
|
||||
|
||||
#### 加工模型的多样性
|
||||
#### 处理模型多样性
|
||||
|
||||
MPP数据库是单一的,紧密集成的软件,负责磁盘上的存储布局,查询计划,调度和执行。由于这些组件都可以针对数据库的特定需求进行调整和优化,因此整个系统可以在其设计的查询类型上取得非常好的性能。而且,SQL查询语言允许表达式查询和重要语义,而无需编写代码,使业务分析师(例如Tableau)使用的图形工具可访问该语言。
|
||||
MPP数据库是单体的,紧密集成的软件,负责磁盘上的存储布局,查询计划,调度和执行。由于这些组件都可以针对数据库的特定需求进行调整和优化,因此整个系统可以在其设计针对的查询类型上取得非常好的性能。而且,SQL查询语言允许以优雅的语法表达查询,而无需编写代码,使业务分析师用来做商业分析的可视化工具(例如Tableau)能够访问。
|
||||
|
||||
另一方面,并非所有类型的处理都可以合理地表达为SQL查询。例如,如果要构建机器学习和推荐系统,或者使用相关性排名模型的全文搜索索引,或者执行图像分析,则很可能需要更一般的数据处理模型。这些类型的处理通常对特定的应用程序非常具体(例如机器学习的特征工程,机器翻译的自然语言模型,欺诈预测的风险评估函数),因此它们不可避免地需要编写代码,而不仅仅是查询。
|
||||
另一方面,并非所有类型的处理都可以合理地表达为SQL查询。例如,如果要构建机器学习和推荐系统,或者使用相关性排名模型的全文搜索索引,或者执行图像分析,则很可能需要更一般的数据处理模型。这些类型的处理通常是特别针对特定应用的(例如机器学习的特征工程,机器翻译的自然语言模型,欺诈预测的风险评估函数),因此它们不可避免地需要编写代码,而不仅仅是查询。
|
||||
|
||||
MapReduce使工程师能够轻松地在大型数据集上运行自己的代码。如果你有HDFS和MapReduce,那么你可以在它上面建立一个SQL查询执行引擎,事实上这正是Hive项目所做的【31】。但是,你也可以编写许多其他形式的批处理,这些批处理不适合用SQL查询表示。
|
||||
MapReduce使工程师能够轻松地在大型数据集上运行自己的代码。如果你有HDFS和MapReduce,那么你**可以**在它之上建立一个SQL查询执行引擎,事实上这正是Hive项目所做的【31】。但是,你也可以编写许多其他形式的批处理,这些批处理不必非要用SQL查询表示。
|
||||
|
||||
随后,人们发现MapReduce对于某些类型的处理来说太过于限制,执行得太差,因此其他各种处理模型都是在Hadoop之上开发的(我们将在“[后MapReduce时代](#后MapReduce时代)”中看到其中的一些)。有两种处理模型,SQL和MapReduce,还不够,需要更多不同的模型!而且由于Hadoop平台的开放性,实施一整套方法是可行的,而这在整体MPP数据库的范围内是不可能的【58】。
|
||||
随后,人们发现MapReduce对于某些类型的处理而言局限性很大,表现很差,因此在Hadoop之上其他各种处理模型也被开发出来(我们将在“[MapReduce之后](#后MapReduce时代)”中看到其中一些)。有两种处理模型,SQL和MapReduce,还不够,需要更多不同的模型!而且由于Hadoop平台的开放性,实施一整套方法是可行的,而这在单体MPP数据库的范畴内是不可能的【58】。
|
||||
|
||||
至关重要的是,这些不同的处理模型都可以在一个共享的机器上运行,所有这些机器都可以访问分布式文件系统上的相同文件。在Hadoop方法中,不需要将数据导入到几个不同的专用系统中进行不同类型的处理:系统足够灵活,可以支持同一个群集内不同的工作负载。不需要移动数据使得从数据中获得价值变得容易得多,并且使用新的处理模型更容易进行实验。
|
||||
至关重要的是,这些不同的处理模型都可以在共享的单个机器集群上运行,所有这些机器都可以访问分布式文件系统上的相同文件。在Hadoop方法中,不需要将数据导入到几个不同的专用系统中进行不同类型的处理:系统足够灵活,可以支持同一个群集内不同的工作负载。不需要移动数据,使得从数据中挖掘价值变得容易得多,也使采用新的处理模型容易的多。
|
||||
|
||||
Hadoop生态系统包括随机访问的OLTP数据库,如HBase(参阅“[SSTables和LSM树](ch3.md#SSTables和LSM树)”)和MPA样式的分析数据库,如Impala 【41】。 HBase和Impala都不使用MapReduce,但都使用HDFS进行存储。它们是访问和处理数据的非常不同的方法,但是它们可以共存并被集成到同一个系统中。
|
||||
Hadoop生态系统包括随机访问的OLTP数据库,如HBase(参阅“[SSTables和LSM树](ch3.md#SSTables和LSM树)”)和MPP风格的分析型数据库,如Impala 【41】。 HBase与Impala都不使用MapReduce,但都使用HDFS进行存储。它们是迥异的数据访问与处理方法,但是它们可以共存,并被集成到同一个系统中。
|
||||
|
||||
#### 为频繁的故障而设计
|
||||
#### 针对频繁故障设计
|
||||
|
||||
在比较MapReduce和MPP数据库时,设计方法的另外两个不同点是:处理故障和使用内存和磁盘。与在线系统相比,批处理对故障不太敏感,因为如果失败,用户不会立即影响用户,并且可以再次运行。
|
||||
当比较MapReduce和MPP数据库时,两种不同的设计思路出现了:处理故障和使用内存与磁盘的方式。与在线系统相比,批处理对故障不太敏感,因为就算失败也不会立即影响到用户,而且它们总是能再次运行。
|
||||
|
||||
如果一个节点在执行查询时崩溃,大多数MPP数据库会中止整个查询,并让用户重新提交查询或自动重新运行它【3】。由于查询通常最多运行几秒钟或几分钟,所以这种处理错误的方法是可以接受的,因为重试的代价不是太大。 MPP数据库还倾向于在内存中保留尽可能多的数据(例如,使用散列连接)以避免从磁盘读取的成本。
|
||||
如果一个节点在执行查询时崩溃,大多数MPP数据库会中止整个查询,并让用户重新提交查询或自动重新运行它【3】。由于查询通常最多运行几秒钟或几分钟,所以这种错误处理的方法是可以接受的,因为重试的代价不是太大。 MPP数据库还倾向于在内存中保留尽可能多的数据(例如,使用散列连接)以避免从磁盘读取的开销。
|
||||
|
||||
另一方面,MapReduce可以容忍映射或减少任务的失败,而不会影响作业的整体,通过以单个任务的粒度重试工作。它也非常渴望将数据写入磁盘,一方面是为了容错,另一方面是假设数据集太大而不能适应内存。
|
||||
另一方面,MapReduce可以容忍单个Map或Reduce任务的失败,而不会影响作业的整体,通过以单个任务的粒度重试工作。它也非常希望将数据写入磁盘,一方面是为了容错,另一部分是假设数据集太大而不能适应内存。
|
||||
|
||||
MapReduce方法更适用于较大的作业:处理如此之多的数据并运行很长时间的作业,以至于在此过程中可能至少遇到一个任务故障。在这种情况下,由于单个任务失败而重新运行整个工作将是浪费的。即使以单个任务的粒度进行恢复引入了使得无故障处理更慢的开销,但如果任务失败率足够高,仍然可以进行合理的权衡。
|
||||
MapReduce方式更适用于较大的作业:要处理如此之多的数据并运行很长时间的作业,以至于在此过程中很可能至少遇到一个任务故障。在这种情况下,由于单个任务失败而重新运行整个作业将是非常浪费的。即使以单个任务的粒度进行恢复引入了使得无故障处理更慢的开销,但如果任务失败率足够高,这仍然是一种合理的权衡。
|
||||
|
||||
但是这些假设有多现实呢?在大多数集群中,机器故障确实发生,但是它们不是很频繁 —— 可能很少,大多数工作都不会经验,因为机器故障。为了容错,真的值得引起重大的开销吗?
|
||||
但是这些假设有多现实呢?在大多数集群中,机器故障确实发生,但是它们不是很频繁 —— 可能很少到,大多数作业都不会经历机器故障。为了容错,真的值得招来这么大的额外开销吗?
|
||||
|
||||
要了解MapReduce节省使用内存和任务级恢复的原因,查看最初设计MapReduce的环境是很有帮助的。 Google拥有混合使用的数据中心,在线生产服务和离线批处理作业在同一台机器上运行。每个任务都有一个使用容器执行的资源分配(CPU核心,RAM,磁盘空间等)。每个任务也具有优先级,如果优先级较高的任务需要更多的资源,则可以终止(抢占)同一台机器上较低优先级的任务以释放资源。优先级还决定了计算资源的定价:团队必须为他们使用的资源付费,而优先级更高的流程花费更多【59】。
|
||||
|
||||
@ -539,11 +543,11 @@ top5.each{|count, url| puts "#{count} #{url}" } # 5
|
||||
|
||||
如前所述,每个MapReduce作业都独立于其他任何作业。作业与世界其他地方的主要联系点是分布式文件系统上的输入和输出目录。如果希望一个作业的输出成为第二个作业的输入,则需要将第二个作业的输入目录配置为与第一个作业的输出目录相同,并且外部工作流调度程序必须仅在第一份工作已经完成。
|
||||
|
||||
如果第一个作业的输出是要在组织内广泛发布的数据集,则此设置是合理的。在这种情况下,你需要能够通过名称来引用它,并将其用作多个不同作业(包括由其他团队开发的作业)的输入。将数据发布到分布式文件系统中的众所周知的位置允许松耦合,这样作业就不需要知道是谁在输入输出或消耗其输出(参阅“[分离逻辑和布线](#分离逻辑和布线)”在本页395)。
|
||||
如果第一个作业的输出是要在组织内广泛发布的数据集,则此设置是合理的。在这种情况下,你需要能够通过名称来引用它,并将其用作多个不同作业(包括由其他团队开发的作业)的输入。将数据发布到分布式文件系统中的众所周知的位置允许松耦合,这样作业就不需要知道是谁在输入输出或消耗其输出(参阅“[逻辑与布线相分离](#逻辑与布线相分离)”)。
|
||||
|
||||
但是,在很多情况下,你知道一个工作的输出只能用作另一个工作的输入,这个工作由同一个团队维护。在这种情况下,分布式文件系统上的文件只是简单的中间状态:一种将数据从一个作业传递到下一个作业的方式。在用于构建由50或100个MapReduce作业【29】组成的推荐系统的复杂工作流程中,存在很多这样的中间状态。
|
||||
|
||||
将这个中间状态写入文件的过程称为物化。 (我们在“[聚合:数据立方体和物化视图](ch2.md#聚合:数据立方体和物化视图)”中已经在物化视图的背景下遇到了这个术语。它意味着要急于计算某个操作的结果并写出来,而不是计算需要时按要求。)
|
||||
将这个中间状态写入文件的过程称为**物化**。 (我们在“[聚合:数据立方体和物化视图](ch2.md#聚合:数据立方体和物化视图)”中已经在物化视图的背景下遇到了这个术语。它意味着立即求值,某个操作的结果并写出来,而不是计算需要时按需计算)
|
||||
|
||||
相反,本章开头的日志分析示例使用Unix管道将一个命令的输出与另一个命令的输出连接起来。管道并没有完全实现中间状态,而是只使用一个小的内存缓冲区,将输出逐渐流向输入。
|
||||
|
||||
@ -559,7 +563,7 @@ top5.each{|count, url| puts "#{count} #{url}" } # 5
|
||||
|
||||
由于它们通过几个处理阶段明确地建模数据流,所以这些系统被称为数据流引擎。像MapReduce一样,它们通过反复调用用户定义的函数来在单个线程上一次处理一条记录。他们通过对输入进行分区来并行工作,并将一个功能的输出复制到网络上,成为另一个功能的输入。
|
||||
|
||||
与MapReduce不同,这些功能不需要交替映射和Reduce的严格角色,而是可以以更灵活的方式进行组合。我们称之为这些函数操作符,数据流引擎提供了几个不同的选项来连接一个操作符的输出到另一个的输入:
|
||||
与MapReduce不同,这些功能不需要交替Map和Reduce的严格角色,而是可以以更灵活的方式进行组合。我们称之为这些函数操作符,数据流引擎提供了几个不同的选项来连接一个操作符的输出到另一个的输入:
|
||||
|
||||
- 一个选项是通过键对记录进行重新分区和排序,就像在MapReduce的混洗阶段一样(参阅“[分布式执行MapReduce](#分布式执行MapReduce)”)。此功能可以像在MapReduce中一样启用排序合并连接和分组。
|
||||
- 另一种可能是采取几个输入,并以相同的方式进行分区,但跳过排序。这节省了分区散列连接的工作,其中记录的分区是重要的,但顺序是不相关的,因为构建散列表随机化了顺序。
|
||||
@ -570,11 +574,11 @@ top5.each{|count, url| puts "#{count} #{url}" } # 5
|
||||
- 排序等昂贵的工作只需要在实际需要的地方执行,而不是在每个Map和Reduce阶段之间默认发生。
|
||||
- 没有不必要的Map任务,因为Mapper所做的工作通常可以合并到前面的reduce操作器中(因为Mapper不会更改数据集的分区)。
|
||||
- 由于工作流程中的所有连接和数据依赖性都是明确声明的,因此调度程序会概述哪些数据是必需的,因此可以进行本地优化。例如,它可以尝试将占用某些数据的任务放在与生成它的任务相同的机器上,以便可以通过共享内存缓冲区交换数据,而不必通过网络复制数据。
|
||||
- 通常将操作员之间的中间状态保存在内存中或写入本地磁盘就足够了,这比写入HDFS需要更少的I/O(必须将其复制到多个计算机并写入到每个代理的磁盘上)。 MapReduce已经将这种优化用于Mapper的输出,但是数据流引擎将该思想推广到了所有的中间状态。
|
||||
- 操作员可以在输入准备就绪后立即开始执行;在下一个开始之前不需要等待整个前一阶段的完成。
|
||||
- 通常将运算符之间的中间状态保存在内存中或写入本地磁盘就足够了,这比写入HDFS需要更少的I/O(必须将其复制到多个计算机并写入到每个代理的磁盘上)。 MapReduce已经将这种优化用于Mapper的输出,但是数据流引擎将该思想推广到了所有的中间状态。
|
||||
- 运算符可以在输入准备就绪后立即开始执行;在下一个开始之前不需要等待整个前一阶段的完成。
|
||||
- 与MapReduce(为每个任务启动一个新的JVM)相比,现有的Java虚拟机(JVM)进程可以重用来运行新操作,从而减少启动开销。
|
||||
|
||||
你可以使用数据流引擎来执行与MapReduce工作流相同的计算,并且由于此处所述的优化,通常执行速度会明显更快。既然操作符是map和reduce的泛化,相同的处理代码可以在任一执行引擎上运行:Pig,Hive或Cascading中实现的工作流可以通过简单的配置更改从MapReduce切换到Tez或Spark,而无需修改代码【64】。
|
||||
你可以使用数据流引擎来执行与MapReduce工作流相同的计算,并且由于此处所述的优化,通常执行速度会明显更快。既然操作符是Map和Reduce的泛化,相同的处理代码可以在任一执行引擎上运行:Pig,Hive或Cascading中实现的工作流可以通过简单的配置更改从MapReduce切换到Tez或Spark,而无需修改代码【64】。
|
||||
|
||||
Tez是一个相当薄的库,它依赖于YARN shuffle服务来实现节点间数据的实际复制【58】,而Spark和Flink则是包含自己的网络通信层,调度器和面向用户的API的大型框架。我们将在短期内讨论这些高级API。
|
||||
|
||||
@ -582,9 +586,9 @@ top5.each{|count, url| puts "#{count} #{url}" } # 5
|
||||
|
||||
完全实现中间状态到分布式文件系统的一个优点是它是持久的,这使得MapReduce中的容错相当容易:如果一个任务失败,它可以在另一台机器上重新启动,并从文件系统重新读取相同的输入。
|
||||
|
||||
Spark,Flink和Tez避免将中间状态写入HDFS,因此他们采取了不同的方法来容忍错误:如果一台机器发生故障,并且该机器上的中间状态丢失,则会从其他仍然可用的数据重新计算在可能的情况下是在先的中间阶段,或者是通常在HDFS上的原始输入数据)。
|
||||
Spark,Flink和Tez避免将中间状态写入HDFS,因此他们采取了不同的方法来容错:如果一台机器发生故障,并且该机器上的中间状态丢失,则会从其他仍然可用的数据重新计算在可能的情况下是在先的中间阶段,或者是通常在HDFS上的原始输入数据)。
|
||||
|
||||
为了实现这个重新计算,框架必须跟踪一个给定的数据是如何计算的 —— 使用哪个输入分区,以及哪个操作符被应用到它。 Spark使用弹性分布式数据集(RDD)抽象来追踪数据的祖先【61】,而Flink检查点操作符状态,允许其恢复运行在执行过程中遇到错误的操作符【66】。
|
||||
为了实现这个重新计算,框架必须跟踪一个给定的数据是如何计算的 —— 使用哪个输入分区,以及哪个操作符被应用到它。 Spark使用**弹性分布式数据集(RDD)**抽象来追踪数据的祖先【61】,而Flink检查点操作符状态,允许其恢复运行在执行过程中遇到错误的操作符【66】。
|
||||
|
||||
在重新计算数据时,重要的是要知道计算是否是确定性的:也就是说,给定相同的输入数据,操作员是否始终生成相同的输出?如果一些丢失的数据已经发送给下游运营商,这个问题就很重要。如果运营商重新启动,重新计算的数据与原有的丢失数据不一致,下游运营商很难解决新旧数据之间的矛盾。对于不确定性运营商来说,解决方案通常是杀死下游运营商,然后再运行新数据。
|
||||
|
||||
@ -596,7 +600,7 @@ top5.each{|count, url| puts "#{count} #{url}" } # 5
|
||||
|
||||
回到Unix的类比,我们看到MapReduce就像是将每个命令的输出写入临时文件,而数据流引擎看起来更像是Unix管道。尤其是Flink是围绕流水线执行的思想而建立的:也就是说,将运算符的输出递增地传递给其他操作符,并且在开始处理之前不等待输入完成。
|
||||
|
||||
排序操作不可避免地需要消耗其整个输入,然后才能生成任何输出,因为最后一个输入记录可能是具有最低键的输入记录,因此需要作为第一个输出记录。任何需要分类的操作员都需要至少暂时地累积状态。但是工作流程的许多其他部分可以以流水线方式执行。
|
||||
排序操作不可避免地需要消耗其整个输入,然后才能生成任何输出,因为最后一个输入记录可能是具有最低键的输入记录,因此需要作为第一个输出记录。任何需要分类的运算符都需要至少暂时地累积状态。但是工作流程的许多其他部分可以以流水线方式执行。
|
||||
|
||||
当作业完成时,它的输出需要持续到某个地方,以便用户可以找到并使用它—— 很可能它会再次写入分布式文件系统。因此,在使用数据流引擎时,HDFS上的物化数据集通常仍是作业的输入和最终输出。和MapReduce一样,输入是不可变的,输出被完全替换。对MapReduce的改进是,你可以节省自己将所有中间状态写入文件系统。
|
||||
|
||||
@ -604,9 +608,9 @@ top5.each{|count, url| puts "#{count} #{url}" } # 5
|
||||
|
||||
在“[图数据模型](ch2.md#图数据模型)”中,我们讨论了使用图形来建模数据,并使用图形查询语言来遍历图形中的边和顶点。[第2章](ch2.md)的讨论集中在OLTP风格的使用上:快速执行查询来查找少量符合特定条件的顶点。
|
||||
|
||||
在批处理环境中查看图也很有趣,其目标是在整个图上执行某种离线处理或分析。这种需求经常出现在机器学习应用程序(如推荐引擎)或排序系统中。例如,最着名的图形分析算法之一是PageRank 【69】,它试图根据其他网页链接的网页来估计网页的流行度。它被用作确定网络搜索引擎呈现结果的顺序的公式的一部分。
|
||||
在批处理环境中查看图也很有趣,其目标是在整个图上执行某种离线处理或分析。这种需求经常出现在机器学习应用(如推荐引擎)或排序系统中。例如,最着名的图形分析算法之一是PageRank 【69】,它试图根据其他网页链接的网页来估计网页的流行度。它被用作确定网络搜索引擎呈现结果的顺序的公式的一部分。
|
||||
|
||||
> 像Spark,Flink和Tez这样的数据流引擎(参见“[中间状态的物化](#中间状态的物化)”)通常将操作符作为**有向无环图(DAG)**排列在作业中。这与图形处理不一样:在数据流引擎中,从一个操作符到另一个操作符的数据流被构造成一个图,而数据本身通常由关系式元组构成。在图形处理中,数据本身具有图形的形式。另一个不幸的命名混乱!
|
||||
> 像Spark,Flink和Tez这样的数据流引擎(参见“[中间状态的物化](#中间状态的物化)”)通常将操作符作为**有向无环图(DAG)**排列在作业中。这与图形处理不一样:在数据流引擎中,从一个操作符到另一个操作符的数据流被构造成一个图,而数据本身通常由关系式元组构成。在图处理中,数据本身具有图的形式。另一个不幸的命名混乱!
|
||||
|
||||
许多图算法是通过一次遍历一个边来表示的,将一个顶点与相邻的顶点连接起来以便传播一些信息,并且重复直到满足一些条件为止——例如,直到没有更多的边要跟随,或者直到一些度量收敛。我们在[图2-6](img/fig2-6.png)中看到一个例子,它通过重复地跟踪指示哪个位置在哪个其他位置(这种算法被称为传递闭包)的边缘,列出了包含在数据库中的北美所有位置。
|
||||
|
||||
@ -619,24 +623,25 @@ top5.each{|count, url| puts "#{count} #{url}" } # 5
|
||||
这种方法是有效的,但是用MapReduce实现它往往是非常低效的,因为MapReduce没有考虑算法的迭代性质:它总是读取整个输入数据集并产生一个全新的输出数据集,即使只有一小部分该图与上次迭代相比已经改变。
|
||||
Pregel处理模型
|
||||
|
||||
作为批处理图形的优化,计算的批量同步并行(BSP)模型【70】已经流行起来。其中,它由Apache Giraph 【37】,Spark的GraphX API和Flink的Gelly API 【71】实现。它也被称为Pregel模型,正如Google的Pregel论文推广这种处理图的方法【72】。
|
||||
作为批处理图形的优化,计算的**批量同步并行(BSP)**模型【70】已经流行起来。其中,它由Apache Giraph 【37】,Spark的GraphX API和Flink的Gelly API 【71】实现。它也被称为Pregel模型,正如Google的Pregel论文推广这种处理图的方法【72】。
|
||||
|
||||
回想一下在MapReduce中,Mapper在概念上“发送消息”给reducer的特定调用,因为框架将所有的mapper输出集中在一起。 Pregel背后有一个类似的想法:一个顶点可以“发送消息”到另一个顶点,通常这些消息沿着图的边被发送。
|
||||
回想一下在MapReduce中,Mapper在概念上“发送消息”给Reducer的特定调用,因为框架将所有的Mapper输出集中在一起。 Pregel背后有一个类似的想法:一个顶点可以“发送消息”到另一个顶点,通常这些消息沿着图的边被发送。
|
||||
|
||||
在每次迭代中,为每个顶点调用一个函数,将所有发送给它的消息传递给它 —— 就像调用reducer一样。与MapReduce的不同之处在于,在Pregel模型中,顶点从一次迭代到下一次迭代记忆它的状态,所以这个函数只需要处理新的传入消息。如果在图的某个部分没有发送消息,则不需要做任何工作。
|
||||
在每次迭代中,为每个顶点调用一个函数,将所有发送给它的消息传递给它 —— 就像调用Reducer一样。与MapReduce的不同之处在于,在Pregel模型中,顶点从一次迭代到下一次迭代记忆它的状态,所以这个函数只需要处理新的传入消息。如果在图的某个部分没有发送消息,则不需要做任何工作。
|
||||
|
||||
这与演员模型有些相似(参阅“[分布式的Actor框架](ch4.md#分布式的Actor框架)”),除非顶点状态和顶点之间的消息具有容错性和耐久性,并且通信以固定的方式进行,否则将每个顶点视为主角轮次:在每一次迭代中,框架传递在前一次迭代中发送的所有消息。演员通常没有这样的时间保证。
|
||||
这与Actor模型有些相似(参阅“[分布式的Actor框架](ch4.md#分布式的Actor框架)”),除非顶点状态和顶点之间的消息具有容错性和耐久性,并且通信以固定的方式进行,否则将每个顶点视为主角轮次:在每一次迭代中,框架传递在前一次迭代中发送的所有消息。Actor通常没有这样的时间保证。
|
||||
|
||||
#### 容错
|
||||
|
||||
顶点只能通过消息传递进行通信(而不是直接相互查询)的事实有助于提高Pregel作业的性能,因为消息可以成批处理,而且等待通信的次数也减少了。唯一的等待是在迭代之间:由于Pregel模型保证所有在一次迭代中发送的消息都在下一次迭代中传递,所以先前的迭代必须完全完成,并且所有的消息必须在网络上复制,然后下一个开始。
|
||||
即使底层网络可能丢失,重复或任意延迟消息(参阅“[不可靠的网络](ch8.md#不可靠的网络)”),Pregel实施可保证在接下来的迭代中消息在其目标顶点处理一次。像MapReduce一样,该框架透明地从故障中恢复,以简化Pregel顶层算法的编程模型。
|
||||
|
||||
即使底层网络可能丢失,重复或任意延迟消息(参阅“[不可靠的网络](ch8.md#不可靠的网络)”),Pregel实施可保证在接下来的迭代中消息在其目标顶点处理一次。像MapReduce一样,该框架透明地从故障中恢复,以简化Pregel顶层算法的编程模型。
|
||||
|
||||
这种容错是通过在迭代结束时定期检查所有顶点的状态来实现的,即将其全部状态写入持久存储。如果某个节点发生故障并且其内存中状态丢失,则最简单的解决方法是将整个图计算回滚到上一个检查点,然后重新启动计算。如果算法是确定性的并且记录了消息,那么也可以选择性地只恢复丢失的分区(就像我们之前讨论过的数据流引擎)【72】。
|
||||
|
||||
#### 并行执行
|
||||
|
||||
顶点不需要知道它正在执行哪个物理机器;当它发送消息到其他顶点时,它只是将它们发送到一个顶点ID。分配图的框架,即确定哪个顶点运行在哪个机器上,以及如何通过网络路由消息,以便它们结束在正确的位置。
|
||||
顶点不需要知道它正在执行哪个物理机器;当它发送消息到其他顶点时,它只是将它们发送到一个顶点ID。分配图的框架,即确定哪个顶点运行在哪个机器上,以及如何通过网络路由消息,以便它们结束在正确的位置。
|
||||
|
||||
由于编程模型一次仅处理一个顶点(有时称为“像顶点一样思考”),所以框架可以以任意方式划分图形。理想情况下,如果它们需要进行大量的通信,那么它将被分割,以使顶点在同一台机器上共置。然而,寻找这样一个优化的分割在实践中是困难的,图形经常被任意分配的顶点ID分割,而不会尝试将相关的顶点分组在一起。
|
||||
|
||||
@ -652,11 +657,11 @@ Pregel处理模型
|
||||
|
||||
如前所述,Hive,Pig,Cascading和Crunch等高级语言和API由于手工编写MapReduce作业而变得非常流行。随着Tez的出现,这些高级语言还有额外的好处,可以移动到新的数据流执行引擎,而无需重写作业代码。 Spark和Flink也包括他们自己的高级数据流API,经常从FlumeJava中获得灵感【34】。
|
||||
|
||||
这些数据流API通常使用关系式构建块来表达一个计算:连接数据集以获取某个字段的值;按键分组元组;过滤一些条件;并通过计数,求和或其他函数来聚合元组。在内部,这些操作是使用本章前面讨论过的各种连接和分组算法来实现的。
|
||||
这些数据流API通常使用关系式构建块来表达一个计算:连接数据集以获取某个字段的值;按键分组元组;过滤一些条件;并通过计数,求和或其他函数来聚合元组。在内部,这些操作是使用本章前面讨论过的各种连接和分组算法来实现的。
|
||||
|
||||
除了需要较少代码的明显优势之外,这些高级接口还允许交互式使用,在这种交互式使用中,你可以将分析代码逐步编写到shell中并经常运行,以观察它正在做什么。这种发展风格在探索数据集和试验处理方法时非常有用。这也让人联想到Unix哲学,我们在第394页的“Unix哲学”中讨论过这个问题。
|
||||
除了需要较少代码的明显优势之外,这些高级接口还允许交互式使用,在这种交互式使用中,你可以将分析代码逐步编写到shell中并经常运行,以观察它正在做什么。这种发展风格在探索数据集和试验处理方法时非常有用。这也让人联想到Unix哲学,我们“[Unix哲学](#Unix哲学)”中讨论过这个问题。
|
||||
|
||||
而且,这些高级接口不仅使人类使用系统的效率更高,而且提高了机器级别的工作执行效率。
|
||||
而且,这些高级接口不仅使人类使用系统的效率更高,而且提高了机器层面的工作执行效率。
|
||||
|
||||
#### 向声明式查询语言的转变
|
||||
|
||||
@ -672,7 +677,7 @@ Pregel处理模型
|
||||
|
||||
Spark生成JVM字节码【79】,Impala使用LLVM为这些内部循环生成本机代码【41】。
|
||||
|
||||
通过将声明性方面与高级API结合起来,并使查询优化器可以在执行期间利用这些优化方法,批处理框架看起来更像MPP数据库(并且可以实现可比较的性能)。同时,通过具有运行任意代码和以任意格式读取数据的可扩展性,它们保持了灵活性的优势。
|
||||
通过将声明性方面与高级API结合起来,并使查询优化器可以在执行期间利用这些优化方法,批处理框架看起来更像MPP数据库(并且可以实现可与之相比的性能)。同时,通过具有运行任意代码和以任意格式读取数据的可扩展性,它们保持了灵活性的优势。
|
||||
|
||||
#### 专业化的不同领域
|
||||
|
||||
@ -680,7 +685,7 @@ Pregel处理模型
|
||||
|
||||
另一个越来越重要的领域是统计和数值算法,它们是机器学习应用(如分类和推荐系统)所需要的。可重复使用的实现正在出现:例如,Mahout在MapReduce,Spark和Flink之上实现了用于机器学习的各种算法,而MADlib在关系型MPP数据库(Apache HAWQ)中实现了类似的功能【54】。
|
||||
|
||||
空间算法也是有用的,例如最近邻搜索(kNN)【80】,它在一些多维空间中搜索与给定物品接近的物品 - 这是一种类似的搜索。近似搜索对于基因组分析算法也很重要,它们需要找到相似但不相同的字符串【81】。
|
||||
空间算法也是有用的,例如最近邻搜索(kNN)【80】,它在一些多维空间中搜索与给定物品接近的物品 —— 这是一种类似的搜索。近似搜索对于基因组分析算法也很重要,它们需要找到相似但不相同的字符串【81】。
|
||||
|
||||
批处理引擎正被用于分布式执行来自日益广泛的领域的算法。随着批处理系统获得内置功能和高级声明性操作符,并且随着MPP数据库变得更加可编程和灵活,两者开始看起来更相似:最终,它们都只是存储和处理数据的系统。
|
||||
|
||||
@ -706,7 +711,7 @@ Pregel处理模型
|
||||
|
||||
|
||||
|
||||
我们讨论了几种MapReduce的连接算法,其中大多数也是在MPP数据库和数据流引擎中使用的。他们还提供了分区算法如何工作的一个很好的例子:
|
||||
我们讨论了几种MapReduce的连接算法,其中大多数也是在MPP数据库和数据流引擎中使用的。他们还提供了分区算法如何工作的一个很好的例子:
|
||||
|
||||
***排序合并连接***
|
||||
|
||||
@ -724,7 +729,7 @@ Pregel处理模型
|
||||
|
||||
得益于这个框架,你在批处理作业中的代码无需担心实现容错机制:框架可以保证作业的最终输出与没有发生错误的情况相同,也许不得不重新尝试各种任务。这些可靠的语义要比在线服务处理用户请求时经常使用的要多得多,而且在处理请求的副作用时写入数据库。
|
||||
|
||||
批量处理工作的显着特点是它读取一些输入数据并产生一些输出数据,而不修改输入—— 换句话说,输出是从输入衍生出的。重要的是,输入数据是有界的:它有一个已知的,固定的大小(例如,它包含一些时间点的日志文件或数据库内容的快照)。因为它是有界的,一个工作知道什么时候它完成了整个输入的读取,所以一个工作最终完成。
|
||||
批量处理工作的显着特点是它读取一些输入数据并产生一些输出数据,而不修改输入—— 换句话说,输出是从输入派生出的。重要的是,输入数据是有界的:它有一个已知的,固定的大小(例如,它包含一些时间点的日志文件或数据库内容的快照)。因为它是有界的,一个工作知道什么时候它完成了整个输入的读取,所以一个工作最终完成。
|
||||
|
||||
在下一章中,我们将转向流处理,其中的输入是未知的 —— 也就是说,你还有一份工作,但是它的输入是永无止境的数据流。在这种情况下,工作永远不会完成,因为在任何时候都可能有更多的工作进来。我们将看到流和批处理在某些方面是相似的。但是关于无尽数据流的假设,也对我们构建系统的方式产生了很大的改变。
|
||||
|
||||
|
10
ch7.md
10
ch7.md
@ -618,15 +618,15 @@ COMMIT;
|
||||
|
||||
#### 在存储过程中封装事务
|
||||
|
||||
在数据库的早期阶段,意图是数据库事务可以包含整个用户活动流程。例如,预订机票是一个多阶段的过程(搜索路线,票价和可用座位,决定行程,在每段行程的航班上订座,输入乘客信息,付款)。数据库设计者认为,如果整个过程是一个事务,那么它就可以被原子化地执行。
|
||||
在数据库的早期阶段,意图是数据库事务可以包含整个用户活动流程。例如,预订机票是一个多阶段的过程(搜索路线,票价和可用座位,决定行程,在每段行程的航班上订座,输入乘客信息,付款)。数据库设计者认为,如果整个过程是一个事务,那么它就可以被原子化地执行。
|
||||
|
||||
不幸的是,人类做出决定和回应的速度非常缓慢。如果数据库事务需要等待来自用户的输入,则数据库需要支持潜在的大量并发事务,其中大部分是空闲的。大多数数据库不能高效完成这项工作,因此几乎所有的OLTP应用程序都避免在事务中等待交互式的用户输入,以此来保持事务的简短。在Web上,这意味着事务在同一个HTTP请求中被提交——一个事务不会跨越多个请求。一个新的HTTP请求开始一个新的事务。
|
||||
不幸的是,人类做出决定和回应的速度非常缓慢。如果数据库事务需要等待来自用户的输入,则数据库需要支持潜在的大量并发事务,其中大部分是空闲的。大多数数据库不能高效完成这项工作,因此几乎所有的OLTP应用程序都避免在事务中等待交互式的用户输入,以此来保持事务的简短。在Web上,这意味着事务在同一个HTTP请求中被提交——一个事务不会跨越多个请求。一个新的HTTP请求开始一个新的事务。
|
||||
|
||||
即使人类已经找到了关键路径,事务仍然以交互式的客户端/服务器风格执行,一次一个语句。应用程序进行查询,读取结果,可能根据第一个查询的结果进行另一个查询,依此类推。查询和结果在应用程序代码(在一台机器上运行)和数据库服务器(在另一台机器上)之间来回发送。
|
||||
即使人类已经找到了关键路径,事务仍然以交互式的客户端/服务器风格执行,一次一个语句。应用程序进行查询,读取结果,可能根据第一个查询的结果进行另一个查询,依此类推。查询和结果在应用程序代码(在一台机器上运行)和数据库服务器(在另一台机器上)之间来回发送。
|
||||
|
||||
在这种交互式的事务方式中,应用程序和数据库之间的网络通信耗费了大量的时间。如果不允许在数据库中进行并发处理,且一次只处理一个事务,则吞吐量将会非常糟糕,因为数据库大部分的时间都花费在等待应用程序发出当前事务的下一个查询。在这种数据库中,为了获得合理的性能,需要同时处理多个事务。
|
||||
在这种交互式的事务方式中,应用程序和数据库之间的网络通信耗费了大量的时间。如果不允许在数据库中进行并发处理,且一次只处理一个事务,则吞吐量将会非常糟糕,因为数据库大部分的时间都花费在等待应用程序发出当前事务的下一个查询。在这种数据库中,为了获得合理的性能,需要同时处理多个事务。
|
||||
|
||||
出于这个原因,具有单线程串行事务处理的系统不允许交互式的多语句事务。取而代之,应用程序必须提前将整个事务代码作为存储过程提交给数据库。这些方法之间的差异如[图7-9](img/fig7-9.png) 所示。如果事务所需的所有数据都在内存中,则存储过程可以非常快地执行,而不用等待任何网络或磁盘I/O。
|
||||
出于这个原因,具有单线程串行事务处理的系统不允许交互式的多语句事务。取而代之,应用程序必须提前将整个事务代码作为存储过程提交给数据库。这些方法之间的差异如[图7-9](img/fig7-9.png) 所示。如果事务所需的所有数据都在内存中,则存储过程可以非常快地执行,而不用等待任何网络或磁盘I/O。
|
||||
|
||||
![](img/fig7-9.png)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user