美文网首页
spark troubleshooting 常见错误整理

spark troubleshooting 常见错误整理

作者: 邵红晓 | 来源:发表于2019-10-17 11:02 被阅读0次

    1、持久化错误使用

    usersRDD.cache()
    usersRDD.count()
    usersRDD.take()
    

    正确使用

    usersRDD = usersRDD.cache()
    val cachedUsersRDD = usersRDD.cache()
    

    注意:因为spark的动态内存管理机制,在内存中存储的数据可能会丢失
    2、程序中有时候会报shuffle file not found
    原因:
    executor的JVM进程,可能内存不是很够用了。那么此时可能就会执行GC。minor GC or full GC。总之一旦发生了JVM之后,就会导致executor内,所有的工作线程全部停止,比如BlockManager,基于netty的网络通信。
    下一个stage的executor,可能是还没有停止掉的,task想要去上一个stage的task所在的exeuctor,去拉取属于自己的数据,结果由于对方正在gc,就导致拉取了半天没有拉取到。就很可能会报出,shuffle file not found。但是,可能下一个stage又重新提交了stage或task以后,再执行就没有问题了,因为可能第二次就没有碰到JVM在gc了。
    解决问题:
    增加shuffle reader的重试次数和等待时长
    spark.shuffle.io.maxRetries 60
    spark.shuffle.io.retryWait 60s
    3、yarn-client 调试情况下本地机器网卡流量激增
    原因:由于咱们的driver是启动在本地机器的,而且driver是全权负责所有的任务的调度的,也就是说要跟yarn集群上运行的多个executor进行频繁的通信(中间有task的启动消息、task的执行统计消息、task的运行状态、shuffle的输出结果)。
    咱们来想象一下。比如你的executor有100个,stage有10个,task有1000个。每个stage运行的时候,都有1000个task提交到executor上面去运行,平均每个executor有10个task。接下来问题来了,driver要频繁地跟executor上运行的1000个task进行通信。通信消息特别多,通信的频率特别高。运行完一个stage,接着运行下一个stage,又是频繁的通信。
    在整个spark运行的生命周期内,都会频繁的去进行通信和调度。所有这一切通信和调度都是从你的本地机器上发出去的,和接收到的。这是最要人命的地方。你的本地机器,很可能在30分钟内(spark作业运行的周期内),进行频繁大量的网络通信。那么此时,你的本地机器的网络通信负载是非常非常高的。会导致你的本地机器的网卡流量会激增!!!
    4、执行spark sql 报错PermGen Out of Memory error log
    既然是JVM的PermGen永久代内存溢出,那么就是内存不够用。咱们呢,就给yarn-cluster模式下的,driver的PermGen多设置一些。
    spark-submit脚本中,加入以下配置即可:
    --conf spark.driver.extraJavaOptions="-XX:PermSize=128M -XX:MaxPermSize=256M"
    5、spark sql 报错jvm stack overflow
    sql,有大量的or语句。比如where keywords='' or keywords='' or keywords=''
    当达到or语句,有成百上千的时候,此时可能就会出现一个driver端的jvm stack overflow,JVM栈内存溢出的问题
    解决办法:
    优化sql语句,就只有100个or子句以内;一条一条SQL语句来执行。根据生产环境经验的测试,一条sql语句,100个or子句以内,是还可以的。通常情况下,不会报那个栈内存溢出

    6、yarn资源队列不足,导致提交作业莫名其妙的失败

    • YARN,发现资源不足时,你的spark作业,并没有hang在那里,等待资源的分配,而是直接打印一行fail的log,直接就fail掉了。
    • YARN,发现资源不足,你的spark作业,就hang在那里。一直等待之前的spark作业执行完,等待有资源分
      配给自己来执行。

    7、Serializable、Serialize 序列化异常
    算子函数里面,如果使用到了外部的自定义类型的变量,那么此时,就要求你的自定义类型,必须是可序列化的
    如果是connect socket连接,则不支持序列化,需要在executor建立socket连接

    class HbaseUtil(getConnect:()=> Connection) extends Serializable {
      lazy val  connect = getConnect()
    }
    val connection = ssc.sparkContext.broadcast(HbaseUtil())
    object HbaseUtil {
     def apply(): HbaseUtil = {
        val f = ()=>{
          UserGroupInformation.setConfiguration(conf)
          val romoteUser = UserGroupInformation.createRemoteUser("hbase")
          UserGroupInformation.setLoginUser(romoteUser)
          val connection = ConnectionFactory.createConnection(conf)
          //释放资源 在executor的JVM关闭之前,千万不要忘记
          sys.addShutdownHook {
            connection.close()
          }
          connection
        }
        new HbaseUtil(f)
      }
    }
    

    8、在算子函数中,返回null
    在返回的时候,返回一些特殊的值,不要返回null,比如“none“,然后再filter之后再coalesce防止数据倾斜
    9、reduce端缓存不足引起OOM
    spark.reducer.maxSizeInFlight = 48m默认
    spark.maxRemoteBlockSizeFetchToMem=Int.MaxValue - 512 2G, 这个参数,在发生数据倾斜的情况下,单个partition文件大于2g,也会oom,最好和spark.executor.memory=1g配合使用,不要大于executor的内存,设置合理值200m
    map端的task是不断的输出数据的,数据量可能是很大的。但是,其实reduce端的task,并不是等到map端task将属于自己的那份数据全部写入磁盘文件之后,再去拉取的。map端写一点数据,reduce端task就会拉取一小部分数据,立即进行后面的聚合、算子函数的应用。每次reduece能够拉取多少数据,就由buffer来决定。因为拉取过来的数据,都是先放在executor内存中val results = new LinkedBlockingQueue[SuccessFetchResult],SuccessFetchResult(BlockId(blockId), address, sizeMap(blockId), buf, remainingBlocks.isEmpty)中的,而且拉取数据过程中会判断本地和远程数据块,然后拉去,如果拉去的block_id 块大小大于2g,则启用磁盘。去进行后续的聚合、函数的执行。
    解决:
    增大到96m,可以减少网络拉取次数,减少了reduce聚合次数,一定程度上可以提升性能

    • Executor 端的任务并发度,多个同时运行的 Task 会共享 Executor 端的内存,使得单个 Task 可使用的内存减少。
    • 无论是在 Map 还是在 Reduce 端,插入数据到内存,排序,归并都是比较都是比较占用内存的。因为有 Spill,理论上不会因为数据倾斜造成 OOM。 但是,由于对堆内对象的分配和释放是由 JVM 管理的,而 Spark 是通过采样获取已经使用的内存情况,有可能因为采样不准确而不能及时 Spill,导致OOM。
    • 在 Reduce 获取数据时,由于数据倾斜,有可能造成单个 Block 的数据非常的大,默认情况下是需要有足够的内存来保存单个 Block 的数据。因此,此时极有可能因为数据倾斜造成 OOM。 可以设置 spark.maxRemoteBlockSizeFetchToMem = Int.MaxValue - 512(2g) 参数,设置这个参数以后,超过一定的阈值,会获取数据流写入磁盘,此时便可以避免因为数据倾斜造成 OOM 的情况。在我们的生产环境中也验证了这点,在设置这个参数到合理的阈值后(200m),生产环境任务 OOM 的情况大大减少了。
    • 在 Reduce 获取数据后,默认情况会对数据流进行解压校验(参数 spark.shuffle.detectCorrupt)。正如在代码注释中提到,由于这部分没有 Spill 到磁盘操作,也有很大的可性能会导致 OOM。在我们的生产环境中也有碰到因为检验导致 OOM 的情况。
    val wrappedStreams = new ShuffleBlockFetcherIterator(
          context,
          blockManager.shuffleClient,
          blockManager,
          mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition),
          serializerManager.wrapStream,
          // Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
          SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024,
          SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue),
          SparkEnv.get.conf.get(config.REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS),
          SparkEnv.get.conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM),
          SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt", true))
    

    10、Spark抛出Too large frame异常,现象:增大spark作业的并行度,反而会报错,减少作业并行度却能执行成功,但是还是存在数据倾斜
    是因为Spark对每个partition所能包含的数据大小有写死的限制(约为2G),当某个partition包含超过此限制的数据时,就会抛出Too large frame Exception。

    • 异常分析
    public class TransportFrameDecoder extends ChannelInboundHandlerAdapter {
    public void channelRead(ChannelHandlerContext ctx, Object data) throws Exception {
        ByteBuf in = (ByteBuf) data;
        buffers.add(in);
        totalSize += in.readableBytes();
    
        while (!buffers.isEmpty()) {
          // First, feed the interceptor, and if it's still, active, try again.
          if (interceptor != null) {
            ByteBuf first = buffers.getFirst();
            int available = first.readableBytes();
            if (feedInterceptor(first)) {
              assert !first.isReadable() : "Interceptor still active but buffer has data.";
            }
    
            int read = available - first.readableBytes();
            if (read == available) {
              buffers.removeFirst().release();
            }
            totalSize -= read;
          } else {
            // Interceptor is not active, so try to decode one frame.
            ByteBuf frame = decodeNext();
            if (frame == null) {
              break;
            }
            ctx.fireChannelRead(frame);
          }
        }
      }
    private ByteBuf decodeNext() {
        long frameSize = decodeFrameSize();
        if (frameSize == UNKNOWN_FRAME_SIZE || totalSize < frameSize) {
          return null;
        }
    
        // Reset size for next frame.
        nextFrameSize = UNKNOWN_FRAME_SIZE;
    
        Preconditions.checkArgument(frameSize < MAX_FRAME_SIZE, "Too large frame: %s", frameSize);
        Preconditions.checkArgument(frameSize > 0, "Frame length should be positive: %s", frameSize);
    
        // If the first buffer holds the entire frame, return it.
        int remaining = (int) frameSize;
        if (buffers.getFirst().readableBytes() >= remaining) {
          return nextBufferForFrame(remaining);
        }
    
        // Otherwise, create a composite buffer.
        CompositeByteBuf frame = buffers.getFirst().alloc().compositeBuffer(Integer.MAX_VALUE);
        while (remaining > 0) {
          ByteBuf next = nextBufferForFrame(remaining);
          remaining -= next.readableBytes();
          frame.addComponent(next).writerIndex(frame.writerIndex() + next.readableBytes());
        }
        assert remaining == 0;
        return frame;
      }
    private long decodeFrameSize() {
        if (nextFrameSize != UNKNOWN_FRAME_SIZE || totalSize < LENGTH_SIZE) {
          return nextFrameSize;
        }
    
        // We know there's enough data. If the first buffer contains all the data, great. Otherwise,
        // hold the bytes for the frame length in a composite buffer until we have enough data to read
        // the frame size. Normally, it should be rare to need more than one buffer to read the frame
        // size.
        /**消息格式
         * ----------------------------------------------------------
         frame size |   type   |  header  |     body
         ----------------------------------------------------------
         8 byte     |  1 byte  |   变长   |      变长
         ----------------------------------------------------------
         frame size 表示后面的数据长度,至少包括 type 和 header,有些消息还会包含 body。
         header 根据消息种类的不同,格式也不一样。一般 header 数据都比较小。
         body 根据消息种类的不同,格式也不一样。有的消息的 body 比较大,而有些消息甚至没有 body 。
         比如 RpcRequest 消息, 它的 frame size 表示 type,header 和 body 三部分的总长度。StreamResponse 消息,
         它的 frame size 只表示 type 和 header 两部分的长度。而RpcFailure 消息,只有 header 部分。
         Netty 处理管道
         */
        ByteBuf first = buffers.getFirst();
        if (first.readableBytes() >= LENGTH_SIZE) {
          nextFrameSize = first.readLong() - LENGTH_SIZE;
          totalSize -= LENGTH_SIZE;
          if (!first.isReadable()) {
            buffers.removeFirst().release();
          }
          return nextFrameSize;
        }
    
        while (frameLenBuf.readableBytes() < LENGTH_SIZE) {
          ByteBuf next = buffers.getFirst();
          int toRead = Math.min(next.readableBytes(), LENGTH_SIZE - frameLenBuf.readableBytes());
          frameLenBuf.writeBytes(next, toRead);
          if (!next.isReadable()) {
            buffers.removeFirst().release();
          }
        }
    
        nextFrameSize = frameLenBuf.readLong() - LENGTH_SIZE;
        totalSize -= LENGTH_SIZE;
        frameLenBuf.clear();
        return nextFrameSize;
      }
    }
    
    微信图片编辑_20191107150433.jpg
    • 关键点:
      frame size 表示后面的数据长度,至少包括 type 和 header,有些消息还会包含 body。
      frameSize < MAX_FRAME_SIZE=Integer.MAX_VALUE;, "Too large frame: %s", frameSize
      也就是,通过一次netty网络传输数据量大于最高限制2g,某个partition数据太大了超过2g,数据清洗,
    • 建议:
      1、repartition(n)或者 spark.sql.shuffle.partitions =500 进行重新分区
      2、最好是从业务的角度对spark sql进行优化,比如:left join 想办法拆开转为union all 这种方式

    https://www.iteblog.com/archives/2527.html

    相关文章

      网友评论

          本文标题:spark troubleshooting 常见错误整理

          本文链接:https://www.haomeiwen.com/subject/aanumctx.html