ARTS - 2019 Week 6-4¶
20190623~20190629
Algorithm¶
Review¶
Cosco: An Efficient Facebook-Scale Shuffle Service¶
Cosco is an efficient shuffle-as-a-service that powers Spark (and Hive) jobs at Facebook warehouse scale. It is implemented as a scalable, reliable and maintainable distributed system. Cosco is based on the idea of partial in-memory aggregation across a shared pool of distributed memory. This provides vastly improved efficiency in disk usage compared to Spark’s built-in shuffle. Long term, we believe the Cosco architecture will be key to efficiently supporting jobs at ever larger scale. In this talk we’ll take a deep dive into the Cosco architecture and describe how it’s deployed at Facebook. We will then describe how it’s integrated to run shuffle for Spark, and contrast it with Spark’s built-in sort-based shuffle mechanism and SOS (presented at Spark+AI Summit 2018).
【PDF】Cosco: An Efficient Facebook-Scale Shuffle Service
Review¶
背景
存储计算分离,好处:硬件、优化、管理、配置
- 计算:CPU、GPU、RAM
- 存储:Disk、持久存储、临时存储
问题:磁盘服务时间、吞吐;写放大
Cosco介绍
- 对于每个 Reducer 的分区,Mapper 共享一个预写缓冲区
- Reducer 可以顺序读取写入的数据
- IO吞吐:顺序读,增大IO大小
- 写放大:避免 Spill
问题
- Shuffle Exchange 在磁盘上处理
- 单个 Shuffle Exchange 处理能力:PB级,10W Mapper,1W Reducer
- 写放大:3倍
- IO吞吐:200k
- IO并发:Reader 并发,M*R
方案
- 预写缓冲区(Write-ahead buffers):对于每个 Reducer 的分区,Mapper 共享一个预写缓冲区,排序(可选)
- 传输(Delivery):数据 Ack与Failover(Exactly once、At least once)
- 副本(Replication):文件 Failover
- 元数据(Metadata):Mapper 提交文件,Reducer 请求文件
- 调度器(Scheduler):Shuffle 计算调度、容错
限制
- 数据容量
- 内存容量
Tip¶
Spark 常用参数¶
-
spark.executor.extraJavaOptions="-XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
Executor JVM 参数
-
spark.executor.cores=2
Executor 核数
-
spark.executor.memory=4g
Executor 内存大小
-
spark.yarn.executor.memoryOverhead=1g
Executor 堆外内存大小
-
spark.driver.extraJavaOptions="-XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
Driver JVM 参数
-
spark.driver.cores=2
Driver 核数
-
spark.driver.memory=4g
Driver 内存大小
-
spark.yarn.driver.memoryOverhead=1g
Driver 堆外内存大小
-
spark.speculation=true
开启推测执行
-
spark.speculation.quantile=0.75
进行推测执行的任务完成最小比例
-
spark.speculation.multiplier=1.5
满足最小完成比例条件下,进行推测执行的任务执行时间大于完成任务执行时间中位数的最小倍数
-
spark.memory.fraction=0.6
执行和存储内存占堆内存的比例,增加可能导致OOM;300MB保留内存,剩下为用户内存(1.0 - spark.memory.fraction) * (SYSTEM_MEMORY - RESERVED_MEMORY)
-
spark.memory.storageFraction=0.5
存储内存免于被计算驱逐的内存比例,驱逐通过LRU机制,计算内存不足可能导致spill到磁盘
-
spark.memory.offHeap.enabled=true
使用堆外内存,减少GC压力
-
spark.memory.offHeap.size=2147483648
设置堆外内存大小
-
spark.shuffle.file.buffer=32k
shuffle输出内存缓冲大小,减少磁盘IO压力
-
spark.unsafe.sorter.spill.reader.buffer.size=2097152
读取spill文件缓冲大小
-
spark.sql.autoBroadcastJoinThreshold=10m
表被广播的最大值
-
spark.sql.broadcastTimeout=300
表被广播的超时时间
-
spark.dynamicAllocation.enabled=true
开启动态资源分配
-
spark.dynamicAllocation.minExecutors=20
最少 Executor 数量
-
spark.dynamicAllocation.maxExecutors=200
最多 Executor 数量
-
spark.shuffle.service.enabled=true
开启外部 Shuffle Service
-
spark.shuffle.service.port=7337
Shuffle Service 端口
-
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2
FileOutputCommitter 使用 V2 算法,减少commitJob耗时,V1 增加了一致性保证