mirror of
https://github.com/DistSysCorp/ddia.git
synced 2024-12-24 03:40:48 +08:00
make capital of Mapper and Reducer consistent
This commit is contained in:
parent
b35c288629
commit
c263ca21e1
94
ch10.md
94
ch10.md
@ -219,57 +219,57 @@ HDFS 能够很好地进行**扩容**:在本书写作时(2017 年),最大
|
||||
MapReduce 是一个**编程框架**,你可以基于 MapReduce 编写代码以处理存储在分布式文件系统(如 HDFS)上的超大数据集。要想理解 MapReduce 的运行机制,最直观的还是举个例子。仍以上一小节提到的服务器日志分析为例。使用 MapReduce 进行数据处理的方式和使用 Unix 工具处理的方式很像:
|
||||
|
||||
1. **读取一组输入文件,将其切分为记录(records)**。在网站服务器日志的例子中,每个记录就是日志中的一行(即,使用 \n 作为记录分隔符)
|
||||
2. **调用 mapper 函数从每个记录中抽取 key 和 value**。在之前的例子中,mapper 函数是 `awk '{print $7}'` :抽取 URL($7)作为 key,value 留空。
|
||||
2. **调用 Mapper 函数从每个记录中抽取 key 和 value**。在之前的例子中,mapper 函数是 `awk '{print $7}'` :抽取 URL($7)作为 key,value 留空。
|
||||
3. **将所有的 key-value 对按 key 进行排序**。在前面例子中,该环节由 sort 承担。
|
||||
4. **调用 reducer 函数对排好序的 kv 列表迭代处理**。如果某个 key 出现了多次,排序环节会让其在在列表中集中到一块,因此可以在不在内存中保存过多状态的的情况下,对具有相同 key 的数据进行汇总处理。在前面例子中,reducer 对应命令 `uniq -c` ,功能是对所有具有相同 key 的记录值进行计数。
|
||||
4. **调用 Reducer 函数对排好序的 kv 列表迭代处理**。如果某个 key 出现了多次,排序环节会让其在在列表中集中到一块,因此可以在不在内存中保存过多状态的的情况下,对具有相同 key 的数据进行汇总处理。在前面例子中,reducer 对应命令 `uniq -c` ,功能是对所有具有相同 key 的记录值进行计数。
|
||||
|
||||
这四个步骤(split-map-sort-reduce)可以通过一个 MapReduce 任务来实现。你可以在步骤 2 (map)和步骤 4(reduce)编写代码来自定义数据处理逻辑。步骤 1 (将文件拆分成记录)由**输入格式解析器**(input format parser)来完成。步骤 3,排序阶段,由 MapReduce 框架**隐式完成**,所有 mapper 的输出在给到 reducer 前,框架都会对其进行排序。
|
||||
这四个步骤(split-map-sort-reduce)可以通过一个 MapReduce 任务来实现。你可以在步骤 2 (map)和步骤 4(reduce)编写代码来自定义数据处理逻辑。步骤 1 (将文件拆分成记录)由**输入格式解析器**(input format parser)来完成。步骤 3,排序阶段,由 MapReduce 框架**隐式完成**,所有 Mapper 的输出在给到 Reducer 前,框架都会对其进行排序。
|
||||
|
||||
为了创建 MapReduce 任务,你需要实现两个回调函数:mapper 和 reducer,其行为如下:
|
||||
|
||||
- **Mapper**
|
||||
|
||||
对于每个输入**记录**都会调用一次 mapper 函数,其任务是从记录中抽取 key 和 value。对于每一个输入记录,都有可能产生任意数量(包括 0 个)的 kv 对。框架不会保存任何**跨记录的状态**,因此每个记录都可以独立的被处理(即 mapper 可以进行任意并发的运行)。
|
||||
对于每个输入**记录**都会调用一次 Mapper 函数,其任务是从记录中抽取 key 和 value。对于每一个输入记录,都有可能产生任意数量(包括 0 个)的 kv 对。框架不会保存任何**跨记录的状态**,因此每个记录都可以独立的被处理(即 Mapper 可以进行任意并发的运行)。
|
||||
|
||||
- **Reducer**
|
||||
|
||||
MapReduce 框架会拿到 mapper 输出的 kv 对,通过排序将具有相同 key 的 value 聚集到一块,以迭代器的形式给到 reducer 函数。reducer 会继续输出一组新的记录(如 URL 的出现频次)。
|
||||
MapReduce 框架会拿到 Mapper 输出的 kv 对,通过排序将具有相同 key 的 value 聚集到一块,以迭代器的形式给到 Reducer 函数。reducer 会继续输出一组新的记录(如 URL 的出现频次)。
|
||||
|
||||
|
||||
在网站服务器日志的例子中,我们在第五步还有一个 `sort` 命令,对所有 URL **按请求频次**进行排序。在 MapReduce 中,如果你需要一个**额外的**(除了 reduce 前的那个排序)排序阶段,可以再实现一个 MapReduce 任务,将其与第一个接起来,即使用第一个 MapReduce 任务的输出作为输入。在这种情形下,第二个 MapReduce 任务的 mapper 会将数据整理成适合排序的形式(将用于排序的字段抽取出来放到 key 中),然后 reducer 对排好序的数据进行处理。
|
||||
在网站服务器日志的例子中,我们在第五步还有一个 `sort` 命令,对所有 URL **按请求频次**进行排序。在 MapReduce 中,如果你需要一个**额外的**(除了 reduce 前的那个排序)排序阶段,可以再实现一个 MapReduce 任务,将其与第一个接起来,即使用第一个 MapReduce 任务的输出作为输入。在这种情形下,第二个 MapReduce 任务的 Mapper 会将数据整理成适合排序的形式(将用于排序的字段抽取出来放到 key 中),然后 Reducer 对排好序的数据进行处理。
|
||||
|
||||
### MapReduce 的分布式执行
|
||||
|
||||
与 Unix 工具流水线的相比,MapReduce 的最大区别在于可以在**多台机器上**进行分布式的执行,但并不需要用户显式地写处理并行的代码。mapper 和 reducer 函数每次只处理一个记录;他们不必关心输入从哪里来,输出要到哪里去,框架会处理分布式系统所带来的的复杂度(如在机器间移动数据的)。
|
||||
与 Unix 工具流水线的相比,MapReduce 的最大区别在于可以在**多台机器上**进行分布式的执行,但并不需要用户显式地写处理并行的代码。mapper 和 Reducer 函数每次只处理一个记录;他们不必关心输入从哪里来,输出要到哪里去,框架会处理分布式系统所带来的的复杂度(如在机器间移动数据的)。
|
||||
|
||||
虽然可以使用 Unix 工具作为分布式计算中的 mapper 和 reducer,但更为常见的是使用**通用编程语言**的函数来实现这两个回调。在 Hadoop MapReduce 中,mapper 和 reducer 是需要实现特殊接口的类(本质上只需要一个函数,因为不需要保存状态。但在 Java 老版本中,函数不是一等公民,所以需要一个类来包裹);在 MongoDB 和 CouchDB 中,mapper 和 reducer 是 JavaScript 函数。
|
||||
虽然可以使用 Unix 工具作为分布式计算中的 Mapper 和 reducer,但更为常见的是使用**通用编程语言**的函数来实现这两个回调。在 Hadoop MapReduce 中,mapper 和 Reducer 是需要实现特殊接口的类(本质上只需要一个函数,因为不需要保存状态。但在 Java 老版本中,函数不是一等公民,所以需要一个类来包裹);在 MongoDB 和 CouchDB 中,mapper 和 Reducer 是 JavaScript 函数。
|
||||
|
||||
图 10-1 中展示了 Hadoop MapReduce 任务中的数据流。其并行是基于分片的的:任务的输入通常是 HDFS 中的一个文件夹,输入文件夹中的每个文件或者文件块是一个可被 Map 子任务(task)处理的分片。
|
||||
|
||||
![Untitled](img/ch10-fig01.png)
|
||||
|
||||
每个输入文件通常有数百 M,每个输入通常有多个副本,分散在多个机器上。MapReduce 的调度器(图中没有显示)在调度时,会在这多个副本所在机器上选择一个具有足够内存和 CPU 资源运行该 mapper 任务的机器,将 map 任务调度过去。这个策略也被称为:**将计算调度到数据上**。从而省去在网络中拷贝数据的环节,提高了局部性,减少了网络带宽消耗。
|
||||
每个输入文件通常有数百 M,每个输入通常有多个副本,分散在多个机器上。MapReduce 的调度器(图中没有显示)在调度时,会在这多个副本所在机器上选择一个具有足够内存和 CPU 资源运行该 Mapper 任务的机器,将 map 任务调度过去。这个策略也被称为:**将计算调度到数据上**。从而省去在网络中拷贝数据的环节,提高了局部性,减少了网络带宽消耗。
|
||||
|
||||
多数情况下,**应用层的代码**通常不会存在于 map 任务调度到的机器上。因此,MapReduce 框架首先会将用户代码(如对于 Java 来说就是 Jar 包)**序列化后**复制过去。然后在对应机器上,动态加载这些代码,继而执行 map 任务。读取输入文件,逐个解析数据记录(record),传给 mapper 回调函数执行。每个 mapper 会产生一组 key-value 对。
|
||||
多数情况下,**应用层的代码**通常不会存在于 map 任务调度到的机器上。因此,MapReduce 框架首先会将用户代码(如对于 Java 来说就是 Jar 包)**序列化后**复制过去。然后在对应机器上,动态加载这些代码,继而执行 map 任务。读取输入文件,逐个解析数据记录(record),传给 Mapper 回调函数执行。每个 Mapper 会产生一组 key-value 对。
|
||||
|
||||
reduce 侧的计算也是分片的。对于 MapReduce 任务来说,**map 任务的数量**,取决于该任务的输入文件数(或者文件 block 数)的数量;**但 reduce 任务的多少**,可以由用户显式的配置(可以不同于 map 任务的数量)。为了保证所有具有相同 key 的 kv 对被同一个 reducer 函数处理,框架会使用哈希函数,将所有 mapper 的输出的 kv 对进行分桶(桶的数量就是 reducer 的数量),进而路由到对应的 reducer 函数。
|
||||
reduce 侧的计算也是分片的。对于 MapReduce 任务来说,**map 任务的数量**,取决于该任务的输入文件数(或者文件 block 数)的数量;**但 reduce 任务的多少**,可以由用户显式的配置(可以不同于 map 任务的数量)。为了保证所有具有相同 key 的 kv 对被同一个 Reducer 函数处理,框架会使用哈希函数,将所有 Mapper 的输出的 kv 对进行分桶(桶的数量就是 Reducer 的数量),进而路由到对应的 Reducer 函数。
|
||||
|
||||
根据 MapReduce 的设定,reducer 接受的 kv 对需要是有序的,但任何传统的排序算法都无法在单机上对如此大尺度的数据进行排序。为了解决这个问题,mapper 和 reducer 间的**排序被分成多个阶段**。
|
||||
根据 MapReduce 的设定,reducer 接受的 kv 对需要是有序的,但任何传统的排序算法都无法在单机上对如此大尺度的数据进行排序。为了解决这个问题,mapper 和 Reducer 间的**排序被分成多个阶段**。
|
||||
|
||||
首先,每个 map 任务在输出时,会先将所有输出哈希后分片(一个分片对应一个 reducer),然后在每个分片内对输出进行排序。由于每个分片的数据量仍然可能很大,因此使用外排算法,类似于 [SSTable 和 LSM-Trees](https://ddia.qtmuniao.com/#/ch03?id=sstables-%e5%92%8c-lsm-trees) 一节中讨论的 append+compact 算法。
|
||||
|
||||
当某个 mapper 任务读取结束,并将输出排好了序,MapReduce 调度器就会通知所有 reducers 来该 mapper 机器上拉取各自对应的输出。最终,每个 reducer 会去所有 mapper 上**拉取**一遍其对应分片数据数据。这里有个推还是拉的设计权衡,拉的好处在于 reducuer 失败后,可以很方便地进行重试,再次拉取计算即可。
|
||||
当某个 Mapper 任务读取结束,并将输出排好了序,MapReduce 调度器就会通知所有 reducers 来该 Mapper 机器上拉取各自对应的输出。最终,每个 Reducer 会去所有 Mapper 上**拉取**一遍其对应分片数据数据。这里有个推还是拉的设计权衡,拉的好处在于 reducuer 失败后,可以很方便地进行重试,再次拉取计算即可。
|
||||
|
||||
这个**分片**(partitioning by reducer)-**排序**(sorting)-**复制**(coping)过程也被称为**数据重排**(**shuffle**,虽然英文是洗牌的意思,但该过程并没有任何随机性,都是确定的)。
|
||||
|
||||
框架会在 reducer 处将所有从 mapper 处拿来的 kv 文件进行归并排序,然后在所有数据拉取完毕后,将排好序数据送给 reducer。这样一来,不同 mapper 产生的具有相同 key 的记录就会被聚集到一块。
|
||||
框架会在 Reducer 处将所有从 Mapper 处拿来的 kv 文件进行归并排序,然后在所有数据拉取完毕后,将排好序数据送给 reducer。这样一来,不同 Mapper 产生的具有相同 key 的记录就会被聚集到一块。
|
||||
|
||||
总结来说,map 和 reduce 间的排序分为**两个阶段**:
|
||||
|
||||
1. 在每个 mapper 上对**输出**分片后各自排序。
|
||||
2. 在每个 reducer 上对**输入**(有序文件)进行归并排序。
|
||||
1. 在每个 Mapper 上对**输出**分片后各自排序。
|
||||
2. 在每个 Reducer 上对**输入**(有序文件)进行归并排序。
|
||||
|
||||
reducer 在调用时会传入一个 key 一个 Iterator(迭代器),使用该迭代器能够访问所有具有相同 key 的记录(极端情况下,内存可能放不下这些记录,因此是给一个迭代器,而非内存数组)。reducer 函数可以使用任意的逻辑对这些记录进行处理,并可以产生任意数量的输出。这些输出最终会被写到分布式文件系统中的文件里(通常该输出文件会在 reducer 机器上放一个副本,在另外一些机器上放其他副本)。
|
||||
Reducer 在调用时会传入一个 key 一个 Iterator(迭代器),使用该迭代器能够访问所有具有相同 key 的记录(极端情况下,内存可能放不下这些记录,因此是给一个迭代器,而非内存数组)。reducer 函数可以使用任意的逻辑对这些记录进行处理,并可以产生任意数量的输出。这些输出最终会被写到分布式文件系统中的文件里(通常该输出文件会在 Reducer 机器上放一个副本,在另外一些机器上放其他副本)。
|
||||
|
||||
### MapReduce 工作流
|
||||
|
||||
@ -325,21 +325,21 @@ reducer 在调用时会传入一个 key 一个 Iterator(迭代器),使用
|
||||
|
||||
### 基于排序-合并的 Join
|
||||
|
||||
让我们回顾下 mapper 的职责:从所有输入记录中提取 key 和 value。对于 10-2 中的例子来说,key 就是用户 ID:我们使用一些 mappers 从用户行为事件中提取用户 ID 和网页,使用另一些 mappers (**两组 mappers 从属于同一个 MapReduce 任务**)从用户资料信息中提取其他信息(如用户 ID 作为 key,用户生日作为 value)。如下图:
|
||||
让我们回顾下 Mapper 的职责:从所有输入记录中提取 key 和 value。对于 10-2 中的例子来说,key 就是用户 ID:我们使用一些 mappers 从用户行为事件中提取用户 ID 和网页,使用另一些 mappers (**两组 mappers 从属于同一个 MapReduce 任务**)从用户资料信息中提取其他信息(如用户 ID 作为 key,用户生日作为 value)。如下图:
|
||||
|
||||
![Untitled](img/ch10-fig03.png)
|
||||
|
||||
当 MapReduce 框架将所有 mapper 的输出按照 key(也就是用户 ID)进行排序后,所有具有同样的用户 ID 的记录就会聚集到一块,作为输入给到 reducer。MapReduce 任务甚至可以将输出进行特殊组织,以使 reducer 先看到同一个用户的资料信息,再看到其行为信息——这种技术也被称为**二级排序**(secondary sort,使用多个字段进行排序)。
|
||||
当 MapReduce 框架将所有 Mapper 的输出按照 key(也就是用户 ID)进行排序后,所有具有同样的用户 ID 的记录就会聚集到一块,作为输入给到 reducer。MapReduce 任务甚至可以将输出进行特殊组织,以使 Reducer 先看到同一个用户的资料信息,再看到其行为信息——这种技术也被称为**二级排序**(secondary sort,使用多个字段进行排序)。
|
||||
|
||||
在此基础上,reducer 可以进行轻松的进行 join:reducer 函数会在每一个用户 ID 上进行调用,由于使用了二级排序,reducer 会先看到该用户的资料信息。在实现 reducer 时,可以首先将用户**资料信息**(比如生日)保存在局部变量里,然后对其所有**行为信息**进行迭代,提取相关信息,输出 <viewed-url, viewed age in years> kv 对。之后可以再接一个 MapReduce 任务,对每个 url 访问用户的年龄分布进行统计,并按年龄段进行聚集。
|
||||
在此基础上,reducer 可以进行轻松的进行 join:reducer 函数会在每一个用户 ID 上进行调用,由于使用了二级排序,reducer 会先看到该用户的资料信息。在实现 Reducer 时,可以首先将用户**资料信息**(比如生日)保存在局部变量里,然后对其所有**行为信息**进行迭代,提取相关信息,输出 <viewed-url, viewed age in years> kv 对。之后可以再接一个 MapReduce 任务,对每个 url 访问用户的年龄分布进行统计,并按年龄段进行聚集。
|
||||
|
||||
由于 reducer 会在单个函数里处理所有同一个 user ID 的记录,因此一次只需要在内存中保存一个用户的资料信息,并且不用进行任何网络请求。这种算法也被称为**基于排序和归并的连接**(sort-merge join),由于 mapper 的输出是按 key 有序的,则 reducers 可将来自多方的同一个 key 的输入轻松的进行合并。
|
||||
由于 Reducer 会在单个函数里处理所有同一个 user ID 的记录,因此一次只需要在内存中保存一个用户的资料信息,并且不用进行任何网络请求。这种算法也被称为**基于排序和归并的连接**(sort-merge join),由于 Mapper 的输出是按 key 有序的,则 reducers 可将来自多方的同一个 key 的输入轻松的进行合并。
|
||||
|
||||
### 将相关数据聚到一块
|
||||
|
||||
在排序-归并 join 中,mappers 和排序会确保同一个用户 id 所有用于 join 必要输入会被放到一起:**即作为一个输入给到某次 reducer 中**。预先让所有相关数据聚集到一起,可以让 reducer 逻辑非常简单,并且可以仅使用单个线程,就能进行高吞吐、低耗存地执行。
|
||||
在排序-归并 join 中,mappers 和排序会确保同一个用户 id 所有用于 join 必要输入会被放到一起:**即作为一个输入给到某次 Reducer 中**。预先让所有相关数据聚集到一起,可以让 Reducer 逻辑非常简单,并且可以仅使用单个线程,就能进行高吞吐、低耗存地执行。
|
||||
|
||||
我们可以从另外一种角度来理解这种架构:mapper **发消息**给 reducer。当某个 mapper 发出一个 key-value 对时,**key 是投递地址,value 就是要投递的内容**。尽管 key 在物理上仅是一个任意的字符串(而非像网络中的 IP 和端口号那样真的网络地址),但在逻辑上充当**地址**的作用:所有具有相同 key 的 kv 对都会被投递到同一个目的地(某个 reducer 的调用处)。
|
||||
我们可以从另外一种角度来理解这种架构:mapper **发消息**给 reducer。当某个 Mapper 发出一个 key-value 对时,**key 是投递地址,value 就是要投递的内容**。尽管 key 在物理上仅是一个任意的字符串(而非像网络中的 IP 和端口号那样真的网络地址),但在逻辑上充当**地址**的作用:所有具有相同 key 的 kv 对都会被投递到同一个目的地(某个 Reducer 的调用处)。
|
||||
|
||||
MapReduce 编程模型,可以将计算的**物理拓扑**(将数据放到合适的机器上)与**应用逻辑**(当有了数据后就进行处理)**解耦**开来。这种解耦与数据库形成对比——在使用数据库的场景中,进行数据库连接(物理)通常藏在应用代码(逻辑)深处。由于 MapReduce 框架会处理所有网络通信细节,它也会让应用层代码免于关心**部分失败**(partial failure),如某些节点宕机:MapReduce 框架会透明的(应用代码无感)的对失败的子任务进行重试,而不会影响应用逻辑。
|
||||
|
||||
@ -351,7 +351,7 @@ MapReduce 编程模型,可以将计算的**物理拓扑**(将数据放到合
|
||||
- 将某个字段进行累加(在 SQL 中对应 `SUM(fieldname)` )
|
||||
- 根据排序函数排序后取前 k 个记录
|
||||
|
||||
使用 MapReduce 实现 Group By 语义,最简单的方法是在 mapper 中**抽取 key 为待分组的 key**。MapReduce 框架就会按照这些 key 将所有 mapper 的输出记录进行分区和排序,然后按 key 聚集给到 reducer。本质上,使用 MapReduce 来实现 group 和 join ,逻辑是极为相似的。
|
||||
使用 MapReduce 实现 Group By 语义,最简单的方法是在 Mapper 中**抽取 key 为待分组的 key**。MapReduce 框架就会按照这些 key 将所有 Mapper 的输出记录进行分区和排序,然后按 key 聚集给到 reducer。本质上,使用 MapReduce 来实现 group 和 join ,逻辑是极为相似的。
|
||||
|
||||
分组的另外一个使用场景是:收集某个用户会话中的所有用户活动——也称为**会话化**(sessionization)。例如,可以用来对比用户对于新老版本网站的分别购买意愿(A/B 测试)或者统计某些市场推广活动是否起作用。
|
||||
|
||||
@ -361,35 +361,35 @@ MapReduce 编程模型,可以将计算的**物理拓扑**(将数据放到合
|
||||
|
||||
如果某个 key 的数据量超级大,则“将相同 key 的数据聚集到一块” 的模型将不再适用。例如,在社交网络中,绝大多数的人都只会连接到较少的其他人,但数量较少的名人会有高达数百万的关注者。数据库中这种不成比例的记录常被称为**关键对象**(*linchpin objects* )或者**热键**(*hot keys*)。
|
||||
|
||||
在单个 reducer 中收集处理名人(celebrity)所有的活动事件(比如他们发布信息的回复),可能会造成严重的**数据倾斜**(**skew**,有时也被称为热点,hot spots)——即,一个 reducer 处理的数据量远超其他(参见[负载偏斜和热点消除](https://ddia.qtmuniao.com/#/ch06?id=%e8%b4%9f%e8%bd%bd%e5%81%8f%e6%96%9c%e5%92%8c%e7%83%ad%e7%82%b9%e6%b6%88%e9%99%a4))。由于只有其所属的所有 mappers 和 reducers 执行完时,该 MapReduce 任务才算完成,该 MapReduce 之后的任何任务都需要等待最慢的 reducer (长尾任务)完成后才能启动。
|
||||
在单个 Reducer 中收集处理名人(celebrity)所有的活动事件(比如他们发布信息的回复),可能会造成严重的**数据倾斜**(**skew**,有时也被称为热点,hot spots)——即,一个 Reducer 处理的数据量远超其他(参见[负载偏斜和热点消除](https://ddia.qtmuniao.com/#/ch06?id=%e8%b4%9f%e8%bd%bd%e5%81%8f%e6%96%9c%e5%92%8c%e7%83%ad%e7%82%b9%e6%b6%88%e9%99%a4))。由于只有其所属的所有 mappers 和 reducers 执行完时,该 MapReduce 任务才算完成,该 MapReduce 之后的任何任务都需要等待最慢的 Reducer (长尾任务)完成后才能启动。
|
||||
|
||||
如果某个 join 的输入存在热点数据,你可以借助一些算法来进行缓解。例如,Pig 中的偏斜 join(skewed join)方法会事先对所有 key 的分布进行**采样**,以探测是否有热点 key。然后,在执行真正的 Join 时,对于 Join 有热点 key 的这一测,mapper 会将含有热点 key 的记录发送到多个 reducer(每次随机挑选一个,相比之下,常规的 MapReduce 只会根据 key 的哈希确定性的选择一个 reducer);对于 Join 的另一侧输入,所有包含热点 key 的相关记录需要每个给每个具有该 key 的 reducer 都发一份。
|
||||
如果某个 join 的输入存在热点数据,你可以借助一些算法来进行缓解。例如,Pig 中的偏斜 join(skewed join)方法会事先对所有 key 的分布进行**采样**,以探测是否有热点 key。然后,在执行真正的 Join 时,对于 Join 有热点 key 的这一测,mapper 会将含有热点 key 的记录发送到多个 reducer(每次随机挑选一个,相比之下,常规的 MapReduce 只会根据 key 的哈希确定性的选择一个 reducer);对于 Join 的另一侧输入,所有包含热点 key 的相关记录需要每个给每个具有该 key 的 Reducer 都发一份。
|
||||
|
||||
该技术将处理热点 key 的工作分摊到多个 reducer 上,从而可以让其更好的并行,当然代价就是需要将 join 的非热点侧的数据冗余多份。Crunch 中的**分片连接**(shared join)也使用类似的技术,但需要**显式地指定**热点 key,而非通过采样来**自动获取**。这种技术很像我们在[负载偏斜和热点消除](https://ddia.qtmuniao.com/#/ch06?id=%e8%b4%9f%e8%bd%bd%e5%81%8f%e6%96%9c%e5%92%8c%e7%83%ad%e7%82%b9%e6%b6%88%e9%99%a4)中讨论过的相关技术,在多分片数据中,使用随机分片的方法来消除热点。
|
||||
该技术将处理热点 key 的工作分摊到多个 Reducer 上,从而可以让其更好的并行,当然代价就是需要将 join 的非热点侧的数据冗余多份。Crunch 中的**分片连接**(shared join)也使用类似的技术,但需要**显式地指定**热点 key,而非通过采样来**自动获取**。这种技术很像我们在[负载偏斜和热点消除](https://ddia.qtmuniao.com/#/ch06?id=%e8%b4%9f%e8%bd%bd%e5%81%8f%e6%96%9c%e5%92%8c%e7%83%ad%e7%82%b9%e6%b6%88%e9%99%a4)中讨论过的相关技术,在多分片数据中,使用随机分片的方法来消除热点。
|
||||
|
||||
Hive 的**偏斜连接**(skewed join)采用了另外一种方法来进行优化。Hive 要求在表的元信息中显示的指出热点 key,在收到这些 key 时会将其存到单独文件中。在对这种表进行 join 时,会使用 map 侧的 join(见下一小节)来处理热点 key。
|
||||
|
||||
当对热点 key 进行分组聚集(group)时,可以将分组过程拆成**两个阶段**,即使用两个相接的 MapReduce。第一个 MapReduce 会将记录随机得发给不同的 reducer,则每个 reducer 会对热点 key 的一个子集执行分组操作,并且产生一个更为紧凑的**聚合值**(aggregated value,如 count,sum,max 等等)。第二个 MapReduce 操作会将第一阶段中 MapReduce 产生的同一个 key 的多个聚合值进行真正的归并。总结来说,就是第一阶段进行**预分组,减小数据量**;第二阶段真正的全局分组,可以想象这种方式,要求聚合操作满足**交换律和结合律**。
|
||||
当对热点 key 进行分组聚集(group)时,可以将分组过程拆成**两个阶段**,即使用两个相接的 MapReduce。第一个 MapReduce 会将记录随机得发给不同的 reducer,则每个 Reducer 会对热点 key 的一个子集执行分组操作,并且产生一个更为紧凑的**聚合值**(aggregated value,如 count,sum,max 等等)。第二个 MapReduce 操作会将第一阶段中 MapReduce 产生的同一个 key 的多个聚合值进行真正的归并。总结来说,就是第一阶段进行**预分组,减小数据量**;第二阶段真正的全局分组,可以想象这种方式,要求聚合操作满足**交换律和结合律**。
|
||||
|
||||
## Map 侧的连接
|
||||
|
||||
上一节讲到的 join 算法是在 reduce 阶段真正执行的 join 逻辑,因此也被称为 **reduce 侧连接**(*reduce-side join*)。其中,mapper 仅扮演准备数据的角色:从每个输入记录中提取 key 和 value,并且将每个 kv 对发给合适的 reducer 分区,并将其进行排序。
|
||||
上一节讲到的 join 算法是在 reduce 阶段真正执行的 join 逻辑,因此也被称为 **reduce 侧连接**(*reduce-side join*)。其中,mapper 仅扮演准备数据的角色:从每个输入记录中提取 key 和 value,并且将每个 kv 对发给合适的 Reducer 分区,并将其进行排序。
|
||||
|
||||
reduce 侧的连接的好处在于,你不需要对输入数据有任何的假设:不管输入数据具有怎样的属性和结构,mappers 都可以进行合适的预处理后送给 reducers 进行连接。然而,缺点在于排序、复制到 reducers、将 reducer 的输入进行合并等过程代价十分高昂。根据可用内存缓存大小不同,数据在流经 MapReduce 中各阶段时可能会被写入多次(写放大)。
|
||||
reduce 侧的连接的好处在于,你不需要对输入数据有任何的假设:不管输入数据具有怎样的属性和结构,mappers 都可以进行合适的预处理后送给 reducers 进行连接。然而,缺点在于排序、复制到 reducers、将 Reducer 的输入进行合并等过程代价十分高昂。根据可用内存缓存大小不同,数据在流经 MapReduce 中各阶段时可能会被写入多次(写放大)。
|
||||
|
||||
但如果,输入数据满足某种假设,就可以利用所谓的 **map 侧连接**(map-side join)进行更快的连接。这种方式利用了一种简化过的 MapReduce 任务,去掉了 reducer,从而也去掉了对 mapper 输出的排序阶段。此时,每个 mapper 只需要从分布式文件系统中的输入文件块中读取记录、处理、并将输出写回到文件系统,即可。
|
||||
但如果,输入数据满足某种假设,就可以利用所谓的 **map 侧连接**(map-side join)进行更快的连接。这种方式利用了一种简化过的 MapReduce 任务,去掉了 reducer,从而也去掉了对 Mapper 输出的排序阶段。此时,每个 Mapper 只需要从分布式文件系统中的输入文件块中读取记录、处理、并将输出写回到文件系统,即可。
|
||||
|
||||
### 广播哈希连接
|
||||
|
||||
使用 map 侧连接的一个最常见的场景是一个大数据集和一个小数据集进行连接时。此种情况下,小数据集需要小到能全部装进 mapper 进程所在机器的内存。
|
||||
使用 map 侧连接的一个最常见的场景是一个大数据集和一个小数据集进行连接时。此种情况下,小数据集需要小到能全部装进 Mapper 进程所在机器的内存。
|
||||
|
||||
如,设想在图 10-2 对应的场景中,用户资料数据足够小,能够装入内存。在这种情况下,当 mapper 启动时,可以先将用户资料分布式文件系统中读取到内存的哈希表中。一旦加载完毕,mapper 可以扫描所有的用户活动事件,对于每一个事件,在内存哈希表中查找该事件对应用户资料信息,然后连接后,输出一条数据即可。
|
||||
如,设想在图 10-2 对应的场景中,用户资料数据足够小,能够装入内存。在这种情况下,当 Mapper 启动时,可以先将用户资料分布式文件系统中读取到内存的哈希表中。一旦加载完毕,mapper 可以扫描所有的用户活动事件,对于每一个事件,在内存哈希表中查找该事件对应用户资料信息,然后连接后,输出一条数据即可。
|
||||
|
||||
但仍然会有多个 mapper 任务:join 的**大数据量输入侧**(在 10-2 中,用户活动事件表是大输入测)每个文件块一个 mapper。其中 MapReduce 任务中的每个 mapper 都会将小输入侧的数据全部加载进内存。
|
||||
但仍然会有多个 Mapper 任务:join 的**大数据量输入侧**(在 10-2 中,用户活动事件表是大输入测)每个文件块一个 mapper。其中 MapReduce 任务中的每个 Mapper 都会将小输入侧的数据全部加载进内存。
|
||||
|
||||
这种简单高效的算法称为**广播哈希连接**(broadcast hash joins):
|
||||
|
||||
1. **广播(broadcast)**:处理大数据测每个分片的 mapper 都会将小数据测数据全部载入内存。从另外一个角度理解,就是将小数据集**广播到了**所有相关 mapper 机器上。
|
||||
1. **广播(broadcast)**:处理大数据测每个分片的 Mapper 都会将小数据测数据全部载入内存。从另外一个角度理解,就是将小数据集**广播到了**所有相关 Mapper 机器上。
|
||||
2. **哈希(hash)**:即在将小数据集在内存中组织为哈希表。
|
||||
|
||||
Pig(replicated join)、Hive(MapJoin)、Cascading 和 Crunch 都支持这种连接方法。
|
||||
@ -398,7 +398,7 @@ Pig(replicated join)、Hive(MapJoin)、Cascading 和 Crunch 都支持这
|
||||
|
||||
如果待 join 的多个输入,能够以同样的方式进行分区,则每个分区在处理时可以独立地进行 join。仍以图 10-2 为例,你可以重新组织活动事件和用户信息,都将其按用户 ID 的最后一位进行分片(则每侧输入都会有十个分片)。例如,mapper-3 首先将具有以 3 结尾的 ID 的用户资料数据加载到内存哈希表中,然后扫描所有以 3 结尾的 ID 活动事件记录,进行连接。
|
||||
|
||||
如果分区方式正确,则所有需要连接的双方都会落到同一个分区内,因此每个 mapper 只需要读取一个分区就可以获取待连接双方的所有记录。这样做的好处是,每个 mapper 所需构建哈希表的数据集要小很多(毕竟被 partition 过了)。
|
||||
如果分区方式正确,则所有需要连接的双方都会落到同一个分区内,因此每个 Mapper 只需要读取一个分区就可以获取待连接双方的所有记录。这样做的好处是,每个 Mapper 所需构建哈希表的数据集要小很多(毕竟被 partition 过了)。
|
||||
|
||||
只有当 join 的连接输入:
|
||||
|
||||
@ -412,7 +412,7 @@ Pig(replicated join)、Hive(MapJoin)、Cascading 和 Crunch 都支持这
|
||||
|
||||
### Map 侧合并连接
|
||||
|
||||
map 侧连接的另一个变种是,当 map 的输入数据集不仅以相同的方式分片过了,而且每个分片是**按该 key 有序的**。在这种情况下,是否有足够小的、能够载入内存的输入已经无关紧要,因为 mapper 可以以类似普通 reducer 的方式对输入数据进行**归并**:都以 key 递增(都递减也可以,取决于输入文件中 key 的顺序)的顺序,增量式(迭代式)的读取两个输入文件,对相同的 key 进行匹配连接(比如对每个输入使用一个指针,进行滑动匹配即可)。
|
||||
map 侧连接的另一个变种是,当 map 的输入数据集不仅以相同的方式分片过了,而且每个分片是**按该 key 有序的**。在这种情况下,是否有足够小的、能够载入内存的输入已经无关紧要,因为 Mapper 可以以类似普通 Reducer 的方式对输入数据进行**归并**:都以 key 递增(都递减也可以,取决于输入文件中 key 的顺序)的顺序,增量式(迭代式)的读取两个输入文件,对相同的 key 进行匹配连接(比如对每个输入使用一个指针,进行滑动匹配即可)。
|
||||
|
||||
如果我们可以在 map 侧进行归并连接,说明前一个 MapReduce 的输入已经将文件分好了组、排好了序。原则上,在这种情况下,join 完全可以在前一个 MapReduce 的 reduce 阶段来做。然而,使用额外的一个 MapOnly 任务来做连接也是有适用场景的,比如分区且有序的文件集还可以被其他任务复用时。
|
||||
|
||||
@ -440,8 +440,8 @@ map 侧连接的另一个变种是,当 map 的输入数据集不仅以相同
|
||||
|
||||
如果你想在一个**固定文档集合**上构建全文索引,批处理非常合适且高效:
|
||||
|
||||
1. mapper 会将文档集合按合适的方式进行分区
|
||||
2. reducer 会对每个分区构建索引
|
||||
1. Mapper 会将文档集合按合适的方式进行分区
|
||||
2. Reducer 会对每个分区构建索引
|
||||
3. 最终将索引文件写回分布式文件系统
|
||||
|
||||
构建这种**按文档分区**(document-partitioned,与 term-partitioned 相对,参见[分片和次级索引](https://ddia.qtmuniao.com/#/ch06?id=%e5%88%86%e7%89%87%e5%92%8c%e6%ac%a1%e7%ba%a7%e7%b4%a2%e5%bc%95))的索引,可以很好地并发生成。由于使用关键词进行索引查询是一种只读操作,因此,这些索引文件一旦构建完成,就是不可变的(immutable)。
|
||||
@ -458,15 +458,15 @@ map 侧连接的另一个变种是,当 map 的输入数据集不仅以相同
|
||||
|
||||
web 应用会查询这些数据库来处理用户请求,这些应用通常不会跟 Hadooop 生态部署在一块。那么,如何让批处理的输出写回数据库,以应对 web 应用的查询?
|
||||
|
||||
**最直观**的做法是,在 mapper 或者 reducer 代码逻辑中,使用相关数据库的**客户端库**,将 mapper 或者 reducer 的输出直接写入数据库服务器,每次一个记录。这种方式能够工作(须假设防火墙允许我们直接从 Hadoop 中访问**生产环境**的数据库服务器),但往往并不是一个好的做法:
|
||||
**最直观**的做法是,在 Mapper 或者 Reducer 代码逻辑中,使用相关数据库的**客户端库**,将 Mapper 或者 Reducer 的输出直接写入数据库服务器,每次一个记录。这种方式能够工作(须假设防火墙允许我们直接从 Hadoop 中访问**生产环境**的数据库服务器),但往往并不是一个好的做法:
|
||||
|
||||
- **吞吐不匹配**。正如之前 join 一节中所讨论的,通过网络一条条写入记录的吞吐要远小于一个批处理任务的吞吐。即使数据库的客户端通常支持将多个 record 写入 batch 成一个请求,性能仍然会比较差。
|
||||
- **数据库过载**。一个 MapReduce 任务通常会并行地跑很多个子任务。如果所有 mapper 和 reducer ,以批处理产生输出的速率,并发地将输出写到同一个数据库,则该数据库很容会被打爆(overwhelmed)。此时,其查询性能大概率非常差,也因此难以对外提供正常服务,从而给系统的其他组件带来运维问题。
|
||||
- **数据库过载**。一个 MapReduce 任务通常会并行地跑很多个子任务。如果所有 Mapper 和 Reducer ,以批处理产生输出的速率,并发地将输出写到同一个数据库,则该数据库很容会被打爆(overwhelmed)。此时,其查询性能大概率非常差,也因此难以对外提供正常服务,从而给系统的其他组件带来运维问题。
|
||||
- **可能产生副作用**。通常来说,MapReduce 对外提供简单的“全有或全无(all-or-nothing)”的输出保证:如果整个任务成功,即使子任务一时失败重试,但最终的输出也会看起来像运行了一次;如果整个任务失败,则没有任何输出。但直接从任务内部将输出写入外部服务,会产生外部可见的**副作用**。在这种情况下,你就必须考虑任务的**部分成功**状态可能会暴露给其他系统,并要理解 Hadoop 内部重试和推测执行的复杂机制。
|
||||
|
||||
一个更好的方案是,在批处理任务内部生成全新的数据库,并将其以文件的形式写入分布式系统的文件夹中。一旦任务成功执行,这些数据文件就会称为**不可变的**(immutable),并且可以**批量加载**(bulk loading)进只处理只读请求的服务中。很多 KV 存储都支持使用 MapReduce 任务构建数据库文件,比如 Voldemort,Terrapin, ElephantDB 和 HBase bulk loading。另外 RocksDB 支持 ingest SST 文件,也是类似的情况。
|
||||
|
||||
**直接构建数据库底层文件**,就是一个 MapReduce 应用的绝佳案例:使用 mapper 抽取 key,然后利用该 key 进行排序,已经**覆盖了**构建索引中的大部分流程。由于大部 KV 存储都是只读的(通过批处理任务一次写入后,即不可变),这些存储的底层数据结构可以设计的非常简单。例如,不需要 WAL(参见[让 B 树更可靠](https://ddia.qtmuniao.com/#/ch03?id=%e8%ae%a9-b-%e6%a0%91%e6%9b%b4%e5%8f%af%e9%9d%a0))。
|
||||
**直接构建数据库底层文件**,就是一个 MapReduce 应用的绝佳案例:使用 Mapper 抽取 key,然后利用该 key 进行排序,已经**覆盖了**构建索引中的大部分流程。由于大部 KV 存储都是只读的(通过批处理任务一次写入后,即不可变),这些存储的底层数据结构可以设计的非常简单。例如,不需要 WAL(参见[让 B 树更可靠](https://ddia.qtmuniao.com/#/ch03?id=%e8%ae%a9-b-%e6%a0%91%e6%9b%b4%e5%8f%af%e9%9d%a0))。
|
||||
|
||||
当数据加载进 Voldemort 时,服务器可以利用老文件**继续对外提供服务**,新文件会从分布式文件系统中拷贝的 Voldemort 服务本地。一旦拷贝完成,服务器可以立即将外部查询请求**原子地**切到新文件上。如果导入过程中发生了任何问题,也可以**快速地切回**,使用老文件提供服务。因为老文件是不可变的,且没有立即被删除。
|
||||
|
||||
@ -579,7 +579,7 @@ Hadoop 生态系统既包括随机访问型的 OLTP 数据库,如HBase(参
|
||||
相比 Unix 管道,MapReduce 将工作流中间结果进行物化的方式有很多缺点:
|
||||
|
||||
- **无谓等待**。一个 MapReduce 任务只能在所有前置依赖任务完成后才能启动。然而由 Unix 管道缀连起来的命令却能够并行运行,只要一个任务开始产生输出,下一个任务就可以开始消费处理。由于机器配置和负载的不同,总会在某些机器上出现一些执行时间过长**拖后腿的任务**(struggler)。而 MapReduce 的这种等待机制,会让单个任务拖垮整个工作流。
|
||||
- **Mapper 冗余**。Mapper 职责非常简单,仅是读出前置 reducer 产生的数据,并为之后 reducer 的分片和排序做准备。在很多情况下,mapper 的职责其实可以并到前序任务的 reducer 中:如果可以将 reducer 的输出按照后继 reducer 的要求准备好,则可将 reducer 直接串起来,从而省去中间夹杂的 mapper 阶段。
|
||||
- **Mapper 冗余**。Mapper 职责非常简单,仅是读出前置 Reducer 产生的数据,并为之后 Reducer 的分片和排序做准备。在很多情况下,mapper 的职责其实可以并到前序任务的 Reducer 中:如果可以将 Reducer 的输出按照后继 Reducer 的要求准备好,则可将 Reducer 直接串起来,从而省去中间夹杂的 Mapper 阶段。
|
||||
- **数据冗余**。在分布式文件系统中存储中间结果,意味着将数据在不同机器上冗余了几份。对于并不需要共享的中间结果来说,这种方式太过奢侈。
|
||||
|
||||
### 数据流引擎
|
||||
@ -597,9 +597,9 @@ Hadoop 生态系统既包括随机访问型的 OLTP 数据库,如HBase(参
|
||||
这种风格的处理引擎思想来自于 Dryad 和 Nephele 等系统,相比 MapReduce 模型,有如下优点:
|
||||
|
||||
- **按需 shuffle**:对于排序等高代价负载,只有在需要的时候才会执行,而不是总强制发生在 map 和 reduce 之间。
|
||||
- **省掉无用 mapper**:由于 map 本身并没有进行 repartition,因此可以将其合并到前一个算子中的 reduceer 阶段。
|
||||
- **省掉无用 Mapper**:由于 map 本身并没有进行 repartition,因此可以将其合并到前一个算子中的 reduceer 阶段。
|
||||
- **数据传输优化**:由于所有 join 和依赖等数据拓扑是显式声明的,调度器可以事先知道哪些数据在哪里被需要。因此可以尽可能地做**局部性优化**(locality optimization)。例如,可以尽量将消费某分区数据的任务放到生产该数据的机器上执行,从而通过共享内存而非网络来共享数据。
|
||||
- **中间结果只存一份**:通常来说,只需要将算子的中间结果,在内存中或者本地硬盘中放一份就够了,而不用写到分布式文件系统中。在 MapReduce 中 mapper 的输出其实也是用了此优化,只不过 dataflow 引擎将该思想扩展到了所有中间状态的存储中。
|
||||
- **中间结果只存一份**:通常来说,只需要将算子的中间结果,在内存中或者本地硬盘中放一份就够了,而不用写到分布式文件系统中。在 MapReduce 中 Mapper 的输出其实也是用了此优化,只不过 dataflow 引擎将该思想扩展到了所有中间状态的存储中。
|
||||
- **算子执行流水化**:大部分算子只要有输入了就可以执行,而不用等到前置任务都完成了才能够执行。
|
||||
- **进程复用**:同一个工作流中,前面算子所使用的 JVM 进程池可以为之后算子所复用,而不用像 MapReduce 一样每个任务都要开一个新的 JVM 进程。
|
||||
|
||||
@ -782,11 +782,11 @@ Spark 使用 JVM 字节码、Impala 使用 LLVM 来通过生成代码的方式
|
||||
|
||||
- **Sort-merge joins**
|
||||
|
||||
**分桶排序**。将多个待 join 的输入数据使用一个 MapReduce 处理,在 Mapper 中提取待 join key ,然后通过再分区、排序和合并,会将具有相同 join key 的 records 送到同一个 Reducer 中进行 join。然后 reducer 函数会将 join 结果进行输出。
|
||||
**分桶排序**。将多个待 join 的输入数据使用一个 MapReduce 处理,在 Mapper 中提取待 join key ,然后通过再分区、排序和合并,会将具有相同 join key 的 records 送到同一个 Reducer 中进行 join。然后 Reducer 函数会将 join 结果进行输出。
|
||||
|
||||
- **Broadcast hash joins**
|
||||
|
||||
**小表广播**。如果 join 中的一个表数据量很小,可以完全加载进内存的哈希表里,则不用对其进行分片。我们可以将大表进行分片,分发给各个 mapper,每个 mapper 将小表加载到内存里,然后逐个遍历大表每个 record,提取相应 join key,再与小表中的记录值进行 Join。
|
||||
**小表广播**。如果 join 中的一个表数据量很小,可以完全加载进内存的哈希表里,则不用对其进行分片。我们可以将大表进行分片,分发给各个 mapper,每个 Mapper 将小表加载到内存里,然后逐个遍历大表每个 record,提取相应 join key,再与小表中的记录值进行 Join。
|
||||
|
||||
- **Partitioned hash joins**
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user