当先锋百科网

首页 1 2 3 4 5 6 7

   我们在写sql的时候经常发现读取数据不多,但是代码运行时间异常长的情况,这通常是发生了数据倾斜现象。数据倾斜现象本质上是因为数据中的key分布不均匀,大量的数据集中到了一台或者几台机器上计算,这些数据的计算速度远远低于平均计算速度,从而拉慢了整个计算过程速度。

本文将介绍如何通过日志分析,判断数据中的哪个key分布不均,从而导致了数据倾斜问题。

任务是否发生了倾斜

hive判断

hive运行日志

当我们在hive作业运行日志中,发现reduce任务长时间卡在99%时,即可判断任务发生了数据倾斜。

其原理是这样的:

分布式处理逻辑

分布式处理实际上是按数据中的key将数据分摊到多个机器上运行,假如出现了数据倾斜问题,如上图。可以想象,当1min过去后,我们的任务完成率只有67%,并且在接下来的9min时间内,任务完成率将持续卡在67%上。因此,当我们发现任务完成率长时间卡在99%时,即判断发生了数据倾斜。

spark判断

spark UI界面

我们进入spark UI界面,发现第2个job的运行时间长达1.8h,而其他job运行时间不超过2min,判断该job有可能发生数据倾斜。

进一步分析job,可以看到该job只存在一个stage(9)

stage界面

进一步分析stage,发现不管是duration还是shuffle的数据量,max和median都有明显的差距,可以肯定是job(5)的stage(9)发生倾斜。

hive输出也可以帮助排查

hive数据倾斜表象:Table 0 has 10000 rows for join key [0,0]
有hive任务发生数据倾斜,reduce端一直99%,有一个reduce任务卡主了。
打开这个reduce任务的log日志,发现如下日志:

[INFO] org.apache.hadoop.hive.ql.exec.JoinOperator: Table 0 has 10000 rows for join key [0,0]


打开hive源码定为输入日志行:

if (sz == nextSz) {
    LOG.info("Table {} has {} rows for join key {}", alias, sz, keyObject);
    nextSz = getNextSize(nextSz);
}


输出的类是org.apache.hadoop.hive.ql.exec.JoinOperator,是hive中join运算符的实现类,具体运行机制尚不清楚。
查询资料得知,当一个key关联了超过1000行时,会输出一条该警告日志,此后每1000会输出一条。所以这条日志的目的在于警告可能存在的Join数据倾斜的风险。
 

寻找倾斜key

当我们发现任务倾斜了,自然而然就希望找到倾斜的key,从而修复数据倾斜的现象。当然,这部分我也会分为hive和spark两个部分进行介绍。

hive识别

step1:确认是哪个Job出现了严重的倾斜问题

hive运行日志

通过搜索tracking的方式,我们发现第3个job的reduce任务一直卡在99%上,判断其发生了倾斜问题。

step2:进入相应的Tracking URL,查看SUCCESSFUL REDUCE

很明显,其他的taske都在2min之内完成,只有000000_1需要耗费1个多小时的时间完成。

另外注意,这里面需要排除一种特殊情况。有时候,某个task执行的节点可能有问题,导致任务跑的特别慢。这个时候,mapreduce的推测执行,会重启一个任务。如果新的任务在很短时间内能完成,通常则是由于task执行节点问题导致的个别task慢。如果推测执行后的task执行任务也特别慢,那更能说明该task可能会有倾斜问题。

step3:进入log日志,查看syslog

hive的syslog日志

可以从log日志中看到,该job仅仅运行了file和group操作后,就将数据写入至hive表中。那么,我们可以确认的是,该job运行的是最后一个group by操作。

step4:对照运行sql

运行sql

我们可以看到,在group by阶段,count(distinct)的出现造成了数据倾斜。

spark识别

step1:找到该任务运行的stage

spark UI界面

我们看到该运行任务,可以发现第2个job运行时间长达1.8h,远大于其他job,可以判定倾斜发生在job(5)。

step2:点击SQL,查看Details for Query

Details for Query

可以从sort time total/peak memory total/spill size total看出来,左表的package_name分布不均匀,此时可以通过查看scan parquet了解具体是哪张表。

step3:对照运行sql

运行sql代码

查询package_name的分布情况

select package_name,count(1) as cnt from test1 where date=20220619 group by package_name order by cnt desc limit 10;

package_name的分布验证了我们的猜想,test1.package_name造成了数据倾斜

过滤掉倾斜数据

当少量key重复次数特别多,如果这种key不是业务需要的key,可以直接过滤掉。

比如一张埋点日志表ods.page_event_log,

需要和订单表dw.order_info_fact做join关联。

在执行Hive的过程中发现任务卡在map 100%、reduce 99%,最后的1%一直运行不完。考虑应该是在join的过程中出现了数据倾斜,下面进行排查。

对于ods.page_event_log表查看出现次数最多的key:

select cookieid,
           count(*) as num
    from ods.page_event_log
   where data_date = "20190101"
group by cookieid
distribute by cookieid
sort by num desc limit 10

同样的,对另一张join表也做对应的排查

select cookieid,
    count(*) as num
from dw.order_info_fact
group by cookieid
distribute by cookieid
sort by num desc limit 10

从sql统计的结果可以看出,日志表和订单表通过cookieid进行join,当cookieid为0的时候,join操作将会产生142286×142286条数据,数量如此庞大的节点系统无法处理过来。同样当cookieid为NULL值和空值时也会出现这种情况,而且cookieid为这3个值时并没有实际的业务意义。因此在对两个表做关联时,排除掉这3个值以后,就可以很快计算出结果了,所以做好前期的数据清洗对一个大数据平台是至关重要的,生产无小事。

引入随机数

当我们用sql对数据group by时,MR会将相同的key拉取到同一个节点上进行聚合,如果某组的数据量很大时,会出现当前节点任务负载过重,从而导致数据倾斜。这时候可以考虑引入随机数,将原来的一个key值拆分成多组进行聚合。

比如现在需要统计用户的订单量,sql如下:

select t1.user_id,
       t2.order_num
    from (select user_id
          from dim.user_info_fact   # 用户维度表
       where data_date = "20190101"            
          and user_status_id=1
       ) t1 
  join ( select user_id,
            count(*) as order_num 
       from dw.dw_order_fact      # 订单表
       where site_id in (600, 900)
          and order_status_id in(1,2,3)
     group by user_id

        ) t2 
    on t1.user_id = t2.user_id

其中,用户维度表有2000w条数据,订单表有10亿条数据,任务在未优化前跑了一小时还没跑完,怀疑出现了数据倾斜。这里可以把key值加上一定的前缀转换成多个key,这样原本一个task上处理的key就会分发到其他多个task,然后去掉前缀再进行一次聚合得到最终结果。

优化后的sql如下: 这里把原来可能1个task执行的任务并行成了1000个随机数task做聚合,再把聚合的结果通过user_id做sum,在集群的整体性能不受影响的情况下,可以有效提高整体的计算速度。

select t1.user_id,
       t2.order_num
    from (select user_id
       from dim_user_info_fact
       where data_date = "20190101"                           
       ) t1 
  join (   select t.user_id,
            sum(t.order_num) as order_num
       from (select user_id,
                round(rand()*1000) as rnd,
                count(1) as order_num 
           from dw.order_info_fact
           where pay_status in (1,3)
           group by user_id,round(rand()*1000)
             ) t 
         group by t.user_id
    ) t2 
  on t1.user_id = t2.user_id

还有一种可能 

可能仅仅是因为你给的资源太少了 ,适当增加map和reduce的内存和个数,以及小文件合并之类的