1、问题:每次跑到MapParititon就会停住
看起来是repartition的问题,实际出问题的是之前的mapPartition
executor表现:
出问题的代码:
val process_data = data.mapPartitions(
rs => {
val delLabelMapbc = delLabelMap.value
var res = List[String]()
for (r <- rs) {
val line = r.split("\t")
val checkDelKey = line(1) + '-' + line(6) + '-' + line(5)
if (delLabelMapbc.contains(checkDelKey)) {
val rep = r.replaceFirst("[0-9]","-1")
res = rep :: res
} else { res = r :: res}
}
res.iterator
} )
这里res对象会把一整个partition拿进内存,所以会OOM;
然后请教的大牛说driverOOM原因就两点,stage过多,或者你自己定义的大数据结构,感觉可以记住。
2、改进
换成map
val process_data = data.map(
r => {
val delLabelMapbc = delLabelMap.value
val seg = r.split("\t")
val checkDelKey = seg(1) + '-' + seg(6) + '-' + seg(5)
var rep = r
if (delLabelMapbc.contains(checkDelKey)) {
rep = r.replaceFirst("[0-9]","-1")
}
rep
}
)
就可以正常运行了