常見的數據傾斜是怎么造成的?
Shuffle的時候,將各個節點上相同的key拉取到某個節點的一個task進行處理,比如按照key進行聚合或join等操作,如果某個key對應的數據量特別大的話,就會發生數據傾斜現象。數據傾斜就成為了整個task運行時間的短板。
觸發shuffle的常見算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。
要解決數據傾斜的問題,首先要定位數據傾斜發生在什么地方。
首先是哪個stage,直接在Web UI上看就可以,一般出現傾斜都是耗時特別長的Stage,然后查看運行耗時的task,一般是其中的某幾個Task一直拖著,其他的Task早已經完成了,根據這個task,根據stage劃分原理,推算出數據傾斜發生在哪個shuffle類算子上。
如何查看發生傾斜的RDD呢?
如果是Spark RDD執行shuffle算子導致的數據傾斜,那么可以在Spark作業中加入查看key分布的代碼,比如RDD.countByKey()。然后對統計出來各個key出現的次數,collect、take到客戶端打印一下,就可以看到key的分布情況。
以下方法可以大概看出哪個key出現了傾斜:
JavaPairRDDhssData = getHssData(fs, sc, hssPath);
JavaPairRDDsample = hssData.sample(false, 0.1);
MapcountByKey = sample.countByKey();
出現傾斜的key有兩種情況:
1、某個可以出現傾斜
2、多個key出現傾斜
某個Key出現傾斜解決辦法:
通過上述方法可以知道是哪個Key出現了傾斜,所以可以先通過filter方法過濾掉傾斜的Key,把傾斜的Key和沒有傾斜的Key分開處理,由于Spark運行機制,所以單獨處理傾斜Key的時候就不會再出現傾斜現象。
上述方法只能處理特定的數據傾斜,對于實際的生產環境可能并不怎么適用,這事是解決傾斜的其中一個方法。
多個Key出現傾斜的解決辦法:
原理:在傾斜Shuffle之前給每一個Key都加上一個隨機前綴,然后再給加了前綴的Key進行一個Shuffle操作,在Shuffle操作后再把Key的前綴去掉。在這個過程中由于前綴的加入,會把傾斜的Key隨機的分配到不同的Task。然后去掉前綴從而解決數據傾斜的問題。
private static JavaPairRDDrepar(
JavaPairRDD。Cdr) {
JavaPairRDDmapToPair;
try {
mapToPair = 。Cdr
.mapToPair(new PairFunctiontuple2, String, agg() {
@Override
public Tuple2call(Tuple2t)
throws Exception {
//產生隨機前綴,隨機數大小看情況決定
long i = (long) (Math.random() * 150);
//添加隨機數前綴
return new Tuple2(i + _ + t._1, t._2);
}
}).sortByKey()//進行一個Shuffle操作打亂Key
//去掉隨機數前綴
.mapToPair(new PairFunctiontuple2, String, agg() {
@Override
public Tuple2call(Tuple2t)
throws Exception {
String str = t.1.split()[0];
return new Tuple2(str, t._2);
}
});
} catch (Exception e) {
return null;
}
return mapToPair;
}
以上是解決RDD數據傾斜簡單方法。
-
RDD
+關注
關注
0文章
7瀏覽量
7989 -
大數據
+關注
關注
64文章
8908瀏覽量
137656
發布評論請先 登錄
相關推薦
評論