1、持久化错误使用
usersRDD.cache()
usersRDD.count()
usersRDD.take()
正确使用
usersRDD = usersRDD.cache()
val cachedUsersRDD = usersRDD.cache()
注意:因为spark的动态内存管理机制,在内存中存储的数据可能会丢失
2、程序中有时候会报shuffle file not found
原因:
executor的JVM进程,可能内存不是很够用了。那么此时可能就会执行GC。minor GC or full GC。总之一旦发生了JVM之后,就会导致executor内,所有的工作线程全部停止,比如BlockManager,基于netty的网络通信。
下一个stage的executor,可能是还没有停止掉的,task想要去上一个stage的task所在的exeuctor,去拉取属于自己的数据,结果由于对方正在gc,就导致拉取了半天没有拉取到。就很可能会报出,shuffle file not found。但是,可能下一个stage又重新提交了stage或task以后,再执行就没有问题了,因为可能第二次就没有碰到JVM在gc了。
解决问题:
增加shuffle reader的重试次数和等待时长
spark.shuffle.io.maxRetries 60
spark.shuffle.io.retryWait 60s
3、yarn-client 调试情况下本地机器网卡流量激增
原因:由于咱们的driver是启动在本地机器的,而且driver是全权负责所有的任务的调度的,也就是说要跟yarn集群上运行的多个executor进行频繁的通信(中间有task的启动消息、task的执行统计消息、task的运行状态、shuffle的输出结果)。
咱们来想象一下。比如你的executor有100个,stage有10个,task有1000个。每个stage运行的时候,都有1000个task提交到executor上面去运行,平均每个executor有10个task。接下来问题来了,driver要频繁地跟executor上运行的1000个task进行通信。通信消息特别多,通信的频率特别高。运行完一个stage,接着运行下一个stage,又是频繁的通信。
在整个spark运行的生命周期内,都会频繁的去进行通信和调度。所有这一切通信和调度都是从你的本地机器上发出去的,和接收到的。这是最要人命的地方。你的本地机器,很可能在30分钟内(spark作业运行的周期内),进行频繁大量的网络通信。那么此时,你的本地机器的网络通信负载是非常非常高的。会导致你的本地机器的网卡流量会激增!!!
4、执行spark sql 报错PermGen Out of Memory error log
既然是JVM的PermGen永久代内存溢出,那么就是内存不够用。咱们呢,就给yarn-cluster模式下的,driver的PermGen多设置一些。
spark-submit脚本中,加入以下配置即可:
--conf spark.driver.extraJavaOptions="-XX:PermSize=128M -XX:MaxPermSize=256M"
5、spark sql 报错jvm stack overflow
sql,有大量的or语句。比如where keywords='' or keywords='' or keywords=''
当达到or语句,有成百上千的时候,此时可能就会出现一个driver端的jvm stack overflow,JVM栈内存溢出的问题
解决办法:
优化sql语句,就只有100个or子句以内;一条一条SQL语句来执行。根据生产环境经验的测试,一条sql语句,100个or子句以内,是还可以的。通常情况下,不会报那个栈内存溢出
6、yarn资源队列不足,导致提交作业莫名其妙的失败
- YARN,发现资源不足时,你的spark作业,并没有hang在那里,等待资源的分配,而是直接打印一行fail的log,直接就fail掉了。
- YARN,发现资源不足,你的spark作业,就hang在那里。一直等待之前的spark作业执行完,等待有资源分
配给自己来执行。
7、Serializable、Serialize 序列化异常
算子函数里面,如果使用到了外部的自定义类型的变量,那么此时,就要求你的自定义类型,必须是可序列化的
如果是connect socket连接,则不支持序列化,需要在executor建立socket连接
class HbaseUtil(getConnect:()=> Connection) extends Serializable {
lazy val connect = getConnect()
}
val connection = ssc.sparkContext.broadcast(HbaseUtil())
object HbaseUtil {
def apply(): HbaseUtil = {
val f = ()=>{
UserGroupInformation.setConfiguration(conf)
val romoteUser = UserGroupInformation.createRemoteUser("hbase")
UserGroupInformation.setLoginUser(romoteUser)
val connection = ConnectionFactory.createConnection(conf)
//释放资源 在executor的JVM关闭之前,千万不要忘记
sys.addShutdownHook {
connection.close()
}
connection
}
new HbaseUtil(f)
}
}
8、在算子函数中,返回null
在返回的时候,返回一些特殊的值,不要返回null,比如“none“,然后再filter之后再coalesce防止数据倾斜
9、reduce端缓存不足引起OOM
spark.reducer.maxSizeInFlight = 48m
默认
spark.maxRemoteBlockSizeFetchToMem=Int.MaxValue - 512
2G, 这个参数,在发生数据倾斜的情况下,单个partition文件大于2g,也会oom,最好和spark.executor.memory=1g
配合使用,不要大于executor的内存,设置合理值200m
map端的task是不断的输出数据的,数据量可能是很大的。但是,其实reduce端的task,并不是等到map端task将属于自己的那份数据全部写入磁盘文件之后,再去拉取的。map端写一点数据,reduce端task就会拉取一小部分数据,立即进行后面的聚合、算子函数的应用。每次reduece能够拉取多少数据,就由buffer来决定。因为拉取过来的数据,都是先放在executor内存中val results = new LinkedBlockingQueue[SuccessFetchResult],SuccessFetchResult(BlockId(blockId), address, sizeMap(blockId), buf, remainingBlocks.isEmpty)
中的,而且拉取数据过程中会判断本地和远程数据块,然后拉去,如果拉去的block_id 块大小大于2g,则启用磁盘。去进行后续的聚合、函数的执行。
解决:
增大到96m,可以减少网络拉取次数,减少了reduce聚合次数,一定程度上可以提升性能
- Executor 端的任务并发度,多个同时运行的 Task 会共享 Executor 端的内存,使得单个 Task 可使用的内存减少。
- 无论是在 Map 还是在 Reduce 端,插入数据到内存,排序,归并都是比较都是比较占用内存的。因为有 Spill,理论上不会因为数据倾斜造成 OOM。 但是,由于对堆内对象的分配和释放是由 JVM 管理的,而 Spark 是通过采样获取已经使用的内存情况,有可能因为采样不准确而不能及时 Spill,导致OOM。
- 在 Reduce 获取数据时,由于数据倾斜,有可能造成单个 Block 的数据非常的大,默认情况下是需要有足够的内存来保存单个 Block 的数据。因此,此时极有可能因为数据倾斜造成 OOM。 可以设置 spark.maxRemoteBlockSizeFetchToMem = Int.MaxValue - 512(2g) 参数,设置这个参数以后,超过一定的阈值,会获取数据流写入磁盘,此时便可以避免因为数据倾斜造成 OOM 的情况。在我们的生产环境中也验证了这点,在设置这个参数到合理的阈值后(200m),生产环境任务 OOM 的情况大大减少了。
- 在 Reduce 获取数据后,默认情况会对数据流进行解压校验(参数 spark.shuffle.detectCorrupt)。正如在代码注释中提到,由于这部分没有 Spill 到磁盘操作,也有很大的可性能会导致 OOM。在我们的生产环境中也有碰到因为检验导致 OOM 的情况。
val wrappedStreams = new ShuffleBlockFetcherIterator(
context,
blockManager.shuffleClient,
blockManager,
mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition),
serializerManager.wrapStream,
// Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024,
SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue),
SparkEnv.get.conf.get(config.REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS),
SparkEnv.get.conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM),
SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt", true))
10、Spark抛出Too large frame异常,现象:增大spark作业的并行度,反而会报错,减少作业并行度却能执行成功,但是还是存在数据倾斜
是因为Spark对每个partition所能包含的数据大小有写死的限制(约为2G),当某个partition包含超过此限制的数据时,就会抛出Too large frame Exception。
- 异常分析
public class TransportFrameDecoder extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object data) throws Exception {
ByteBuf in = (ByteBuf) data;
buffers.add(in);
totalSize += in.readableBytes();
while (!buffers.isEmpty()) {
// First, feed the interceptor, and if it's still, active, try again.
if (interceptor != null) {
ByteBuf first = buffers.getFirst();
int available = first.readableBytes();
if (feedInterceptor(first)) {
assert !first.isReadable() : "Interceptor still active but buffer has data.";
}
int read = available - first.readableBytes();
if (read == available) {
buffers.removeFirst().release();
}
totalSize -= read;
} else {
// Interceptor is not active, so try to decode one frame.
ByteBuf frame = decodeNext();
if (frame == null) {
break;
}
ctx.fireChannelRead(frame);
}
}
}
private ByteBuf decodeNext() {
long frameSize = decodeFrameSize();
if (frameSize == UNKNOWN_FRAME_SIZE || totalSize < frameSize) {
return null;
}
// Reset size for next frame.
nextFrameSize = UNKNOWN_FRAME_SIZE;
Preconditions.checkArgument(frameSize < MAX_FRAME_SIZE, "Too large frame: %s", frameSize);
Preconditions.checkArgument(frameSize > 0, "Frame length should be positive: %s", frameSize);
// If the first buffer holds the entire frame, return it.
int remaining = (int) frameSize;
if (buffers.getFirst().readableBytes() >= remaining) {
return nextBufferForFrame(remaining);
}
// Otherwise, create a composite buffer.
CompositeByteBuf frame = buffers.getFirst().alloc().compositeBuffer(Integer.MAX_VALUE);
while (remaining > 0) {
ByteBuf next = nextBufferForFrame(remaining);
remaining -= next.readableBytes();
frame.addComponent(next).writerIndex(frame.writerIndex() + next.readableBytes());
}
assert remaining == 0;
return frame;
}
private long decodeFrameSize() {
if (nextFrameSize != UNKNOWN_FRAME_SIZE || totalSize < LENGTH_SIZE) {
return nextFrameSize;
}
// We know there's enough data. If the first buffer contains all the data, great. Otherwise,
// hold the bytes for the frame length in a composite buffer until we have enough data to read
// the frame size. Normally, it should be rare to need more than one buffer to read the frame
// size.
/**消息格式
* ----------------------------------------------------------
frame size | type | header | body
----------------------------------------------------------
8 byte | 1 byte | 变长 | 变长
----------------------------------------------------------
frame size 表示后面的数据长度,至少包括 type 和 header,有些消息还会包含 body。
header 根据消息种类的不同,格式也不一样。一般 header 数据都比较小。
body 根据消息种类的不同,格式也不一样。有的消息的 body 比较大,而有些消息甚至没有 body 。
比如 RpcRequest 消息, 它的 frame size 表示 type,header 和 body 三部分的总长度。StreamResponse 消息,
它的 frame size 只表示 type 和 header 两部分的长度。而RpcFailure 消息,只有 header 部分。
Netty 处理管道
*/
ByteBuf first = buffers.getFirst();
if (first.readableBytes() >= LENGTH_SIZE) {
nextFrameSize = first.readLong() - LENGTH_SIZE;
totalSize -= LENGTH_SIZE;
if (!first.isReadable()) {
buffers.removeFirst().release();
}
return nextFrameSize;
}
while (frameLenBuf.readableBytes() < LENGTH_SIZE) {
ByteBuf next = buffers.getFirst();
int toRead = Math.min(next.readableBytes(), LENGTH_SIZE - frameLenBuf.readableBytes());
frameLenBuf.writeBytes(next, toRead);
if (!next.isReadable()) {
buffers.removeFirst().release();
}
}
nextFrameSize = frameLenBuf.readLong() - LENGTH_SIZE;
totalSize -= LENGTH_SIZE;
frameLenBuf.clear();
return nextFrameSize;
}
}
微信图片编辑_20191107150433.jpg
- 关键点:
frame size 表示后面的数据长度,至少包括 type 和 header,有些消息还会包含 body。
frameSize < MAX_FRAME_SIZE=Integer.MAX_VALUE
;, "Too large frame: %s", frameSize
也就是,通过一次netty网络传输数据量大于最高限制2g,某个partition数据太大了超过2g,数据清洗, - 建议:
1、repartition(n)或者spark.sql.shuffle.partitions =500
进行重新分区
2、最好是从业务的角度对spark sql进行优化,比如:left join 想办法拆开转为union all 这种方式
网友评论