mirror of
https://github.com/LCTT/TranslateProject.git
synced 2024-12-26 21:30:55 +08:00
commit
c728ecb6bd
@ -1,133 +0,0 @@
|
||||
wyangsun translating
|
||||
|
||||
Apache Spark @Scale: A 60 TB+ production use case
|
||||
===========
|
||||
|
||||
Facebook often uses analytics for data-driven decision making. Over the past few years, user and product growth has pushed our analytics engines to operate on data sets in the tens of terabytes for a single query. Some of our batch analytics is executed through the venerable [Hive][1] platform (contributed to Apache Hive by Facebook in 2009) and [Corona][2], our custom MapReduce implementation. Facebook has also continued to grow its Presto footprint for ANSI-SQL queries against several internal data stores, including Hive. We support other types of analytics such as graph processing and machine learning ([Apache Giraph][3]) and streaming (e.g., [Puma][4], [Swift][5], and [Stylus][6]).
|
||||
|
||||
While the sum of Facebook's offerings covers a broad spectrum of the analytics space, we continually interact with the open source community in order to share our experiences and also learn from others. [Apache Spark][7] was started by Matei Zaharia at UC-Berkeley's AMPLab in 2009 and was later contributed to Apache in 2013. It is currently one of the fastest-growing data processing platforms, due to its ability to support streaming, batch, imperative (RDD), declarative (SQL), graph, and machine learning use cases all within the same API and underlying compute engine. Spark can efficiently leverage larger amounts of memory, optimize code across entire pipelines, and reuse JVMs across tasks for better performance. Recently, we felt Spark had matured to the point where we could compare it with Hive for a number of batch-processing use cases. In the remainder of this article, we describe our experiences and lessons learned while scaling Spark to replace one of our Hive workload
|
||||
|
||||
### Use case: Feature preparation for entity ranking
|
||||
|
||||
Real-time entity ranking is used in a variety of ways at Facebook. For some of these online serving platforms raw feature values are generated offline with Hive and data loaded into its real-time affinity query system. The old Hive-based infrastructure built years ago was computationally resource intensive and challenging to maintain because the pipeline was sharded into hundreds of smaller Hive jobs. In order to enable fresher feature data and improve manageability, we took one of the existing pipelines and tried to migrate it to Spark.
|
||||
|
||||
### Previous Hive implementation
|
||||
|
||||
The Hive-based pipeline was composed of three logical stages where each stage corresponded to hundreds of smaller Hive jobs sharded by entity_id, since running large Hive jobs for each stage was less reliable and limited by the maximum number of tasks per job.
|
||||
|
||||
|
||||
![](https://fbcdn-dragon-a.akamaihd.net/hphotos-ak-xaf1/t39.2365-6/14050196_257613611304247_245043082_n.jpg)
|
||||
|
||||
The three logical steps can be summarized as follows:
|
||||
|
||||
1. Filter out non-production features and noise.
|
||||
2. Aggregate on each (entity_id, target_id) pair.
|
||||
3. Shard the table into N number of shards and pipe each shard through a custom binary to generate a custom index file for online querying.
|
||||
|
||||
The Hive-based pipeline building the index took roughly three days to complete. It was also challenging to manage, because the pipeline contained hundreds of sharded jobs that made monitoring difficult. There was no easy way to gauge the overall progress of the pipeline or calculate an ETA. When considering the aforementioned limitations of the existing Hive pipeline, we decided to attempt to build a faster and more manageable pipeline with Spark.
|
||||
|
||||
### Spark implementation
|
||||
|
||||
Debugging at full scale can be slow, challenging, and resource intensive. We started off by converting the most resource intensive part of the Hive-based pipeline: stage two. We started with a sample of 50 GB of compressed input, then gradually scaled up to 300 GB, 1 TB, and then 20 TB. At each size increment, we resolved performance and stability issues, but experimenting with 20 TB is where we found our largest opportunity for improvement.
|
||||
|
||||
While running on 20 TB of input, we discovered that we were generating too many output files (each sized around 100 MB) due to the large number of tasks. Three out of 10 hours of job runtime were spent moving files from the staging directory to the final directory in HDFS. Initially, we considered two options: Either improve batch renaming in HDFS to support our use case, or configure Spark to generate fewer output files (difficult due to the large number of tasks — 70,000 — in this stage). We stepped back from the problem and considered a third alternative. Since the tmp_table2 table we generate in step two of the pipeline is temporary and used only to store the pipeline's intermediate output, we were essentially compressing, serializing, and replicating three copies for a single read workload with terabytes of data. Instead, we went a step further: Remove the two temporary tables and combine all three Hive stages into a single Spark job that reads 60 TB of compressed data and performs a 90 TB shuffle and sort. The final Spark job is as follows:
|
||||
|
||||
|
||||
![](https://fbcdn-dragon-a.akamaihd.net/hphotos-ak-xfp1/t39.2365-6/14146896_1073876729364007_1912864323_n.jpg)
|
||||
|
||||
### How did we scale Spark for this job?
|
||||
|
||||
Of course, running a single Spark job for such a large pipeline didn't work on the first try, or even on the 10th try. As far as we know, this is the largest real-world Spark job attempted in terms of shuffle data size (Databrick's Petabyte sort was on synthetic data). It took numerous improvements and optimizations to the core Spark infrastructure and our application to get this job to run. The upside of this effort is that many of these improvements are applicable to other large-scale workloads for Spark, and we were able to contribute all our work back into the open source Apache Spark project — see the JIRAs for additional details. Below, we highlight the major improvements that enabled one of the entity ranking pipelines to be deployed into production.
|
||||
|
||||
### Reliability fixes
|
||||
|
||||
#### Dealing with frequent node reboots
|
||||
|
||||
In order to reliably execute long-running jobs, we want the system to be fault-tolerant and recover from failures (mainly due to machine reboots that can occur due to normal maintenance or software errors). Although Spark is designed to tolerate machine reboots, we found various bugs/issues that needed to be addressed before it was robust enough to handle common failures.
|
||||
|
||||
- Make PipedRDD robust to fetch failure (SPARK-13793): The previous implementation of PipedRDD was not robust enough to fetch failures that occur due to node reboots, and the job would fail whenever there was a fetch failure. We made change in the PipedRDD to handle fetch failure gracefully so the job can recover from these types of fetch failure.
|
||||
- Configurable max number of fetch failures (SPARK-13369): With long-running jobs such as this one, probability of fetch failure due to a machine reboot increases significantly. The maximum allowed fetch failures per stage was hard-coded in Spark, and, as a result, the job used to fail when the max number was reached. We made a change to make it configurable and increased it from four to 20 for this use case, which made the job more robust against fetch failure.
|
||||
- Less disruptive cluster restart: Long-running jobs should be able to survive a cluster restart so we don't waste all the processing completed so far. Spark's restartable shuffle service feature lets us preserve the shuffle files after node restart. On top of that, we implemented a feature in Spark driver to be able to pause scheduling of tasks so the jobs don't fail due to excessive task failure due to cluster restart.
|
||||
|
||||
#### Other reliability fixes
|
||||
|
||||
- Unresponsive driver (SPARK-13279): Spark driver was stuck due to O(N^2) operations while adding tasks, resulting in the job being stuck and killed eventually. We fixed the issue by removing the unnecessary O(N^2) operations.
|
||||
- Excessive driver speculation: We discovered that the Spark driver was spending a lot of time in speculation when managing a large number of tasks. In the short term, we disabled speculation for this job. We are currently working on a change in the Spark driver to reduce speculation time in the long term.
|
||||
- TimSort issue due to integer overflow for large buffer (SPARK-13850): We found that Spark's unsafe memory operation had a bug that leads to memory corruption in TimSort. Thanks to Databricks folks for fixing this issue, which enabled us to operate on large in-memory buffer.
|
||||
- Tune the shuffle service to handle large number of connections: During the shuffle phase, we saw many executors timing out while trying to connect to the shuffle service. Increasing the number of Netty server threads (spark.shuffle.io.serverThreads) and backlog (spark.shuffle.io.backLog) resolved the issue.
|
||||
- Fix Spark executor OOM (SPARK-13958) (deal maker): It was challenging to pack more than four reduce tasks per host at first. Spark executors were running out of memory because there was bug in the sorter that caused a pointer array to grow indefinitely. We fixed the issue by forcing the data to be spilled to disk when there is no more memory available for the pointer array to grow. As a result, now we can run 24 tasks/host without running out of memory.
|
||||
|
||||
### Performance improvements
|
||||
|
||||
After implementing the reliability improvements above, we were able to reliably run the Spark job. At this point, we shifted our efforts on performance-related projects to get the most out of Spark. We used Spark's metrics and several profilers to find some of the performance bottlenecks.
|
||||
|
||||
#### Tools we used to find performance bottleneck
|
||||
|
||||
- Spark UI Metrics: Spark UI provides great insight into where time is being spent in a particular phase. Each task's execution time is split into sub-phases that make it easier to find the bottleneck in the job.
|
||||
- Jstack: Spark UI also provides an on-demand jstack function on an executor process that can be used to find hotspots in the code.
|
||||
- Spark Linux Perf/Flame Graph support: Although the two tools above are very handy, they do not provide an aggregated view of CPU profiling for the job running across hundreds of machines at the same time. On a per-job basis, we added support for enabling Perf profiling (via libperfagent for Java symbols) and can customize the duration/frequency of sampling. The profiling samples are aggregated and displayed as a Flame Graph across the executors using our internal metrics collection framework.
|
||||
|
||||
### Performance optimizations
|
||||
|
||||
- Fix memory leak in the sorter (SPARK-14363) (30 percent speed-up): We found an issue when tasks were releasing all memory pages but the pointer array was not being released. As a result, large chunks of memory were unused and caused frequent spilling and executor OOMs. Our change now releases memory properly and enabled large sorts to run efficiently. We noticed a 30 percent CPU improvement after this change.
|
||||
- Snappy optimization (SPARK-14277) (10 percent speed-up): A JNI method — (Snappy.ArrayCopy) — was being called for each row being read/written. We raised this issue, and the Snappy behavior was changed to use the non-JNI based System.ArrayCopy instead. This change alone provided around 10 percent CPU improvement.
|
||||
- Reduce shuffle write latency (SPARK-5581) (up to 50 percent speed-up): On the map side, when writing shuffle data to disk, the map task was opening and closing the same file for each partition. We made a fix to avoid unnecessary open/close and observed a CPU improvement of up to 50 percent for jobs writing a very high number of shuffle partitions.
|
||||
- Fix duplicate task run issue due to fetch failure (SPARK-14649): The Spark driver was resubmitting already running tasks when a fetch failure occurred, which led to poor performance. We fixed the issue by avoiding rerunning the running tasks, and we saw the job was more stable when fetch failures occurred.
|
||||
- Configurable buffer size for PipedRDD (SPARK-14542) (10 percent speed-up): While using a PipedRDD, we found out that the default buffer size for transferring the data from the sorter to the piped process was too small and our job was spending more than 10 percent of time in copying the data. We made the buffer size configurable to avoid this bottleneck.
|
||||
- Cache index files for shuffle fetch speed-up (SPARK-15074): We observed that the shuffle service often becomes the bottleneck, and the reducers spend 10 percent to 15 percent of time waiting to fetch map data. Digging deeper into the issue, we found out that the shuffle service is opening/closing the shuffle index file for each shuffle fetch. We made a change to cache the index information so that we can avoid file open/close and reuse the index information for subsequent fetches. This change reduced the total shuffle fetch time by 50 percent.
|
||||
- Reduce update frequency of shuffle bytes written metrics (SPARK-15569) (up to 20 percent speed-up): Using the Spark Linux Perf integration, we found that around 20 percent of the CPU time was being spent probing and updating the shuffle bytes written metrics.
|
||||
- Configurable initial buffer size for Sorter (SPARK-15958) (up to 5 percent speed-up): The default initial buffer size for the Sorter is too small (4 KB), and we found that it is very small for large workloads — and as a result we waste a significant amount of time expending the buffer and copying the contents. We made a change to make the buffer size configurable, and with large buffer size of 64 MB we could avoid significant data copying, making the job around 5 percent faster.
|
||||
- Configuring number of tasks: Since our input size is 60 T and each HDFS block size is 256 M, we were spawning more than 250,000 tasks for the job. Although we were able to run the Spark job with such a high number of tasks, we found that there is significant performance degradation when the number of tasks is too high. We introduced a configuration parameter to make the map input size configurable, so we can reduce that number by 8x by setting the input split size to 2 GB.
|
||||
|
||||
After all these reliability and performance improvements, we are pleased to report that we built and deployed a faster and more manageable pipeline for one of our entity ranking systems, and we provided the ability for other similar jobs to run in Spark.
|
||||
|
||||
### Spark pipeline vs. Hive pipeline performance comparison
|
||||
|
||||
We used the following performance metrics to compare the Spark pipeline against the Hive pipeline. Please note that these numbers aren't a direct comparison of Spark to Hive at the query or job level, but rather a comparison of building an optimized pipeline with a flexible compute engine (e.g., Spark) instead of a compute engine that operates only at the query/job level (e.g., Hive).
|
||||
|
||||
CPU time: This is the CPU usage from the perspective of the OS. For example, if you have a job that is running only one process on a 32-core machine using 50 percent of all CPU for 10 seconds, then your CPU time would be 32 * 0.5 * 10 = 160 CPU seconds.
|
||||
|
||||
![](https://fbcdn-dragon-a.akamaihd.net/hphotos-ak-xpt1/t39.2365-6/14146892_595234533986285_2004398348_n.jpg)
|
||||
|
||||
CPU reservation time: This is the CPU reservation from the perspective of the resource management framework. For example, if we reserve a 32-core machine for 10 seconds to run the job, the CPU reservation time is 32 * 10 = 320 CPU seconds. The ratio of CPU time to CPU reservation time reflects how well are we utilizing the reserved CPU resources on the cluster. When accurate, the reservation time provides a better comparison between execution engines when running the same workloads when compared with CPU time. For example, if a process requires 1 CPU second to run but must reserve 100 CPU seconds, it is less efficient by this metric than a process that requires 10 CPU seconds but reserves only 10 CPU seconds to do the same amount of work. We also compute the memory reservation time but do not include it here, since the numbers were similar to the CPU reservation time due to running experiments on the same hardware, and that in both the Spark and Hive cases we do not cache data in memory. Spark has the ability to cache data in memory, but due to our cluster memory limitations we decided to work out-of-core, similar to Hive.
|
||||
|
||||
![](https://fbcdn-dragon-a.akamaihd.net/hphotos-ak-xfa1/t39.2365-6/14129680_325754934432503_513809233_n.jpg)
|
||||
|
||||
Latency: End-to-end elapsed time of the job.
|
||||
|
||||
![](https://fbcdn-dragon-a.akamaihd.net/hphotos-ak-xap1/t39.2365-6/14129681_178723715883876_1030939470_n.jpg)
|
||||
|
||||
### Conclusion and future work
|
||||
|
||||
Facebook uses performant and scalable analytics to assist in product development. Apache Spark offers the unique ability to unify various analytics use cases into a single API and efficient compute engine. We challenged Spark to replace a pipeline that decomposed to hundreds of Hive jobs into a single Spark job. Through a series of performance and reliability improvements, we were able to scale Spark to handle one of our entity ranking data processing use cases in production. In this particular use case, we showed that Spark could reliably shuffle and sort 90 TB+ intermediate data and run 250,000 tasks in a single job. The Spark-based pipeline produced significant performance improvements (4.5-6x CPU, 3-4x resource reservation, and ~5x latency) compared with the old Hive-based pipeline, and it has been running in production for several months.
|
||||
|
||||
While this post details our most challenging use case for Spark, a growing number of customer teams have deployed Spark workloads into production. Performance, maintainability, and flexibility are the strengths that continue to drive more use cases to Spark. Facebook is excited to be a part of the Spark open source community and will work together to develop Spark toward its full potential.
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
via: https://code.facebook.com/posts/1671373793181703/apache-spark-scale-a-60-tb-production-use-case/?utm_source=dbweekly&utm_medium=email
|
||||
|
||||
作者:[Sital Kedia][a] [王硕杰][b] [Avery Ching][c]
|
||||
译者:[译者ID](https://github.com/译者ID)
|
||||
校对:[校对者ID](https://github.com/校对者ID)
|
||||
|
||||
本文由 [LCTT](https://github.com/LCTT/TranslateProject) 组织编译,[Linux中国](https://linux.cn/) 荣誉推出
|
||||
|
||||
[a]: https://www.facebook.com/sitalkedia
|
||||
[b]: https://www.facebook.com/shuojiew
|
||||
[c]: https://code.facebook.com/posts/1671373793181703/apache-spark-scale-a-60-tb-production-use-case/?utm_source=dbweekly&utm_medium=email#
|
||||
[1]: https://code.facebook.com/posts/370832626374903/even-faster-data-at-the-speed-of-presto-orc/
|
||||
[2]: https://www.facebook.com/notes/facebook-engineering/under-the-hood-scheduling-mapreduce-jobs-more-efficiently-with-corona/10151142560538920/
|
||||
[3]: https://code.facebook.com/posts/509727595776839/scaling-apache-giraph-to-a-trillion-edges/
|
||||
[4]: https://research.facebook.com/publications/realtime-data-processing-at-facebook/
|
||||
[5]: https://research.facebook.com/publications/realtime-data-processing-at-facebook/
|
||||
[6]: https://research.facebook.com/publications/realtime-data-processing-at-facebook/
|
||||
[7]: http://spark.apache.org/
|
||||
|
@ -0,0 +1,120 @@
|
||||
Apache Spark 一个60TB+规模的产品使用案例
|
||||
===========
|
||||
Facebook 经常使用数据驱动的分析方法来做决策。在过去的几年,用户和产品的增长已经推动了我们的分析工程师在数十兆兆字节数据集上执行单独的查询。我们的一些批量分析执行在可敬的 [Hive][1] 平台(在2009年 Facebook 贡献了 Apache Hive)和 [Corona][2],我们的客户 MapReduce 实现。Facebook还针对几个内部数据存储(包括Hive)继续增加其对Presto ANSI-SQL 语句的封装。我们支持其他分析类型,比如图片处理和机器学习([Apache Giraph][3])和流(例如,[Puma][4],[Swift][5] 和 [Stylus][6])。
|
||||
|
||||
同时 Facebook 的所有产品涵盖了广泛的分析领域,为了共享我们的经验也为了从其他人学习,我们与开源社区继续相互影响。[Apache Spark][7] 于2009年在加州大学伯克利分校的 AMPLab 由Matei Zaharia 发起,后来在2013年贡献给 Apache。它是目前增长最快的数据处理平台之一,由于它能支持流,批,命令式(RDD),声明式(SQL),图形和机器学习用例,所有这些内置在相同的API和底层计算引擎。Spark 可以有效地利用更大量的内存,优化整个流程中的代码,并跨任务重用 JVM 以获得更好的性能。最近我们感觉 Spark 已经成熟,我们可以把他与 Hive 比较,做一些批量处理用例。文章其余的部分,我们讲述了扩展 Spark 来替代我们一个 Hive 工作量时的经验和学习到的教训。
|
||||
|
||||
### 用例:实体排名功能准备
|
||||
|
||||
在 Facebook 以多种方式使用实时实体排名。对于一些在线服务平台原始特性数值由 Hive 线下生成,然后将数据加载到实时关联查询系统。几年前建立的基于 Hive 的老式基础设施在计算上是资源密集型的并且很难维护,因为流程被划分成数百个较小的 Hive 工作。为了可以使用更加新的功能数据和提供可管理性,我们拿一个现有的流水线然后试着将其迁移至 Spark。
|
||||
|
||||
### 以前的 Hive 实现
|
||||
|
||||
基于 Hive 的管道由三个逻辑阶段组成,每个阶段对应由 entity_id 划分的数百个较小的 Hive 作业,因为每个阶段运行大型 Hive 作业不太可靠,并受到每个作业的最大任务数量的限制。
|
||||
|
||||
![](https://fbcdn-dragon-a.akamaihd.net/hphotos-ak-xaf1/t39.2365-6/14050196_257613611304247_245043082_n.jpg)
|
||||
|
||||
这三个逻辑阶段可以被总结如下:
|
||||
|
||||
1. 过滤出非产品功能和噪点。
|
||||
2. 在每个(entity_id,target_id)对上聚合。
|
||||
3. 将表格分割成N个分片,并通过自定义二进制文件管理每个分片,以生成用于在线查询的自定义索引文件。
|
||||
|
||||
基于 Hive 的流程建立索引大概要三天完成。它也难于管理,因为流程包含上百个分片的工作,使监控也变得困难。没有好的方法估算流程进度或计算剩余时间。当考虑 Hive 流程的上述限制,我们决定建立一个更快更易于管理的 Spark 流程
|
||||
|
||||
### Spark 实现
|
||||
|
||||
全面的调试会很慢,有挑战,资源密集。我们转换基于 Hive 流程资源最密集的部分开始:第二步。我们以一个50GB的压缩输入例子开始,然后逐渐扩展到300GB,1TB,然后到20TB。在每次规模增长,我们解决了性能和稳定性问题,但是实验到20TB,我们发现最大的改善机会。
|
||||
|
||||
运行在20TB的输入时,我们发现,由于大量的任务我们生成了太多输出文件(每个大小在100MB左右)。在10小时的作业运行时中,有三分之一是将文件从阶段目录移动到HDFS中的最终目录。起初,我们考虑两个选项:要么改善 HDFS 中的批量重命名来支持我们的用例,或者配置 Spark 生成更少的输出文件(很难,由于在这一步有大量的任务 — 70,000 )。我们退出这个问题,考虑第三种方案。由于我们在流程的第二步中生成的tmp_table2表是临时的,仅用于存储管道的中间输出,所以我们基本上压缩,序列化和复制三个副本,以便将单个读取TB级数据的工作负载。相反,我们更进一步:移除两个临时表并整合 Hive 过程的所有三部分到一个单独的 Spark 工作,读取60TB的压缩数据然后处理90TB的重新分配和排序。最终 Spark 工作如下:
|
||||
|
||||
![](https://fbcdn-dragon-a.akamaihd.net/hphotos-ak-xfp1/t39.2365-6/14146896_1073876729364007_1912864323_n.jpg)
|
||||
|
||||
### 为此工作我们如何规划 Spark?
|
||||
|
||||
当然,为如此大的流程运行一个单独的 Spark 任务,第一次尝试没有成功,甚至是第十次尝试。据我们所知,最大的 Spark 工作真实是在重新分配数据大小上(Databrick 的 Petabyte 排序是在合成数据上)。核心 Spark 基础架构和我们的应用程序进行了许多改进和优化使这个工作运行。这种努力的优势在于,许多这些改进适用于 Spark 的其他大型工作量,我们将所有工作捐献给开源 Apache Spark 项目 - 有关其他详细信息,请参阅JIRA。下面,我们强调的是实体排行流程之一部署到生产环境的重大改进。
|
||||
|
||||
### 可靠性修复
|
||||
|
||||
#### 处理频繁的节点重启
|
||||
|
||||
为了可靠地执行长时间运行的作业,我们希望系统容错并从故障中恢复(主要是由于机器重启,可能是平时的维护或软件错误的原因)。虽然 Spark 设计为容忍机器重启,我们发现在处理常见故障之前有各种错误/问题需要定位。
|
||||
|
||||
- 使 PipedRDD 稳健的获取失败(SPARK-13793):PipedRDD 以前的实现不够强大,无法获取由于节点重启而导致的故障,并且只要出现提取失败,该作业就会失败。我们在 PipedRDD 中进行了更改,优雅的处理提取失败,因此该作业可以从这些提取到的失败中恢复。
|
||||
- 可配置的最大获取失败次数(SPARK-13369):使用这种长时间运行的作业,由于机器重启引起的提取失败概率显着增加。在Spark中每个阶段的最大允许获取失败是硬编码的,因此,当达到最大数量时该作业将失败。我们做了一个改变,使它可配置并且在这个用例将其从4增长到20,通过获取失败使作业更稳健。
|
||||
- 减少集群重启混乱:长耗时作业应该可以在集群重启时生存,所以我们不用浪费所有完成的处理。Spark 的可重新启动的分配服务功能使我们可以在节点重新启动后保留分配文件。最重要的是,我们在Spark驱动程序中实现了一项功能,可以暂停执行任务调度,所以由于过重的任务失败导致的集群重启,作业不会失败。
|
||||
|
||||
#### 其他的可靠性修复
|
||||
|
||||
- 响应迟钝的驱动程序(SPARK-13279):在添加任务时,由于O(N ^ 2)操作,Spark驱动程序被卡住,导致该作业最终被卡住和死亡。 我们通过删除不必要的O(N ^ 2)操作来修复问题。
|
||||
- 过多的驱动推测:我们发现,Spark 驱动程序在管理大量任务时花费了大量的时间推测。 在短期内,我们禁止这个作业的推测。在长期,我们正在努力改变Spark驱动程序,以减少推测时间。
|
||||
- 由于大型缓冲区的整数溢出导致的 TimSort 问题(SPARK-13850):我们发现 Spark 的不安全内存操作有一个漏洞,导致 TimSort 中的内存损坏。 感谢 Databricks 的人解决了这个问题,这使我们能够在大内存缓冲区中运行。
|
||||
- 调整分配服务来处理大量连接:在分配阶段,我们看到许多执行程序在尝试连接分配服务时超时。 增加Netty服务器线程(spark.重排.io.serverThreads)和积压(spark.重排.io.backLog)的数量解决了这个问题。
|
||||
- 修复 Spark 执行程序 OOM(SPARK-13958)(交易制造商):首先在每个主机上打包超过四个聚合任务是很困难的。Spark 执行程序内存溢出,因为排序程序中存在导致无限期增长的指针数组的漏洞。当指针数组不再有可用的内存增长时,我们通过强制将数据溢出到磁盘来修复问题。因此,现在我们可以运行24个任务/主机,而不会内存溢出。
|
||||
|
||||
### 性能改进
|
||||
|
||||
在实施上述可靠性改进后,我们能够可靠地运行 Spark 工作。基于这一点,我们将精力转向与性能相关的项目,以充分发挥 Spark 的作用。我们使用 Spark 的指标和几个分析器来查找一些性能瓶颈。
|
||||
|
||||
#### 我们用来查找性能平静的工具
|
||||
|
||||
- Spark UI Metrics:Spark UI 可以很好地了解在特定阶段花费的时间。每个任务的执行时间被分为子阶段,以便更容易地找到作业中的瓶颈。
|
||||
- Jstack:Spark UI还在执行程序进程上提供了一个被需要的jstack函数,可用于中查找热点代码。
|
||||
- Spark Linux Perf / Flame Graph 支持:尽管上述两个工具非常方便,但它们并不提供同时在数百台机器上运行的作业的 CPU 分析的聚合视图。在每个作业的基础上,我们添加了支持 Perf 分析(通过libperfagent的Java符号),并可以自定义采样的持续时间/频率。使用我们的内部指标收集框架,将概要分析样本聚合并显示为使用Flame Graph的执行程序。
|
||||
|
||||
### 性能优化
|
||||
|
||||
- 修复分拣机中的内存泄漏(SPARK-14363)(加速30%):我们发现了一个问题,当任务释放所有内存页时指针数组却未被释放。 因此,大量的内存未被使用,并导致频繁的溢出和程序 OOM。 我们现在进行了改变,正确地释放内存,并能够大量分类使运行更有效。 我们注意到,这一变化后 CPU 改善了30%。
|
||||
- Snappy优化(SPARK-14277)(10%加速):JNI方法 -(Snappy.ArrayCopy)- 在每一行被读取/写入时都会被调用。 我们提出了这个问题,Snappy 的行为被改为使用非 JNI 的 System.ArrayCopy 代替。 这一改变节约了大约10%的CPU。
|
||||
- 减少重新分配写入延迟(SPARK-5581)(高达50%的速度提升):在映射方面,将重新分配数据写入磁盘时,映射任务为每个分区打开并关闭相同的文件。 我们做了一个修复,以避免不必要的打开/关闭,并观察到高达50%的CPU改进写作大量的重新分配分区的工作。
|
||||
- 解决由于获取失败导致的重复任务运行问题(SPARK-14649):当获取失败发生时,Spark驱动程序重新提交已运行的任务,导致性能下降。 我们通过避免重新运行运行的任务来解决问题,我们看到当提取失败发生时工作更加稳定。
|
||||
- PipedRDD的可配置缓冲区大小(SPARK-14542)(10%速度提升):在使用PipedRDD时,我们发现将数据从分类器传输到管道过程的默认缓冲区大小太小,我们的作业要花费超过10%的时间复制数据。 我们使缓冲区大小可配置,以避免这个瓶颈。
|
||||
- 用于重排提取加速的缓存索引文件(SPARK-15074):我们观察到重排服务经常成为瓶颈,减少器花费10%至15%的时间等待获取地图数据。深入了解问题,我们发现,随机播放服务是为每个重排提取打开/关闭随机索引文件。我们进行了更改以缓存索引信息,以便我们可以避免文件打开/关闭,并重新使用索引信息以便后续提取。这个变化将总重排时间减少了50%。
|
||||
- 降低重排字节写入指标的更新频率(SPARK-15569)(高达20%的加速):使用 Spark Linux Perf 集成,我们发现大约20%的CPU时间正在花费探测和更新写入的重排字节指标。
|
||||
- 分拣机可配置的初始缓冲区大小(SPARK-15958)(高达5%的加速):分拣机的默认初始缓冲区大小太小(4 KB),我们发现它对于大型工作负载非常小 - 所以我们浪费了大量的时间消耗缓冲区并复制内容。我们做了一个更改,使缓冲区大小可配置,并且缓冲区大小为64 MB,我们可以避免重复的数据复制,使作业的速度提高约5%。
|
||||
- 配置任务数量:由于我们的输入大小为60T,每个 HDFS 块大小为256M,因此我们为该作业产生了超过250,000个任务。尽管我们能够以如此多的任务来运行Spark工作,但是我们发现,当任务数量过高时,性能会下降。我们引入了一个配置参数,使地图输入大小可配置,因此我们可以通过将输入分割大小设置为2 GB来将该数量减少8倍。
|
||||
|
||||
在所有这些可靠性和性能改进之后,我们很高兴地报告,我们为我们的实体排名系统之一构建和部署了一个更快更易于管理的流程,并且我们提供了在Spark中运行其他类似作业的能力。
|
||||
|
||||
### Spark 流程与 Hive 流程性能对比
|
||||
|
||||
我们使用以下性能指标来比较Spark流程与Hive流程。请注意,这些数字并不是Spark与Hive在查询或作业级别的直接比较,而是将优化流程与灵活的计算引擎(例如Spark)进行比较,而不是仅在查询/作业级别(如Hive)。
|
||||
|
||||
CPU 时间:这是从系统角度看 CPU 使用。例如,你在一个32核机器上使用50%的 CPU 10秒运行一个单进程任务,然后你的 CPU 时间应该是32 * 0.5 * 10 = 160 CPU 秒。
|
||||
|
||||
![](https://fbcdn-dragon-a.akamaihd.net/hphotos-ak-xpt1/t39.2365-6/14146892_595234533986285_2004398348_n.jpg)
|
||||
|
||||
CPU预留时间:这是从资源管理框架的角度来看CPU预留。例如,如果我们保留32位机器10秒钟来运行作业,则CPU预留时间为32 * 10 = 320 CPU秒。CPU时间与CPU预留时间的比率反映了我们如何在集群上利用预留的CPU资源。当准确时,与CPU时间相比,预留时间在运行相同工作负载时,可以更好地比较执行引擎。例如,如果一个进程需要1个CPU的时间才能运行,但是必须保留100个CPU秒,则该指标的效率要低于需要10个CPU秒的进程,但是只能执行10个CPU秒钟来执行相同的工作量。我们还计算内存预留时间,但不包括在这里,因为数字类似于CPU预留时间,因为在同一硬件上运行实验,而在 Spark 和 Hive 的情况下,我们不会将数据缓存在内存中。Spark有能力在内存中缓存数据,但是由于我们的集群内存限制,我们决定核心工作与Hive类似。
|
||||
|
||||
![](https://fbcdn-dragon-a.akamaihd.net/hphotos-ak-xfa1/t39.2365-6/14129680_325754934432503_513809233_n.jpg)
|
||||
|
||||
等待时间:端到端的工作流失时间。
|
||||
|
||||
![](https://fbcdn-dragon-a.akamaihd.net/hphotos-ak-xap1/t39.2365-6/14129681_178723715883876_1030939470_n.jpg)
|
||||
|
||||
### 结论和未来工作
|
||||
|
||||
Facebook的性能和可扩展的分析在产品开发给予了协助。Apache Spark 提供了将各种分析用例统一为单一API和高效计算引擎的独特功能。我们挑战了Spark,来将一个分解成数百个Hive作业的流程替换成一个Spark工作。通过一系列的性能和可靠性改进之后,我们可以将Spark扩大到处理我们在生产中的实体排名数据处理用例之一。 在这个特殊用例中,我们展示了Spark可以可靠地重排和排序90 TB +中间数据,并在一个工作中运行了25万个任务。 与旧的基于Hive的流程相比,基于Spark的流程产生了显着的性能改进(4.5-6倍CPU,3-4倍资源预留和〜5倍的延迟),并且已经投入使用了几个月。
|
||||
|
||||
虽然本文详细介绍了我们 Spark 最具挑战性的用例,但越来越多的客户团队已将Spark工作负载部署到生产中。 性能,可维护性和灵活性是继续推动更多用例到Spark的优势。 Facebook很高兴成为Spark开源社区的一部分,并将共同开发Spark充分发挥潜力。
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
via: https://code.facebook.com/posts/1671373793181703/apache-spark-scale-a-60-tb-production-use-case/?utm_source=dbweekly&utm_medium=email
|
||||
|
||||
作者:[Sital Kedia][a] [王硕杰][b] [Avery Ching][c]
|
||||
译者:[wyangsun](https://github.com/wyangsun)
|
||||
校对:[校对者ID](https://github.com/校对者ID)
|
||||
|
||||
本文由 [LCTT](https://github.com/LCTT/TranslateProject) 组织编译,[Linux中国](https://linux.cn/) 荣誉推出
|
||||
|
||||
[a]: https://www.facebook.com/sitalkedia
|
||||
[b]: https://www.facebook.com/shuojiew
|
||||
[c]: https://code.facebook.com/posts/1671373793181703/apache-spark-scale-a-60-tb-production-use-case/?utm_source=dbweekly&utm_medium=email#
|
||||
[1]: https://code.facebook.com/posts/370832626374903/even-faster-data-at-the-speed-of-presto-orc/
|
||||
[2]: https://www.facebook.com/notes/facebook-engineering/under-the-hood-scheduling-mapreduce-jobs-more-efficiently-with-corona/10151142560538920/
|
||||
[3]: https://code.facebook.com/posts/509727595776839/scaling-apache-giraph-to-a-trillion-edges/
|
||||
[4]: https://research.facebook.com/publications/realtime-data-processing-at-facebook/
|
||||
[5]: https://research.facebook.com/publications/realtime-data-processing-at-facebook/
|
||||
[6]: https://research.facebook.com/publications/realtime-data-processing-at-facebook/
|
||||
[7]: http://spark.apache.org/
|
Loading…
Reference in New Issue
Block a user