translation updates for chapter 10

This commit is contained in:
Yin Gang 2021-08-19 09:41:06 +08:00
parent ed9acfd65b
commit 5af92a7b00

190
ch10.md
View File

@ -10,9 +10,9 @@
[TOC]
在本书的前两部分中,我们讨论了很多关于**请求**和**查询**以及相应的**响应**或**结果**。许多现有数据系统中都采用这种数据处理方式:你发送请求指令,一段时间后(我们期望)系统会给出一个结果。数据库,缓存,搜索索引,Web服务器以及其他一些系统都以这种方式工作。
在本书的前两部分中,我们讨论了很多关于**请求**和**查询**以及相应的**响应**或**结果**。许多现有数据系统中都采用这种数据处理方式:你发送请求指令,一段时间后(我们期望)系统会给出一个结果。数据库、缓存、搜索索引、Web服务器以及其他一些系统都以这种方式工作。
像这样的**在线online**系统无论是浏览器请求页面还是调用远程API的服务我们通常认为请求是由人类用户触发的并且正在等待响应。他们不应该等太久所以我们非常关注系统的响应时间参阅“[描述性能](ch1.md)”)。
像这样的**在线online**系统无论是浏览器请求页面还是调用远程API的服务我们通常认为请求是由人类用户触发的并且正在等待响应。他们不应该等太久所以我们非常关注系统的响应时间参阅“[描述性能](ch1.md#描述性能)”)。
Web和越来越多的基于HTTP/REST的API使交互的请求/响应风格变得如此普遍,以至于很容易将其视为理所当然。但我们应该记住,这不是构建系统的唯一方式,其他方法也有其优点。我们来看看三种不同类型的系统:
@ -28,19 +28,19 @@
流处理介于在线和离线(批处理)之间,所以有时候被称为**准实时near-real-time**或**准在线nearline**处理。像批处理系统一样,流处理消费输入并产生输出(并不需要响应请求)。但是,流式作业在事件发生后不久就会对事件进行操作,而批处理作业则需等待固定的一组输入数据。这种差异使流处理系统比起批处理系统具有更低的延迟。由于流处理基于批处理,我们将在[第11章](ch11.md)讨论它。
正如我们将在本章中看到的那样,批处理是构建可靠可伸缩和可维护应用程序的重要组成部分。例如2004年发布的批处理算法Map-Reduce可能被过分热情地被称为“造就Google大规模可伸缩性的算法”【2】。随后在各种开源数据系统中得到应用包括HadoopCouchDB和MongoDB。
正如我们将在本章中看到的那样,批处理是构建可靠可伸缩和可维护应用程序的重要组成部分。例如2004年发布的批处理算法Map-Reduce可能被过分热情地被称为“造就Google大规模可伸缩性的算法”【2】。随后在各种开源数据系统中得到应用包括HadoopCouchDB和MongoDB。
与多年前为数据仓库开发的并行处理系统【3,4】相比MapReduce是一个相当低级别的编程模型但它使得在商用硬件上能进行的处理规模迈上一个新的台阶。虽然MapReduce的重要性正在下降【5】但它仍然值得去理解因为它描绘了一幅关于批处理为什么有用以及如何用的清晰图景。
与多年前为数据仓库开发的并行处理系统【3,4】相比MapReduce是一个相当低级别的编程模型但它使得在商用硬件上能进行的处理规模迈上一个新的台阶。虽然MapReduce的重要性正在下降【5】但它仍然值得去理解因为它描绘了一幅关于批处理为什么有用以及如何做到有用的清晰图景。
实际上批处理是一种非常古老的计算方式。早在可编程数字计算机诞生之前打孔卡制表机例如1890年美国人口普查【6】中使用的霍尔里斯机实现了半机械化的批处理形式从大量输入中汇总计算。 Map-Reduce与1940年代和1950年代广泛用于商业数据处理的机电IBM卡片分类机器有着惊人的相似之处【7】。正如我们所说历史总是在不断重复自己。
在本章中我们将了解MapReduce和其他一些批处理算法和框架并探索它们在现代数据系统中的作用。但首先我们将看看使用标准Unix工具的数据处理。即使你已经熟悉了它们Unix的哲学也值得一读Unix的思想和经验教训可以迁移到大规模异构的分布式数据系统中。
在本章中我们将了解MapReduce和其他一些批处理算法和框架并探索它们在现代数据系统中的作用。但首先我们将看看使用标准Unix工具的数据处理。即使你已经熟悉了它们Unix的哲学也值得一读Unix的思想和经验教训可以迁移到大规模异构的分布式数据系统中。
## 使用Unix工具的批处理
我们从一个简单的例子开始。假设您有一台Web服务器每次处理请求时都会在日志文件中附加一行。例如使用nginx默认访问日志格式日志的一行可能如下所示
我们从一个简单的例子开始。假设您有一台Web服务器每次处理请求时都会在日志文件中附加一行。例如使用nginx默认访问日志格式,日志的一行可能如下所示:
```bash
216.58.210.78 - - [27/Feb/2015:17:55:11 +0000] "GET /css/typography.css HTTP/1.1"
@ -59,7 +59,7 @@ AppleWebKit/537.36 (KHTML, like Gecko) Chrome/40.0.2214.115 Safari/537.36"
### 分析简单日志
### 简单日志分析
很多工具可以从这些日志文件生成关于网站流量的漂亮的报告但为了练手让我们使用基本的Unix功能创建自己的工具。 例如,假设你想在你的网站上找到五个最受欢迎的网页。 则可以在Unix shell中这样做[^i]
@ -132,13 +132,13 @@ top5.each{|count, url| puts "#{count} #{url}" } # 5
### Unix哲学
我们可以非常容易地使用前一个例子中的一系列命令来分析日志文件这并非巧合事实上这实际上是Unix的关键设计思想之一且它今天仍然令人讶异地关联。让我们更深入地研究一下以便从Unix中借鉴一些想法【10】。
我们可以非常容易地使用前一个例子中的一系列命令来分析日志文件这并非巧合事实上这实际上是Unix的关键设计思想之一而且它直至今天也仍然令人讶异地重要。让我们更深入地研究一下以便从Unix中借鉴一些想法【10】。
Unix管道的发明者道格·麦克罗伊Doug McIlroy在1964年首先描述了这种情况【11】“当我们需要将消息从一个程序传递另一个程序时我们需要一种类似水管法兰的拼接程序的方式【a】 I/O应该也按照这种方式进行“。水管的类比仍然在生效通过管道连接程序的想法成为了现在被称为**Unix哲学**的一部分 —— 这一组设计原则在Unix用户与开发者之间流行起来该哲学在1978年表述如下【12,13】
Unix管道的发明者道格·麦克罗伊Doug McIlroy在1964年首先描述了这种情况【11】我们需要一种类似园艺胶管的方式来拼接程序 —— 当我们需要将消息从一个程序传递另一个程序时,直接接上去就行。I/O应该也按照这种方式进行“。水管的类比仍然在生效通过管道连接程序的想法成为了现在被称为**Unix哲学**的一部分 —— 这一组设计原则在Unix用户与开发者之间流行起来该哲学在1978年表述如下【12,13】
1. 让每个程序都做好一件事。要做一件新的工作,写一个新程序,而不是通过添加“功能”让老程序复杂化。
2. 期待每个程序的输出成为另一个程序的输入。不要将无关信息混入输出。避免使用严格的列数据或二进制输入格式。不要坚持交互式输入。
3. 设计和构建软件,甚至是操作系统,要尽早尝试,最好在几周内完成。不要犹豫,扔掉笨拙的部分,重建它们。
3. 设计和构建软件时,即使是操作系统,也让它们能够尽早地被试用,最好在几周内完成。不要犹豫,扔掉笨拙的部分,重建它们。
4. 优先使用工具来减轻编程任务,即使必须曲线救国编写工具,且在用完后很可能要扔掉大部分。
这种方法 —— 自动化,快速原型设计,增量式迭代,对实验友好,将大型项目分解成可管理的块 —— 听起来非常像今天的敏捷开发和DevOps运动。奇怪的是四十年来变化不大。
@ -163,7 +163,7 @@ top5.each{|count, url| puts "#{count} #{url}" } # 5
尽管几十年后还不够完美但统一的Unix接口仍然是非常出色的设计。没有多少软件能像Unix工具一样交互组合的这么好你不能通过自定义分析工具轻松地将电子邮件帐户的内容和在线购物历史记录以管道传送至电子表格中并将结果发布到社交网络或维基。今天像Unix工具一样流畅地运行程序是一种例外而不是规范。
即使是具有**相同数据模型**的数据库,将数据从一种导出再导入另一种也并不容易。缺乏整合导致了数据的**巴尔干化**[^译注i]。
即使是具有**相同数据模型**的数据库,将数据从一种数据库导出再导入另一种数据库也并不容易。缺乏整合导致了数据的**巴尔干化**[^译注i]。
[^译注i]: **巴尔干化Balkanization**是一个常带有贬义的地缘政治学术语,其定义为:一个国家或政区分裂成多个互相敌对的国家或政区的过程。
@ -173,13 +173,13 @@ top5.each{|count, url| puts "#{count} #{url}" } # 5
Unix工具的另一个特点是使用标准输入`stdin`)和标准输出(`stdout`)。如果你运行一个程序,而不指定任何其他的东西,标准输入来自键盘,标准输出指向屏幕。但是,你也可以从文件输入和/或将输出重定向到文件。管道允许你将一个进程的标准输出附加到另一个进程的标准输入(有个小内存缓冲区,而不需要将整个中间数据流写入磁盘)。
程序仍然可以直接读取和写入文件,但如果程序不担心特定的文件路径只使用标准输入和标准输出则Unix方法效果最好。这允许shell用户以任何他们想要的方式连接输入和输出该程序不知道或不关心输入来自哪里以及输出到哪里。 (人们可以说这是一种**松耦合loose coupling****晚期绑定late binding**【15】或**控制反转inversion of control**【16】。将输入/输出布线与程序逻辑分开,可以将小工具组合成更大的系统。
如果需要,程序仍然可以直接读取和写入文件,但Unix方法在程序不关心特定的文件路径、只使用标准输入和标准输出时效果最好。这允许shell用户以任何他们想要的方式连接输入和输出该程序不知道或不关心输入来自哪里以及输出到哪里。 (人们可以说这是一种**松耦合loose coupling****晚期绑定late binding**【15】或**控制反转inversion of control**【16】。将输入/输出布线与程序逻辑分开,可以将小工具组合成更大的系统。
你甚至可以编写自己的程序并将它们与操作系统提供的工具组合在一起。你的程序只需要从标准输入读取输入并将输出写入标准输出它就可以加入数据处理的管道中。在日志分析示例中你可以编写一个将Usage-Agent字符串转换为更灵敏的浏览器标识符或者将IP地址转换为国家代码的工具并将其插入管道。`sort`程序并不关心它是否与操作系统的另一部分或者你写的程序通信。
但是,使用`stdin`和`stdout`能做的事情是有限的。需要多个输入或输出的程序是可能的,但非常棘手。你没法将程序的输出管道连接至网络连接中【17,18】[^iii] 。如果程序直接打开文件进行读取和写入或者将另一个程序作为子进程启动或者打开网络连接那么I/O的布线就取决于程序本身了。它仍然可以被配置例如通过命令行选项但在Shell中对输入和输出进行布线的灵活性就少了。
但是,使用`stdin`和`stdout`能做的事情是有限的。需要多个输入或输出的程序虽然可能,却非常棘手。你没法将程序的输出管道连接至网络连接中【17,18】[^iii] 。如果程序直接打开文件进行读取和写入或者将另一个程序作为子进程启动或者打开网络连接那么I/O的布线就取决于程序本身了。它仍然可以被配置例如通过命令行选项但在Shell中对输入和输出进行布线的灵活性就少了。
[^iii]: 除了使用一个单独的工具,如`netcat`或`curl`。 Unix开始试图将所有东西都表示为文件但是BSD套接字API偏离了这个惯例【17】。研究用操作系统Plan 9和Inferno在使用文件方面更加一致它们将TCP连接表示为`/net/tcp`中的文件【18】。
[^iii]: 除了使用一个单独的工具,如`netcat`或`curl`。 Unix起初试图将所有东西都表示为文件但是BSD套接字API偏离了这个惯例【17】。研究用操作系统Plan 9和Inferno在使用文件方面更加一致它们将TCP连接表示为`/net/tcp`中的文件【18】。
@ -188,8 +188,6 @@ top5.each{|count, url| puts "#{count} #{url}" } # 5
使Unix工具如此成功的部分原因是它们使查看正在发生的事情变得非常容易
- Unix命令的输入文件通常被视为不可变的。这意味着你可以随意运行命令尝试各种命令行选项而不会损坏输入文件。
- 你可以在任何时候结束管道,将管道输出到`less`,然后查看它是否具有预期的形式。这种检查能力对调试非常有用。
- 你可以将一个流水线阶段的输出写入文件,并将该文件用作下一阶段的输入。这使你可以重新启动后面的阶段,而无需重新运行整个管道。
@ -205,26 +203,26 @@ top5.each{|count, url| puts "#{count} #{url}" } # 5
和大多数Unix工具一样运行MapReduce作业通常不会修改输入除了生成输出外没有任何副作用。输出文件以连续的方式一次性写入一旦写入文件不会修改任何现有的文件部分
虽然Unix工具使用`stdin`和`stdout`作为输入和输出但MapReduce作业在分布式文件系统上读写文件。在Hadoop的Map-Reduce实现中该文件系统被称为**HDFSHadoop分布式文件系统**一个Google文件系统GFS的开源实现【19】。
虽然Unix工具使用`stdin`和`stdout`作为输入和输出但MapReduce作业在分布式文件系统上读写文件。在Hadoop的MapReduce实现中该文件系统被称为**HDFSHadoop分布式文件系统**一个Google文件系统GFS的开源实现【19】。
除HDFS外还有各种其他分布式文件系统如GlusterFS和Quantcast File SystemQFS【20】。诸如Amazon S3Azure Blob存储和OpenStack Swift 【21】等对象存储服务在很多方面都是相似的[^iv]。在本章中我们将主要使用HDFS作为示例但是这些原则适用于任何分布式文件系统。
除HDFS外还有各种其他分布式文件系统如GlusterFS和Quantcast File SystemQFS【20】。诸如Amazon S3Azure Blob存储和OpenStack Swift【21】等对象存储服务在很多方面都是相似的[^iv]。在本章中我们将主要使用HDFS作为示例但是这些原则适用于任何分布式文件系统。
[^iv]: 一个不同之处在于对于HDFS可以将计算任务安排在存储特定文件副本的计算机上运行而对象存储通常将存储和计算分开。如果网络带宽是一个瓶颈从本地磁盘读取有性能优势。但是请注意如果使用纠删码则会丢失局部性因为来自多台机器的数据必须进行合并以重建原始文件【20】。
[^iv]: 一个不同之处在于对于HDFS可以将计算任务安排在存储特定文件副本的计算机上运行而对象存储通常将存储和计算分开。如果网络带宽是一个瓶颈从本地磁盘读取有性能优势。但是请注意如果使用纠删码Erasure Coding则会丢失局部性因为来自多台机器的数据必须进行合并以重建原始文件【20】。
与网络连接存储NAS和存储区域网络SAN架构的共享磁盘方法相比HDFS基于**无共享**原则(参见[第二部分前言](part-ii.md))。共享磁盘存储由集中式存储设备实现,通常使用定制硬件和专用网络基础设施(如光纤通道)。而另一方面,无共享方法不需要特殊的硬件,只需要通过传统数据中心网络连接的计算机。
与网络连接存储NAS和存储区域网络SAN架构的共享磁盘方法相比HDFS基于**无共享**原则(参见[第二部分](part-ii.md)的介绍)。共享磁盘存储由集中式存储设备实现,通常使用定制硬件和专用网络基础设施(如光纤通道)。而另一方面,无共享方法不需要特殊的硬件,只需要通过传统数据中心网络连接的计算机。
HDFS包含在每台机器上运行的守护进程,对外暴露网络服务,允许其他节点访问存储在该机器上的文件(假设数据中心中的每台通用计算机都挂载着一些磁盘)。名为**NameNode**的中央服务器会跟踪哪个文件块存储在哪台机器上。因此HDFS在概念上创建了一个大型文件系统可以使用所有运行有守护进程的机器的磁盘。
HDFS在每台机器上运行了一个守护进程,它对外暴露网络服务,允许其他节点访问存储在该机器上的文件(假设数据中心中的每台通用计算机都挂载着一些磁盘)。名为**NameNode**的中央服务器会跟踪哪个文件块存储在哪台机器上。因此HDFS在概念上创建了一个大型文件系统可以使用所有运行有守护进程的机器的磁盘。
为了容忍机器和磁盘故障,文件块被复制到多台机器上。复制可能意味着多个机器上的相同数据的多个副本,如[第5章](ch5.md)中所述或者诸如Reed-Solomon码这样的纠删码方案允许以比完全复制更低的存储开销以恢复丢失的数据【20,22】。这些技术与RAID相似可以在连接到同一台机器的多个磁盘上提供冗余区别在于在分布式文件系统中文件访问和复制是在传统的数据中心网络上完成的没有特殊的硬件。
为了容忍机器和磁盘故障,文件块被复制到多台机器上。复制可能意味着多个机器上的相同数据的多个副本,如[第5章](ch5.md)中所述或者诸如Reed-Solomon码这样的纠删码方案能以比完全复制更低的存储开销来支持恢复丢失的数据【20,22】。这些技术与RAID相似后者可以在连接到同一台机器的多个磁盘上提供冗余;区别在于在分布式文件系统中,文件访问和复制是在传统的数据中心网络上完成的,没有特殊的硬件。
HDFS的可伸缩性已经很不错了在撰写本书时最大的HDFS部署运行在上万台机器上总存储容量达数百PB【23】。如此大的规模已经变得可行因为使用商品硬件和开源软件的HDFS上的数据存储和访问成本远低于专用存储设备上的同等容量【24】。
HDFS的可伸缩性已经很不错了在撰写本书时最大的HDFS部署运行在上万台机器上总存储容量达数百PB【23】。如此大的规模已经变得可行因为使用商品硬件和开源软件的HDFS上的数据存储和访问成本远低于在专用存储设备上支持同等容量的成本【24】。
### MapReduce作业执行
MapReduce是一个编程框架你可以使用它编写代码来处理HDFS等分布式文件系统中的大型数据集。理解它的最简单方法是参考“[简单日志分析](#简单日志分析)”中的Web服务器日志分析示例。MapReduce中的数据处理模式与此示例非常相似
1. 读取一组输入文件,并将其分解成**记录records**。在Web服务器日志示例中每条记录都是日志中的一行即`\n`是记录分隔符)。
2. 调用Mapper函数从每条输入记录中提取一对键值。在前面的例子中Mapper函数是`awk '{print $7}'`它提取URL`$7`)作为,并将值留空。
2. 调用Mapper函数从每条输入记录中提取一对键值。在前面的例子中Mapper函数是`awk '{print $7}'`它提取URL`$7`)作为键,并将值留空。
3. 按键排序所有的键值对。在日志的例子中,这由第一个`sort`命令完成。
4. 调用Reducer函数遍历排序后的键值对。如果同一个键出现多次排序使它们在列表中相邻所以很容易组合这些值而不必在内存中保留很多状态。在前面的例子中Reducer是由`uniq -c`命令实现的,该命令使用相同的键来统计相邻记录的数量。
@ -237,7 +235,7 @@ top5.each{|count, url| puts "#{count} #{url}" } # 5
Mapper会在每条输入记录上调用一次其工作是从输入记录中提取键值。对于每个输入它可以生成任意数量的键值对包括None。它不会保留从一个输入记录到下一个记录的任何状态因此每个记录都是独立处理的。
***Reducer***
MapReduce框架拉取由Mapper生成的键值对收集属于同一个键的所有值使用在这组值列表上迭代调用Reducer。 Reducer可以产生输出记录例如相同URL的出现次数
MapReduce框架拉取由Mapper生成的键值对收集属于同一个键的所有值并在这组值上迭代调用Reducer。 Reducer可以产生输出记录例如相同URL的出现次数
在Web服务器日志的例子中我们在第5步中有第二个`sort`命令它按请求数对URL进行排序。在MapReduce中如果你需要第二个排序阶段则可以通过编写第二个MapReduce作业并将第一个作业的输出用作第二个作业的输入来实现它。这样看来Mapper的作用是将数据放入一个适合排序的表单中并且Reducer的作用是处理已排序的数据。
@ -247,7 +245,7 @@ top5.each{|count, url| puts "#{count} #{url}" } # 5
在分布式计算中可以使用标准的Unix工具作为Mapper和Reducer【25】但更常见的是它们被实现为传统编程语言的函数。在Hadoop MapReduce中Mapper和Reducer都是实现特定接口的Java类。在MongoDB和CouchDB中Mapper和Reducer都是JavaScript函数参阅“[MapReduce查询](ch2.md#MapReduce查询)”)。
[图10-1]()显示了Hadoop MapReduce作业中的数据流。其并行化基于分区参见[第6章](ch6.md)作业的输入通常是HDFS中的一个目录输入目录中的每个文件或文件块都被认为是一个单独的分区可以单独处理map任务[图10-1](img/fig10-1.png)中的m1m2和m3标记
[图10-1](img/fig10-1.png)显示了Hadoop MapReduce作业中的数据流。其并行化基于分区参见[第6章](ch6.md)作业的输入通常是HDFS中的一个目录输入目录中的每个文件或文件块都被认为是一个单独的分区可以单独处理map任务[图10-1](img/fig10-1.png)中的m1m2和m3标记
每个输入文件的大小通常是数百兆字节。 MapReduce调度器图中未显示试图在其中一台存储输入文件副本的机器上运行每个Mapper只要该机器有足够的备用RAM和CPU资源来运行Mapper任务【26】。这个原则被称为**将计算放在数据附近**【27】它节省了通过网络复制输入文件的开销减少网络负载并增加局部性。
@ -257,7 +255,7 @@ top5.each{|count, url| puts "#{count} #{url}" } # 5
在大多数情况下应该在Mapper任务中运行的应用代码在将要运行它的机器上还不存在所以MapReduce框架首先将代码例如Java程序中的JAR文件复制到适当的机器。然后启动Map任务并开始读取输入文件一次将一条记录传入Mapper回调函数。Mapper的输出由键值对组成。
计算的Reduce端也被分区。虽然Map任务的数量由输入文件块的数量决定但Reducer的任务的数量是由作业作者配置的它可以不同于Map任务的数量。为了确保具有相同键的所有键值对最终落在相同的Reducer处框架使用键的散列值来确定哪个Reduce任务应该接收到特定的键值对参见“[按键散列分区](ch6.md#按键散列分区)”))。
计算的Reduce端也被分区。虽然Map任务的数量由输入文件块的数量决定但Reducer的任务的数量是由作业作者配置的它可以不同于Map任务的数量。为了确保具有相同键的所有键值对最终落在相同的Reducer处框架使用键的散列值来确定哪个Reduce任务应该接收到特定的键值对参见“[根据键的散列分区](ch6.md#根据键的散列分区)”))。
键值对必须进行排序但数据集可能太大无法在单台机器上使用常规排序算法进行排序。相反分类是分阶段进行的。首先每个Map任务都按照Reducer对输出进行分区。每个分区都被写入Mapper程序的本地磁盘使用的技术与我们在“[SSTables与LSM树](ch3.md#SSTables与LSM树)”中讨论的类似。
@ -271,7 +269,7 @@ top5.each{|count, url| puts "#{count} #{url}" } # 5
单个MapReduce作业可以解决的问题范围很有限。以日志分析为例单个MapReduce作业可以确定每个URL的页面浏览次数但无法确定最常见的URL因为这需要第二轮排序。
因此将MapReduce作业链接成为**工作流workflow**中是极为常见的,例如,一个作业的输出成为下一个作业的输入。 Hadoop Map-Reduce框架对工作流没有特殊支持所以这个链是通过目录名隐式实现的第一个作业必须将其输出配置为HDFS中的指定目录第二个作业必须将其输入配置为从同一个目录。从MapReduce框架的角度来看是两个独立的作业。
因此将MapReduce作业链接成为**工作流workflow**中是极为常见的,例如,一个作业的输出成为下一个作业的输入。 Hadoop MapReduce框架对工作流没有特殊支持所以这个链是通过目录名隐式实现的第一个作业必须将其输出配置为HDFS中的指定目录第二个作业必须将其输入配置为从同一个目录。从MapReduce框架的角度来看这是两个独立的作业。
因此被链接的MapReduce作业并没有那么像Unix命令管道它直接将一个进程的输出作为另一个进程的输入仅用一个很小的内存缓冲区。它更像是一系列命令其中每个命令的输出写入临时文件下一个命令从临时文件中读取。这种设计有利也有弊我们将在“[物化中间状态](#物化中间状态)”中讨论。
@ -283,15 +281,15 @@ top5.each{|count, url| puts "#{count} #{url}" } # 5
### Reduce侧连接与分组
我们在[第2章](ch2.md)中讨论了数据模型和查询语言的接,但是我们还没有深入探讨连接是如何实现的。现在是我们再次捡起这条线索的时候了。
我们在[第2章](ch2.md)中讨论了数据模型和查询语言的接,但是我们还没有深入探讨连接是如何实现的。现在是我们再次捡起这条线索的时候了。
在许多数据集中,一条记录与另一条记录存在关联是很常见的:关系模型中的**外键**,文档模型中的**文档引用**或图模型中的**边**。当你需要同时访问这一关联的两侧(持有引用的记录与被引用的记录)时,连接就是必须的。正如[第2章](ch2.md)所讨论的,非规范化可以减少对连接的需求,但通常无法将其完全移除[^v]。
[^v]: 我们在本书中讨论的连接通常是等值连接即最常见的连接类型其中记录与其他记录在特定字段例如ID中具有**相同值**相关联。有些数据库支持更通用的连接类型,例如使用小于运算符而不是等号运算符,但是我们没有地方来讲这些东西。
[^v]: 我们在本书中讨论的连接通常是等值连接,即最常见的连接类型,其中记录通过与其他记录在特定字段例如ID中具有**相同值**相关联。有些数据库支持更通用的连接类型,例如使用小于运算符而不是等号运算符,但是我们没有地方来讲这些东西。
在数据库中,如果执行只涉及少量记录的查询,数据库通常会使用**索引**来快速定位感兴趣的记录(参阅[第3章](ch3.md)。如果查询涉及到连接则可能涉及到查找多个索引。然而MapReduce没有索引的概念 —— 至少在通常意义上没有。
当MapReduce作业被赋予一组文件作为输入时它读取所有这些文件的全部内容数据库会将这种操作称为**全表扫描**。如果你只想读取少量的记录,则全表扫描与索引查询相比,代价非常高昂。但是在分析查询中(参阅“[事务处理分析?](ch3.md#事务处理还是分析?)”),通常需要计算大量记录的聚合。在这种情况下,特别是如果能在多台机器上并行处理时,扫描整个输入可能是相当合理的事情。
当MapReduce作业被赋予一组文件作为输入时它读取所有这些文件的全部内容数据库会将这种操作称为**全表扫描**。如果你只想读取少量的记录,则全表扫描与索引查询相比,代价非常高昂。但是在分析查询中(参阅“[事务处理还是分析?](ch3.md#事务处理还是分析?)”),通常需要计算大量记录的聚合。在这种情况下,特别是如果能在多台机器上并行处理时,扫描整个输入可能是相当合理的事情。
当我们在批处理的语境中讨论连接时,我们指的是在数据集中解析某种关联的全量存在。 例如我们假设一个作业是同时处理所有用户的数据,而非仅仅是为某个特定用户查找数据(而这能通过索引更高效地完成)。
@ -303,7 +301,7 @@ top5.each{|count, url| puts "#{count} #{url}" } # 5
**图10-2 用户行为日志与用户档案的连接**
分析任务可能需要将用户活动与用户档相关联例如如果档案包含用户的年龄或出生日期系统就可以确定哪些页面更受哪些年龄段的用户欢迎。然而活动事件仅包含用户ID而没有包含完整的用户档案信息。在每个活动事件中嵌入这些档案信息很可能会非常浪费。因此活动事件需要与用户档案数据库相连接。
分析任务可能需要将用户活动与用户档案信息相关联例如如果档案包含用户的年龄或出生日期系统就可以确定哪些页面更受哪些年龄段的用户欢迎。然而活动事件仅包含用户ID而没有包含完整的用户档案信息。在每个活动事件中嵌入这些档案信息很可能会非常浪费。因此活动事件需要与用户档案数据库相连接。
实现这一连接的最简单方法是逐个遍历活动事件并为每个遇到的用户ID查询用户数据库在远程服务器上。这是可能的但是它的性能可能会非常差处理吞吐量将受限于受数据库服务器的往返时间本地缓存的有效性很大程度上取决于数据的分布并行运行大量查询可能会轻易压垮数据库【35】。
@ -329,11 +327,11 @@ top5.each{|count, url| puts "#{count} #{url}" } # 5
在排序合并连接中Mapper和排序过程确保了所有对特定用户ID执行连接操作的必须数据都被放在同一个地方单次调用Reducer的地方。预先排好了所有需要的数据Reducer可以是相当简单的单线程代码能够以高吞吐量和与低内存开销扫过这些记录。
这种架构可以看做Mapper将“消息”发送给Reducer。当一个Mapper发出一个键值对时这个键的作用就像值应该传递到的目标地址。即使键只是一个任意的字符串不是像IP地址和端口号那样的实际的网络地址它表现的就像一个地址所有具有相同键的键值对将被传递到相同的目标一次Reduce的调用
这种架构可以看做Mapper将“消息”发送给Reducer。当一个Mapper发出一个键值对时这个键的作用就像值应该传递到的目标地址。即使键只是一个任意的字符串不是像IP地址和端口号那样的实际的网络地址它表现的就像一个地址所有具有相同键的键值对将被传递到相同的目标一次Reducer的调用)。
使用MapReduce编程模型能将计算的物理网络通信层面从正确的机器获取数据从应用逻辑中剥离出来获取数据后执行处理。这种分离与数据库的典型用法形成了鲜明对比从数据库中获取数据的请求经常出现在应用代码内部【36】。由于MapReduce能够处理所有的网络通信因此它也避免了应用代码去担心部分故障例如另一个节点的崩溃MapReduce在不影响应用逻辑的情况下能透明地重试失败的任务。
使用MapReduce编程模型能将计算的物理网络通信层面从正确的机器获取数据从应用逻辑中剥离出来获取数据后执行处理。这种分离与数据库的典型用法形成了鲜明对比从数据库中获取数据的请求经常出现在应用代码内部【36】。由于MapReduce处理所有的网络通信,因此它也避免了应用代码去担心部分故障例如另一个节点的崩溃MapReduce在不影响应用逻辑的情况下能透明地重试失败的任务。
### GROUP BY
#### 分组
除了连接之外“把相关数据放在一起”的另一种常见模式是按某个键对记录分组如SQL中的GROUP BY子句。所有带有相同键的记录构成一个组而下一步往往是在每个组内进行某种聚合操作例如
@ -343,21 +341,21 @@ top5.each{|count, url| puts "#{count} #{url}" } # 5
使用MapReduce实现这种分组操作的最简单方法是设置Mapper以便它们生成的键值对使用所需的分组键。然后分区和排序过程将所有具有相同分区键的记录导向同一个Reducer。因此在MapReduce之上实现分组和连接看上去非常相似。
分组的另一个常见用途是整理特定用户会话的所有活动事件,以找出用户进行的一系列操作(称为**会话化sessionization**【37】。例如可以使用这种分析来确定显示新版网站的用户是否比那些显示旧版本A/B测试的用户更有购买欲,或者计算某个营销活动是否值得。
分组的另一个常见用途是整理特定用户会话的所有活动事件,以找出用户进行的一系列操作(称为**会话化sessionization**【37】。例如可以使用这种分析来确定显示新版网站的用户是否比那些显示旧版本的用户更有购买欲A/B测试,或者计算某个营销活动是否值得。
如果你有多个Web服务器处理用户请求则特定用户的活动事件很可能分散在各个不同的服务器的日志文件中。你可以通过使用会话cookie用户ID或类似的标识符作为分组键以将特定用户的所有活动事件放在一起来实现会话化与此同时不同用户的事件仍然散在不同的分区中。
如果你有多个Web服务器处理用户请求则特定用户的活动事件很可能分散在各个不同的服务器的日志文件中。你可以通过使用会话cookie用户ID或类似的标识符作为分组键以将特定用户的所有活动事件放在一起来实现会话化与此同时不同用户的事件仍然散在不同的分区中。
#### 处理偏斜
如果存在与单个键关联的大量数据,则“将具有相同键的所有记录放到相同的位置”这种模式就被破坏了。例如在社交网络中,大多数用户可能会与几百人有连接,但少数名人可能有数百万的追随者。这种不成比例的活动数据库记录被称为**关键对象linchpin object**【38】或**热键hot key**。
在单个Reducer中收集与某个名流相关的所有活动(例如他们发布内容的回复)可能导致严重的倾斜(也称为**热点hot spot**)—— 也就是说一个Reducer必须比其他Reducer处理更多的记录参见“[负载偏斜与热点消除](ch6.md#负载偏斜与热点消除)“。由于MapReduce作业只有在所有Mapper和Reducer都完成时才完成所有后续作业必须等待最慢的Reducer才能启动。
在单个Reducer中收集与某个名人相关的所有活动(例如他们发布内容的回复)可能导致严重的**偏斜**(也称为**热点hot spot**)—— 也就是说一个Reducer必须比其他Reducer处理更多的记录参见“[负载偏斜与热点消除](ch6.md#负载偏斜与热点消除)“。由于MapReduce作业只有在所有Mapper和Reducer都完成时才完成所有后续作业必须等待最慢的Reducer才能启动。
如果连接的输入存在热可以使用一些算法进行补偿。例如Pig中的**斜连接skewed join**方法首先运行一个抽样作业来确定哪些键是热键【39】。连接实际执行时Mapper会将热键的关联记录**随机**相对于传统MapReduce基于键散列的确定性方法发送到几个Reducer之一。对于另外一侧的连接输入与热键相关的记录需要被复制到所有处理该键的Reducer上【40】。
如果连接的输入存在热键可以使用一些算法进行补偿。例如Pig中的**斜连接skewed join**方法首先运行一个抽样作业Sampling Job来确定哪些键是热键【39】。连接实际执行时Mapper会将热键的关联记录**随机**相对于传统MapReduce基于键散列的确定性方法发送到几个Reducer之一。对于另外一侧的连接输入与热键相关的记录需要被复制到**所有**处理该键的Reducer上【40】。
这种技术将处理热键的工作分散到多个Reducer上这样可以使其更好地并行化代价是需要将连接另一侧的输入记录复制到多个Reducer上。 Crunch中的**分片连接sharded join**方法与之类似,但需要显式指定热键而不是使用样作业。这种技术也非常类似于我们在“[负载偏斜与热点消除](ch6.md#负载偏斜与热点消除)”中讨论的技术,使用随机化来缓解分区数据库中的热点。
这种技术将处理热键的工作分散到多个Reducer上这样可以使其更好地并行化代价是需要将连接另一侧的输入记录复制到多个Reducer上。 Crunch中的**分片连接sharded join**方法与之类似,但需要显式指定热键而不是使用样作业。这种技术也非常类似于我们在“[负载偏斜与热点消除](ch6.md#负载偏斜与热点消除)”中讨论的技术,使用随机化来缓解分区数据库中的热点。
Hive的偏斜连接优化采取了另一种方法。它需要在表格元数据中显式指定热键并将与这些键相关的记录单独存放与其它文件分开。当在该表上执行连接时对于热键它会使用Map端连接参阅[下一节](#Map端连接))。
Hive的偏斜连接优化采取了另一种方法。它需要在表格元数据中显式指定热键并将与这些键相关的记录单独存放与其它文件分开。当在该表上执行连接时对于热键它会使用Map端连接参阅下一节
当按照热键进行分组并聚合时可以将分组分两个阶段进行。第一个MapReduce阶段将记录发送到随机Reducer以便每个Reducer只对热键的子集执行分组为每个键输出一个更紧凑的中间聚合结果。然后第二个MapReduce作业将所有来自第一阶段Reducer的中间聚合结果合并为每个键一个值。
@ -365,17 +363,17 @@ top5.each{|count, url| puts "#{count} #{url}" } # 5
### Map侧连接
上一节描述的连接算法在Reducer中执行实际的连接逻辑因此被称为Reduce连接。Mapper扮演着预处理输入数据的角色从每个输入记录中提取键值将键值对分配给Reducer分区并按键排序。
上一节描述的连接算法在Reducer中执行实际的连接逻辑因此被称为Reduce连接。Mapper扮演着预处理输入数据的角色从每个输入记录中提取键值将键值对分配给Reducer分区并按键排序。
Reduce方法的优点是不需要对输入数据做任何假设无论其属性和结构如何Mapper都可以对其预处理以备连接。然而不利的一面是排序复制至Reducer以及合并Reducer输入所有这些操作可能开销巨大。当数据通过MapReduce 阶段时数据可能需要落盘好几次取决于可用的内存缓冲区【37】。
Reduce方法的优点是不需要对输入数据做任何假设无论其属性和结构如何Mapper都可以对其预处理以备连接。然而不利的一面是排序复制至Reducer以及合并Reducer输入所有这些操作可能开销巨大。当数据通过MapReduce 阶段时数据可能需要落盘好几次取决于可用的内存缓冲区【37】。
另一方面,如果你**能**对输入数据作出某些假设则通过使用所谓的Map端连接来加快连接速度是可行的。这种方法使用了一个阉掉Reduce与排序的MapReduce作业每个Mapper只是简单地从分布式文件系统中读取一个输入文件块然后将输出文件写入文件系统仅此而已。
另一方面,如果你**能**对输入数据作出某些假设则通过使用所谓的Map侧连接来加快连接速度是可行的。这种方法使用了一个裁减掉Reducer与排序的MapReduce作业每个Mapper只是简单地从分布式文件系统中读取一个输入文件块然后将输出文件写入文件系统仅此而已。
#### 广播散列连接
适用于执行Map端连接的最简单场景是大数据集与小数据集连接的情况。要点在于小数据集需要足够小以便可以将其全部加载到每个Mapper的内存中。
例如,假设在[图10-2](img/fig10-2.png)的情况下用户数据库小到足以放进内存中。在这种情况下当Mapper启动时它可以首先将用户数据库从分布式文件系统读取到内存中的散列中。完成此操作后Map程序可以扫描用户活动事件并简单地在散列表中查找每个事件的用户ID[^vi]。
例如,假设在[图10-2](img/fig10-2.png)的情况下用户数据库小到足以放进内存中。在这种情况下当Mapper启动时它可以首先将用户数据库从分布式文件系统读取到内存中的散列表中。完成此操作后Mapper可以扫描用户活动事件并简单地在散列表中查找每个事件的用户ID[^vi]。
[^vi]: 这个例子假定散列表中的每个键只有一个条目这对用户数据库用户ID唯一标识一个用户可能是正确的。通常哈希表可能需要包含具有相同键的多个条目而连接运算符将对每个键输出所有的匹配。
@ -383,29 +381,29 @@ top5.each{|count, url| puts "#{count} #{url}" } # 5
这种简单有效的算法被称为**广播散列连接broadcast hash join****广播**一词反映了这样一个事实每个连接较大输入端分区的Mapper都会将较小输入端数据集整个读入内存中所以较小输入实际上“广播”到较大数据的所有分区上**散列**一词反映了它使用一个散列表。 Pig名为“**复制链接replicated join**”Hive“**MapJoin**”Cascading和Crunch支持这种连接。它也被诸如Impala的数据仓库查询引擎使用【41】。
除了将连接较小输入加载到内存散列表中另一种方法是将较小输入存储在本地磁盘上的只读索引中【42】。索引中经常使用的部分将保留在操作系统的页面缓存中因而这种方法可以提供与内存散列表几乎一样快的随机查找性能但实际上并不需要数据集能放入内存中。
除了将较小的连接输入加载到内存散列表中另一种方法是将较小输入存储在本地磁盘上的只读索引中【42】。索引中经常使用的部分将保留在操作系统的页面缓存中因而这种方法可以提供与内存散列表几乎一样快的随机查找性能但实际上并不需要数据集能放入内存中。
#### 分区散列连接
如果Map连接的输入以相同的方式进行分区,则散列连接方法可以独立应用于每个分区。在[图10-2](img/fig10-2.png)的情况中你可以根据用户ID的最后一位十进制数字来对活动事件和用户数据库进行分区因此连接两侧各有10个分区。例如Mapper3首先将所有具有以3结尾的ID的用户加载到散列表中然后扫描ID为3的每个用户的所有活动事件。
如果Map连接的输入以相同的方式进行分区,则散列连接方法可以独立应用于每个分区。在[图10-2](img/fig10-2.png)的情况中你可以根据用户ID的最后一位十进制数字来对活动事件和用户数据库进行分区因此连接两侧各有10个分区。例如Mapper3首先将所有具有以3结尾的ID的用户加载到散列表中然后扫描ID为3的每个用户的所有活动事件。
如果分区正确无误可以确定的是所有你可能需要连接的记录都落在同一个编号的分区中。因此每个Mapper只需要从输入两端各读取一个分区就足够了。好处是每个Mapper都可以在内存散列表中少放点数据。
这种方法只有当连接两端输入有相同的分区数且两侧的记录都是使用相同的键与相同的哈希函数做分区时才适用。如果输入是由之前执行过这种分组的MapReduce作业生成的那么这可能是一个合理的假设。
分区散列连接在Hive中称为**Map桶连接bucketed map joins【37】**。
分区散列连接在Hive中称为**Map桶连接bucketed map joins【37】**。
#### Map侧合并连接
如果输入数据集不仅以相同的方式进行分区,而且还基于相同的键进行**排序**则可适用另一种Map端联接的变体。在这种情况下输入是否小到能放入内存并不重要因为这时候Mapper同样可以执行归并操作通常由Reducer执行的归并操作按键递增的顺序依次读取两个输入文件将具有相同键的记录配对。
如果输入数据集不仅以相同的方式进行分区,而且还基于相同的键进行**排序**则可适用另一种Map侧连接的变体。在这种情况下输入是否小到能放入内存并不重要因为这时候Mapper同样可以执行归并操作通常由Reducer执行的归并操作按键递增的顺序依次读取两个输入文件将具有相同键的记录配对。
如果能进行Map合并连接这通常意味着前一个MapReduce作业可能一开始就已经把输入数据做了分区并进行了排序。原则上这个连接就可以在前一个作业的Reduce阶段进行。但使用独立的仅Map作业有时也是合适的例如分好区且排好序的中间数据集可能还会用于其他目的。
如果能进行Map合并连接这通常意味着前一个MapReduce作业可能一开始就已经把输入数据做了分区并进行了排序。原则上这个连接就可以在前一个作业的Reduce阶段进行。但使用独立的仅Map作业有时也是合适的例如分好区且排好序的中间数据集可能还会用于其他目的。
#### MapReduce工作流与Map侧连接
当下游作业使用MapReduce连接的输出时选择Map端连接或Reduce端连接会影响输出的结构。Reduce端连接的输出是按照**连接键**进行分区和排序的而Map端连接的输出则按照与较大输入相同的方式进行分区和排序因为无论是使用分区连接还是广播连接连接较大输入端的每个文件块都会启动一个Map任务
当下游作业使用MapReduce连接的输出时选择Map侧连接或Reduce侧连接会影响输出的结构。Reduce侧连接的输出是按照**连接键**进行分区和排序的而Map端连接的输出则按照与较大输入相同的方式进行分区和排序因为无论是使用分区连接还是广播连接连接较大输入端的每个文件块都会启动一个Map任务
如前所述Map连接也对输入数据集的大小,有序性和分区方式做出了更多假设。在优化连接策略时,了解分布式文件系统中数据集的物理布局变得非常重要:仅仅知道编码格式和数据存储目录的名称是不够的;你还必须知道数据是按哪些键做的分区和排序,以及分区的数量。
如前所述Map连接也对输入数据集的大小,有序性和分区方式做出了更多假设。在优化连接策略时,了解分布式文件系统中数据集的物理布局变得非常重要:仅仅知道编码格式和数据存储目录的名称是不够的;你还必须知道数据是按哪些键做的分区和排序,以及分区的数量。
在Hadoop生态系统中这种关于数据集分区的元数据通常在HCatalog和Hive Metastore中维护【37】。
@ -415,17 +413,17 @@ top5.each{|count, url| puts "#{count} #{url}" } # 5
我们已经说了很多用于实现MapReduce工作流的算法但却忽略了一个重要的问题这些处理完成之后的最终结果是什么我们最开始为什么要跑这些作业
在数据库查询的场景中我们将事务处理OLTP与分析两种目的区分开来参阅“[事务处理或分析?](ch3.md#事务处理或分析?)”。我们看到OLTP查询通常根据键查找少量记录使用索引并将其呈现给用户比如在网页上。另一方面分析查询通常会扫描大量记录执行分组与聚合输出通常有着报告的形式显示某个指标随时间变化的图表或按照某种排位取前10项或一些数字细化为子类。这种报告的消费者通常是需要做出商业决策的分析师或经理。
在数据库查询的场景中我们将事务处理OLTP与分析两种目的区分开来参阅“[事务处理还是分析?](ch3.md#事务处理还是分析?)”。我们看到OLTP查询通常根据键查找少量记录使用索引并将其呈现给用户比如在网页上。另一方面分析查询通常会扫描大量记录执行分组与聚合输出通常有着报告的形式显示某个指标随时间变化的图表或按照某种排位取前10项一些数字细化为子类。这种报告的消费者通常是需要做出商业决策的分析师或经理。
批处理放哪里合适它不属于事务处理也不是分析。它和分析比较接近因为批处理通常会扫过输入数据集的绝大部分。然而MapReduce作业工作流与用于分析目的的SQL查询是不同的参阅“[Hadoop与分布式数据库的对比](#Hadoop与分布式数据库的对比)”)。批处理过程的输出通常不是报表,而是一些其他类型的结构。
#### 建立搜索索引
Google最初使用MapReduce是为其搜索引擎建立索引用了由5到10个MapReduce作业组成的工作流实现【1】。虽然Google后来也不仅仅是为这个目的而使用MapReduce 【43】但如果从构建搜索索引的角度来看更能帮助理解MapReduce。 直至今日Hadoop MapReduce仍然是为Lucene/Solr构建索引的好方法【44】
Google最初使用MapReduce是为其搜索引擎建立索引其实现为由5到10个MapReduce作业组成的工作流【1】。虽然Google后来也不仅仅是为这个目的而使用MapReduce 【43】但如果从构建搜索索引的角度来看更能帮助理解MapReduce。 直至今日Hadoop MapReduce仍然是为Lucene/Solr构建索引的好方法【44】
我们在“[全文搜索和模糊索引](ch3.md#全文搜索和模糊索引)”中简要地了解了Lucene这样的全文搜索索引是如何工作的它是一个文件关键词字典你可以在其中高效地查找特定关键字并找到包含该关键字的所有文档ID列表文章列表。这是一种非常简化的看法 —— 实际上,搜索索引需要各种额外数据,以便根据相关性对搜索结果进行排名,纠正拼写错误,解析同义词等等 —— 但这个原则是成立的。
如果需要对一组固定文档执行全文搜索则批处理是一种构建索引的高效方法Mapper根据需要对文档集合进行分区每个Reducer构建该分区的索引并将索引文件写入分布式文件系统。构建这样的文档分区索引参阅“[分区和二级索引](ch6.md#分区和二级索引)”)并行处理效果拔群。
如果需要对一组固定文档执行全文搜索则批处理是一种构建索引的高效方法Mapper根据需要对文档集合进行分区每个Reducer构建该分区的索引并将索引文件写入分布式文件系统。构建这样的文档分区索引参阅“[分区与次级索引](ch6.md#分区与次级索引)”)并行处理效果拔群。
由于按关键字查询搜索索引是只读操作,因而这些索引文件一旦创建就是不可变的。
@ -441,15 +439,15 @@ top5.each{|count, url| puts "#{count} #{url}" } # 5
这些数据库需要被处理用户请求的Web应用所查询而它们通常是独立于Hadoop基础设施的。那么批处理过程的输出如何回到Web应用可以查询的数据库中呢
最直接的选择可能是直接在Mapper或Reducer中使用你最爱数据库的客户端库并从批处理作业直接写入数据库服务器一次写入一条记录。它能工作假设你的防火墙规则允许从你的Hadoop环境直接访问你的生产数据库但这并不是一个好主意出于以下几个原因
最直接的选择可能是直接在Mapper或Reducer中使用你最爱数据库的客户端库并从批处理作业直接写入数据库服务器一次写入一条记录。它能工作假设你的防火墙规则允许从你的Hadoop环境直接访问你的生产数据库但这并不是一个好主意出于以下几个原因
- 正如前面在连接的上下文中讨论的那样,为每条记录发起一个网络请求,要比批处理任务的正常吞吐量慢几个数量级。即使客户端库支持批处理,性能也可能很差。
- MapReduce作业经常并行运行许多任务。如果所有Mapper或Reducer都同时写入相同的输出数据库并以批处理的预期速率工作那么该数据库很可能被轻易压垮其查询性能可能变差。这可能会导致系统其他部分的运行问题【35】。
- 通常情况下MapReduce为作业输出提供了一个干净利落的“全有或全无”保证如果作业成功则结果就是每个任务恰好执行一次所产生的输出即使某些任务失败且必须一路重试。如果整个作业失败则不会生成输出。然而从作业内部写入外部系统会产生外部可见的副作用这种副作用是不能以这种方式被隐藏的。因此你不得不去操心部分完成的作业对其他系统可见的结果并需要理解Hadoop任务尝试与预测执行的复杂性。
- 通常情况下MapReduce为作业输出提供了一个干净利落的“全有或全无”保证如果作业成功则结果就是每个任务恰好执行一次所产生的输出即使某些任务失败且必须一路重试。如果整个作业失败则不会生成输出。然而从作业内部写入外部系统会产生外部可见的副作用这种副作用是不能以这种方式被隐藏的。因此你不得不去操心对其他系统可见的部分完成的作业结果并需要理解Hadoop任务尝试与预测执行的复杂性。
更好的解决方案是在批处理作业**内**创建一个全新的数据库并将其作为文件写入分布式文件系统中作业的输出目录就像上节中的搜索索引一样。这些数据文件一旦写入就是不可变的可以批量加载到处理只读查询的服务器中。不少键值存储都支持在MapReduce作业中构建数据库文件包括Voldemort 【46】Terrapin 【47】ElephantDB 【48】和HBase批量加载【49】。
更好的解决方案是在批处理作业**内**创建一个全新的数据库并将其作为文件写入分布式文件系统中作业的输出目录就像上节中的搜索索引一样。这些数据文件一旦写入就是不可变的可以批量加载到处理只读查询的服务器中。不少键值存储都支持在MapReduce作业中构建数据库文件包括Voldemort 【46】Terrapin 【47】ElephantDB 【48】和HBase批量加载【49】。
构建这些数据库文件是MapReduce的一种好用法的使用方使用Mapper提取出键并按该键排序现在已经是构建索引所必需的大量工作。由于这些键值存储大多都是只读的(文件只能由批处理作业一次性写入,然后就不可变),所以数据结构非常简单。比如它们就不需要WAL参阅“[使B树可靠](ch3.md#使B树可靠)”)。
构建这些数据库文件是MapReduce的一种好用法使用Mapper提取出键并按该键排序已经完成了构建索引所必需的大量工作。由于这些键值存储大多都是只读的(文件只能由批处理作业一次性写入,然后就不可变),所以数据结构非常简单。比如它们就不需要预写式日志WAL参阅“[让B树更可靠](ch3.md#让B树更可靠)”)。
将数据加载到Voldemort时服务器将继续用旧数据文件服务请求同时将新数据文件从分布式文件系统复制到服务器的本地磁盘。一旦复制完成服务器会自动将查询切换到新文件。如果在这个过程中出现任何问题它可以轻易回滚至旧文件因为它们仍然存在而且不可变【46】。
@ -462,10 +460,10 @@ top5.each{|count, url| puts "#{count} #{url}" } # 5
- 如果在代码中引入了一个错误,而输出错误或损坏了,则可以简单地回滚到代码的先前版本,然后重新运行该作业,输出将重新被纠正。或者,甚至更简单,你可以将旧的输出保存在不同的目录中,然后切换回原来的目录。具有读写事务的数据库没有这个属性:如果你部署了错误的代码,将错误的数据写入数据库,那么回滚代码将无法修复数据库中的数据。 (能够从错误代码中恢复的概念被称为**人类容错human fault tolerance**【50】
- 由于回滚很容易,比起在错误意味着不可挽回的伤害的环境,功能开发进展能快很多。这种**最小化不可逆性minimizing irreversibility**的原则有利于敏捷软件开发【51】。
- 如果Map或Reduce任务失败MapReduce框架将自动重新调度并在同样的输入上再次运行它。如果失败是由代码中的错误造成的那么它会不断崩溃并最终导致作业在几次尝试之后失败。但是如果故障是由于临时问题导致的那么故障就会被容忍。因为输入不可变这种自动重试是安全的而失败任务的输出会被MapReduce框架丢弃。
- 同一组文件可用作各种不同作业的输入,包括计算指标的监控作业可以评估作业的输出是否具有预期的性质(例如,将其与前一次运行的输出进行比较并测量差异) 。
- 与Unix工具类似MapReduce作业将逻辑与布线配置输入和输出目录分离这使得关注点分离可以重用代码一个团队可以实现一个专注做好一件事的作业;而其他团队可以决定何时何地运行这项作业。
- 同一组文件可用作各种不同作业的输入,包括计算指标的监控作业并且评估作业的输出是否具有预期的性质(例如,将其与前一次运行的输出进行比较并测量差异) 。
- 与Unix工具类似MapReduce作业将逻辑与布线配置输入和输出目录分离这使得关注点分离可以重用代码一个团队可以专注实现一个做好一件事的作业;而其他团队可以决定何时何地运行这项作业。
在这些领域在Unix上表现良好的设计原则似乎也适用于Hadoop但Unix和Hadoop在某些方面也有所不同。例如因为大多数Unix工具都假设输入输出是无类型文本文件所以它们必须做大量的输入解析工作本章开头的日志分析示例使用`{print $7}`来提取URL。在Hadoop上可以通过使用更结构化的文件格式消除一些低价值的语法转换比如Avro参阅“[Avro](ch4.md#Avro)”和Parquet参阅“[列存储](ch3.md#列存储)”经常使用因为它们提供了基于模式的高效编码并允许模式随时间推移而演进见第4章
在这些领域在Unix上表现良好的设计原则似乎也适用于Hadoop但Unix和Hadoop在某些方面也有所不同。例如因为大多数Unix工具都假设输入输出是无类型文本文件所以它们必须做大量的输入解析工作本章开头的日志分析示例使用`{print $7}`来提取URL。在Hadoop上可以通过使用更结构化的文件格式消除一些低价值的语法转换比如Avro参阅“[Avro](ch4.md#Avro)”和Parquet参阅“[列存储](ch3.md#列存储)”)经常使用,因为它们提供了基于模式的高效编码,并允许模式随时间推移而演进(见[第4章](ch4.md))。
### Hadoop与分布式数据库的对比
@ -483,23 +481,23 @@ top5.each{|count, url| puts "#{count} #{url}" } # 5
在纯粹主义者看来,这种仔细的建模和导入似乎是可取的,因为这意味着数据库的用户有更高质量的数据来处理。然而实践经验表明,简单地使数据快速可用 —— 即使它很古怪,难以使用,使用原始格式 —— 也通常要比事先决定理想数据模型要更有价值【54】。
这个想法与数据仓库类似(参阅“[数据仓库](ch3.md#数据仓库)”):将大型组织的各个部分的数据集中在一起是很有价值的,因为它可以跨越以前相分离的数据集进行连接。 MPP数据库所要求的谨慎模式设计拖慢了集中式数据收集速度以原始形式收集数据稍后再操心模式的设计能使数据收集速度加快有时被称为“**数据湖data lake**”或“**企业数据中心enterprise data hub**”【55】
这个想法与数据仓库类似(参阅“[数据仓库](ch3.md#数据仓库)”):将大型组织的各个部分的数据集中在一起是很有价值的,因为它可以跨越以前相分离的数据集进行连接。 MPP数据库所要求的谨慎模式设计拖慢了集中式数据收集速度以原始形式收集数据稍后再操心模式的设计能使数据收集速度加快有时被称为“**数据湖data lake**”或“**企业数据中心enterprise data hub**”【55】
不加区分的数据转储转移了解释数据的负担:数据集的生产者不再需要强制将其转化为标准格式,数据的解释成为消费者的问题(**读时模式**方法【56】参阅“[文档模型中的架构灵活性](ch2.md#文档模型中的架构灵活性)”)。如果生产者和消费者是不同优先级的不同团队,这可能是一种优势。甚至可能不存在一个理想的数据模型,对于不同目的有不同的合适视角。以原始形式简单地转储数据,可以允许多种这样的转换。这种方法被称为**寿司原则sushi principle**“原始数据更好”【57】。
不加区分的数据转储转移了解释数据的负担:数据集的生产者不再需要强制将其转化为标准格式,数据的解释成为消费者的问题(**读时模式**方法【56】参阅“[文档模型中的模式灵活性](ch2.md#文档模型中的模式灵活性)”)。如果生产者和消费者是不同优先级的不同团队,这可能是一种优势。甚至可能不存在一个理想的数据模型,对于不同目的有不同的合适视角。以原始形式简单地转储数据,可以允许多种这样的转换。这种方法被称为**寿司原则sushi principle**“原始数据更好”【57】。
因此Hadoop经常被用于实现ETL过程参阅“[数据仓库](ch3.md#数据仓库)”事务处理系统中的数据以某种原始形式转储到分布式文件系统中然后编写MapReduce作业来清理数据将其转换为关系形式并将其导入MPP数据仓库以进行分析。数据建模仍然在进行但它在一个单独的步骤中进行与数据收集相解耦。这种解耦是可行的因为分布式文件系统支持以任何格式编码的数据。
#### 处理模型多样性
#### 处理模型多样性
MPP数据库是单体的紧密集成的软件负责磁盘上的存储布局查询计划调度和执行。由于这些组件都可以针对数据库的特定需求进行调整和优化因此整个系统可以在其设计针对的查询类型上取得非常好的性能。而且SQL查询语言允许以优雅的语法表达查询而无需编写代码使业务分析师用来做商业分析的可视化工具例如Tableau能够访问
MPP数据库是单体的紧密集成的软件负责磁盘上的存储布局查询计划调度和执行。由于这些组件都可以针对数据库的特定需求进行调整和优化因此整个系统可以在其设计针对的查询类型上取得非常好的性能。而且SQL查询语言允许以优雅的语法表达查询而无需编写代码可以在业务分析师使用的可视化工具例如Tableau中访问到
另一方面并非所有类型的处理都可以合理地表达为SQL查询。例如如果要构建机器学习和推荐系统或者使用相关性排名模型的全文搜索索引或者执行图像分析则很可能需要更一般的数据处理模型。这些类型的处理通常是特别针对特定应用的例如机器学习的特征工程机器翻译的自然语言模型欺诈预测的风险评估函数因此它们不可避免地需要编写代码而不仅仅是查询。
MapReduce使工程师能够轻松地在大型数据集上运行自己的代码。如果你有HDFS和MapReduce那么你**可以**在它之上建立一个SQL查询执行引擎事实上这正是Hive项目所做的【31】。但是你也可以编写许多其他形式的批处理这些批处理不必非要用SQL查询表示。
随后人们发现MapReduce对于某些类型的处理而言局限性很大表现很差因此在Hadoop之上其他各种处理模型也被开发出来我们将在“[MapReduce之后](#后MapReduce时代)”中看到其中一些)。有两种处理模型SQL和MapReduce还不够需要更多不同的模型而且由于Hadoop平台的开放性实施一整套方法是可行的而这在单体MPP数据库的范畴内是不可能的【58】。
随后人们发现MapReduce对于某些类型的处理而言局限性很大表现很差因此在Hadoop之上其他各种处理模型也被开发出来我们将在“[MapReduce之后](#MapReduce之后)”中看到其中一些)。只有两种处理模型SQL和MapReduce还不够需要更多不同的模型而且由于Hadoop平台的开放性实施一整套方法是可行的而这在单体MPP数据库的范畴内是不可能的【58】。
至关重要的是这些不同的处理模型都可以在共享的单个机器集群上运行所有这些机器都可以访问分布式文件系统上的相同文件。在Hadoop方中,不需要将数据导入到几个不同的专用系统中进行不同类型的处理:系统足够灵活,可以支持同一个集内不同的工作负载。不需要移动数据,使得从数据中挖掘价值变得容易得多,也使采用新的处理模型容易的多。
至关重要的是这些不同的处理模型都可以在共享的单个机器集群上运行所有这些机器都可以访问分布式文件系统上的相同文件。在Hadoop方中,不需要将数据导入到几个不同的专用系统中进行不同类型的处理:系统足够灵活,可以支持同一个集内不同的工作负载。不需要移动数据,使得从数据中挖掘价值变得容易得多,也使采用新的处理模型容易的多。
Hadoop生态系统包括随机访问的OLTP数据库如HBase参阅“[SSTables和LSM树](ch3.md#SSTables和LSM树)”和MPP风格的分析型数据库如Impala 【41】。 HBase与Impala都不使用MapReduce但都使用HDFS进行存储。它们是迥异的数据访问与处理方法但是它们可以共存并被集成到同一个系统中。
@ -519,11 +517,11 @@ top5.each{|count, url| puts "#{count} #{url}" } # 5
这种架构允许非生产(低优先级)计算资源被**过量使用overcommitted**因为系统知道必要时它可以回收资源。与分离生产和非生产任务的系统相比过量使用资源可以更好地利用机器并提高效率。但由于MapReduce作业以低优先级运行它们随时都有被抢占的风险因为优先级较高的进程可能需要其资源。在高优先级进程拿走所需资源后批量作业能有效地“捡面包屑”利用剩下的任何计算资源。
在谷歌运行一个小时的MapReduce任务有大约有5的风险被终止为了给更高优先级的进程挪地方。这一概率比硬件问题机器重启或其他原因的概率高了一个数量级【59】。按照这种抢占率如果一个作业有100个任务每个任务运行10分钟那么至少有一个任务在完成之前被终止的风险大于50
在谷歌运行一个小时的MapReduce任务有大约有5的风险被终止为了给更高优先级的进程挪地方。这一概率比硬件问题机器重启或其他原因的概率高了一个数量级【59】。按照这种抢占率如果一个作业有100个任务每个任务运行10分钟那么至少有一个任务在完成之前被终止的风险大于50
这就是MapReduce被设计为容忍频繁意外任务终止的原因不是因为硬件很不可靠而是因为任意终止进程的自由有利于提高计算集群中的资源利用率。
在开源的集群调度器中,抢占的使用较少。 YARN的CapacityScheduler支持抢占以平衡不同队列的资源分配【58】但在编写本文时YARNMesos或Kubernetes不支持通用优先级抢占【60】。在任务不经常被终止的环境中MapReduce的这一设计决策就没有多少意义了。在下一节中我们将研究一些与MapReduce设计决策相异的替代方案。
在开源的集群调度器中,抢占的使用较少。 YARN的CapacityScheduler支持抢占以平衡不同队列的资源分配【58】但在编写本文时YARNMesos或Kubernetes不支持通用优先级抢占【60】。在任务不经常被终止的环境中MapReduce的这一设计决策就没有多少意义了。在下一节中我们将研究一些与MapReduce设计决策相异的替代方案。
@ -548,21 +546,21 @@ top5.each{|count, url| puts "#{count} #{url}" } # 5
但在很多情况下,你知道一个作业的输出只能用作另一个作业的输入,这些作业由同一个团队维护。在这种情况下,分布式文件系统上的文件只是简单的**中间状态intermediate state**一种将数据从一个作业传递到下一个作业的方式。在一个用于构建推荐系统的由50或100个MapReduce作业组成的复杂工作流中存在着很多这样的中间状态【29】。
将这个中间状态写入文件的过程称为**物化materialization**。 (在“[聚合:数据立方体和物化视图](ch2.md#聚合:数据立方体和物化视图)”中已经在物化视图的背景中遇到过这个术语。它意味着对某个操作的结果立即求值并写出来,而不是在请求时按需计算)
将这个中间状态写入文件的过程称为**物化materialization**。 (在“[聚合:数据立方体和物化视图](ch3.md#聚合:数据立方体和物化视图)”中已经在物化视图的背景中遇到过这个术语。它意味着对某个操作的结果立即求值并写出来,而不是在请求时按需计算)
作为对照本章开头的日志分析示例使用Unix管道将一个命令的输出与另一个命令的输入连接起来。管道并没有完全物化中间状态而是只使用一个小的内存缓冲区将输出增量地**流stream**向输入。
与Unix管道相比MapReduce完全物化中间状态的方法存在不足之处
- MapReduce作业只有在前驱作业生成其输入中的所有任务都完成时才能启动而由Unix管道连接的进程会同时启动输出一旦生成就会被消费。不同机器上的数据斜或负载不均意味着一个作业往往会有一些掉队的任务,比其他任务要慢得多才能完成。必须等待至前驱作业的所有任务完成,拖慢了整个工作流程的执行。
- MapReduce作业只有在前驱作业生成其输入中的所有任务都完成时才能启动而由Unix管道连接的进程会同时启动输出一旦生成就会被消费。不同机器上的数据斜或负载不均意味着一个作业往往会有一些掉队的任务,比其他任务要慢得多才能完成。必须等待至前驱作业的所有任务完成,拖慢了整个工作流程的执行。
- Mapper通常是多余的它们仅仅是读取刚刚由Reducer写入的同样文件为下一个阶段的分区和排序做准备。在许多情况下Mapper代码可能是前驱Reducer的一部分如果Reducer和Mapper的输出有着相同的分区与排序方式那么Reducer就可以直接串在一起而不用与Mapper相互交织。
- 将中间状态存储在分布式文件系统中意味着这些文件被复制到多个节点,这些临时数据这么搞就比较过分了。
- 将中间状态存储在分布式文件系统中意味着这些文件被复制到多个节点,这些临时数据这么搞就比较过分了。
#### 数据流引擎
了解决MapReduce的这些问题几种用于分布式批处理的新执行引擎被开发出来其中最著名的是Spark 【61,62】Tez 【63,64】和Flink 【65,66】。它们的设计方式有很多区别但有一个共同点把整个工作流作为单个作业来处理而不是把它分解为独立的子作业。
了解决MapReduce的这些问题几种用于分布式批处理的新执行引擎被开发出来其中最著名的是Spark 【61,62】Tez 【63,64】和Flink 【65,66】。它们的设计方式有很多区别但有一个共同点把整个工作流作为单个作业来处理而不是把它分解为独立的子作业。
由于它们将工作流显式建模为 数据从几个处理阶段穿过,所以这些系统被称为**数据流引擎dataflow engines**。像MapReduce一样它们在一条线上通过反复调用用户定义的函数来一次处理一条记录它们通过输入分区来并行化载荷它们通过网络将一个函数的输出复制到另一个函数的输入。
由于它们将工作流显式建模为数据从几个处理阶段穿过,所以这些系统被称为**数据流引擎dataflow engines**。像MapReduce一样它们在一条线上通过反复调用用户定义的函数来一次处理一条记录它们通过输入分区来并行化载荷它们通过网络将一个函数的输出复制到另一个函数的输入。
与MapReduce不同这些功能不需要严格扮演交织的Map与Reduce的角色而是可以以更灵活的方式进行组合。我们称这些函数为**算子operators**,数据流引擎提供了几种不同的选项来将一个算子的输出连接到另一个算子的输入:
@ -570,7 +568,7 @@ top5.each{|count, url| puts "#{count} #{url}" } # 5
- 另一种可能是接受多个输入,并以相同的方式进行分区,但跳过排序。当记录的分区重要但顺序无关紧要时,这省去了分区散列连接的工作,因为构建散列表还是会把顺序随机打乱。
- 对于广播散列连接,可以将一个算子的输出,发送到连接算子的所有分区。
这种类型的处理引擎是基于像Dryad 【67】和Nephele 【68】这样的研究系统与MapReduce模型相比它有几个优点
这种类型的处理引擎是基于像Dryad 【67】和Nephele 【68】这样的研究系统与MapReduce模型相比它有几个优点
- 排序等昂贵的工作只需要在实际需要的地方执行而不是默认地在每个Map和Reduce阶段之间出现。
- 没有不必要的Map任务因为Mapper所做的工作通常可以合并到前面的Reduce算子中因为Mapper不会更改数据集的分区
@ -579,7 +577,7 @@ top5.each{|count, url| puts "#{count} #{url}" } # 5
- 算子可以在输入就绪后立即开始执行;后续阶段无需等待前驱阶段整个完成后再开始。
- 与MapReduce为每个任务启动一个新的JVM相比现有Java虚拟机JVM进程可以重用来运行新算子从而减少启动开销。
你可以使用数据流引擎执行与MapReduce工作流同样的计算而且由于此处所述的优化通常执行速度要明显快得多。既然算子是Map和Reduce的泛化那么相同的处理代码就可以在任一执行引擎上运行PigHive或Cascading中实现的工作流可以无需修改代码可以通过修改配置简单地从MapReduce切换到Tez或Spark【64】。
你可以使用数据流引擎执行与MapReduce工作流同样的计算而且由于此处所述的优化通常执行速度要明显快得多。既然算子是Map和Reduce的泛化那么相同的处理代码就可以在任一执行引擎上运行PigHive或Cascading中实现的工作流可以无需修改代码可以通过修改配置简单地从MapReduce切换到Tez或Spark【64】。
Tez是一个相当薄的库它依赖于YARN shuffle服务来实现节点间数据的实际复制【58】而Spark和Flink则是包含了独立网络通信层调度器及用户向API的大型框架。我们将简要讨论这些高级API。
@ -589,7 +587,7 @@ top5.each{|count, url| puts "#{count} #{url}" } # 5
SparkFlink和Tez避免将中间状态写入HDFS因此它们采取了不同的方法来容错如果一台机器发生故障并且该机器上的中间状态丢失则它会从其他仍然可用的数据重新计算在可行的情况下是先前的中间状态要么就只能是原始输入数据通常在HDFS上
为了实现这种重新计算,框架必须跟踪一个给定的数据是如何计算的 —— 使用了哪些输入分区?应用了哪些算子? Spark使用**弹性分布式数据集RDD**的抽象来跟踪数据的谱系【61】而Flink对算子状态存档允许恢复运行在执行过程中遇到错误的算子【66】。
为了实现这种重新计算,框架必须跟踪一个给定的数据是如何计算的 —— 使用了哪些输入分区?应用了哪些算子? Spark使用**弹性分布式数据集RDDResilient Distributed Dataset**的抽象来跟踪数据的谱系【61】而Flink对算子状态存档允许恢复运行在执行过程中遇到错误的算子【66】。
在重新计算数据时,重要的是要知道计算是否是**确定性的**:也就是说,给定相同的输入数据,算子是否始终产生相同的输出?如果一些丢失的数据已经发送给下游算子,这个问题就很重要。如果算子重新启动,重新计算的数据与原有的丢失数据不一致,下游算子很难解决新旧数据之间的矛盾。对于不确定性算子来说,解决方案通常是杀死下游算子,然后再重跑新数据。
@ -609,11 +607,11 @@ top5.each{|count, url| puts "#{count} #{url}" } # 5
在“[图数据模型](ch2.md#图数据模型)”中,我们讨论了使用图来建模数据,并使用图查询语言来遍历图中的边与点。[第2章](ch2.md)的讨论集中在OLTP风格的应用场景快速执行查询来查找少量符合特定条件的顶点。
批处理上下文中的图也很有趣其目标是在整个图上执行某种离线处理或分析。这种需求经常出现在机器学习应用如推荐引擎或排序系统中。例如最着名的图形分析算法之一是PageRank 【69】它试图根据链接到某个网页的其他网页来估计该网页的流行度。它作为配方的一部分用于确定网络搜索引擎呈现结果的顺序
批处理上下文中的图也很有趣其目标是在整个图上执行某种离线处理或分析。这种需求经常出现在机器学习应用如推荐引擎或排序系统中。例如最着名的图形分析算法之一是PageRank 【69】它试图根据链接到某个网页的其他网页来估计该网页的流行度。它作为配方的一部分用于确定网络搜索引擎呈现结果的顺序
> 像SparkFlink和Tez这样的数据流引擎参见“[中间状态的物化](#中间状态的物化)”)通常将算子作为**有向无环图DAG**的一部分安排在作业中。这与图处理不一样:在数据流引擎中,**从一个算子到另一个算子的数据流**被构造成一个图,而数据本身通常由关系型元组构成。在图处理中,数据本身具有图的形式。又一个不幸的命名混乱!
> 像SparkFlink和Tez这样的数据流引擎参见“[物化中间状态](#物化中间状态)”)通常将算子作为**有向无环图DAG**的一部分安排在作业中。这与图处理不一样:在数据流引擎中,**从一个算子到另一个算子的数据流**被构造成一个图,而数据本身通常由关系型元组构成。在图处理中,数据本身具有图的形式。又一个不幸的命名混乱!
许多图算法是通过一次遍历一条边来表示的,将一个顶点与近邻的顶点连接起来,以传播一些信息,并不断重复,直到满足一些条件为止 —— 例如,直到没有更多的边要跟进,或直到一些指标收敛。我们在[图2-6](img/fig2-6.png)中看到一个例子,它通过重复跟进标明地点归属关系的边,生成了数据库中北美包含的所有地点列表(这种算法被称为**闭包传递transitive closure**)。
许多图算法是通过一次遍历一条边来表示的,将一个顶点与近邻的顶点连接起来,以传播一些信息,并不断重复,直到满足一些条件为止 —— 例如,直到没有更多的边要跟进,或直到一些指标收敛。我们在[图2-6](img/fig2-6.png)中看到一个例子,它通过重复跟进标明地点归属关系的边,生成了数据库中北美包含的所有地点列表(这种算法被称为**传递闭包transitive closure**)。
可以在分布式文件系统中存储图包含顶点和边的列表的文件但是这种“重复至完成”的想法不能用普通的MapReduce来表示因为它只扫过一趟数据。这种算法因此经常以**迭代**的风格实现:
@ -625,19 +623,19 @@ top5.each{|count, url| puts "#{count} #{url}" } # 5
#### Pregel处理模型
针对图批处理的优化 —— **批量同步并行BSP**计算模型【70】已经开始流行起来。其中Apache Giraph 【37】Spark的GraphX API和Flink的Gelly API 【71】实现了它。它也被称为**Pregel**模型因为Google的Pregel论文推广了这种处理图的方法【72】。
针对图批处理的优化 —— **批量同步并行BSPBulk Synchronous Parallel**计算模型【70】已经开始流行起来。其中Apache Giraph 【37】Spark的GraphX API和Flink的Gelly API 【71】实现了它。它也被称为**Pregel**模型因为Google的Pregel论文推广了这种处理图的方法【72】。
回想一下在MapReduce中Mapper在概念上向Reducer的特定调用“发送消息”因为框架将所有具有相同键的Mapper输出集中在一起。 Pregel背后有一个类似的想法一个顶点可以向另一个顶点“发送消息”通常这些消息是沿着图的边发送的。
在每次迭代中,为每个顶点调用一个函数,将所有发送给它的消息传递给它 —— 就像调用Reducer一样。与MapReduce的不同之处在于在Pregel模型中顶点在一次迭代到下一次迭代的过程中会记住它的状态所以这个函数只需要处理新的传入消息。如果图的某个部分没有被发送消息那里就不需要做任何工作。
这与Actor模型有些相似参阅“[分布式的Actor框架](ch4.md#分布式的Actor框架)”),除了顶点状态和顶点之间的消息具有容错性和耐久性,且通信以固定的方式进行在每次迭代中框架递送上次迭代中发送的所有消息。Actor通常没有这样的时保证。
这与Actor模型有些相似参阅“[分布式的Actor框架](ch4.md#分布式的Actor框架)”),除了顶点状态和顶点之间的消息具有容错性和持久性,且通信以固定的回合进行在每次迭代中框架递送上次迭代中发送的所有消息。Actor通常没有这样的时保证。
#### 容错
顶点只能通过消息传递进行通信而不是直接相互查询的事实有助于提高Pregel作业的性能因为消息可以成批处理且等待通信的次数也减少了。唯一的等待是在迭代之间由于Pregel模型保证所有在一轮迭代中发送的消息都在下轮迭代中送达所以在下一轮迭代开始前先前的迭代必须完全完成而所有的消息必须在网络上完成复制。
即使底层网络可能丢失重复或任意延迟消息(参阅“[不可靠的网络](ch8.md#不可靠的网络)”Pregel的实现能保证在后续迭代中消息在其目标顶点恰好处理一次。像MapReduce一样框架能从故障中透明地恢复以简化在Pregel上实现算法的编程模型。
即使底层网络可能丢失重复或任意延迟消息(参阅“[不可靠的网络](ch8.md#不可靠的网络)”Pregel的实现能保证在后续迭代中消息在其目标顶点恰好处理一次。像MapReduce一样框架能从故障中透明地恢复以简化在Pregel上实现算法的编程模型。
这种容错是通过在迭代结束时定期存档所有顶点的状态来实现的即将其全部状态写入持久化存储。如果某个节点发生故障并且其内存中的状态丢失则最简单的解决方法是将整个图计算回滚到上一个存档点然后重启计算。如果算法是确定性的且消息记录在日志中那么也可以选择性地只恢复丢失的分区就像之前讨论过的数据流引擎【72】。
@ -655,7 +653,7 @@ top5.each{|count, url| puts "#{count} #{url}" } # 5
### 高级API和语言
自MapReduce开始流行的这几年以来分布式批处理的执行引擎已经很成熟了。到目前为止基础设施已经足够强大能够存储和处理超过10,000台机器集上的数PB的数据。由于在这种规模下物理执行批处理的问题已经被认为或多或少解决了所以关注点已经转向其他领域改进编程模型提高处理效率扩大这些技术可以解决的问题集。
自MapReduce开始流行的这几年以来分布式批处理的执行引擎已经很成熟了。到目前为止基础设施已经足够强大能够存储和处理超过10,000台机器集上的数PB的数据。由于在这种规模下物理执行批处理的问题已经被认为或多或少解决了所以关注点已经转向其他领域改进编程模型提高处理效率扩大这些技术可以解决的问题集。
如前所述HivePigCascading和Crunch等高级语言和API变得越来越流行因为手写MapReduce作业实在是个苦力活。随着Tez的出现这些高级语言还有一个额外好处可以迁移到新的数据流执行引擎而无需重写作业代码。 Spark和Flink也有它们自己的高级数据流API通常是从FlumeJava中获取的灵感【34】。
@ -671,21 +669,21 @@ top5.each{|count, url| puts "#{count} #{url}" } # 5
连接算法的选择可以对批处理作业的性能产生巨大影响,而无需理解和记住本章中讨论的各种连接算法。如果连接是以**声明式declarative**的方式指定的,那这就这是可行的:应用只是简单地说明哪些连接是必需的,查询优化器决定如何最好地执行连接。我们以前在“[数据查询语言](ch2.md#数据查询语言)”中见过这个想法。
但MapReduce及其后继者数据流在其他方面与SQL的完全声明式查询模型有很大区别。 MapReduce是围绕着回调函数的概念建立的对于每条记录或者一组记录调用一个用户定义的函数Mapper或Reducer并且该函数可以自由地调用任意代码来决定输出什么。这种方法的优点是可以基于大量已有库的生态系统创作解析,自然语言分析,图像分析以及运行数值算法或统计算法等。
但MapReduce及其数据流后继者在其他方面与SQL的完全声明式查询模型有很大区别。 MapReduce是围绕着回调函数的概念建立的对于每条记录或者一组记录调用一个用户定义的函数Mapper或Reducer并且该函数可以自由地调用任意代码来决定输出什么。这种方法的优点是可以基于大量已有库的生态系统创作解析、自然语言分析、图像分析以及运行数值或统计算法等。
自由运行任意代码长期以来都是传统MapReduce批处理系统与MPP数据库的区别所在参见“[比较Hadoop和分布式数据库](#比较Hadoop和分布式数据库)”一节。虽然数据库具有编写用户定义函数的功能但是它们通常使用起来很麻烦而且与大多数编程语言中广泛使用的程序包管理器和依赖管理系统兼容不佳例如Java的Maven Javascript的npm以及Ruby的gems
自由运行任意代码长期以来都是传统MapReduce批处理系统与MPP数据库的区别所在参见“[Hadoop与分布式数据库的对比](#Hadoop与分布式数据库的对比)”一节。虽然数据库具有编写用户定义函数的功能但是它们通常使用起来很麻烦而且与大多数编程语言中广泛使用的程序包管理器和依赖管理系统兼容不佳例如Java的Maven、Javascript的npm以及Ruby的gems
然而数据流引擎已经发现,支持除连接之外的更多**声明式特性**还有其他的优势。例如如果一个回调函数只包含一个简单的过滤条件或者只是从一条记录中选择了一些字段那么在为每条记录调用函数时会有相当大的额外CPU开销。如果以声明方式表示这些简单的过滤和映射操作那么查询优化器可以利用面向列的存储布局参阅“[列存储](ch3.md#列存储)”),只从磁盘读取所需的列。 HiveSpark DataFrames和Impala还使用了向量化执行参阅“[内存带宽和向量处理](ch3.md#内存带宽和向量处理)”在对CPU缓存友好的内部循环中迭代数据避免函数调用。Spark生成JVM字节码【79】Impala使用LLVM为这些内部循环生成本机代码【41】。
然而数据流引擎已经发现,支持除连接之外的更多**声明式特性**还有其他的优势。例如如果一个回调函数只包含一个简单的过滤条件或者只是从一条记录中选择了一些字段那么在为每条记录调用函数时会有相当大的额外CPU开销。如果以声明方式表示这些简单的过滤和映射操作那么查询优化器可以利用面向列的存储布局参阅“[列存储](ch3.md#列存储)”),只从磁盘读取所需的列。 HiveSpark DataFrames和Impala还使用了向量化执行参阅“[内存带宽和向量处理](ch3.md#内存带宽和向量处理)”在对CPU缓存友好的内部循环中迭代数据避免函数调用。Spark生成JVM字节码【79】Impala使用LLVM为这些内部循环生成本机代码【41】。
通过在高级API中引入声明式的部分并使查询优化器可以在执行期间利用这些来做优化批处理框架看起来越来越像MPP数据库了并且能实现可与之媲美的性能。同时通过拥有运行任意代码和以任意格式读取数据的可伸缩性,它们保持了灵活性的优势。
通过在高级API中引入声明式的部分并使查询优化器可以在执行期间利用这些来做优化批处理框架看起来越来越像MPP数据库了并且能实现可与之媲美的性能。同时通过拥有运行任意代码和以任意格式读取数据的可扩展性,它们保持了灵活性的优势。
#### 专业化的不同领域
尽管能够运行任意代码的可伸缩性是很有用的,但是也有很多常见的例子,不断重复着标准的处理模式。因而这些模式值得拥有自己的可重用通用构建模块实现传统上MPP数据库满足了商业智能分析和业务报表的需求但这只是许多使用批处理的领域之一。
尽管能够运行任意代码的可扩展性是很有用的,但是也有很多常见的例子,不断重复着标准的处理模式。因而这些模式值得拥有自己的可重用通用构建模块实现传统上MPP数据库满足了商业智能分析和业务报表的需求但这只是许多使用批处理的领域之一。
另一个越来越重要的领域是统计和数值算法它们是机器学习应用所需要的例如分类器和推荐系统。可重用的实现正在出现例如Mahout在MapReduceSpark和Flink之上实现了用于机器学习的各种算法而MADlib在关系型MPP数据库Apache HAWQ中实现了类似的功能【54】。
另一个越来越重要的领域是统计和数值算法它们是机器学习应用所需要的例如分类器和推荐系统。可重用的实现正在出现例如Mahout在MapReduceSpark和Flink之上实现了用于机器学习的各种算法而MADlib在关系型MPP数据库Apache HAWQ中实现了类似的功能【54】。
空间算法也是有用的,例如**最近邻搜索k-nearest neghbors, kNN**【80】它在一些多维空间中搜索与给定项最近的项目 —— 这是一种相似性搜索。近似搜索对于基因组分析算法也很重要它们需要找到相似但不相同的字符串【81】。
空间算法也是有用的,例如**k近邻搜索k-nearest neighbors, kNN**【80】它在一些多维空间中搜索与给定项最近的项目 —— 这是一种相似性搜索。近似搜索对于基因组分析算法也很重要它们需要找到相似但不相同的字符串【81】。
批处理引擎正被用于分布式执行日益广泛的各领域算法。随着批处理系统获得各种内置功能以及高级声明式算子且随着MPP数据库变得更加灵活和易于编程两者开始看起来相似了最终它们都只是存储和处理数据的系统。
@ -693,7 +691,7 @@ top5.each{|count, url| puts "#{count} #{url}" } # 5
## 本章小结
在本章中我们探索了批处理的主题。我们首先看到了诸如awkgrep和sort之类的Unix工具然后我们看到了这些工具的设计理念是如何应用到MapReduce和更近的数据流引擎中的。一些设计原则包括输入是不可变的输出是为了作为另一个仍未知的程序的输入而复杂的问题是通过编写“做好一件事”的小工具来解决的。
在本章中我们探索了批处理的主题。我们首先看到了诸如awkgrep和sort之类的Unix工具然后我们看到了这些工具的设计理念是如何应用到MapReduce和更近的数据流引擎中的。一些设计原则包括输入是不可变的输出是为了作为另一个仍未知的程序的输入而复杂的问题是通过编写“做好一件事”的小工具来解决的。
在Unix世界中允许程序与程序组合的统一接口是文件与管道在MapReduce中该接口是一个分布式文件系统。我们看到数据流引擎添加了自己的管道式数据传输机制以避免将中间状态物化至分布式文件系统但作业的初始输入和最终输出通常仍是HDFS。
@ -701,7 +699,7 @@ top5.each{|count, url| puts "#{count} #{url}" } # 5
***分区***
在MapReduce中Mapper根据输入文件块进行分区。Mapper的输出被重新分区,排序,并合并到可配置数量的Reducer分区中。这一过程的目的是把所有的**相关**数据(例如带有相同键的所有记录)都放在同一个地方。
在MapReduce中Mapper根据输入文件块进行分区。Mapper的输出被重新分区、排序并合并到可配置数量的Reducer分区中。这一过程的目的是把所有的**相关**数据(例如带有相同键的所有记录)都放在同一个地方。
后MapReduce时代的数据流引擎若非必要会尽量避免排序但它们也采取了大致类似的分区方法。
@ -715,7 +713,7 @@ top5.each{|count, url| puts "#{count} #{url}" } # 5
***排序合并连接***
每个参与连接的输入都通过一个提取连接键的Mapper。通过分区排序和合并具有相同键的所有记录最终都会进入相同的Reducer调用。这个函数能输出连接好的记录。
每个参与连接的输入都通过一个提取连接键的Mapper。通过分区排序和合并具有相同键的所有记录最终都会进入相同的Reducer调用。这个函数能输出连接好的记录。
***广播散列连接***
@ -727,7 +725,7 @@ top5.each{|count, url| puts "#{count} #{url}" } # 5
分布式批处理引擎有一个刻意限制的编程模型回调函数比如Mapper和Reducer被假定是无状态的而且除了指定的输出外必须没有任何外部可见的副作用。这一限制允许框架在其抽象下隐藏一些困难的分布式系统问题当遇到崩溃和网络问题时任务可以安全地重试任何失败任务的输出都被丢弃。如果某个分区的多个任务成功则其中只有一个能使其输出实际可见。
得益于这个框架,你在批处理作业中的代码无需操心实现容错机制:框架可以保证作业的最终输出与没有发生错误的情况相同,也许不得不重试各种任务。在线服务处理用户请求,并将写入数据库作为处理请求的副作用,比起在线服务,批处理提供的这种可靠性语义要强得多。
得益于这个框架,你在批处理作业中的代码无需操心实现容错机制:框架可以保证作业的最终输出与没有发生错误的情况相同,虽然实际上也许不得不重试各种任务。比起在线服务一边处理用户请求一边将写入数据库作为处理请求的副作用,批处理提供的这种可靠性语义要强得多。
批处理作业的显著特点是,它读取一些输入数据并产生一些输出数据,但不修改输入—— 换句话说,输出是从输入衍生出的。最关键的是,输入数据是**有界的bounded**:它有一个已知的,固定的大小(例如,它包含一些时间点的日志文件或数据库内容的快照)。因为它是有界的,一个作业知道自己什么时候完成了整个输入的读取,所以一个工作在做完后,最终总是会完成的。