17 KiB
Apache Spark 一个60TB+规模的产品使用案例
Facebook 经常使用数据驱动的分析方法来做决策。在过去的几年,用户和产品的增长已经推动了我们的分析工程师在数十兆兆字节数据集上执行单独的查询。我们的一些批量分析执行在可敬的 Hive 平台(在2009年 Facebook 贡献了 Apache Hive)和 Corona,我们的客户 MapReduce 实现。Facebook还针对几个内部数据存储(包括Hive)继续增加其对Presto ANSI-SQL 语句的封装。我们支持其他分析类型,比如图片处理和机器学习(Apache Giraph)和流(例如,Puma,Swift 和 Stylus)。
同时 Facebook 的所有产品涵盖了广泛的分析领域,为了共享我们的经验也为了从其他人学习,我们与开源社区继续相互影响。Apache Spark 于2009年在加州大学伯克利分校的 AMPLab 由Matei Zaharia 发起,后来在2013年贡献给 Apache。它是目前增长最快的数据处理平台之一,由于它能支持流,批,命令式(RDD),声明式(SQL),图形和机器学习用例,所有这些内置在相同的API和底层计算引擎。Spark 可以有效地利用更大量的内存,优化整个流程中的代码,并跨任务重用 JVM 以获得更好的性能。最近我们感觉 Spark 已经成熟,我们可以把他与 Hive 比较,做一些批量处理用例。文章其余的部分,我们讲述了扩展 Spark 来替代我们一个 Hive 工作量时的经验和学习到的教训。
用例:实体排名功能准备
在 Facebook 以多种方式使用实时实体排名。对于一些在线服务平台原始特性数值由 Hive 线下生成,然后将数据加载到实时关联查询系统。几年前建立的基于 Hive 的老式基础设施在计算上是资源密集型的并且很难维护,因为流程被划分成数百个较小的 Hive 工作。为了可以使用更加新的功能数据和提供可管理性,我们拿一个现有的流水线然后试着将其迁移至 Spark。
以前的 Hive 实现
基于 Hive 的管道由三个逻辑阶段组成,每个阶段对应由 entity_id 划分的数百个较小的 Hive 作业,因为每个阶段运行大型 Hive 作业不太可靠,并受到每个作业的最大任务数量的限制。
这三个逻辑阶段可以被总结如下:
- 过滤出非产品功能和噪点。
- 在每个(entity_id,target_id)对上聚合。
- 将表格分割成N个分片,并通过自定义二进制文件管理每个分片,以生成用于在线查询的自定义索引文件。
基于 Hive 的流程建立索引大概要三天完成。它也难于管理,因为流程包含上百个分片的工作,使监控也变得困难。没有好的方法估算流程进度或计算剩余时间。当考虑 Hive 流程的上述限制,我们决定建立一个更快更易于管理的 Spark 流程
Spark 实现
全面的调试会很慢,有挑战,资源密集。我们转换基于 Hive 流程资源最密集的部分开始:第二步。我们以一个50GB的压缩输入例子开始,然后逐渐扩展到300GB,1TB,然后到20TB。在每次规模增长,我们解决了性能和稳定性问题,但是实验到20TB,我们发现最大的改善机会。
运行在20TB的输入时,我们发现,由于大量的任务我们生成了太多输出文件(每个大小在100MB左右)。在10小时的作业运行时中,有三分之一是将文件从阶段目录移动到HDFS中的最终目录。起初,我们考虑两个选项:要么改善 HDFS 中的批量重命名来支持我们的用例,或者配置 Spark 生成更少的输出文件(很难,由于在这一步有大量的任务 — 70,000 )。我们退出这个问题,考虑第三种方案。由于我们在流程的第二步中生成的tmp_table2表是临时的,仅用于存储管道的中间输出,所以我们基本上压缩,序列化和复制三个副本,以便将单个读取TB级数据的工作负载。相反,我们更进一步:移除两个临时表并整合 Hive 过程的所有三部分到一个单独的 Spark 工作,读取60TB的压缩数据然后处理90TB的重新分配和排序。最终 Spark 工作如下:
为此工作我们如何规划 Spark?
当然,为如此大的流程运行一个单独的 Spark 任务,第一次尝试没有成功,甚至是第十次尝试。据我们所知,最大的 Spark 工作真实是在重新分配数据大小上(Databrick 的 Petabyte 排序是在合成数据上)。核心 Spark 基础架构和我们的应用程序进行了许多改进和优化使这个工作运行。这种努力的优势在于,许多这些改进适用于 Spark 的其他大型工作量,我们将所有工作捐献给开源 Apache Spark 项目 - 有关其他详细信息,请参阅JIRA。下面,我们强调的是实体排行流程之一部署到生产环境的重大改进。
可靠性修复
处理频繁的节点重启
为了可靠地执行长时间运行的作业,我们希望系统容错并从故障中恢复(主要是由于机器重启,可能是平时的维护或软件错误的原因)。虽然 Spark 设计为容忍机器重启,我们发现在处理常见故障之前有各种错误/问题需要定位。
- 使 PipedRDD 稳健的获取失败(SPARK-13793):PipedRDD 以前的实现不够强大,无法获取由于节点重启而导致的故障,并且只要出现提取失败,该作业就会失败。我们在 PipedRDD 中进行了更改,优雅的处理提取失败,因此该作业可以从这些提取到的失败中恢复。
- 可配置的最大获取失败次数(SPARK-13369):使用这种长时间运行的作业,由于机器重启引起的提取失败概率显着增加。在Spark中每个阶段的最大允许获取失败是硬编码的,因此,当达到最大数量时该作业将失败。我们做了一个改变,使它可配置并且在这个用例将其从4增长到20,通过获取失败使作业更稳健。
- 减少集群重启混乱:长耗时作业应该可以在集群重启时生存,所以我们不用浪费所有完成的处理。Spark 的可重新启动的分配服务功能使我们可以在节点重新启动后保留分配文件。最重要的是,我们在Spark驱动程序中实现了一项功能,可以暂停执行任务调度,所以由于过重的任务失败导致的集群重启,作业不会失败。
其他的可靠性修复
- 响应迟钝的驱动程序(SPARK-13279):在添加任务时,由于O(N ^ 2)操作,Spark驱动程序被卡住,导致该作业最终被卡住和死亡。 我们通过删除不必要的O(N ^ 2)操作来修复问题。
- 过多的驱动推测:我们发现,Spark 驱动程序在管理大量任务时花费了大量的时间推测。 在短期内,我们禁止这个作业的推测。在长期,我们正在努力改变Spark驱动程序,以减少推测时间。
- 由于大型缓冲区的整数溢出导致的 TimSort 问题(SPARK-13850):我们发现 Spark 的不安全内存操作有一个漏洞,导致 TimSort 中的内存损坏。 感谢 Databricks 的人解决了这个问题,这使我们能够在大内存缓冲区中运行。
- 调整分配服务来处理大量连接:在分配阶段,我们看到许多执行程序在尝试连接分配服务时超时。 增加Netty服务器线程(spark.重排.io.serverThreads)和积压(spark.重排.io.backLog)的数量解决了这个问题。
- 修复 Spark 执行程序 OOM(SPARK-13958)(交易制造商):首先在每个主机上打包超过四个聚合任务是很困难的。Spark 执行程序内存溢出,因为排序程序中存在导致无限期增长的指针数组的漏洞。当指针数组不再有可用的内存增长时,我们通过强制将数据溢出到磁盘来修复问题。因此,现在我们可以运行24个任务/主机,而不会内存溢出。
性能改进
在实施上述可靠性改进后,我们能够可靠地运行 Spark 工作。基于这一点,我们将精力转向与性能相关的项目,以充分发挥 Spark 的作用。我们使用 Spark 的指标和几个分析器来查找一些性能瓶颈。
我们用来查找性能平静的工具
- Spark UI Metrics:Spark UI 可以很好地了解在特定阶段花费的时间。每个任务的执行时间被分为子阶段,以便更容易地找到作业中的瓶颈。
- Jstack:Spark UI还在执行程序进程上提供了一个被需要的jstack函数,可用于中查找热点代码。
- Spark Linux Perf / Flame Graph 支持:尽管上述两个工具非常方便,但它们并不提供同时在数百台机器上运行的作业的 CPU 分析的聚合视图。在每个作业的基础上,我们添加了支持 Perf 分析(通过libperfagent的Java符号),并可以自定义采样的持续时间/频率。使用我们的内部指标收集框架,将概要分析样本聚合并显示为使用Flame Graph的执行程序。
性能优化
- 修复分拣机中的内存泄漏(SPARK-14363)(加速30%):我们发现了一个问题,当任务释放所有内存页时指针数组却未被释放。 因此,大量的内存未被使用,并导致频繁的溢出和程序 OOM。 我们现在进行了改变,正确地释放内存,并能够大量分类使运行更有效。 我们注意到,这一变化后 CPU 改善了30%。
- Snappy优化(SPARK-14277)(10%加速):JNI方法 -(Snappy.ArrayCopy)- 在每一行被读取/写入时都会被调用。 我们提出了这个问题,Snappy 的行为被改为使用非 JNI 的 System.ArrayCopy 代替。 这一改变节约了大约10%的CPU。
- 减少重新分配写入延迟(SPARK-5581)(高达50%的速度提升):在映射方面,将重新分配数据写入磁盘时,映射任务为每个分区打开并关闭相同的文件。 我们做了一个修复,以避免不必要的打开/关闭,并观察到高达50%的CPU改进写作大量的重新分配分区的工作。
- 解决由于获取失败导致的重复任务运行问题(SPARK-14649):当获取失败发生时,Spark驱动程序重新提交已运行的任务,导致性能下降。 我们通过避免重新运行运行的任务来解决问题,我们看到当提取失败发生时工作更加稳定。
- PipedRDD的可配置缓冲区大小(SPARK-14542)(10%速度提升):在使用PipedRDD时,我们发现将数据从分类器传输到管道过程的默认缓冲区大小太小,我们的作业要花费超过10%的时间复制数据。 我们使缓冲区大小可配置,以避免这个瓶颈。
- 用于重排提取加速的缓存索引文件(SPARK-15074):我们观察到重排服务经常成为瓶颈,减少器花费10%至15%的时间等待获取地图数据。深入了解问题,我们发现,随机播放服务是为每个重排提取打开/关闭随机索引文件。我们进行了更改以缓存索引信息,以便我们可以避免文件打开/关闭,并重新使用索引信息以便后续提取。这个变化将总重排时间减少了50%。
- 降低重排字节写入指标的更新频率(SPARK-15569)(高达20%的加速):使用 Spark Linux Perf 集成,我们发现大约20%的CPU时间正在花费探测和更新写入的重排字节指标。
- 分拣机可配置的初始缓冲区大小(SPARK-15958)(高达5%的加速):分拣机的默认初始缓冲区大小太小(4 KB),我们发现它对于大型工作负载非常小 - 所以我们浪费了大量的时间消耗缓冲区并复制内容。我们做了一个更改,使缓冲区大小可配置,并且缓冲区大小为64 MB,我们可以避免重复的数据复制,使作业的速度提高约5%。
- 配置任务数量:由于我们的输入大小为60T,每个 HDFS 块大小为256M,因此我们为该作业产生了超过250,000个任务。尽管我们能够以如此多的任务来运行Spark工作,但是我们发现,当任务数量过高时,性能会下降。我们引入了一个配置参数,使地图输入大小可配置,因此我们可以通过将输入分割大小设置为2 GB来将该数量减少8倍。
在所有这些可靠性和性能改进之后,我们很高兴地报告,我们为我们的实体排名系统之一构建和部署了一个更快更易于管理的流程,并且我们提供了在Spark中运行其他类似作业的能力。
Spark 流程与 Hive 流程性能对比
我们使用以下性能指标来比较Spark流程与Hive流程。请注意,这些数字并不是Spark与Hive在查询或作业级别的直接比较,而是将优化流程与灵活的计算引擎(例如Spark)进行比较,而不是仅在查询/作业级别(如Hive)。
CPU 时间:这是从系统角度看 CPU 使用。例如,你在一个32核机器上使用50%的 CPU 10秒运行一个单进程任务,然后你的 CPU 时间应该是32 * 0.5 * 10 = 160 CPU 秒。
CPU预留时间:这是从资源管理框架的角度来看CPU预留。例如,如果我们保留32位机器10秒钟来运行作业,则CPU预留时间为32 * 10 = 320 CPU秒。CPU时间与CPU预留时间的比率反映了我们如何在集群上利用预留的CPU资源。当准确时,与CPU时间相比,预留时间在运行相同工作负载时,可以更好地比较执行引擎。例如,如果一个进程需要1个CPU的时间才能运行,但是必须保留100个CPU秒,则该指标的效率要低于需要10个CPU秒的进程,但是只能执行10个CPU秒钟来执行相同的工作量。我们还计算内存预留时间,但不包括在这里,因为数字类似于CPU预留时间,因为在同一硬件上运行实验,而在 Spark 和 Hive 的情况下,我们不会将数据缓存在内存中。Spark有能力在内存中缓存数据,但是由于我们的集群内存限制,我们决定核心工作与Hive类似。
等待时间:端到端的工作流失时间。
结论和未来工作
Facebook的性能和可扩展的分析在产品开发给予了协助。Apache Spark 提供了将各种分析用例统一为单一API和高效计算引擎的独特功能。我们挑战了Spark,来将一个分解成数百个Hive作业的流程替换成一个Spark工作。通过一系列的性能和可靠性改进之后,我们可以将Spark扩大到处理我们在生产中的实体排名数据处理用例之一。 在这个特殊用例中,我们展示了Spark可以可靠地重排和排序90 TB +中间数据,并在一个工作中运行了25万个任务。 与旧的基于Hive的流程相比,基于Spark的流程产生了显着的性能改进(4.5-6倍CPU,3-4倍资源预留和〜5倍的延迟),并且已经投入使用了几个月。
虽然本文详细介绍了我们 Spark 最具挑战性的用例,但越来越多的客户团队已将Spark工作负载部署到生产中。 性能,可维护性和灵活性是继续推动更多用例到Spark的优势。 Facebook很高兴成为Spark开源社区的一部分,并将共同开发Spark充分发挥潜力。
作者:Sital Kedia 王硕杰 Avery Ching 译者:wyangsun 校对:校对者ID