ddia/ch10.md

1033 lines
112 KiB
Markdown
Raw Normal View History

2018-02-08 14:07:06 +08:00
# 10. 批处理
![](img/ch10.png)
2018-02-08 18:29:00 +08:00
> 带有太强个人色彩的系统无法成功。当第一版健壮的设计完成时,不同的人们以自己的方式来测试,真正的考验才开始。
>
> ——高德纳
2018-02-08 14:07:06 +08:00
---------------
[TOC]
2018-02-12 17:15:40 +08:00
在本书的前两部分中我们讨论了很多关于请求和查询以及相应的响应或结果。这种数据处理方式在许多现代数据系统中都是假设的你要求什么或者发送指令一段时间后系统希望会给你一个答案。数据库缓存搜索索引Web服务器以及其他许多系统都以这种方式工作。
在这样的在线系统中无论是浏览器请求页面还是调用远程API的服务我们通常都假设请求是由人类用户触发的并且用户正在等待响应。他们不必等太久所以我们非常重视这些系统的响应时间请参阅第13页的“描述性能”
Web和越来越多的基于HTTP / REST的API使交互的请求/响应风格变得如此普遍,以至于很容易将其视为理所当然。但我们应该记住,这不是构建系统的唯一方式,其他方法也有其优点。我们来区分三种不同类型的系统:
服务(在线系统)
服务等待客户的请求或指令到达。当收到一个,服务试图尽快处理它,并发回一个响应。响应时间通常是服务性能的主要衡量指标,可用性通常非常重要(如果客户端无法访问服务,用户可能会收到错误消息)。
***批处理系统(离线系统)***
一个批处理系统需要大量的输入数据,运行一个工作来处理它,并产生一些输出数据。工作往往需要一段时间(从几分钟到几天),所以通常不会有用户等待工作完成。相反,批量作业通常会定期运行(例如,每天一次)。批处理作业的主要性能衡量标准通常是吞吐量(通过特定大小的输入数据集所需的时间)。我们讨论本章中的批处理。
***流处理系统(近实时系统)***
流处理是在线和离线/批处理之间的一个地方所以有时候被称为近实时或近线处理。像批处理系统一样流处理器消耗输入并产生输出而不是响应请求。但是流式作业在事件发生后不久就会对事件进行操作而批处理作业则使用固定的一组输入数据进行操作。这种差异使流处理系统比等效的批处理系统具有更低的延迟。由于流处理基于批处理我们将在第11章讨论它。
正如我们将在本章中看到的那样批量处理是构建可靠可扩展和可维护应用程序的重要组成部分。例如2004年发布的批处理算法Map-Reduce可能过度热情地被称为“使得Google具有如此大规模可扩展性的算法”[2]。随后在各种开源数据系统中实施包括HadoopCouchDB和MongoDB。
与多年前为数据仓库开发的并行处理系统[34]相比MapReduce是一个相当低级别的编程模型但它在处理规模方面迈出了重要的一步。在商品硬件上。虽然MapReduce的重要性正在下降[5],但它仍然值得理解,因为它提供了批处理为什么以及如何有用的清晰画面。
实际上批处理是一种非常古老的计算形式。早在可编程数字计算机诞生之前打孔卡制表机例如1890年美国人口普查[6]中使用的霍尔里斯机)实现了半机械化的批处理形式,以计算来自大量输入的汇总统计量。 Map-Reduce与1940年代和1950年代广泛用于商业数据处理的机电IBM卡片分类机器有着惊人的相似之处[7]。像往常一样,历史有重演的趋势。
在本章中我们将看看MapReduce和其他一些批处理算法和框架并探讨它们在现代数据系统中的使用方式。但首先要开始我们将看看使用标准Unix工具的数据处理。即使你已经熟悉了它们Unix的哲学提醒也是值得的因为从Unix的想法和经验教训转移到大规模异构的分布式数据系统。
2018-02-08 14:07:06 +08:00
## 使用Unix工具的批处理
2018-02-12 17:15:40 +08:00
我们从一个简单的例子开始。假设您有一台Web服务器每次提供请求时都会在日志文件中附加一行。例如使用nginx默认访问日志格式日志的一行可能如下所示
```
216.58.210.78 - - [27/Feb/2015:17:55:11 +0000] "GET /css/typography.css HTTP/1.1"
200 3377 "http://martin.kleppmann.com/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_5)
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/40.0.2214.115 Safari/537.36"
```
(实际上这只是一行,为了便于阅读,它只是分成多行)。这一行中有很多信息。为了解释它,你需要看看日志格式的定义,如下所示::
```
$remote_addr - $remote_user [$time_local] "$request"
$status $body_bytes_sent "$http_referer" "$http_user_agent"
```
因此日志的这一行表明在2015年2月27日17:55:11 UTC服务器从客户端IP地址216.58.210.78接收到文件`/css/typography.css`的请求。用户没有被认证,所以`$remote_user`被设置为连字符(`-` 。响应状态是200即请求成功响应的大小是3377字节。网页浏览器是Chrome 40并且它加载了该文件因为它是在URL `http://martin.kleppmann.com/`的页面中引用的。
2018-02-08 14:07:06 +08:00
### 分析简单日志
2018-02-12 17:15:40 +08:00
各种工具可以把这些日志文件并产生漂亮的报告有关您的网站流量但为了锻炼让我们建立自己的使用基本的Unix工具。 例如,假设你想在你的网站上找到五个最受欢迎的网页。 你可以在Unix shell中这样做[^i]
[^i]: 有些人喜欢抬杠,认为`cat`这里并没有必要因为输入文件可以直接作为awk的参数。 但这种写法让流水线更显眼。
```bash
cat /var/log/nginx/access.log | #1
awk '{print $7}' | #2
sort | #3
uniq -c | #4
sort -r -n | #5
head -n 5 #6
```
1. 读取日志文件
2. 将每一行按空格分割成不同的字段每行只输出第七个这样的字段恰好是请求的URL。在我们的示例行中这个请求URL是`/css/typography.css`。
3. 按字母顺序排列请求的URL列表。如果某个URL被请求过n次那么排序后该文件将包含连续n次重复的URL。
4. uniq命令通过检查两条相邻的行是否相同来过滤掉其输入中的重复行。 -c选项告诉它也输出一个计数器对于每个不同的URL它会报告输入中出现该URL的次数。
5. 第二种排序按每行起始处的数字(-n排序这是请求URL的次数。然后以反向-r顺序返回结果即首先以最大的数字返回结果。
6. 最后,头只输出输入的前五行(-n 5并丢弃其余的。该系列命令的输出如下所示
```
4189 /favicon.ico
3631 /2013/05/24/improving-security-of-ssh-private-keys.html
2124 /2012/12/05/schema-evolution-in-avro-protocol-buffers-thrift.html
1369 /
915 /css/typography.css
```
尽管如果你不熟悉Unix工具上面的命令行可能看起来有点模糊但是它非常强大。它将在几秒钟内处理千兆字节的日志文件您可以轻松修改分析以适应您的需求。例如如果要从报告中省略CSS文件请将awk参数更改为`'$7 !~ /\.css$/ {print $7}'`等等。
本书中没有空间来详细探索Unix工具但是非常值得学习。令人惊讶的是使用awksedgrepsortuniq和xargs的组合可以在几分钟内完成许多数据分析并且它们的表现令人惊讶地很好[8]。
#### 命令链与自定义程序
而不是Unix命令链你可以写一个简单的程序来做同样的事情。例如在Ruby中它可能看起来像这样
```ruby
counts = Hash.new(0) # 1
File.open('/var/log/nginx/access.log') do |file|
file.each do |line|
url = line.split[6] # 2
counts[url] += 1 # 3
end
end
top5 = counts.map{|url, count| [count, url] }.sort.reverse[0...5] # 4
top5.each{|count, url| puts "#{count} #{url}" } # 5
```
1. `counts`是一个哈希表,保持一个计数器的次数,我们已经看到每个网址。计数器默认为零。
2. 从日志的每一行我们都把URL作为第七个空格分隔的字段这里的数组索引是6因为Ruby的数组是零索引的
3. 增加日志当前行中URL的计数器。
4. 按计数器值(降序)对散列表内容进行排序,并取前五位。
5. 打印出前五个条目。
这个程序并不像Unix管道那样简洁但是它的可读性很强你喜欢的两个中的哪一个是味道的一部分。但是除了两者之间的表面差异之外执行流程也有很大差异如果您在大文件上运行此分析则会变得明显。
#### 排序与内存中的聚合
Ruby脚本保存一个URL的内存哈希表其中每个URL映射到它已经被看到的次数。 Unix流水线的例子没有这样一个哈希表而是依赖于对一个URL列表进行排序在这个URL列表中同一个URL的多个发生只是简单的重复。
哪种方法更好这取决于你有多少个不同的网址。对于大多数中小型网站您可能可以适应所有不同的URL并且可以为每个网址例如1GB内存提供一个计数器。在此示例中作业的工作集作业需要随机访问的内存量仅取决于不同URL的数量如果单个URL有一百万个日志条目则散列中所需的空间表仍然只有一个URL加上计数器的大小。如果这个工作集足够小那么内存散列表工作正常甚至在笔记本电脑上也是如此。
另一方面如果作业的工作集大于可用内存则排序方法的优点是可以高效地使用磁盘。这与我们在第74页的“SSTables和LSM-Trees”中讨论过的原理是一样的数据块可以在内存中排序并作为段文件写入磁盘然后多个排序的段可以合并为一个更大的排序文件。 Mergesort具有在磁盘上运行良好的顺序访问模式。 请记住在顺序I / O中进行优化是第3章中反复出现的主题。这里再次出现相同的模式。
GNU CoreutilsLinux中的排序实用程序通过溢出到磁盘自动处理大于内存的数据集并自动并行排序跨多个CPU核心[9]。这意味着我们之前看到的简单的Unix命令链很容易扩展到大数据集而不会耗尽内存。瓶颈可能是从磁盘读取输入文件的速度。
2018-02-08 14:07:06 +08:00
### Unix哲学
2018-02-12 17:15:40 +08:00
我们可以非常容易地使用前一个例子中的一系列命令来分析日志文件这并非巧合事实上这实际上是Unix的关键设计思想之一而且它今天依然令人惊讶。让我们更深入地研究一下这样我们可以从Unix中借鉴一些想法[10]。
Unix管道的发明者道格·麦克罗伊Doug McIlroy在1964年首先描述了这种情况[11]:“当需要以另一种方式处理数据时,我们应该有一些连接程序的方法,比如[a] 。这也是I / O的方式。“管道类比困难了连接程序和管道的想法成为了现在被称为Unix哲学的一部分 - 一套在开发者中流行的设计原则。 Unix的用户。哲学在1978年描述如下[12,13]
1. 让每个程序都做好一件事。要做好新的工作,重新建立一个新的“特征”,而不是使旧的计划复杂化。
2. 期待每个程序的输出成为另一个程序的输入。不要混淆输出与无关的信息。避免使用严格的柱状或二进制输入格式。不要坚持交互式输入。
3. 设计和构建软件,甚至是操作系统,要尽早尝试,最好在几周内完成。不要犹豫,扔掉笨拙的部分,重建它们。
4. 使用工具优先于不熟练的帮助来减轻编程任务,即使您必须绕道建立工具,并期望在完成使用后将其中一些工具扔掉。
这种方法 - 自动化,快速原型设计,迭代式迭代,对实验友好,将大型项目分解成可管理的块 - 听起来非常像今天的敏捷和DevOps运动。奇怪的是四十年来变化不大。
排序工具是一个很好的例子。它可以说是一个比大多数编程语言在其标准库不会溢出到磁盘并且不使用多线程即使是有益的中更好的排序实现。然而这种分类几乎没有用处。它只能与其他Unix工具如uniq结合使用。
像bash这样的Unix shell可以让我们轻松地将这些小程序组合成令人惊讶的强大数据处理作业。尽管这些程序中有很多是由不同人群编写的但它们可以灵活地结合在一起。 Unix如何实现这种可组合性
#### 统一的接口
如果您希望一个程序的输出成为另一个程序的输入,那意味着这些程序必须使用相同的数据格式 - 换句话说,一个兼容的接口。如果您希望能够将任何程序的输出连接到任何程序的输入,那意味着所有程序必须使用相同的输入/输出接口。
在Unix中该接口是一个文件更准确地说是一个文件描述符。一个文件只是一个有序的字节序列。因为这是一个非常简单的接口所以可以使用相同的接口来表示许多不同的东西文件系统上的实际文件到另一个进程Unix套接字stdinstdout的通信通道设备驱动程序比如`/dev/audio`或`/dev/lp0`表示TCP连接的套接字等等。理所当然的事很容易但实际上这些非常不同的事物可以共享一个统一的界面所以它们可以很容易地连接在一起[^ii]。
[^ii]: 统一接口的另一个例子是URL和HTTP这是Web的基础。 一个URL标识一个网站上的一个特定的东西资源你可以链接到任何其他网站的任何网址。 具有网络浏览器的用户因此可以通过跟随链接在网站之间无缝跳转,即使服务器可能由完全不相关的组织操作。 这个原则今天似乎很明显,但它是使网络取得今天成功的关键。 之前的系统并不是那么统一例如在公告板系统BBS时代每个系统都有自己的电话号码和波特率配置。 从一个BBS到另一个BBS的引用必须以电话号码和调制解调器设置的形式; 用户将不得不挂断拨打其他BBS然后手动找到他们正在寻找的信息。 这是不可能的直接链接到另一个BBS内的一些内容。
按照惯例许多但不是全部Unix程序将这个字节序列视为ASCII文本。我们的日志分析示例使用了这个事实awksortuniq和head都将它们的输入文件视为由`\n`换行符ASCII 0x0A字符分隔的记录列表。 `\n`的选择是任意的 - 可以说ASCII记录分隔符`0x1E`本来就是一个更好的选择,因为它是为了这个目的而设计的[14],但是无论如何,所有这些程序都使用相同的记录分隔符允许它们互操作。
每个记录(即一行输入)的解析更加模糊。 Unix工具通常通过空白或制表符将行分割成字段但也使用CSV逗号分隔管道分隔和其他编码。即使像xargs这样一个相当简单的工具也有六个命令行选项用于指定如何解析输入。
ASCII文本的统一接口主要工作但它不是很漂亮我们的日志分析示例使用`{print $ 7}`来提取网址,这是不是很可读。在理想的世界中,这可能是`{print $ request_url}`或类似的东西。我们稍后会回到这个想法。
尽管几十年后还不够完美但统一的Unix界面仍然非常显着。与Unix工具一样软件的交互操作和编写并不是很多您不能通过自定义分析工具轻松地将电子邮件帐户的内容和在线购物历史记录传送到电子表格中并将结果发布到社交网络或维基。今天像Unix工具一样流畅地运行程序是一个例外而不是规范。
即使是具有相同数据模型的数据库,也往往不容易将数据从一个数据模型中移出。这种缺乏整合导致数据的巴尔干化。
#### 逻辑和布线的分离
Unix工具的另一个特点是使用标准输入stdin和标准输出stdout。如果你运行一个程序而不指定任何其他的东西标准输入来自键盘和标准输出到屏幕上。但是您也可以从文件输入和/或将输出重定向到文件。管道允许您将一个进程的标准输出附加到另一个进程的标准输入(具有小内存缓冲区,而不需要将整个中间数据流写入磁盘)。
程序仍然可以直接读取和写入文件但如果程序不担心特定的文件路径只使用标准输入和标准输出则Unix方法效果最好。这允许shell用户以任何他们想要的方式连接输入和输出;该程序不知道或不关心输入来自哪里以及输出到哪里。 (人们可以说这是一种松耦合,后期绑定[15]或控制反转[16])。将输入/输出接线与程序逻辑分开,可以将小工具组合成更大的系统。
您甚至可以编写自己的程序并将它们与操作系统提供的工具组合在一起。你的程序只需要从标准输入读取输入并将输出写入标准输出并且可以参与数据处理流水线。在日志分析示例中您可以编写一个工具将用户代理字符串转换为更灵敏的浏览器标识符或者将IP地址转换为国家代码的工具并将其插入管道。排序程序并不关心它是否与操作系统的另一部分或者你写的程序通信。
但是使用stdin和stdout可以做什么是有限的。需要多个输入或输出的程序是可能的但棘手的。如果程序直接打开文件进行读取和写入或者将另一个程序作为子进程启动或者打开网络连接则无法将程序的输出传输到网络连接中【17,18】[^iii] 。 I / O由程序本身连接。它仍然可以配置例如通过命令行选项但是减少了在Shell中连接输入和输出的灵活性。
[^iii]: 除了使用一个单独的工具如netcat或curl。 Unix开始试图将所有东西都表示为文件但是BSD套接字API偏离了这个惯例[17]。研究操作系统Plan 9和Inferno在使用文件方面更加一致它们将TCP连接表示为/ net / tcp中的文件[18]。
#### 透明度和实验
使Unix工具如此成功的部分原因是它们使得查看正在发生的事情变得非常容易
Unix命令的输入文件通常被视为不可变的。这意味着您可以随意运行命令尝试各种命令行选项而不会损坏输入文件。
* 您可以在任何时候结束管道,将输出管道输送到较少的位置,然后查看它是否具有预期的形式。这种检查能力对调试非常有用。
* 您可以将一个流水线阶段的输出写入文件,并将该文件用作下一阶段的输入。这使您可以重新启动后面的阶段,而无需重新运行整个管道。
因此与关系数据库的查询优化器相比即使Unix工具非常简单工具简单但仍然非常有用特别是对于实验而言。
然而Unix工具的最大局限在于它们只能在一台机器上运行 - 而Hadoop这样的工具就是在这里工作的。
2018-02-08 14:07:06 +08:00
## MapReduce和分布式文件系统
2018-02-12 17:15:40 +08:00
MapReduce有点像Unix工具但分布在数千台机器上。像Unix工具一样这是一个相当直接的蛮力的但却是令人惊讶的有效工具。一个MapReduce作业可以和一个Unix进程相媲美它需要一个或多个输入并产生一个或多个输出。
和大多数Unix工具一样运行MapReduce作业通常不会修改输入除了生成输出外没有任何副作用。输出文件以连续的方式写入一次一旦写入文件不会修改任何现有的文件部分
虽然Unix工具使用stdin和stdout作为输入和输出但MapReduce作业在分布式文件系统上读写文件。在Hadoop的Map-Reduce实现中该文件系统被称为HDFSHadoop分布式文件系统一个开源的重新实现Google文件系统GFS[19]。
除HDFS外还有各种其他分布式文件系统如GlusterFS和Quantcast File SystemQFS[20]。诸如Amazon S3Azure Blob存储和OpenStack Swift [21]等对象存储服务在很多方面都是相似的[^iv]。在本章中我们将主要使用HDFS作为示例但是这些原则适用于任何分布式文件系统。
[^iv]: 一个不同之处在于对于HDFS可以将计算任务安排在存储特定文件副本的计算机上运行而对象存储通常将存储和计算分开。如果网络带宽是一个瓶颈从本地磁盘读取有性能优势。但是请注意如果使用删除编码局部优势将会丢失因为来自多台机器的数据必须进行合并以重建原始文件[20]。
与网络连接存储NAS和存储区域网络SAN架构的共享磁盘方法相比HDFS基于无共享原则参见第二部分的介绍。共享磁盘存储由集中式存储设备实现通常使用定制硬件和专用网络基础设施如光纤通道。另一方面无共享方法不需要特殊的硬件只需要通过传统数据中心网络连接的计算机。
HDFS包含在每台机器上运行的守护进程暴露一个允许其他节点访问存储在该机器上的文件的网络服务假设数据中心中的每台通用计算机都附带有一些磁盘。名为NameNode的中央服务器会跟踪哪个文件块存储在哪台机器上。因此HDFS在概念上创建一个可以使用运行守护进程的所有机器的磁盘上的空间的大文件系统。
为了容忍机器和磁盘故障文件块被复制到多台机器上。复制可能意味着多个机器上的相同数据的多个副本如第5章中所述或者像Reed-Solomon代码这样的擦除编码方案它允许以比完全复制更低的存储开销恢复丢失的数据[20 22。这些技术与RAID相似可以在连接到同一台机器的多个磁盘上提供冗余;区别在于在分布式文件系统中,文件访问和复制是在传统的数据中心网络上完成的,没有特殊的硬件。
HDFS已经很好地扩展了在撰写本文时最大的HDFS部署运行在成千上万台机器上总存储容量达数百peta-bytes [23]。如此大的规模已经变得可行因为使用商品硬件和开源软件的HDFS上的数据存储和访问成本远低于专用存储设备上的同等容量[24]。
### MapReduce作业执行
MapReduce是一个编程框架您可以使用它编写代码来处理HDFS等分布式文件系统中的大型数据集。理解它的最简单方法是参考第391页上的“简单日志分析”中的Web服务器日志分析示例。MapReduce中的数据处理模式与此示例非常相似
1. 读取一组输入文件并将其分解成记录。在Web服务器日志示例中每条记录都是日志中的一行即\ n是记录分隔符
2. 调用Mapper函数从每个输入记录中提取一个键和值。在前面的例子中mapper函数是`awk'{print $ 7}'`:它提取`URL($7)`作为关键字,并将值保留为空。
3. 按键排序所有的键值对。在日志示例中,这由第一个排序命令完成。
4. 调用reducer函数遍历排序后的键值对。如果同一个键出现多次排序使它们在列表中相邻所以很容易组合这些值而不必在内存中保留很多状态。在前面的例子中reducer是由uniq -c命令实现的该命令使用相同的密钥来统计相邻记录的数量。
这四个步骤可以由一个MapReduce作业执行。步骤2地图和4减少是您编写自定义数据处理代码的地方。步骤1将文件分解成记录由输入格式解析器处理。步骤3中的排序步骤隐含在MapReduce中 - 您不必编写它因为映射器的输出始终在给予reducer之前进行排序。
要创建MapReduce作业您需要实现两个回调函数mapper和reducer其行为如下另请参阅“MapReduce查询”第46页
***Mapper***
每个输入记录都会调用一次映射器,其工作是从输入记录中提取键和值。对于每个输入,它可以生成任意数量的键值对(包括无)。它不会保留从一个输入记录到下一个记录的任何状态,因此每个记录都是独立处理的。
***Reducer***
MapReduce框架采用由映射器生成的键值对收集属于同一个键的所有值并使用迭代器调用reducer以调用该值集合。 Reducer可以产生输出记录例如相同URL的出现次数
在Web服务器日志示例中我们在第5步中有第二个排序命令它按请求数对URL进行排序。在MapReduce中如果您需要第二个排序阶段则可以通过编写第二个MapReduce作业并将第一个作业的输出用作第二个作业的输入来实现它。这样看来映射器的作用是将数据放入一个适合排序的表单中并且还原器的作用是处理已排序的数据。
#### 分布式执行MapReduce
Unix命令管道的主要区别在于MapReduce可以在多台机器上并行执行计算而无需编写代码来明确处理并行性。映射器和简化器一次只能处理一条记录;他们不需要知道他们的输入来自哪里或者输出什么地方,所以框架可以处理机器之间移动数据的复杂性。
在分布式计算中可以使用标准的Unix工具作为映射器和简化器[25]但更常见的是它们被实现为传统编程语言的函数。在Hadoop MapReduce中映射器和简化器都是实现特定接口的Java类。在MongoDB和CouchDB中映射器和简化器都是JavaScript函数请参阅第46页的“MapReduce查询”
[图10-1]()显示了Hadoop MapReduce作业中的数据流。其并行化基于分区参见第6章作业的输入通常是HDFS中的一个目录输入目录中的每个文件或文件块都被认为是一个单独的分区可以单独处理map任务图10-1中的m 1m 2和m 3标记
每个输入文件的大小通常是数百兆字节。 MapReduce调度器图中未显示试图在其中一台存储输入文件副本的机器上运行每个映射器只要该机器有足够的备用RAM和CPU资源来运行映射任务[26]。这个原则被称为将数据放在数据附近[27]:它节省了通过网络复制输入文件,减少网络负载和增加局部性。
2018-02-08 14:07:06 +08:00
![](img/fig10-1.png)
**图10-1 具有三个Mapper和三个Reducer的MapReduce任务**
2018-02-12 17:15:40 +08:00
在大多数情况下应该在映射任务中运行的应用程序代码在分配运行它的任务的计算机上还不存在所以MapReduce框架首先复制代码例如Java程序中的JAR文件到适当的机器。然后启动地图任务并开始读取输入文件一次将一条记录传递给mapper回调。映射器的输出由键值对组成。
计算的减少方面也被分割。虽然Map任务的数量由输入文件块的数量决定但Reducer的任务的数量是由作业作者配置的它可以不同于地图任务的数量。为了确保具有相同密钥的所有键值对在相同的缩减器处结束框架使用密钥的散列值来确定哪个减少的任务应该接收到特定的键值对参见“通过密钥散列分区”第203页
键值对必须进行排序但数据集可能太大无法在单台机器上使用常规排序算法进行排序。相反分类是分阶段进行的。首先每个映射任务都基于密钥的散列通过简化器分割其输出。这些分区中的每一个都被写入映射程序本地磁盘上的已排序文件使用的技术与我们在第76页的“SSTables and LSM-Trees”中讨论的类似。
只要映射器完成读取输入文件并写入其排序后的输出文件MapReduce调度器就会通知减速器他们可以从该映射器开始获取输出文件。减法器连接到每个映射器并为其分区下载排序后的键值对的文件。通过简化分类和将数据分区从映射器复制到简化器的过程被称为混洗[26](一个令人困惑的术语 - 不像洗牌一样在MapReduce中没有随机性
reduce任务从映射器获取文件并将它们合并在一起并保存排序顺序。因此如果不同的映射器使用相同的键生成记录则它们将在合并的缩减器输入中相邻。
使用一个键和一个迭代器调用reducer迭代器使用相同的键在某些情况下可能不是全部适合内存逐步扫描所有记录。 Reducer可以使用任意逻辑来处理这些记录并且可以生成任意数量的输出记录。这些输出记录写入到分布式文件系统上的文件通常是运行reducer的机器的本地磁盘上的一个副本其他机器上的副本
MapReduce工作流程
单个MapReduce作业可以解决的问题范围有限。请参阅日志分析示例一个MapReduce作业可以确定每个URL的页面浏览次数但不是最常用的URL因为这需要第二轮排序。
因此将MapReduce作业链接到工作流中是非常常见的例如一个作业的输出成为下一个作业的输入。 Hadoop Map-Reduce框架对工作流程没有特别的支持所以这个链接是通过目录名隐含完成的第一个作业必须被配置为将其输出写入HDFS中的指定目录第二个作业必须是配置为读取与其输入相同的目录名称。从MapReduce框架的角度来看他们是两个独立的工作。
因此被链接的MapReduce作业不如Unix命令的流水线它直接将一个进程的输出作为输入传递给另一个进程只使用一个小的内存缓冲区更像是一系列命令其中每个命令的输出写入临时文件下一个命令从临时文件中读取。这种设计有利有弊我们将在第419页“中间状态的物化”中讨论。
当作业成功完成时批处理作业的输出仅被视为有效MapReduce丢弃失败作业的部分输出。因此工作流程中的一项工作只有在先前的工作 - 即生产其投入方向的工作 - 成功完成时才能开始。处理这些作业之间的依赖关系执行为Hadoop开发了各种工作流调度器包括OozieAzkabanLuigiAirflow和Pinball [28]。
这些调度程序还具有管理功能,在维护大量批处理作业时非常有用。在构建推荐系统[29]时由50到100个MapReduce作业组成的工作流是常见的而在大型组织中许多不同的团队可能运行不同的作业来读取彼此的输出。工具支持对于管理这样复杂的数据流非常重要。
Hadoop的各种高级工具如Pig [30]Hive [31]Cascading [32]Crunch [33]和FlumeJava [34]也设置了多个MapReduce阶段的工作流程 。
2018-02-08 14:07:06 +08:00
### Reduce端连接与分组
2018-02-12 17:15:40 +08:00
我们在第2章中讨论了数据模型和查询语言的联接但是我们还没有深入探讨联接是如何实现的。现在是我们再次拿起那个线程的时候了。
在许多数据集中通常一条记录与另一条记录有关联关系模型中的外键文档模型中的文档引用或图模型中的边。只要有一些代码需要访问该关联两边的记录包含引用的记录和被引用的记录连接就是必需的。正如第2章所讨论的非规范化可以减少对连接的需求但通常不会将其完全移除[^v]。
在数据库中如果执行只涉及少量记录的查询数据库通常会使用索引来快速定位感兴趣的记录请参阅第3章。如果查询涉及连接则可能需要多个索引查找。然而MapReduce没有索引的概念 - 至少不是通常意义上的。
当MapReduce作业被赋予一组文件作为输入时它读取所有这些文件的全部内容;一个数据库会调用这个操作一个全表扫描。如果您只想读取少量的记录则与索引查找相比全表扫描的成本非常高昂。但是在分析查询中请参阅第88页上的“事务处理或分析通常需要计算大量记录的聚合。在这种情况下扫描整个输入可能是相当合理的事情特别是如果可以在多台机器上并行处理。
[^v]: 我们在本书中讨论的连接通常是等值连接即最常见的连接类型其中记录与其他记录在特定字段例如ID中具有相同的值相关联。有些数据库支持更一般的连接类型例如使用小于运算符而不是等号运算符但是我们没有空间来覆盖它们。
当我们在批处理的背景下讨论连接时,我们的意思是解决数据集内某个关联的所有事件。 例如,我们假设一个工作是同时为所有用户处理数据,而不仅仅是为一个特定用户查找数据(这可以通过索引更有效地完成)。
#### 示例:分析用户活动事件
图10-2给出了一个批处理作业中加入典型的例子。 在左侧是事件日志,描述登录用户在网站上做的事情(称为活动事件或点击流数据),右侧是用户数据库。 您可以将此示例看作是星型模式的一部分请参阅“星号和雪花分析的示意图”第93页事件日志是事实表用户数据库是其中一个尺寸。
2018-02-08 14:07:06 +08:00
![](img/fig10-2.png)
**图10-2 用户行为日志与用户档案的连接**
2018-02-12 17:15:40 +08:00
分析任务可能需要将用户活动与用户简档信息相关联:例如,如果简档包含用户的年龄或出生日期,则系统可以确定哪些年龄组最受欢迎。但是,活动事件仅包含用户标识,而不包含完整的用户配置文件信息。在每一个活动事件中嵌入这个简介信息很可能是非常浪费的。因此,活动事件需要加入用户配置文件数据库。
2018-02-08 14:07:06 +08:00
2018-02-12 17:15:40 +08:00
这个连接的最简单实现将逐个遍历活动事件并为每个遇到的用户ID查询用户数据库在远程服务器上。这是可能的但是它很可能会遭受非常差的性能处理吞吐量将受到数据库服务器的往返时间的限制本地缓存的有效性将很大程度上取决于数据的分布并行运行大量查询可能会轻易压倒数据库[35]。
2018-02-08 14:07:06 +08:00
2018-02-12 17:15:40 +08:00
为了在批处理过程中实现良好的吞吐量,计算必须(尽可能)在一台机器上进行。通过网络为您要处理的每个记录进行随机访问请求太慢。而且,查询远程数据库意味着批处理作业变得不确定,因为远程数据库中的数据可能会改变。
因此更好的方法是获取用户数据库的副本例如使用ETL进程从数据库备份中提取数据请参阅第91页上的“数据仓库”并将其放入与日志相同的分布式文件系统用户活动事件。然后您可以将用户数据库存储在HDFS中的一组文件中并将用户活动记录在另一组文件中并且可以使用MapReduce将所有相关记录集中到同一地点并高效地处理它们。
#### 排序合并连接
回想一下映射器的目的是从每个输入记录中提取一个键和值。在图10-2的情况下这个键就是用户ID一组映射器会覆盖活动事件提取用户ID作为键和活动事件作为值而另一组映射器将会检查用户数据库提取用户ID作为键和用户的出生日期作为值。这个过程如图10-3所示。
2018-02-08 14:07:06 +08:00
![](img/fig10-3.png)
**图10-3 Reduce端在user ID上进行归并排序连接如果输入数据集分片成多个文件则每个都会被多个Mapper并行处理**
2018-02-12 17:15:40 +08:00
当MapReduce框架通过key对mapper输出进行分区然后对键值对进行排序时效果是所有活动事件和用户ID相同的用户记录在reducer输入中彼此相邻。 Map-Reduce作业甚至可以安排记录进行排序使减速器始终如一
首先从用户数据库中查看记录,然后按照时间戳顺序查看活动事件 - 这种技术被称为次级排序[26]。
然后reducer可以很容易地执行实际的加入逻辑每个用户ID调用一次reducer函数并且由于二次排序第一个值应该是来自用户数据库的出生日期记录。 Reducer将出生日期存储在局部变量中然后使用相同的用户ID遍历活动事件输出已观看网址和观看者年龄对。随后的Map- Reduce作业可以计算每个URL的查看者年龄分布并按年龄组进行聚类。
由于reducer一次处理一个特定用户ID的所有记录因此只需要一次将一个用户记录保存在内存中而不需要通过网络发出任何请求。这个算法被称为排序合并连接因为映射器输出是按键排序的然后缩减器将来自连接两边的排序的记录列表合并在一起。
#### 把相关数据放在一起
在排序合并连接中映射器和排序过程确保将执行特定用户标识的连接操作的所有必需数据放在一起一次调用reducer。预先排列了所有需要的数据reducer可以是一个相当简单单线程的代码可以通过高吞吐量和低内存开销通过记录。
查看这种体系结构的一种方法是映射器将“消息”发送给reducer。当一个映射器发出一个键值对时这个键的作用就像值应该传递到的目标地址。即使密钥只是一个任意的字符串不是像IP地址和端口号那样的实际的网络地址它的行为就像一个地址所有具有相同密钥的密钥对将被传送到相同的目标a呼叫减速机
使用MapReduce编程模型将计算的物理网络通信方面从正确的计算机获取数据从应用程序逻辑中分离出来处理完数据后。这种分离与数据库的典型使用形成了鲜明的对比从数据库中获取数据的请求经常发生在应用程序代码的深处[36]。由于MapReduce能够处理所有的网络通信因此它也避免了应用程序代码担心部分故障例如另一个节点的崩溃MapReduce在不影响应用程序逻辑的情况下透明地重试失败的任务。
### GROUP BY
除了连接之外“将相关数据引入同一地点”模式的另一个常见用法是通过某个键如SQL中的GROUP BY子句对记录进行分组。所有用相同的密钥记录一个组并且下一步往往是在每个组内进行某种聚合例如
* 计算每个组中记录的数量例如在统计页面视图的示例中您将在SQL中表示为COUNT*)聚合)
* 在SQL中的一个特定字段SUMfieldname中添加值
* 根据某些排名函数选择前k个记录
使用MapReduce实现这种分组操作的最简单方法是设置映射器以便它们生成的键值对使用所需的分组键。然后分区和排序过程将所有记录与同一个缩减器中的相同键集合在一起。因此在MapReduce上实现时分组和连接看起来非常相似。
分组的另一个常见用途是整理特定用户会话的所有活动事件,以便找出用户采取的一系列操作(称为会话化[37]。例如可以使用这种分析来确定显示网站新版本的用户是否比那些显示旧版本A / B测试的用户更有可能进行购买或计算某个营销活动是值得的。
如果您有多个Web服务器处理用户请求则特定用户的活动事件很可能分散在各个不同的服务器的日志文件中。您可以通过使用会话cookie用户ID或类似的标识符作为分组键来实现会话并将特定用户的所有活动事件放在一起同时将不同用户的事件分配到不同的分区。
#### 处理倾斜
如果存在与单个密钥相关的大量数据,则“将具有相同密钥的所有记录带到相同位置”的模式被破坏。例如,在社交网络中,大多数用户可能会连接到几百人,但少数名人可能有数百万的追随者。这种不成比例的活动数据库记录被称为关键对象[38]或热键。
在单个缩减器中收集与名人相关的所有活动(例如回复他们发布的内容)可能导致严重的偏差(也称为热点) - 也就是说一个减速器必须处理比其他更多的记录参见“歪曲的工作负载和消除热点“。由于MapReduce作业只有在其所有映射器和缩减器都完成时才完成所有后续作业必须等待最慢的缩减器才能启动。
如果加入输入有热键则可以使用一些算法进行补偿。例如Pig中的偏斜连接方法首先运行一个抽样作业来确定哪些键是热的[39]。执行实际加入时映射器发送任何随机选择一个与几个减速器之一相关的热键的记录与传统的MapReduce相比它选择一个基于密钥散列确定性的减速器。对于加入的其他输入与热键相关的记录需要被复制到所有处理该密钥的缩减器[40]。
这种技术将处理热键的工作分散到多个reducer上这样可以使其更好地并行化而不必将其他join连接复制到多个reducer。 Crunch中的分片连接方法是相似的但需要显式指定热键而不是使用采样作业。这种技术也非常类似于我们在第205页的“倾斜的工作负载和减轻热点”中讨论的技术使用随机化来缓解分区数据库中的热点。
Hive的偏斜连接优化采取了另一种方法。它需要在表格元数据中明确指定热键并将与这些键相关的记录与其余文件分开存放。在该表上执行连接时它将使用地图边连接请参阅下一节获取热键。
使用热键对记录进行分组并汇总记录时可以分两个阶段进行分组。第一个MapReduce阶段将记录发送到随机缩减器以便每个缩减器对热键的记录子集执行分组并为每个键输出更紧凑的聚合值。第二个Map-Reduce作业然后将来自所有第一阶段减速器的值合并为每个键的单个值。
### Map端连接
上一节描述的连接算法在reducer中执行实际的连接逻辑因此被称为reduce-side连接。映射器扮演着输入数据的角色从每个输入记录中提取键和值将键值对分配给reducer分区并按键排序。
减少方法的优点是不需要对输入数据做任何假设无论其属性和结构如何映射器都可以准备数据以准备加入。然而不利的一面是所有这些排序复制到缩减器以及合并减速器输入可能是非常昂贵的。取决于可用的内存缓冲区当数据通过MapReduce [37]阶段时,数据可能被写入磁盘几次。
另一方面如果您可以对输入数据进行某些假设则可以通过使用所谓的map端连接来加快连接速度。这种方法使用了一个缩减的MapReduce作业其中没有减速器也没有排序。相反每个映射器只需从分布式文件系统读取一个输入文件块然后将一个输出文件写入文件系统即可。
#### 广播散列连接
执行地图边连接最简单的方法适用于大数据集与小数据集连接的情况。特别是,小数据集需要足够小,以便可以将其全部加载到每个映射器的内存中。
例如假设在图10-2的情况下用户数据库足够小以适应内存。在这种情况下当映射器启动时它可以首先将用户数据库从分布式文件系统读取到内存中的哈希表中。完成此操作后映射程序可以扫描用户活动事件并简单地查找散列表中每个事件的用户标识[^vi]。
[^vi]: 这个例子假定散列表中的每个键只有一个条目这对用户数据库用户ID唯一标识一个用户可能是正确的。通常哈希表可能需要包含具有相同键的多个条目并且连接运算符将输出关键字的所有匹配。
仍然可以有几个映射任务一个用于连接的大输入的每个文件块在图10-2的例子中活动事件是大输入。这些映射器中的每一个都将小输入全部加载到内存中。
这种简单而有效的算法被称为广播散列连接广播词反映了这样一个事实即大输入的分区的每个映射器都读取整个小输入所以小输入有效地“广播”到大的输入单词hash反映了它使用一个哈希表。 Pig名为“replicated join”Hive“MapJoin”Cascading和Crunch支持此连接方法。它也用于数据仓库查询引擎如Impala [41]。
而不是将小连接输入加载到内存散列表中,另一种方法是将小连接输入存储在本地磁盘上的只读索引中[42]。该索引中经常使用的部分将保留在操作系统的页面缓存中,因此这种方法可以提供与内存中哈希表几乎一样快的随机访问查找,但实际上并不需要数据集适合内存。
#### 分区散列连接
如果以相同方式对映射端连接的输入进行分区则散列连接方法可以独立应用于每个分区。在图10-2的情况下您可以根据用户标识的最后一位十进制数字来安排活动事件和用户数据库的每一个因此每边有10个分区。例如映射器3首先将所有具有以3结尾的ID的用户加载到散列表中然后扫描ID为3的每个用户的所有活动事件。
如果分区正确完成,您可以确定所有您可能要加入的记录都位于相同编号的分区中,因此每个映射器只能从每个输入数据集中读取一个分区就足够了。这具有的优点是每个映射器都可以将较少量的数据加载到其哈希表中。
这种方法只适用于两个连接的输入具有相同数量的分区记录根据相同的密钥和相同的散列函数分配给分区。如果输入是由之前执行过这个分组的MapReduce作业生成的那么这可能是一个合理的假设。
分区散列连接在Hive [37]中称为bucketed映射连接。地图边合并连接
如果输入数据集不仅以相同的方式进行分区而且还基于相同的键进行排序则应用另一种地图端联接的变体。在这种情况下输入是否足够小以适应内存并不重要因为映射器可以执行通常由reducer执行的相同合并操作按递增键递增读取两个输入文件以及匹配相同的密钥记录。
如果地图边合并连接是可能的则可能意味着先前的MapReduce作业首先将输入数据集引入到这个分区和排序的表单中。原则上这个加入可以在之前工作的缩减阶段进行。但是在单独的仅用于地图的作业中执行合并连接仍然是适当的例如除了此特定连接之外还需要分区和排序数据集以用于其他目的。
#### MapReduce与Map端连接的工作流程
当下游作业使用MapReduce连接的输出时map-side或reduce-side连接的选择会影响输出的结构。 reduce-side连接的输出按连接键进行分区和排序而map-side连接的输出按照与大输入相同的方式进行分区和排序因为对每个文件块启动一个map任务无论是使用分区连接还是广播连接连接的大输入
如前所述,地图边连接也对输入数据集的大小,排序和分区做出了更多的假设。在优化连接策略时,了解分布式文件系统中数据集的物理布局变得非常重要:仅仅知道编码格式和数据存储目录的名称是不够的;您还必须知道数据分区和排序的分区数量和密钥。
在Hadoop生态系统中这种关于数据集分区的元数据经常在HCatalog和Hive Metastore中维护[37]。
2018-02-08 14:07:06 +08:00
### 工作流的输出
2018-02-12 17:15:40 +08:00
我们已经谈了很多关于实现MapReduce工作流程的各种算法但是我们忽略了一个重要的问题一旦完成所有处理的结果是什么我们为什么要把所有这些工作放在首位
在数据库查询的情况下我们根据分析目的来区分事务处理OLTP目的请参阅第90页上的“事务处理或分析。我们看到OLTP查询通常使用索引按键查找少量记录以便将其呈现给用户例如在网页上。另一方面分析查询通常会扫描大量记录执行分组和汇总输出通常具有报告的形式显示某个指标随时间变化的图表或前10个项目根据一些排名或一些数量分解成子类别。这种报告的消费者通常是需要做出商业决策的分析师或经理。
批处理在哪里适合这不是交易处理也不是分析。与分析更接近因为批处理过程通常扫描输入数据集的大部分。但是MapReduce作业的工作流程与用于分析目的的SQL查询不同请参阅第418页的“比较Hadoop与分布式数据库”。批处理过程的输出通常不是报告而是一些其他类型的结构。
#### 建立搜索索引
Google最初使用的MapReduce是为其搜索引擎建立索引这个索引是作为5到10个MapReduce作业的工作流实现的[1]。虽然Google为了这个目的后来不再使用MapReduce [43]但是如果从建立搜索索引的角度来看它可以帮助理解MapReduce。 即使在今天Hadoop MapReduce仍然是构建Lucene / Solr索引的好方法。
我们在第88页的“全文搜索和模糊索引”中简要地看到了Lucene这样的全文搜索索引是如何工作的它是一个文件术语字典您可以在其中高效地查找特定关键字并找到包含该关键字的所有文档ID列表发布列表。这是一个非常简单的搜索索引视图 - 实际上,它需要各种附加数据,以便根据相关性对搜索结果进行排名,纠正拼写错误,解析同义词等等,但这一原则是成立的。
如果需要对一组固定文档执行全文搜索则批处理是构建索引的一种非常有效的方法映射器根据需要对文档集进行分区每个reducer构建其分区的索引并将索引文件写入分布式文件系统。构建这样的文档分区索引请参阅“分区和二级索引”第184页并行处理非常好。
由于按关键字查询搜索索引是只读操作,因此这些索引文件一旦创建就是不可变的。
如果索引的文档集合发生更改,则可以选择定期重新运行整个索引工作流程,并在完成后用新的索引文件批量替换以前的索引文件。如果只有少量的文档发生了变化,这种方法可能会带来很高的计算成本,但是它的优点是索引过程很容易推理:文档,索引。
或者可以逐渐建立索引。如第3章所述如果要添加删除或更新索引中的文档Lucene会写出新的段文件并异步合并和压缩背景中的段文件。我们将在第11章中看到更多这样的增量处理。
#### 键值存储作为批处理输出
搜索索引只是批处理工作流程可能输出的一个示例。批量处理的另一个常见用途是构建机器学习系统,如分类器(例如,垃圾邮件过滤器,异常检测,图像识别)和推荐系统(例如,您可能认识的人,您可能感兴趣的产品或相关搜索) ])。
这些批处理作业的输出通常是某种数据库例如可以通过用户ID查询以获取该用户的建议朋友的数据库或者可以通过产品ID查询的数据库以获取相关产品[45]。
这些数据库需要从处理用户请求的Web应用程序中查询这些请求通常与Hadoop基础架构分离。那么批处理过程的输出如何返回到Web应用程序可以查询的数据库
最明显的选择可能是直接在映射器或简化器中使用客户端库作为您最喜欢的数据库并从批处理作业直接写入数据库服务器一次写入一条记录。这将起作用假设您的防火墙规则允许从您的Hadoop环境直接访问您的生产数据库但由于以下几个原因这是一个坏主意
* 正如前面讨论的连接一样,为每个记录提出一个网络请求比批处理任务的正常吞吐量要慢几个数量级。即使客户端库支持批处理,性能也可能很差。
* MapReduce作业经常并行运行许多任务。如果所有映射器或简化器都同时写入相同的输出数据库并且批处理过程期望的速率那么该数据库可能很容易被压倒并且其查询性能可能受到影响。这可能会导致系统其他部分的操作问题[35]。
* 通常情况下MapReduce为作业输出提供了一个干净的“全有或全无”的保证如果作业成功则结果就是只执行一次任务的输出即使某些任务失败并且必须重试。如果整个作业失败则不会生成输出。然而从作业内部写入外部系统会产生外部可见的副作用这种副作用是不能被隐藏的。因此您不得不担心部分完成的作业对其他系统可见的结果以及Hadoop任务尝试和推测性执行的复杂性。
更好的解决方案是在批处理作业中创建一个全新的数据库并将其作为文件写入分布式文件系统中作业的输出目录就像上一节的搜索索引一样。这些数据文件一旦写入就是不可变的可以批量加载到处理只读查询的服务器中。各种键值存储支持在MapReduce作业中构建数据库文件包括Voldemort [46]Terrapin [47]ElephantDB [48]和HBase批量加载[49]。
构建这些数据库文件是MapReduce的一个很好的使用方法使用映射器提取一个键然后使用该键进行排序已经成为构建索引所需的大量工作。由于大多数这些键值存储是只读的文件只能由批处理作业一次写入而且是不可变的所以数据结构非常简单。例如它们不需要WAL请参阅第82页的「使B树可靠」
将数据加载到Voldemort时服务器将继续向旧数据文件提供请求同时将新数据文件从分布式文件系统复制到服务器的本地磁盘。一旦复制完成服务器会自动切换到查询新文件。如果在这个过程中出现任何问题它可以很容易地再次切换回旧的文件因为它们仍然存在并且是不变的[46]。
#### 批量过程输出的哲学
本章前面讨论过的Unix哲学第394页的“Unix哲学”鼓励通过对数据流的非常明确的实验来进行实验程序读取输入并写入输出。在这个过程中输入保持不变任何以前的输出都被新输出完全替换并且没有其他副作用。这意味着您可以随心所欲地重新运行一个命令调整或调试它而不会扰乱系统的状态。
MapReduce作业的输出处理遵循相同的原理。通过将输入视为不可变且避免副作用如写入外部数据库批处理作业不仅实现了良好的性能而且更容易维护
* 如果在代码中引入了一个错误,并且输出错误或损坏了,则可以简单地回滚到代码的先前版本,然后重新运行该作业,输出将再次正确。或者,甚至更简单,您可以将旧的输出保存在不同的目录中,然后切换回原来的目录。具有读写事务的数据库没有这个属性:如果你部署了错误的代码,将错误的数据写入数据库,那么回滚代码将无法修复数据库中的数据。 (能够从错误代码中恢复的思想被称为人类容错[50]。)
* 由于易于回滚,功能开发可以比错误意味着不可挽回的损害的环境更快地进行。这种使不可逆性最小化的原则有利于敏捷软件的开发[51]。
* 如果映射或减少任务失败MapReduce框架将自动重新调度并在同一个输入上再次运行它。如果失败是由于代码中的一个错误造成的那么它会一直崩溃并最终导致作业在几次尝试之后失败。但是如果故障是由于暂时的问题引起的那么故障是可以容忍的。这种自动重试只是安全的因为输入是不可变的而失败任务的输出被MapReduce框架丢弃。
* 同一组文件可用作各种不同作业的输入,其中包括计算度量标准的计算作业,并评估作业的输出是否具有预期的特性(例如,将其与前一次运行的输出进行比较并测量差异) 。
* 与Unix工具类似MapReduce作业将逻辑与布线配置输入和输出目录分开这就提供了关注点的分离并且可以重用代码一个团队可以专注于实现一件好事的工作其他团队可以决定何时何地运行这项工作。
在这些领域对Unix运行良好的设计原则似乎也适用于Hadoop但Unix和Hadoop在某些方面也有所不同。例如因为大多数Unix工具都假定没有类型的文本文件所以他们必须做大量的输入解析本章开头的日志分析示例使用{print $ 7}来提取URL。在Hadoop上通过使用更多结构化的文件格式可以消除一些低价值的语法转换Avro请参阅第122页上的“Avro”和Parquet请参阅第95页上的“面向列的存储”经常使用因为它们提供高效的基于模式的编码并允许随着时间的推移模式的演变见第4章
### 比较Hadoop和分布式数据库
正如我们所看到的Hadoop有点像Unix的分布式版本其中HDFS是文件系统而MapReduce是Unix进程的古怪实现这恰好总是在映射阶段和缩小阶段之间运行排序实用程序。我们看到了如何在这些基元之上实现各种连接和分组操作。
当MapReduce论文[1]发表时它在某种意义上说并不新鲜。我们在前几节中讨论的所有处理和并行连接算法已经在十多年前的所谓的大规模并行处理MPP数据库中实现了[3,40]。例如Gamma数据库机器Teradata和Tandem NonStop SQL是这方面的先驱[52]。
最大的区别是MPP数据库集中于在一组机器上并行执行分析SQL查询而MapReduce和分布式文件系统[19]的组合则更像是一个可以运行任意程序的通用操作系统。
#### 存储的多样性
数据库要求您根据特定的模型(例如关系或文档)来构造数据,而分布式文件系统中的文件只是字节序列,可以使用任何数据模型和编码来编写。它们可能是数据库记录的集合,但同样可以是文本,图像,视频,传感器读数,稀疏矩阵,特征向量,基因组序列或任何其他类型的数据。
说白了Hadoop开放了将数据不加区分地转储到HDFS的可能性之后才想出如何进一步处理它[53]。相比之下在将数据导入数据库专有存储格式之前MPP数据库通常需要对数据和查询模式进行仔细的前期建模。
从纯粹的角度来看,这种仔细的建模和导入似乎是可取的,因为这意味着数据库的用户有更好的质量数据来处理。然而,在实践中,似乎只是简单地使数据可用 - 即使它是一个古怪的,难以使用的原始格式 - 通常比尝试决定理想的数据模型更有价值[54 ]。
这个想法与数据仓库类似请参阅第91页上的“数据仓库”将大型组织的各个部分的数据集中在一起是很有价值的因为它可以跨以前不同的数据集进行联接。 MPP数据库所要求的谨慎的模式设计减慢了集中式数据收集速度;以原始形式收集数据,以后担心模式设计,使数据收集速度加快(有时被称为“数据湖”或“企业数据中心”[55])。
不加区别的数据倾销改变了解释数据的负担:不是强迫数据集的生产者将其转化为标准化的格式,而是数据的解释成为消费者的问题(模式在读方法[56];请参阅第39页上的“文档模型中的模式灵活性”。如果生产者和消费者是不同优先级的不同团队这可能是一个优势。甚至可能不存在一个理想的数据模型而是对适合不同目的的数据有不同的看法。以原始形式简单地转储数据可以进行多次这样的转换。这种方法被称为寿司原则“原始数据更好”[57]。
因此Hadoop经常被用于实现ETL过程请参阅“数据仓库”第91页事务处理系统中的数据以某种原始形式转储到分布式文件系统中然后编写MapReduce作业来清理数据将其转换为关系表单并将其导入MPP数据仓库以进行分析。数据建模仍然在发生但它是在一个单独的步骤中从数据收集中分离出来的。这种解耦是可能的因为分布式文件系统支持以任何格式编码的数据。
#### 加工模型的多样性
MPP数据库是单一的紧密集成的软件负责磁盘上的存储布局查询计划调度和执行。由于这些组件都可以针对数据库的特定需求进行调整和优化因此整个系统可以在其设计的查询类型上取得非常好的性能。而且SQL查询语言允许表达式查询和重要语义而无需编写代码使业务分析师例如Tableau使用的图形工具可访问该语言。
另一方面并非所有类型的处理都可以合理地表达为SQL查询。例如如果要构建机器学习和推荐系统或者使用相关性排名模型的全文搜索索引或者执行图像分析则很可能需要更一般的数据处理模型。这些类型的处理通常对特定的应用程序非常具体例如机器学习的特征工程机器翻译的自然语言模型欺诈预测的风险评估函数因此它们不可避免地需要编写代码而不仅仅是查询。
MapReduce使工程师能够轻松地在大型数据集上运行自己的代码。如果你有HDFS和MapReduce那么你可以在它上面建立一个SQL查询执行引擎事实上这正是Hive项目所做的[31]。但是您也可以编写许多其他形式的批处理这些批处理不适合用SQL查询表示。
随后人们发现MapReduce对于某些类型的处理来说太过于限制执行得太差因此其他各种处理模型都是在Hadoop之上开发的我们将在第419页的“Beyond MapReduce”中看到其中的一些。有两种处理模型SQL和MapReduce还不够需要更多不同的模型而且由于Hadoop平台的开放性实施一整套方法是可行的而这在整体MPP数据库的范围内是不可能的[58]。
至关重要的是这些不同的处理模型都可以在一个共享的机器上运行所有这些机器都可以访问分布式文件系统上的相同文件。在Hadoop方法中不需要将数据导入到几个不同的专用系统中进行不同类型的处理系统足够灵活可以支持同一个群集内不同的工作负载。不需要移动数据使得从数据中获得价值变得容易得多并且使用新的处理模型更容易进行实验。
Hadoop生态系统包括随机访问的OLTP数据库如HBase请参阅第70页的“SSTables和LSM-Trees”和MPA样式的分析数据库如Impala [41]。 HBase和Impala都不使用MapReduce但都使用HDFS进行存储。它们是访问和处理数据的非常不同的方法但是它们可以共存并被集成到同一个系统中。
#### 为频繁的故障而设计
在比较MapReduce和MPP数据库时设计方法的另外两个不同点是处理故障和使用内存和磁盘。与在线系统相比批处理对故障不太敏感因为如果失败用户不会立即影响用户并且可以再次运行。
如果一个节点在执行查询时崩溃大多数MPP数据库会中止整个查询并让用户重新提交查询或自动重新运行它[3]。由于查询通常最多运行几秒钟或几分钟,所以这种处理错误的方法是可以接受的,因为重试的代价不是太大。 MPP数据库还倾向于在内存中保留尽可能多的数据例如使用散列连接以避免从磁盘读取的成本。
另一方面MapReduce可以容忍映射或减少任务的失败而不会影响作业的整体通过以单个任务的粒度重试工作。它也非常渴望将数据写入磁盘一方面是为了容错另一方面是假设数据集太大而不能适应内存。
MapReduce方法更适用于较大的作业处理如此之多的数据并运行很长时间的作业以至于在此过程中可能至少遇到一个任务故障。在这种情况下由于单个任务失败而重新运行整个工作将是浪费的。即使以单个任务的粒度进行恢复引入了使得无故障处理更慢的开销但如果任务失败率足够高仍然可以进行合理的权衡。
但是这些假设有多现实呢?在大多数集群中,机器故障确实发生,但是它们不是很频繁 - 可能很少,大多数工作都不会经验,因为机器故障。为了容错,真的值得引起重大的开销吗?
要了解MapReduce节省使用内存和任务级恢复的原因查看最初设计MapReduce的环境是很有帮助的。 Google拥有混合使用的数据中心在线生产服务和离线批处理作业在同一台机器上运行。每个任务都有一个使用容器执行的资源分配CPU核心RAM磁盘空间等。每个任务也具有优先级如果优先级较高的任务需要更多的资源则可以终止抢占同一台机器上较低优先级的任务以释放资源。优先级还决定了计算资源的定价团队必须为他们使用的资源付费而优先级更高的流程花费更多[59]。
这种架构允许非生产低优先级计算资源被过度使用因为系统知道如果必要的话它可以回收资源。与分离生产和非生产任务的系统相比过度使用资源可以更好地利用机器和提高效率。但是由于MapReduce作业以低优先级运行因此它们随时都有被抢占的风险因为优先级较高的进程需要其资源。批量工作有效地“拿起桌子下面的碎片”利用高优先级进程已经采取的任何计算资源。
在谷歌运行一个小时的MapReduce任务有大约5被终止的风险为更高优先级的进程腾出空间。由于硬件问题机器重新启动或其他原因这个速率比故障率高出一个数量级[59]。按照这种抢先率如果一个作业有100个任务每个任务运行10分钟那么至少有一个任务在完成之前将被终止的风险大于50
这就是为什么MapReduce能够容忍频繁意外的任务终止的原因这不是因为硬件特别不可靠这是因为任意终止进程的自由可以在计算集群中更好地利用资源。
在开源的集群调度器中,抢占的使用较少。 YARN的CapacityScheduler支持抢占以平衡不同队列的资源分配[58]但在编写本文时YARNMesos或Kubernetes不支持通用优先级抢占[60]。在任务不经常被终止的环境中MapReduce的设计决策没有多少意义。在下一节中我们将看看MapReduce的一些替代方案这些替代方案做出了不同的设计决定。
2018-02-08 14:07:06 +08:00
## 后MapReduce时代
2018-02-12 17:15:40 +08:00
虽然MapReduce在二十世纪二十年代后期变得非常流行并受到大量的炒作但它只是分布式系统的许多可能的编程模型之一。根据数据量数据结构和处理类型其他工具可能更适合表达计算。
尽管如此我们在讨论MapReduce的这一章花了很多时间因为它是一个有用的学习工具因为它是分布式文件系统的一个相当清晰和简单的抽象。也就是说能够理解它在做什么而不是在易于使用的意义上是简单的。恰恰相反使用原始的MapReduce API来实现复杂的处理工作实际上是非常困难和费力的 - 例如,您需要从头开始实现任何连接算法[37]。
针对直接使用MapReduce的困难在MapReduce上创建了各种更高级的编程模型PigHiveCascadingCrunch作为抽象。如果您了解MapReduce的工作原理那么它们相当容易学习而且它们的高级构造使许多常见的批处理任务更容易实现。
但是MapReduce执行模型本身也存在一些问题这些问题并没有通过增加另一个抽象层次来解决而且在某些类型的处理中表现得很差。一方面MapReduce非常强大您可以使用它来处理频繁任务终止的不可靠多租户系统上几乎任意大量的数据并且仍然可以完成工作虽然速度很慢。另一方面对于某些类型的处理来说其他工具有时也会更快。
在本章的其余部分中,我们将介绍一些批处理方法。在第十一章我们将转向流处理,这可以看作是加速批处理的另一种方法。
2018-02-08 14:07:06 +08:00
### 内部状态表示
2018-02-12 17:15:40 +08:00
如前所述每个MapReduce作业都独立于其他任何作业。作业与世界其他地方的主要联系点是分布式文件系统上的输入和输出目录。如果希望一个作业的输出成为第二个作业的输入则需要将第二个作业的输入目录配置为与第一个作业的输出目录相同并且外部工作流调度程序必须仅在第一份工作已经完成。
如果第一个作业的输出是要在组织内广泛发布的数据集则此设置是合理的。在这种情况下您需要能够通过名称来引用它并将其用作多个不同作业包括由其他团队开发的作业的输入。将数据发布到分布式文件系统中的众所周知的位置允许松耦合这样作业就不需要知道是谁在输入输出或消耗其输出请参阅“分离逻辑和布线”在本页395
但是在很多情况下您知道一个工作的输出只能用作另一个工作的输入这个工作由同一个团队维护。在这种情况下分布式文件系统上的文件只是简单的中间状态一种将数据从一个作业传递到下一个作业的方式。在用于构建由50或100个MapReduce作业[29]组成的推荐系统的复杂工作流程中,存在很多这样的中间状态。
将这个中间状态写入文件的过程称为物化。 我们在第101页的“聚合数据立方体和物化视图”中已经在物化视图的背景下遇到了这个术语。它意味着要急于计算某个操作的结果并写出来而不是计算需要时按要求。
相反本章开头的日志分析示例使用Unix管道将一个命令的输出与另一个命令的输出连接起来。管道并没有完全实现中间状态而是只使用一个小的内存缓冲区将输出逐渐流向输入。
MapReduce的完全实现中间状态的方法与Unix管道相比存在不足
* MapReduce作业只有在前面的作业生成其输入中的所有任务都完成时才能启动而由Unix管道连接的进程同时启动输出一旦生成就会被使用。不同机器上的偏差或不同的负荷意味着一份工作往往会有一些比其他人更快完成的离散任务。必须等到所有前面的工作完成才能减慢整个工作流程的执行。
* 映射器通常是多余的它们只读取刚刚由reducer写入的相同文件并为下一个分区和排序阶段做好准备。在许多情况下映射器代码可能是以前的reducer的一部分如果reducer输出被分区和排序的方式与mapper输出相同那么reducers可以直接链接在一起而不与mapper阶段交错。
* 将中间状态存储在分布式文件系统中意味着这些文件被复制到多个节点,这对于这样的临时数据通常是过度的。
#### 数据流引擎
了解决MapReduce的这些问题开发了几种用于分布式批量计算的新的执行引擎其中最着名的是Spark [61,62]Tez [63,64]和Flink [65,66]。他们设计的方式有很多不同之处,但他们有一个共同点:他们把整个工作流作为一项工作来处理,而不是把它分解成独立的子作业。
由于它们通过几个处理阶段明确地建模数据流所以这些系统被称为数据流引擎。像MapReduce一样它们通过反复调用用户定义的函数来在单个线程上一次处理一条记录。他们通过对输入进行分区来并行工作并将一个功能的输出复制到网络上成为另一个功能的输入。
与MapReduce不同这些功能不需要交替映射和缩减的严格角色而是可以以更灵活的方式进行组合。我们称之为这些函数操作符数据流引擎提供了几个不同的选项来连接一个操作符的输出到另一个的输入
* 一个选项是通过键对记录进行重新分区和排序就像在MapReduce的混洗阶段一样请参阅“分布式执行MapReduce”。此功能可以像在MapReduce中一样启用排序合并连接和分组。
* 另一种可能是采取几个输入,并以相同的方式进行分区,但跳过排序。这节省了分区散列连接的工作,其中记录的分区是重要的,但顺序是不相关的,因为构建散列表随机化了顺序。
* 对于广播散列连接,可以将一个运算符的相同输出发送到连接运算符的所有分区。
这种处理引擎的风格基于像Dryad [67]和Nephele [68]这样的研究系统与MapReduce模型相比它提供了几个优点
* 排序等昂贵的工作只需要在实际需要的地方执行而不是在每个Map和Reduce阶段之间默认发生。
* 没有不必要的地图任务因为映射器所做的工作通常可以合并到前面的reduce操作器中因为映射器不会更改数据集的分区
* 由于工作流程中的所有连接和数据依赖性都是明确声明的,因此调度程序会概述哪些数据是必需的,因此可以进行本地优化。例如,它可以尝试将占用某些数据的任务放在与生成它的任务相同的机器上,以便可以通过共享内存缓冲区交换数据,而不必通过网络复制数据。
* 通常将操作员之间的中间状态保存在内存中或写入本地磁盘就足够了这比写入HDFS需要更少的I / O必须将其复制到多个计算机并写入到每个代理的磁盘上。 MapReduce已经将这种优化用于映射器的输出但是数据流引擎将该思想推广到了所有的中间状态。
* 操作员可以在输入准备就绪后立即开始执行;在下一个开始之前不需要等待整个前一阶段的完成。
* 与MapReduce为每个任务启动一个新的JVM相比现有的Java虚拟机JVM进程可以重用来运行新操作从而减少启动开销。
您可以使用数据流引擎来执行与MapReduce工作流相同的计算并且由于此处所述的优化通常执行速度会明显更快。既然操作符是map和reduce的泛化相同的处理代码可以在任一执行引擎上运行PigHive或Cascading中实现的工作流可以通过简单的配置更改从MapReduce切换到Tez或Spark而无需修改代码[64]。
Tez是一个相当薄的库它依赖于YARN shuffle服务来实现节点间数据的实际复制[58]而Spark和Flink则是包含自己的网络通信层调度器和面向用户的API的大型框架。我们将在短期内讨论这些高级API。
#### 容错
完全实现中间状态到分布式文件系统的一个优点是它是持久的这使得MapReduce中的容错相当容易如果一个任务失败它可以在另一台机器上重新启动并从文件系统重新读取相同的输入。
SparkFlink和Tez避免将中间状态写入HDFS因此他们采取了不同的方法来容忍错误如果一台机器发生故障并且该机器上的中间状态丢失则会从其他仍然可用的数据重新计算在可能的情况下是在先的中间阶段或者是通常在HDFS上的原始输入数据
为了实现这个重新计算,框架必须跟踪一个给定的数据是如何计算的 - 使用哪个输入分区,以及哪个操作符被应用到它。 Spark使用弹性分布式数据集RDD抽象来追踪数据的祖先[61]而Flink检查点操作符状态允许其恢复运行在执行过程中遇到错误的操作符[66]。
在重新计算数据时,重要的是要知道计算是否是确定性的:也就是说,给定相同的输入数据,操作员是否始终生成相同的输出?如果一些丢失的数据已经发送给下游运营商,这个问题就很重要。如果运营商重新启动,重新计算的数据与原有的丢失数据不一致,下游运营商很难解决新旧数据之间的矛盾。对于不确定性运营商来说,解决方案通常是杀死下游运营商,然后再运行新数据。
为了避免这种级联故障,最好让操作员具有确定性。但是请注意,非确定性行为很容易发生意外蔓延:例如,许多编程语言在迭代哈希表的元素时不能保证任何特定顺序,许多概率和统计算法明确依赖于使用随机数,以及任何用途系统时钟或外部数据源是不确定的。为了可靠地从故障中恢复,例如通过使用固定种子产生伪随机数,需要消除这种不确定性的原因。
通过重新计算数据从故障中恢复并不总是正确的答案:如果中间数据比源数据小得多,或者如果计算量非常大,那么将中间数据转化为文件可能比将其重新计算更便宜。
#### 关于物化的讨论
回到Unix的类比我们看到MapReduce就像是将每个命令的输出写入临时文件而数据流引擎看起来更像是Unix管道。尤其是Flink是围绕流水线执行的思想而建立的也就是说将运算符的输出递增地传递给其他操作符并且在开始处理之前不等待输入完成。
排序操作不可避免地需要消耗其整个输入,然后才能生成任何输出,因为最后一个输入记录可能是具有最低密钥的输入记录,因此需要作为第一个输出记录。任何需要分类的操作员都需要至少暂时地累积状态。但是工作流程的许多其他部分可以以流水线方式执行。
当作业完成时,它的输出需要持续到某个地方,以便用户可以找到并使用它 - 很可能它会再次写入分布式文件系统。因此在使用数据流引擎时HDFS上的物化数据集通常仍是作业的输入和最终输出。和MapReduce一样输入是不可变的输出被完全替换。对MapReduce的改进是您可以节省自己将所有中间状态写入文件系统。
2018-02-08 14:07:06 +08:00
### 图与迭代处理
2018-02-12 17:15:40 +08:00
在第49页上的“类似图形的数据模型”中我们讨论了使用图形来建模数据并使用图形查询语言来遍历图形中的边和顶点。第2章的讨论集中在OLTP风格的使用上快速执行查询来查找少量符合特定条件的顶点。
在批处理环境中查看图表也很有趣其目标是在整个图表上执行某种离线处理或分析。这种需求经常出现在机器学习应用程序如推荐引擎或排序系统中。例如最着名的图形分析算法之一是PageRank [69],它试图根据其他网页链接的网页来估计网页的流行度。它被用作确定网络搜索引擎呈现结果的顺序的公式的一部分。
> 像SparkFlink和Tez这样的数据流引擎参见第419页“中间状态的实现化”通常将操作符作为有向无环图DAG排列在作业中。这与图形处理不一样在数据流引擎中从一个操作符到另一个操作符的数据流被构造成一个图而数据本身通常由关系式元组构成。在图形处理中数据本身具有图形的形式。另一个不幸的命名混乱
许多图算法是通过一次遍历一个边来表示的,将一个顶点与相邻的顶点连接起来以便传播一些信息,并且重复直到满足一些条件为止 - 例如直到没有更多的边要跟随或者直到一些度量收敛。我们在图2-6中看到一个例子它通过重复地跟踪指示哪个位置在哪个其他位置这种算法被称为传递闭包的边缘列出了包含在数据库中的北美所有位置。
可以在分布式文件系统包含顶点和边的列表的文件中存储图形但是这种“重复直到完成”的想法不能用普通的MapReduce来表示因为它只执行一次数据传递。这种算法因此经常以迭代方式实现
1. 外部调度程序运行批处理来计算算法的一个步骤。
2. 当批处理过程完成时,调度器检查它是否完成(基于完成条件 - 例如,没有更多的边要跟随,或者与上次迭代相比的变化低于某个阈值)。
3. 如果尚未完成则调度程序返回到步骤1并运行另一轮批处理。
这种方法是有效的但是用MapReduce实现它往往是非常低效的因为MapReduce没有考虑算法的迭代性质它总是读取整个输入数据集并产生一个全新的输出数据集即使只有一小部分该图与上次迭代相比已经改变。
Pregel处理模型
作为批处理图形的优化计算的批量同步并行BSP模型[70]已经流行起来。其中它由Apache Giraph [37]Spark的GraphX API和Flink的Gelly API [71]实现。它也被称为Pregel模型正如Google的Pregel论文推广这种处理图的方法[72]。
回想一下在MapReduce中映射器在概念上“发送消息”给reducer的特定调用因为框架将所有的mapper输出集中在一起。 Pregel背后有一个类似的想法一个顶点可以“发送消息”到另一个顶点通常这些消息沿着图的边被发送。
在每次迭代中,为每个顶点调用一个函数,将所有发送给它的消息传递给它 - 就像调用reducer一样。与MapReduce的不同之处在于在Pregel模型中顶点从一次迭代到下一次迭代记忆它的状态所以这个函数只需要处理新的传入消息。如果在图的某个部分没有发送消息则不需要做任何工作。
这与演员模型有些相似请参阅第130页上的“分布式演员框架”除非顶点状态和顶点之间的消息具有容错性和耐久性并且通信以固定的方式进行否则将每个顶点视为主角轮次在每一次迭代中框架传递在前一次迭代中发送的所有消息。演员通常没有这样的时间保证。
#### 容错
顶点只能通过消息传递进行通信而不是直接相互查询的事实有助于提高Pregel作业的性能因为消息可以成批处理而且等待通信的次数也减少了。唯一的等待是在迭代之间由于Pregel模型保证所有在一次迭代中发送的消息都在下一次迭代中传递所以先前的迭代必须完全完成并且所有的消息必须在网络上复制然后下一个开始。
即使底层网络可能丢失重复或任意延迟消息请参阅第267页上的“不可靠网络”Pregel实施可保证在接下来的迭代中消息在其目标顶点处理一次。像MapReduce一样该框架透明地从故障中恢复以简化Pregel顶层算法的编程模型。
这种容错是通过在迭代结束时定期检查所有顶点的状态来实现的,即将其全部状态写入持久存储。如果某个节点发生故障并且其内存中状态丢失,则最简单的解决方法是将整个图计算回滚到上一个检查点,然后重新启动计算。如果算法是确定性的并且记录了消息,那么也可以选择性地只恢复丢失的分区(就像我们之前讨论过的数据流引擎)[72]。
#### 并行执行
顶点不需要知道它正在执行哪个物理机器;当它发送消息到其他顶点时它只是将它们发送到一个顶点ID。分配图的框架即确定哪个顶点运行在哪个机器上以及如何通过网络路由消息以便它们结束在正确的位置。
由于编程模型一次仅处理一个顶点有时称为“像顶点一样思考”所以框架可以以任意方式划分图形。理想情况下如果它们需要进行大量的通信那么它将被分割以使顶点在同一台机器上共置。然而寻找这样一个优化的分割在实践中是困难的图形经常被任意分配的顶点ID分割而不会尝试将相关的顶点分组在一起。
因此,图算法通常会有很多跨机器通信,而中间状态(节点之间发送的消息)往往比原始图大。通过网络发送消息的开销会显着减慢分布式图算法的速度。
出于这个原因,如果你的图可以放在一台计算机的内存中,那么单机(甚至可能是单线程)算法很可能会超越分布式批处理[73,74]。即使图形大于内存也可以放在单个计算机的磁盘上使用GraphChi等框架进行单机处理是一个可行的选择[75]。如果图形太大而不适合单个机器像Pregel这样的分布式方法是不可避免的。有效的并行化图算法是一个正在进行的领域。
2018-02-08 14:07:06 +08:00
### 高级API和语言
2018-02-12 17:15:40 +08:00
自MapReduce第一次流行以来分布式批处理的执行引擎已经成熟。到目前为止基础设施已经足够强大能够存储和处理超过10,000台机器群集上的数PB的数据。由于在这种规模下物理操作批处理过程的问题已经或多或少得到了解决所以已经转向其他领域改进编程模型提高处理效率扩大这些技术可以解决的问题集。
如前所述HivePigCascading和Crunch等高级语言和API由于手工编写MapReduce作业而变得非常流行。随着Tez的出现这些高级语言还有额外的好处可以移动到新的数据流执行引擎而无需重写作业代码。 Spark和Flink也包括他们自己的高级数据流API经常从FlumeJava中获得灵感[34]。
这些数据流API通常使用关系式构建块来表达一个计算连接数据集以获取某个字段的值;按键分组元组;过滤一些条件;并通过计数,求和或其他函数来聚合元组。在内部,这些操作是使用本章前面讨论过的各种连接和分组算法来实现的。
除了需要较少代码的明显优势之外这些高级接口还允许交互式使用在这种交互式使用中您可以将分析代码逐步编写到shell中并经常运行以观察它正在做什么。这种发展风格在探索数据集和试验处理方法时非常有用。这也让人联想到Unix哲学我们在第394页的“Unix哲学”中讨论过这个问题。
而且,这些高级接口不仅使人类使用系统的效率更高,而且提高了机器级别的工作执行效率。
#### 向声明式查询语言的转变
与拼写执行连接的代码相比,指定连接为关系运算符的优点是,框架可以分析连接输入的属性,并自动决定哪个上述连接算法最适合手头的任务。 HiveSpark和Flink都有基于代价的查询优化器可以做到这一点甚至可以改变连接顺序使中间状态的数量最小化[66,77,78,79]。
连接算法的选择可以对批处理作业的性能产生很大的影响不必理解和记住本章中讨论的各种连接算法。如果以声明的方式指定连接则这是可能的应用程序简单地说明哪些连接是必需的查询优化器决定如何最好地执行连接。我们以前在第42页的“数据的查询语言”中遇到了这个想法。
但是在其他方面MapReduce及其数据流后继与SQL的完全声明性查询模型有很大不同。 MapReduce是围绕函数回调的思想构建的对于每个记录或者一组记录调用一个用户定义的函数mapper或reducer并且该函数可以自由地调用任意代码来决定输出什么。这种方法的优点是可以绘制在现有库的大型生态系统上进行分析自然语言分析图像分析以及运行数字或统计算法等。
2018-02-08 14:07:06 +08:00
2018-02-12 17:15:40 +08:00
轻松运行任意代码的自由是从MPP数据库参见“比较Hadoop到分布式数据库”一节第414页中分离出来的MapReduce传统批处理系统。虽然数据库具有编写用户定义函数的功能但是它们通常使用起来很麻烦而且与大多数编程语言中广泛使用的程序包管理器和依赖管理系统例如Maven for Javanpm for Java-Script和Ruby的Ruby的Ruby
但是数据流引擎已经发现除了连接之外在合并更多的声明性特征方面也是有优势的。例如如果一个回调函数只包含一个简单的过滤条件或者只是从一条记录中选择了一些字段那么在调用每条记录的函数时会有相当大的CPU开销。如果以声明方式表示这样简单的过滤和映射操作那么查询优化器可以利用面向列的存储布局请参阅第95页的“面向列的存储”并从磁盘只读取所需的列。 HiveSpark DataFrames和Impala也使用向量化执行请参阅第99页的“内存带宽和向量化处理”在对CPU缓存很友好的内部循环中迭代数据并避免函数调用。
Spark生成JVM字节码[79]Impala使用LLVM为这些内部循环生成本机代码[41]。
通过将声明性方面与高级API结合起来并使查询优化器可以在执行期间利用这些优化方法批处理框架看起来更像MPP数据库并且可以实现可比较的性能。同时通过具有运行任意代码和以任意格式读取数据的可扩展性它们保持了灵活性的优势。
#### 专业化的不同领域
尽管能够运行任意代码的可扩展性是有用的但是在标准处理模式不断重复发生的情况下也有许多常见的情况所以值得重用通用构建块的实现。传统上MPP数据库满足了商业智能分析师和业务报告的需求但这只是许多使用批处理的领域之一。
另一个越来越重要的领域是统计和数值算法它们是机器学习应用如分类和推荐系统所需要的。可重复使用的实现正在出现例如Mahout在MapReduceSpark和Flink之上实现了用于机器学习的各种算法而MADlib在关系型MPP数据库Apache HAWQ中实现了类似的功能[54]。
空间算法也是有用的例如k-最近邻居[80],它在一些多维空间中搜索与给定物品接近的物品 - 这是一种类似的搜索。近似搜索对于基因组分析算法也很重要,它们需要找到相似但不相同的字符串[81]。
批处理引擎正被用于分布式执行来自日益广泛的领域的算法。随着批处理系统获得内置功能和高级声明性操作符并且随着MPP数据库变得更加可编程和灵活两者开始看起来更相似最终它们都只是存储和处理数据的系统。
2018-02-08 14:07:06 +08:00
## 本章小结
2018-02-12 17:15:40 +08:00
在本章中我们探讨了批处理的主题。我们首先查看了诸如awkgrep和sort之类的Unix工具然后我们看到了这些工具的设计理念是如何运用到MapReduce和更新的数据流引擎中的。这些设计原则中的一些是输入是不可变的输出是为了成为另一个还未知的程序的输入而复杂的问题是通过编写“做一件好事”的小工具来解决的。
在Unix世界中允许一个程序与另一个程序组合的统一界面是文件和管道;在MapReduce中该接口是一个分布式文件系统。我们看到数据流引擎添加了自己的管道式数据传输机制以避免将中间状态转化为分布式文件系统但作业的初始输入和最终输出通常仍然是HDFS。
分布式批处理框架需要解决的两个主要问题是:
***分区***
在MapReduce中映射器根据输入文件块进行分区。映射器的输出被重新分区分类并合并到可配置数量的reducer分区中。这个过程的目的是把所有的相关数据 - 例如,所有的记录都放在同一个地方。
Post-MapReduce数据流引擎尽量避免排序除非它是必需的但它们采取了大致类似的分区方法。
***容错***
MapReduce经常写入磁盘这使得从单个失败的任务中轻松地恢复而无需重新启动整个作业但在无故障的情况下减慢了执行速度。数据流引擎对中间状态执行的实现较少并且保留在内存中这意味着如果节点发生故障则需要推荐更多的数据。确定性运算符减少了需要重新计算的数据量。
我们讨论了几种MapReduce的连接算法其中大多数也是在MPP数据库和数据流引擎中使用的。他们还提供了分区算法如何工作的一个很好的例子
***排序合并连接***
每个正在连接的输入都通过一个提取连接键的映射器。通过分区排序和合并具有相同密钥的所有记录最终都会进入reducer的相同调用。这个函数可以输出连接的记录。
***广播散列连接***
两个连接输入之一是小的,所以它没有分区,它可以被完全加载到一个哈希表。因此,您可以为大连接输入的每个分区启动一个映射器,将小输入的散列表加载到每个映射器中,然后一次扫描大输入一条记录,查询每条记录的散列表。
***分区散列连接***
如果两个连接输入以相同的方式分区(使用相同的密钥,相同的散列函数和相同数量的分区),则可以独立地为每个分区使用散列表方法。
分布式批处理引擎有一个有意限制的编程模型:回调函数(比如映射器和简化器)被认为是无状态的,除了指定的输出外,没有外部可见的副作用。这个限制允许框架隐藏抽象背后的一些硬分布式系统问题:面对崩溃和网络问题,任务可以安全地重试,任何失败任务的输出都被丢弃。如果某个分区的多个任务成功,则只有其中一个实际上使其输出可见。
得益于这个框架,您在批处理作业中的代码无需担心实现容错机制:框架可以保证作业的最终输出与没有发生错误的情况相同,也许不得不重新尝试各种任务。这些可靠的语义要比在线服务处理用户请求时经常使用的要多得多,而且在处理请求的副作用时写入数据库。
批量处理工作的显着特点是它读取一些输入数据并产生一些输出数据,而不修改输入 - 换句话说,输出是从输入导出的。重要的是,输入数据是有界的:它有一个已知的,固定的大小(例如,它包含一些时间点的日志文件或数据库内容的快照)。因为它是有界的,一个工作知道什么时候它完成了整个输入的读取,所以一个工作最终完成。
在下一章中,我们将转向流处理,其中的输入是未知的 - 也就是说,你还有一份工作,但是它的输入是永无止境的数据流。在这种情况下,工作永远不会完成,因为在任何时候都可能有更多的工作进来。我们将看到流和批处理在某些方面是相
2018-02-08 14:07:06 +08:00
## 参考文献
1. Jeffrey Dean and Sanjay Ghemawat:
“[MapReduce: Simplified Data Processing on Large Clusters](http://research.google.com/archive/mapreduce.html),” at *6th USENIX Symposium on Operating System Design
and Implementation* (OSDI), December 2004.
1. Joel Spolsky:
“[The Perils of JavaSchools](http://www.joelonsoftware.com/articles/ThePerilsofJavaSchools.html),” *joelonsoftware.com*, December 25, 2005.
1. Shivnath Babu and Herodotos Herodotou:
“[Massively Parallel Databases and MapReduce Systems](http://research.microsoft.com/pubs/206464/db-mr-survey-final.pdf),” *Foundations and Trends in Databases*,
volume 5, number 1, pages 1104, November 2013.
[doi:10.1561/1900000036](http://dx.doi.org/10.1561/1900000036)
1. David J. DeWitt and Michael Stonebraker:
“[MapReduce: A Major Step Backwards](https://homes.cs.washington.edu/~billhowe/mapreduce_a_major_step_backwards.html),” originally published at *databasecolumn.vertica.com*, January 17, 2008.
1. Henry Robinson:
“[The Elephant Was a Trojan Horse: On the Death of Map-Reduce at Google](http://the-paper-trail.org/blog/the-elephant-was-a-trojan-horse-on-the-death-of-map-reduce-at-google/),”
*the-paper-trail.org*, June 25, 2014.
1. “[The Hollerith Machine](https://www.census.gov/history/www/innovations/technology/the_hollerith_tabulator.html),” United States Census Bureau, *census.gov*.
1. “[IBM 82, 83, and 84 Sorters Reference Manual](http://www.textfiles.com/bitsavers/pdf/ibm/punchedCard/Sorter/A24-1034-1_82-83-84_sorters.pdf),” Edition A24-1034-1, International Business
Machines Corporation, July 1962.
1. Adam Drake:
“[Command-Line Tools Can Be 235x Faster than Your Hadoop Cluster](http://aadrake.com/command-line-tools-can-be-235x-faster-than-your-hadoop-cluster.html),” *aadrake.com*, January 25, 2014.
1. “[GNU Coreutils 8.23 Documentation](http://www.gnu.org/software/coreutils/manual/html_node/index.html),” Free Software Foundation, Inc., 2014.
1. Martin Kleppmann:
“[Kafka, Samza, and the Unix Philosophy of Distributed Data](http://martin.kleppmann.com/2015/08/05/kafka-samza-unix-philosophy-distributed-data.html),” *martin.kleppmann.com*, August 5, 2015.
1. Doug McIlroy:
[Internal Bell Labs memo](http://cm.bell-labs.com/cm/cs/who/dmr/mdmpipe.pdf),
October 1964. Cited in: Dennis M. Richie:
“[Advice from Doug McIlroy](https://www.bell-labs.com/usr/dmr/www/mdmpipe.html),”
*cm.bell-labs.com*.
1. M. D. McIlroy, E. N. Pinson, and B. A. Tague:
“[UNIX Time-Sharing System: Foreword](https://archive.org/details/bstj57-6-1899),”
*The Bell System Technical Journal*, volume 57, number 6, pages 18991904,
July 1978.
1. Eric S. Raymond:
<a href="http://www.catb.org/~esr/writings/taoup/html/">*The Art of UNIX Programming*</a>.
Addison-Wesley, 2003. ISBN: 978-0-13-142901-7
1. Ronald Duncan:
“[Text File Formats ASCII Delimited Text Not CSV or TAB Delimited Text](https://ronaldduncan.wordpress.com/2009/10/31/text-file-formats-ascii-delimited-text-not-csv-or-tab-delimited-text/),”
*ronaldduncan.wordpress.com*, October 31, 2009.
1. Alan Kay:
“[Is 'Software Engineering' an Oxymoron?](http://tinlizzie.org/~takashi/IsSoftwareEngineeringAnOxymoron.pdf),” *tinlizzie.org*.
1. Martin Fowler:
“[InversionOfControl](http://martinfowler.com/bliki/InversionOfControl.html),”
*martinfowler.com*, June 26, 2005.
1. Daniel J. Bernstein:
“[Two File Descriptors for Sockets](http://cr.yp.to/tcpip/twofd.html),” *cr.yp.to*.
1. Rob Pike and Dennis M. Ritchie:
“[The Styx Architecture for Distributed Systems](http://doc.cat-v.org/inferno/4th_edition/styx),” *Bell Labs Technical Journal*, volume 4, number 2, pages
146152, April 1999.
1. Sanjay Ghemawat, Howard Gobioff, and Shun-Tak
Leung: “[The Google File System](http://research.google.com/archive/gfs-sosp2003.pdf),”
at *19th ACM Symposium on Operating Systems Principles* (SOSP), October 2003.
[doi:10.1145/945445.945450](http://dx.doi.org/10.1145/945445.945450)
1. Michael Ovsiannikov, Silvius Rus, Damian Reeves, et al.:
“[The Quantcast File System](http://db.disi.unitn.eu/pages/VLDBProgram/pdf/industry/p808-ovsiannikov.pdf),” *Proceedings of the VLDB Endowment*, volume 6, number 11, pages 10921101, August 2013.
[doi:10.14778/2536222.2536234](http://dx.doi.org/10.14778/2536222.2536234)
1. “[OpenStack Swift 2.6.1 Developer Documentation](http://docs.openstack.org/developer/swift/),” OpenStack Foundation, *docs.openstack.org*, March 2016.
1. Zhe Zhang, Andrew Wang, Kai Zheng, et al.:
“[Introduction to HDFS Erasure Coding in Apache Hadoop](http://blog.cloudera.com/blog/2015/09/introduction-to-hdfs-erasure-coding-in-apache-hadoop/),” *blog.cloudera.com*, September 23, 2015.
1. Peter Cnudde:
“[Hadoop Turns 10](http://yahoohadoop.tumblr.com/post/138739227316/hadoop-turns-10),”
*yahoohadoop.tumblr.com*, February 5, 2016.
1. Eric Baldeschwieler:
“[Thinking About the HDFS vs. Other Storage Technologies](http://hortonworks.com/blog/thinking-about-the-hdfs-vs-other-storage-technologies/),” *hortonworks.com*, July 25, 2012.
1. Brendan Gregg:
“[Manta: Unix Meets Map Reduce](http://dtrace.org/blogs/brendan/2013/06/25/manta-unix-meets-map-reduce/),” *dtrace.org*, June 25, 2013.
1. Tom White: *Hadoop: The Definitive Guide*,
4th edition. O'Reilly Media, 2015. ISBN: 978-1-491-90163-2
1. Jim N. Gray:
“[Distributed Computing Economics](http://arxiv.org/pdf/cs/0403019.pdf),” Microsoft
Research Tech Report MSR-TR-2003-24, March 2003.
1. Márton Trencséni:
“[Luigi vs Airflow vs Pinball](http://bytepawn.com/luigi-airflow-pinball.html),”
*bytepawn.com*, February 6, 2016.
1. Roshan Sumbaly, Jay Kreps, and Sam Shah:
“[The 'Big Data' Ecosystem at LinkedIn](http://www.slideshare.net/s_shah/the-big-data-ecosystem-at-linkedin-23512853),” at *ACM International Conference on Management of Data*
(SIGMOD), July 2013.
[doi:10.1145/2463676.2463707](http://dx.doi.org/10.1145/2463676.2463707)
1. Alan F. Gates, Olga Natkovich, Shubham Chopra, et al.:
“[Building a High-Level Dataflow System on Top of Map-Reduce: The Pig Experience](http://www.vldb.org/pvldb/2/vldb09-1074.pdf),” at *35th International Conference on Very Large Data
Bases* (VLDB), August 2009.
1. Ashish Thusoo, Joydeep Sen Sarma, Namit Jain, et al.:
“[Hive A Petabyte Scale Data Warehouse Using Hadoop](http://i.stanford.edu/~ragho/hive-icde2010.pdf),” at *26th IEEE International Conference on Data Engineering* (ICDE), March 2010.
[doi:10.1109/ICDE.2010.5447738](http://dx.doi.org/10.1109/ICDE.2010.5447738)
1. “[Cascading 3.0 User Guide](http://docs.cascading.org/cascading/3.0/userguide/),” Concurrent, Inc., *docs.cascading.org*, January 2016.
1. “[Apache Crunch User Guide](https://crunch.apache.org/user-guide.html),” Apache Software Foundation, *crunch.apache.org*.
1. Craig Chambers, Ashish Raniwala, Frances
Perry, et al.: “[FlumeJava: Easy, Efficient Data-Parallel Pipelines](https://research.google.com/pubs/archive/35650.pdf),” at *31st ACM SIGPLAN Conference on Programming Language
Design and Implementation* (PLDI), June 2010.
[doi:10.1145/1806596.1806638](http://dx.doi.org/10.1145/1806596.1806638)
1. Jay Kreps:
“[Why Local State is a Fundamental Primitive in Stream Processing](https://www.oreilly.com/ideas/why-local-state-is-a-fundamental-primitive-in-stream-processing),” *oreilly.com*, July 31, 2014.
1. Martin Kleppmann:
“[Rethinking Caching in Web Apps](http://martin.kleppmann.com/2012/10/01/rethinking-caching-in-web-apps.html),” *martin.kleppmann.com*, October 1, 2012.
1. Mark Grover, Ted Malaska, Jonathan
Seidman, and Gwen Shapira: *[Hadoop Application Architectures](http://shop.oreilly.com/product/0636920033196.do)*. O'Reilly Media, 2015. ISBN: 978-1-491-90004-8
1. Philippe Ajoux, Nathan Bronson,
Sanjeev Kumar, et al.:
“[Challenges to Adopting Stronger Consistency at Scale](https://www.usenix.org/system/files/conference/hotos15/hotos15-paper-ajoux.pdf),” at *15th USENIX Workshop on Hot Topics in
Operating Systems* (HotOS), May 2015.
1. Sriranjan Manjunath:
“[Skewed Join](https://wiki.apache.org/pig/PigSkewedJoinSpec),” *wiki.apache.org*,
2009.
1. David J. DeWitt, Jeffrey F. Naughton, Donovan A.
Schneider, and S. Seshadri: “[Practical Skew Handling in Parallel Joins](http://www.vldb.org/conf/1992/P027.PDF),” at *18th International Conference on Very Large Data Bases* (VLDB), August 1992.
1. Marcel Kornacker, Alexander Behm, Victor
Bittorf, et al.: “[Impala: A Modern, Open-Source SQL Engine for Hadoop](http://pandis.net/resources/cidr15impala.pdf),” at *7th Biennial Conference on Innovative Data Systems
Research* (CIDR), January 2015.
1. Matthieu Monsch:
“[Open-Sourcing PalDB, a Lightweight Companion for Storing Side Data](https://engineering.linkedin.com/blog/2015/10/open-sourcing-paldb--a-lightweight-companion-for-storing-side-da),” *engineering.linkedin.com*, October 26, 2015.
1. Daniel Peng and Frank Dabek:
“[Large-Scale Incremental Processing Using Distributed Transactions and Notifications](https://www.usenix.org/legacy/event/osdi10/tech/full_papers/Peng.pdf),” at *9th USENIX
conference on Operating Systems Design and Implementation* (OSDI), October 2010.
1. “["Cloudera Search User Guide,"](http://www.cloudera.com/documentation/cdh/5-1-x/Search/Cloudera-Search-User-Guide/Cloudera-Search-User-Guide.html) Cloudera, Inc., September 2015.
1. Lili Wu, Sam Shah, Sean Choi, et al.:
“[The Browsemaps: Collaborative Filtering at LinkedIn](http://ls13-www.cs.uni-dortmund.de/homepage/rsweb2014/papers/rsweb2014_submission_3.pdf),” at *6th Workshop on Recommender Systems and
the Social Web* (RSWeb), October 2014.
1. Roshan Sumbaly, Jay Kreps, Lei Gao, et al.:
“[Serving Large-Scale Batch Computed Data with Project Voldemort](http://static.usenix.org/events/fast12/tech/full_papers/Sumbaly.pdf),” at *10th USENIX Conference on File and Storage
Technologies* (FAST), February 2012.
1. Varun Sharma:
“[Open-Sourcing Terrapin: A Serving System for Batch Generated Data](https://engineering.pinterest.com/blog/open-sourcing-terrapin-serving-system-batch-generated-data-0),” *engineering.pinterest.com*, September 14, 2015.
1. Nathan Marz:
“[ElephantDB](http://www.slideshare.net/nathanmarz/elephantdb),” *slideshare.net*, May 30, 2011.
1. Jean-Daniel (JD) Cryans:
“[How-to: Use HBase Bulk Loading, and Why](http://blog.cloudera.com/blog/2013/09/how-to-use-hbase-bulk-loading-and-why/),” *blog.cloudera.com*, September 27, 2013.
1. Nathan Marz:
“[How to Beat the CAP Theorem](http://nathanmarz.com/blog/how-to-beat-the-cap-theorem.html),” *nathanmarz.com*, October 13, 2011.
1. Molly Bartlett Dishman and Martin Fowler:
“[Agile Architecture](http://conferences.oreilly.com/software-architecture/sa2015/public/schedule/detail/40388),” at *O'Reilly Software Architecture Conference*, March 2015.
1. David J. DeWitt and Jim N. Gray:
“[Parallel Database Systems: The Future of High Performance Database Systems](http://www.cs.cmu.edu/~pavlo/courses/fall2013/static/papers/dewittgray92.pdf),”
*Communications of the ACM*, volume 35, number 6, pages 8598, June 1992.
[doi:10.1145/129888.129894](http://dx.doi.org/10.1145/129888.129894)
1. Jay Kreps:
“[But the multi-tenancy thing is actually really really hard](https://twitter.com/jaykreps/status/528235702480142336),” tweetstorm, *twitter.com*, October 31, 2014.
1. Jeffrey Cohen, Brian Dolan, Mark Dunlap, et al.: “[MAD Skills: New Analysis Practices for Big Data](http://www.vldb.org/pvldb/2/vldb09-219.pdf),” *Proceedings of the VLDB Endowment*, volume 2, number
2, pages 14811492, August 2009.
[doi:10.14778/1687553.1687576](http://dx.doi.org/10.14778/1687553.1687576)
1. Ignacio
Terrizzano, Peter Schwarz, Mary Roth, and John E. Colino:
“[Data Wrangling: The Challenging Journey from the Wild to the Lake](http://cidrdb.org/cidr2015/Papers/CIDR15_Paper2.pdf),” at *7th Biennial Conference on Innovative Data Systems
Research* (CIDR), January 2015.
1. Paige Roberts:
“[To Schema on Read or to Schema on Write, That Is the Hadoop Data Lake Question](http://adaptivesystemsinc.com/blog/to-schema-on-read-or-to-schema-on-write-that-is-the-hadoop-data-lake-question/),” *adaptivesystemsinc.com*, July 2, 2015.
1. Bobby Johnson and Joseph Adler:
“[The Sushi Principle: Raw Data Is Better](https://vimeo.com/123985284),” at
*Strata+Hadoop World*, February 2015.
1. Vinod Kumar Vavilapalli, Arun C. Murthy, Chris Douglas, et al.:
“[Apache Hadoop YARN: Yet Another Resource Negotiator](http://www.socc2013.org/home/program/a5-vavilapalli.pdf),” at *4th ACM Symposium on Cloud Computing* (SoCC), October 2013.
[doi:10.1145/2523616.2523633](http://dx.doi.org/10.1145/2523616.2523633)
1. Abhishek Verma, Luis Pedrosa, Madhukar Korupolu, et al.:
“[Large-Scale Cluster Management at Google with Borg](http://research.google.com/pubs/pub43438.html),” at *10th European Conference on Computer Systems* (EuroSys), April 2015.
[doi:10.1145/2741948.2741964](http://dx.doi.org/10.1145/2741948.2741964)
1. Malte Schwarzkopf:
“[The Evolution of Cluster Scheduler Architectures](http://www.firmament.io/blog/scheduler-architectures.html),” *firmament.io*, March 9, 2016.
1. Matei Zaharia, Mosharaf Chowdhury, Tathagata Das, et al.:
“[Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing](https://www.usenix.org/system/files/conference/nsdi12/nsdi12-final138.pdf),” at *9th
USENIX Symposium on Networked Systems Design and Implementation* (NSDI), April 2012.
1. Holden Karau, Andy Konwinski, Patrick Wendell, and Matei
Zaharia: *Learning Spark*. O'Reilly Media, 2015. ISBN: 978-1-449-35904-1
1. Bikas Saha and Hitesh Shah:
“[Apache Tez: Accelerating Hadoop Query Processing](http://www.slideshare.net/Hadoop_Summit/w-1205phall1saha),” at *Hadoop Summit*, June 2014.
1. Bikas Saha, Hitesh Shah, Siddharth Seth, et al.:
“[Apache Tez: A Unifying Framework for Modeling and Building Data Processing Applications](http://home.cse.ust.hk/~weiwa/teaching/Fall15-COMP6611B/reading_list/Tez.pdf),” at *ACM
International Conference on Management of Data* (SIGMOD), June 2015.
[doi:10.1145/2723372.2742790](http://dx.doi.org/10.1145/2723372.2742790)
1. Kostas Tzoumas:
“[Apache Flink: API, Runtime, and Project Roadmap](http://www.slideshare.net/KostasTzoumas/apache-flink-api-runtime-and-project-roadmap),” *slideshare.net*, January 14, 2015.
1. Alexander Alexandrov, Rico Bergmann, Stephan Ewen, et al.:
“[The Stratosphere Platform for Big Data Analytics](https://ssc.io/pdf/2014-VLDBJ_Stratosphere_Overview.pdf),” *The VLDB Journal*, volume 23, number 6, pages 939964, May 2014.
[doi:10.1007/s00778-014-0357-y](http://dx.doi.org/10.1007/s00778-014-0357-y)
1. Michael Isard, Mihai Budiu, Yuan Yu, et al.:
“[Dryad: Distributed Data-Parallel Programs from Sequential Building Blocks](http://research.microsoft.com/en-us/projects/dryad/eurosys07.pdf),” at *European Conference on Computer
Systems* (EuroSys), March 2007.
[doi:10.1145/1272996.1273005](http://dx.doi.org/10.1145/1272996.1273005)
1. Daniel Warneke and Odej Kao:
“[Nephele: Efficient Parallel Data Processing in the Cloud](https://stratosphere2.dima.tu-berlin.de/assets/papers/Nephele_09.pdf),” at *2nd Workshop on Many-Task Computing on Grids and
Supercomputers* (MTAGS), November 2009.
[doi:10.1145/1646468.1646476](http://dx.doi.org/10.1145/1646468.1646476)
1. Lawrence Page, Sergey Brin, Rajeev
Motwani, and Terry Winograd: “<a href="http://ilpubs.stanford.edu:8090/422/">The <span class="keep-together">PageRank
1. Leslie G. Valiant:
“[A Bridging Model for Parallel Computation](http://dl.acm.org/citation.cfm?id=79181),”
*Communications of the ACM*, volume 33, number 8, pages 103111, August 1990.
[doi:10.1145/79173.79181](http://dx.doi.org/10.1145/79173.79181)
1. Stephan Ewen, Kostas Tzoumas, Moritz Kaufmann, and Volker Markl:
“[Spinning Fast Iterative Data Flows](http://vldb.org/pvldb/vol5/p1268_stephanewen_vldb2012.pdf),” *Proceedings of the VLDB Endowment*, volume 5, number 11, pages 1268-1279, July 2012.
[doi:10.14778/2350229.2350245](http://dx.doi.org/10.14778/2350229.2350245)
1. Grzegorz Malewicz, Matthew H.
Austern, Aart J. C. Bik, et al.: “[Pregel: A System for Large-Scale Graph Processing](https://kowshik.github.io/JPregel/pregel_paper.pdf),” at *ACM International Conference on Management of
Data* (SIGMOD), June 2010.
[doi:10.1145/1807167.1807184](http://dx.doi.org/10.1145/1807167.1807184)
1. Frank McSherry, Michael Isard, and Derek G. Murray:
“[Scalability! But at What COST?](http://www.frankmcsherry.org/assets/COST.pdf),” at
*15th USENIX Workshop on Hot Topics in Operating Systems* (HotOS), May 2015.
1. Ionel Gog, Malte Schwarzkopf, Natacha Crooks, et al.:
“[Musketeer: All for One, One for All in Data Processing Systems](http://www.cl.cam.ac.uk/research/srg/netos/camsas/pubs/eurosys15-musketeer.pdf),” at *10th European Conference on
Computer Systems* (EuroSys), April 2015.
[doi:10.1145/2741948.2741968](http://dx.doi.org/10.1145/2741948.2741968)
1. Aapo Kyrola, Guy Blelloch, and Carlos Guestrin:
“[GraphChi: Large-Scale Graph Computation on Just a PC](https://www.usenix.org/system/files/conference/osdi12/osdi12-final-126.pdf),” at *10th USENIX Symposium on Operating Systems
Design and Implementation* (OSDI), October 2012.
1. Andrew Lenharth, Donald Nguyen, and Keshav Pingali:
“[Parallel Graph Analytics](http://cacm.acm.org/magazines/2016/5/201591-parallel-graph-analytics/fulltext),” *Communications of the ACM*, volume 59, number 5, pages 7887, May
2016. [doi:10.1145/2901919](http://dx.doi.org/10.1145/2901919)
1. Fabian Hüske:
“[Peeking into Apache Flink's Engine Room](http://flink.apache.org/news/2015/03/13/peeking-into-Apache-Flinks-Engine-Room.html),” *flink.apache.org*, March 13, 2015.
1. Mostafa Mokhtar:
“[Hive 0.14 Cost Based Optimizer (CBO) Technical Overview](http://hortonworks.com/blog/hive-0-14-cost-based-optimizer-cbo-technical-overview/),” *hortonworks.com*, March 2, 2015.
1. Michael Armbrust, Reynold S Xin, Cheng Lian, et al.:
“[Spark SQL: Relational Data Processing in Spark](http://people.csail.mit.edu/matei/papers/2015/sigmod_spark_sql.pdf),” at *ACM International Conference on Management of Data* (SIGMOD), June 2015.
[doi:10.1145/2723372.2742797](http://dx.doi.org/10.1145/2723372.2742797)
1. Daniel Blazevski:
“[Planting Quadtrees for Apache Flink](http://insightdataengineering.com/blog/flink-knn/),” *insightdataengineering.com*, March 25, 2016.
1. Tom White:
“[Genome Analysis Toolkit: Now Using Apache Spark for Data Processing](http://blog.cloudera.com/blog/2016/04/genome-analysis-toolkit-now-using-apache-spark-for-data-processing/),” *blog.cloudera.com*, April 6, 2016.
2018-02-12 17:15:40 +08:00
------
2018-03-09 00:00:42 +08:00
| 上一章 | 目录 | 下一章 |
| --------------------------------- | ------------------------------- | ------------------------ |
| [第三部分:派生数据](part-iii.md) | [设计数据密集型应用](README.md) | [第十章:流处理](ch7.md) |
2018-02-08 14:07:06 +08:00