6.6 倾斜连接
并行计算中,我们总希望分配的每一个任务(task)都能以相似的粒度来切分,且完成时间相差不大。但是由于集群中的硬件和应用的类型不同、切分的数据大小不一,总会导致部分任务极大地拖慢了整个任务的完成时间。硬件不同暂且不论,下面举例说明不同应用类型的情况,如Page Rank或者Data Mining中的一些计算,它的每条记录消耗的成本不太一样,这里只讨论关于关系型运算的Join连接的数据倾斜状况。
数据倾斜原因如下。
1)业务数据本身的特性。
2)Key分布不均匀。
3)建表时考虑不周。
4)某些SQL语句本身就有数据倾斜。
数据倾斜表现如下。
任务进度长时间维持,查看任务监控页面,由于其处理的数据量与其他任务差异过大,会发现只有少量(1个或几个)任务未完成。
1.实例描述
输入:
- 表A(数据倾斜),表B
输出:
- 表C(A,B连接后的表)
2.设计思路
数据倾斜有很多解决方案,本例简要介绍一种实现方式。假设表A和表B连接,表A数据倾斜,只有一个Key倾斜。首先对A进行采样,统计出最倾斜的Key。将A表分隔为A1只有倾斜Key,A2不包含倾斜Key,然后分别与B连接。
3.代码示例
倾斜连接的代码示例如下:
- import org.apache.spark.SparkContext
- import org.apache.spark.SparkContext._
- */
- object SkewJoin {
- def main(args : Array[String]) {
- val skewedTable = left.execute()
- val spark = new SparkContext("local", "TopK",
- System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
- /*存在数据倾斜的数据表*/
- val skewTable = spark.textFile("skewTable")
- /*与skewTable连接的表*/
- val Table = spark.textFile("Table")
- /*对数据倾斜的表进行采样,假设只有一个key倾斜最严重,获取倾斜最大的key*/
- val sample = skewTable.sample(false, 0.3, 9).groupByKey().collect()
- val maxrowKey = sample.map(rows => (rows._2.size, rows._1)).maxBy(rows =>
- rows._1)._2)
- /*将倾斜的表拆分为两个RDD,一个为只含有倾斜key的表,一个为不含有倾斜key的表*/
- /*分别与原表连接*/
- val maxKeySkewedTable = skewTable.filter(row => {
- buildSideKeyGenerator(row) == maxrowKey
- })
- val mainSkewedTable = skewTable.filter(row => {
- !(buildSideKeyGenerator(row) == maxrowKey)
- })
- /*分别与原表连接*/
- val maxKeyJoinedRdd = maxKeySkewedTable.join(Table)
- val mainJoinedRdd = mainSkewedTable.join(Table)
- /*将结果合并*/
- sc.union(maxKeyJoinedRdd, mainJoinedRdd)
- }
- }
4.应用场景
在大数据分析平台中,经常遇到数据倾斜问题,读者可以参照相应的思路处理数据倾斜的处理。SQL on Hadoop系统中也需要处理数据倾斜问题。