6.6 倾斜连接

并行计算中,我们总希望分配的每一个任务(task)都能以相似的粒度来切分,且完成时间相差不大。但是由于集群中的硬件和应用的类型不同、切分的数据大小不一,总会导致部分任务极大地拖慢了整个任务的完成时间。硬件不同暂且不论,下面举例说明不同应用类型的情况,如Page Rank或者Data Mining中的一些计算,它的每条记录消耗的成本不太一样,这里只讨论关于关系型运算的Join连接的数据倾斜状况。

数据倾斜原因如下。

1)业务数据本身的特性。

2)Key分布不均匀。

3)建表时考虑不周。

4)某些SQL语句本身就有数据倾斜。

数据倾斜表现如下。

任务进度长时间维持,查看任务监控页面,由于其处理的数据量与其他任务差异过大,会发现只有少量(1个或几个)任务未完成。

1.实例描述

输入:


  1. A(数据倾斜),表B

输出:


  1. CAB连接后的表)

2.设计思路

数据倾斜有很多解决方案,本例简要介绍一种实现方式。假设表A和表B连接,表A数据倾斜,只有一个Key倾斜。首先对A进行采样,统计出最倾斜的Key。将A表分隔为A1只有倾斜Key,A2不包含倾斜Key,然后分别与B连接。

3.代码示例

倾斜连接的代码示例如下:


  1. import org.apache.spark.SparkContext
  2. import org.apache.spark.SparkContext._
  3. */
  4. object SkewJoin {
  5. def mainargs Array[String]) {
  6. val skewedTable = left.execute()
  7. val spark = new SparkContext"local" "TopK"
  8. System.getenv"SPARK_HOME"), SparkContext.jarOfClassthis.getClass))
  9. /*存在数据倾斜的数据表*/
  10. val skewTable = spark.textFile"skewTable"
  11. /*与skewTable连接的表*/
  12. val Table = spark.textFile"Table"
  13. /*对数据倾斜的表进行采样,假设只有一个key倾斜最严重,获取倾斜最大的key*/
  14. val sample = skewTable.samplefalse 0.3 9).groupByKey().collect()
  15. val maxrowKey = sample.maprows => rows._2.size rows._1)).maxByrows =>
  16. rows._1)._2
  17. /*将倾斜的表拆分为两个RDD,一个为只含有倾斜key的表,一个为不含有倾斜key的表*/
  18. /*分别与原表连接*/
  19. val maxKeySkewedTable = skewTable.filterrow => {
  20. buildSideKeyGeneratorrow == maxrowKey
  21. })
  22. val mainSkewedTable = skewTable.filterrow => {
  23. !(buildSideKeyGeneratorrow == maxrowKey
  24. })
  25. /*分别与原表连接*/
  26. val maxKeyJoinedRdd = maxKeySkewedTable.joinTable
  27. val mainJoinedRdd = mainSkewedTable.joinTable
  28. /*将结果合并*/
  29. sc.unionmaxKeyJoinedRdd mainJoinedRdd
  30. }
  31. }

4.应用场景

在大数据分析平台中,经常遇到数据倾斜问题,读者可以参照相应的思路处理数据倾斜的处理。SQL on Hadoop系统中也需要处理数据倾斜问题。