streaming job功能描述:
该streaming job 输入是kafka 输出也是kafka,输入topic 是40个分区,输出topic 是一个分区,foreachRdd 里面会执行四个job ,其中两个比较重要,一个是写入hbase,一个是写入kafka,其他两个是计数操作,这四个job是顺序执行的
写入kafka的参数如下
props.put("request.required.acks", "1")
props.put("producer.type", "async")
props.put("batch.num.messages", "200")
props.put("queue.buffering.max.ms", "1000")
props.put("topic", topic)
props.put("serializer.class", "kafka.serializer.StringEncoder")
image.png
问题如上图所示:
这是一个batch的处理时间图,四个job顺序执行,总共时间是2s(Job Duratiion 0.6s + 0.1s + 0.3 s + 1.0s )左右,但是整个batch的处理时间 是16s。由于batch 是10s,数据量5000左右.每个batch是10s
开始漫长的调试过程:
尝试1 :首先怀疑是输出的kafka分区数的问题,改成10个分区 ,没有任何效果还是16s左右
尝试2:怀疑是kafka的参数,check了一遍,发现异步async,批量的大小感觉都没问题,算了改大一下batch大小,改为500,重新提交,发现问题依旧。又改为 5000,发现问题依旧
尝试3:看来不是kafka的问题,那会不是是instance的问题,分区数是40,那么分配35个instance的话,DirectStream的方式是每个kafka的分区对应一个rdd的分区,会有一个executor去消费,所以是不是executor数太少,导致没有资源去执行输出。所以干脆直接修改instance数量为50 ,结果问题依旧。
尝试4:经过翻看kafka的send的源代码发现:异步发送的时候如果队列不满或者超时时间不到你设定的事件则会一直循环,直到shutdown 也就是close producer 才会发送队列里最后的一批数据。也就是说 这个时候你的任务会一直卡在这
// drain the queue until you get a shutdown command
Stream.continually(queue.poll(scala.math.max(0, (lastSend + queueTime) - SystemTime.milliseconds), TimeUnit.MILLISECONDS))
.takeWhile(item => if(item != null) item ne shutdownCommand else true).foreach {
currentQueueItem =>
val elapsed = (SystemTime.milliseconds - lastSend)
// check if the queue time is reached. This happens when the poll method above returns after a timeout and
// returns a null object
val expired = currentQueueItem == null
if(currentQueueItem != null) {
trace("Dequeued item for topic %s, partition key: %s, data: %s"
.format(currentQueueItem.topic, currentQueueItem.key, currentQueueItem.message))
events += currentQueueItem
}
// check if the batch size is reached
full = events.size >= batchSize
if(full || expired) {
if(expired)
debug(elapsed + " ms elapsed. Queue time reached. Sending..")
if(full)
debug("Batch full. Sending..")
// if either queue time has reached or batch size has reached, dispatch to event handler
tryToHandle(events)
lastSend = SystemTime.milliseconds
events = new ArrayBuffer[KeyedMessage[K,V]]
}
}
// send the last batch of events
tryToHandle(events)
if(queue.size > 0)
throw new IllegalQueueStateException("Invalid queue state! After queue shutdown, %d remaining items in the queue"
.format(queue.size))
}
所以修改 参数
props.put("queue.buffering.max.ms", "300")
还是没啥效果
尝试4 : props.put("queue.buffering.max.ms", "300"),instance 数量是50
也不行
尝试5:将deploy-mode 从client 改为cluster
好了,原因就是 driver 资源收集 job执行以及新启job的时候资源不足,导致执行时间较长
以上结论:
1、输出kafka的分区数对于提升job性能影响不大
2、增加实例数并不一定能解决每个batch处理的速度
3、kafka发送的逻辑超时时间一定不要设置太长,否则严重影响处理时间
网友评论