TranslateProject/translated/tech/20160831 Apache Spark Scale - A 60 TB production use case.md

121 lines
17 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

Apache Spark 一个60TB+规模的产品使用案例
===========
Facebook 经常使用数据驱动的分析方法来做决策。在过去的几年,用户和产品的增长已经推动了我们的分析工程师在数十兆兆字节数据集上执行单独的查询。我们的一些批量分析执行在可敬的 [Hive][1] 平台在2009年 Facebook 贡献了 Apache Hive和 [Corona][2],我们的客户 MapReduce 实现。Facebook还针对几个内部数据存储包括Hive继续增加其对Presto ANSI-SQL 语句的封装。我们支持其他分析类型,比如图片处理和机器学习([Apache Giraph][3])和流(例如,[Puma][4][Swift][5] 和 [Stylus][6])。
同时 Facebook 的所有产品涵盖了广泛的分析领域,为了共享我们的经验也为了从其他人学习,我们与开源社区继续相互影响。[Apache Spark][7] 于2009年在加州大学伯克利分校的 AMPLab 由Matei Zaharia 发起后来在2013年贡献给 Apache。它是目前增长最快的数据处理平台之一由于它能支持流命令式RDD声明式SQL图形和机器学习用例所有这些内置在相同的API和底层计算引擎。Spark 可以有效地利用更大量的内存,优化整个流程中的代码,并跨任务重用 JVM 以获得更好的性能。最近我们感觉 Spark 已经成熟,我们可以把他与 Hive 比较,做一些批量处理用例。文章其余的部分,我们讲述了扩展 Spark 来替代我们一个 Hive 工作量时的经验和学习到的教训。
### 用例:实体排名功能准备
在 Facebook 以多种方式使用实时实体排名。对于一些在线服务平台原始特性数值由 Hive 线下生成,然后将数据加载到实时关联查询系统。几年前建立的基于 Hive 的老式基础设施在计算上是资源密集型的并且很难维护,因为流程被划分成数百个较小的 Hive 工作。为了可以使用更加新的功能数据和提供可管理性,我们拿一个现有的流水线然后试着将其迁移至 Spark。
### 以前的 Hive 实现
基于 Hive 的管道由三个逻辑阶段组成,每个阶段对应由 entity_id 划分的数百个较小的 Hive 作业,因为每个阶段运行大型 Hive 作业不太可靠,并受到每个作业的最大任务数量的限制。
![](https://fbcdn-dragon-a.akamaihd.net/hphotos-ak-xaf1/t39.2365-6/14050196_257613611304247_245043082_n.jpg)
这三个逻辑阶段可以被总结如下:
1. 过滤出非产品功能和噪点。
2. 在每个entity_idtarget_id对上聚合。
3. 将表格分割成N个分片并通过自定义二进制文件管理每个分片以生成用于在线查询的自定义索引文件。
基于 Hive 的流程建立索引大概要三天完成。它也难于管理,因为流程包含上百个分片的工作,使监控也变得困难。没有好的方法估算流程进度或计算剩余时间。当考虑 Hive 流程的上述限制,我们决定建立一个更快更易于管理的 Spark 流程
### Spark 实现
全面的调试会很慢,有挑战,资源密集。我们转换基于 Hive 流程资源最密集的部分开始第二步。我们以一个50GB的压缩输入例子开始然后逐渐扩展到300GB1TB然后到20TB。在每次规模增长我们解决了性能和稳定性问题但是实验到20TB我们发现最大的改善机会。
运行在20TB的输入时我们发现由于大量的任务我们生成了太多输出文件每个大小在100MB左右。在10小时的作业运行时中有三分之一是将文件从阶段目录移动到HDFS中的最终目录。起初我们考虑两个选项要么改善 HDFS 中的批量重命名来支持我们的用例,或者配置 Spark 生成更少的输出文件(很难,由于在这一步有大量的任务 — 70,000 。我们退出这个问题考虑第三种方案。由于我们在流程的第二步中生成的tmp_table2表是临时的仅用于存储管道的中间输出所以我们基本上压缩序列化和复制三个副本以便将单个读取TB级数据的工作负载。相反我们更进一步移除两个临时表并整合 Hive 过程的所有三部分到一个单独的 Spark 工作读取60TB的压缩数据然后处理90TB的重新分配和排序。最终 Spark 工作如下:
![](https://fbcdn-dragon-a.akamaihd.net/hphotos-ak-xfp1/t39.2365-6/14146896_1073876729364007_1912864323_n.jpg)
### 为此工作我们如何规划 Spark
当然,为如此大的流程运行一个单独的 Spark 任务,第一次尝试没有成功,甚至是第十次尝试。据我们所知,最大的 Spark 工作真实是在重新分配数据大小上Databrick 的 Petabyte 排序是在合成数据上)。核心 Spark 基础架构和我们的应用程序进行了许多改进和优化使这个工作运行。这种努力的优势在于,许多这些改进适用于 Spark 的其他大型工作量,我们将所有工作捐献给开源 Apache Spark 项目 - 有关其他详细信息请参阅JIRA。下面我们强调的是实体排行流程之一部署到生产环境的重大改进。
### 可靠性修复
#### 处理频繁的节点重启
为了可靠地执行长时间运行的作业,我们希望系统容错并从故障中恢复(主要是由于机器重启,可能是平时的维护或软件错误的原因)。虽然 Spark 设计为容忍机器重启,我们发现在处理常见故障之前有各种错误/问题需要定位。
- 使 PipedRDD 稳健的获取失败SPARK-13793PipedRDD 以前的实现不够强大,无法获取由于节点重启而导致的故障,并且只要出现提取失败,该作业就会失败。我们在 PipedRDD 中进行了更改,优雅的处理提取失败,因此该作业可以从这些提取到的失败中恢复。
- 可配置的最大获取失败次数SPARK-13369使用这种长时间运行的作业由于机器重启引起的提取失败概率显着增加。在Spark中每个阶段的最大允许获取失败是硬编码的因此当达到最大数量时该作业将失败。我们做了一个改变使它可配置并且在这个用例将其从4增长到20通过获取失败使作业更稳健。
- 减少集群重启混乱长耗时作业应该可以在集群重启时生存所以我们不用浪费所有完成的处理。Spark 的可重新启动的分配服务功能使我们可以在节点重新启动后保留分配文件。最重要的是我们在Spark驱动程序中实现了一项功能可以暂停执行任务调度所以由于过重的任务失败导致的集群重启作业不会失败。
#### 其他的可靠性修复
- 响应迟钝的驱动程序SPARK-13279在添加任务时由于ON ^ 2操作Spark驱动程序被卡住导致该作业最终被卡住和死亡。 我们通过删除不必要的ON ^ 2操作来修复问题。
- 过多的驱动推测我们发现Spark 驱动程序在管理大量任务时花费了大量的时间推测。 在短期内我们禁止这个作业的推测。在长期我们正在努力改变Spark驱动程序以减少推测时间。
- 由于大型缓冲区的整数溢出导致的 TimSort 问题SPARK-13850我们发现 Spark 的不安全内存操作有一个漏洞,导致 TimSort 中的内存损坏。 感谢 Databricks 的人解决了这个问题,这使我们能够在大内存缓冲区中运行。
- 调整分配服务来处理大量连接:在分配阶段,我们看到许多执行程序在尝试连接分配服务时超时。 增加Netty服务器线程spark.重排.io.serverThreads和积压spark.重排.io.backLog的数量解决了这个问题。
- 修复 Spark 执行程序 OOMSPARK-13958交易制造商首先在每个主机上打包超过四个聚合任务是很困难的。Spark 执行程序内存溢出因为排序程序中存在导致无限期增长的指针数组的漏洞。当指针数组不再有可用的内存增长时我们通过强制将数据溢出到磁盘来修复问题。因此现在我们可以运行24个任务/主机,而不会内存溢出。
### 性能改进
在实施上述可靠性改进后,我们能够可靠地运行 Spark 工作。基于这一点,我们将精力转向与性能相关的项目,以充分发挥 Spark 的作用。我们使用 Spark 的指标和几个分析器来查找一些性能瓶颈。
#### 我们用来查找性能平静的工具
- Spark UI MetricsSpark UI 可以很好地了解在特定阶段花费的时间。每个任务的执行时间被分为子阶段,以便更容易地找到作业中的瓶颈。
- JstackSpark UI还在执行程序进程上提供了一个被需要的jstack函数可用于中查找热点代码。
- Spark Linux Perf / Flame Graph 支持:尽管上述两个工具非常方便,但它们并不提供同时在数百台机器上运行的作业的 CPU 分析的聚合视图。在每个作业的基础上,我们添加了支持 Perf 分析通过libperfagent的Java符号并可以自定义采样的持续时间/频率。使用我们的内部指标收集框架将概要分析样本聚合并显示为使用Flame Graph的执行程序。
### 性能优化
- 修复分拣机中的内存泄漏SPARK-14363加速30我们发现了一个问题当任务释放所有内存页时指针数组却未被释放。 因此,大量的内存未被使用,并导致频繁的溢出和程序 OOM。 我们现在进行了改变,正确地释放内存,并能够大量分类使运行更有效。 我们注意到,这一变化后 CPU 改善了30
- Snappy优化SPARK-1427710加速JNI方法 -Snappy.ArrayCopy- 在每一行被读取/写入时都会被调用。 我们提出了这个问题Snappy 的行为被改为使用非 JNI 的 System.ArrayCopy 代替。 这一改变节约了大约10的CPU。
- 减少重新分配写入延迟SPARK-5581高达50的速度提升在映射方面将重新分配数据写入磁盘时映射任务为每个分区打开并关闭相同的文件。 我们做了一个修复,以避免不必要的打开/关闭并观察到高达50的CPU改进写作大量的重新分配分区的工作。
- 解决由于获取失败导致的重复任务运行问题SPARK-14649当获取失败发生时Spark驱动程序重新提交已运行的任务导致性能下降。 我们通过避免重新运行运行的任务来解决问题,我们看到当提取失败发生时工作更加稳定。
- PipedRDD的可配置缓冲区大小SPARK-1454210速度提升在使用PipedRDD时我们发现将数据从分类器传输到管道过程的默认缓冲区大小太小我们的作业要花费超过10的时间复制数据。 我们使缓冲区大小可配置,以避免这个瓶颈。
- 用于重排提取加速的缓存索引文件SPARK-15074我们观察到重排服务经常成为瓶颈减少器花费10至15的时间等待获取地图数据。深入了解问题我们发现随机播放服务是为每个重排提取打开/关闭随机索引文件。我们进行了更改以缓存索引信息,以便我们可以避免文件打开/关闭并重新使用索引信息以便后续提取。这个变化将总重排时间减少了50
- 降低重排字节写入指标的更新频率SPARK-15569高达20的加速使用 Spark Linux Perf 集成我们发现大约20的CPU时间正在花费探测和更新写入的重排字节指标。
- 分拣机可配置的初始缓冲区大小SPARK-15958高达5的加速分拣机的默认初始缓冲区大小太小4 KB我们发现它对于大型工作负载非常小 - 所以我们浪费了大量的时间消耗缓冲区并复制内容。我们做了一个更改使缓冲区大小可配置并且缓冲区大小为64 MB我们可以避免重复的数据复制使作业的速度提高约5
- 配置任务数量由于我们的输入大小为60T每个 HDFS 块大小为256M因此我们为该作业产生了超过250,000个任务。尽管我们能够以如此多的任务来运行Spark工作但是我们发现当任务数量过高时性能会下降。我们引入了一个配置参数使地图输入大小可配置因此我们可以通过将输入分割大小设置为2 GB来将该数量减少8倍。
在所有这些可靠性和性能改进之后我们很高兴地报告我们为我们的实体排名系统之一构建和部署了一个更快更易于管理的流程并且我们提供了在Spark中运行其他类似作业的能力。
### Spark 流程与 Hive 流程性能对比
我们使用以下性能指标来比较Spark流程与Hive流程。请注意这些数字并不是Spark与Hive在查询或作业级别的直接比较而是将优化流程与灵活的计算引擎例如Spark进行比较而不是仅在查询/作业级别如Hive
CPU 时间:这是从系统角度看 CPU 使用。例如你在一个32核机器上使用50%的 CPU 10秒运行一个单进程任务然后你的 CPU 时间应该是32 * 0.5 * 10 = 160 CPU 秒。
![](https://fbcdn-dragon-a.akamaihd.net/hphotos-ak-xpt1/t39.2365-6/14146892_595234533986285_2004398348_n.jpg)
CPU预留时间这是从资源管理框架的角度来看CPU预留。例如如果我们保留32位机器10秒钟来运行作业则CPU预留时间为32 * 10 = 320 CPU秒。CPU时间与CPU预留时间的比率反映了我们如何在集群上利用预留的CPU资源。当准确时与CPU时间相比预留时间在运行相同工作负载时可以更好地比较执行引擎。例如如果一个进程需要1个CPU的时间才能运行但是必须保留100个CPU秒则该指标的效率要低于需要10个CPU秒的进程但是只能执行10个CPU秒钟来执行相同的工作量。我们还计算内存预留时间但不包括在这里因为数字类似于CPU预留时间因为在同一硬件上运行实验而在 Spark 和 Hive 的情况下我们不会将数据缓存在内存中。Spark有能力在内存中缓存数据但是由于我们的集群内存限制我们决定核心工作与Hive类似。
![](https://fbcdn-dragon-a.akamaihd.net/hphotos-ak-xfa1/t39.2365-6/14129680_325754934432503_513809233_n.jpg)
等待时间:端到端的工作流失时间。
![](https://fbcdn-dragon-a.akamaihd.net/hphotos-ak-xap1/t39.2365-6/14129681_178723715883876_1030939470_n.jpg)
### 结论和未来工作
Facebook的性能和可扩展的分析在产品开发给予了协助。Apache Spark 提供了将各种分析用例统一为单一API和高效计算引擎的独特功能。我们挑战了Spark来将一个分解成数百个Hive作业的流程替换成一个Spark工作。通过一系列的性能和可靠性改进之后我们可以将Spark扩大到处理我们在生产中的实体排名数据处理用例之一。 在这个特殊用例中我们展示了Spark可以可靠地重排和排序90 TB +中间数据并在一个工作中运行了25万个任务。 与旧的基于Hive的流程相比基于Spark的流程产生了显着的性能改进4.5-6倍CPU3-4倍资源预留和〜5倍的延迟并且已经投入使用了几个月。
虽然本文详细介绍了我们 Spark 最具挑战性的用例但越来越多的客户团队已将Spark工作负载部署到生产中。 性能可维护性和灵活性是继续推动更多用例到Spark的优势。 Facebook很高兴成为Spark开源社区的一部分并将共同开发Spark充分发挥潜力。
--------------------------------------------------------------------------------
via: https://code.facebook.com/posts/1671373793181703/apache-spark-scale-a-60-tb-production-use-case/?utm_source=dbweekly&utm_medium=email
作者:[Sital Kedia][a] [王硕杰][b] [Avery Ching][c]
译者:[wyangsun](https://github.com/wyangsun)
校对:[校对者ID](https://github.com/校对者ID)
本文由 [LCTT](https://github.com/LCTT/TranslateProject) 组织编译,[Linux中国](https://linux.cn/) 荣誉推出
[a]: https://www.facebook.com/sitalkedia
[b]: https://www.facebook.com/shuojiew
[c]: https://code.facebook.com/posts/1671373793181703/apache-spark-scale-a-60-tb-production-use-case/?utm_source=dbweekly&utm_medium=email#
[1]: https://code.facebook.com/posts/370832626374903/even-faster-data-at-the-speed-of-presto-orc/
[2]: https://www.facebook.com/notes/facebook-engineering/under-the-hood-scheduling-mapreduce-jobs-more-efficiently-with-corona/10151142560538920/
[3]: https://code.facebook.com/posts/509727595776839/scaling-apache-giraph-to-a-trillion-edges/
[4]: https://research.facebook.com/publications/realtime-data-processing-at-facebook/
[5]: https://research.facebook.com/publications/realtime-data-processing-at-facebook/
[6]: https://research.facebook.com/publications/realtime-data-processing-at-facebook/
[7]: http://spark.apache.org/