美文网首页
Flink中串行的陷阱

Flink中串行的陷阱

作者: 〇白衣卿相〇 | 来源:发表于2020-05-23 17:58 被阅读0次

问题背景

在使用FlinkSql做topic复制程序时,遇到一个问题:
一份kafka topic数据往多个topic中发,实现一份数据复制多份的功能,但是在做性能压测的时候发现tps上不来。


image.png

问题分析

由于flinksql目前不支持针对具体的算子设置并行度,所以整个job只设置了一个整体并行度。由于flink内部的operation chain机制,会将source和3个map、3个sink都chain到一起,形成一个串行的结构
验证代码如下:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;

public class MyTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> src = env.addSource(new RichSourceFunction<String>() {
            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                while (true) {
                    ctx.collect("ttttt-123");
                }
            }

            @Override
            public void cancel() {

            }
        });
        src.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                System.out.println(Thread.currentThread().getName()+",flatmap1,"+ System.currentTimeMillis());
                out.collect(value);
                Thread.sleep(1000);
            }
        }).print();

        src.flatMap(new FlatMapFunction<String, Tuple2<String,String>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, String>> out) throws Exception {
                System.out.println(Thread.currentThread().getName()+",flatmap2,"+ System.currentTimeMillis());
                out.collect(Tuple2.of(value.split("-")[0],value.split("-")[1]));
                Thread.sleep(3000);
            }
        }).print();
        System.out.println(env.getExecutionPlan());
        env.execute();


    }
}

执行结果如下:

Legacy Source Thread - Source: Custom Source -> (Flat Map -> Sink: Print to Std. Out, Flat Map -> Sink: Print to Std. Out) (1/1),flatmap1,1590227296256
ttttt-123
Legacy Source Thread - Source: Custom Source -> (Flat Map -> Sink: Print to Std. Out, Flat Map -> Sink: Print to Std. Out) (1/1),flatmap2,1590227297258
(ttttt,123)
Legacy Source Thread - Source: Custom Source -> (Flat Map -> Sink: Print to Std. Out, Flat Map -> Sink: Print to Std. Out) (1/1),flatmap1,1590227300258
ttttt-123
Legacy Source Thread - Source: Custom Source -> (Flat Map -> Sink: Print to Std. Out, Flat Map -> Sink: Print to Std. Out) (1/1),flatmap2,1590227301258
(ttttt,123)
Legacy Source Thread - Source: Custom Source -> (Flat Map -> Sink: Print to Std. Out, Flat Map -> Sink: Print to Std. Out) (1/1),flatmap1,1590227304258
ttttt-123
Legacy Source Thread - Source: Custom Source -> (Flat Map -> Sink: Print to Std. Out, Flat Map -> Sink: Print to Std. Out) (1/1),flatmap2,1590227305258
(ttttt,123)
Legacy Source Thread - Source: Custom Source -> (Flat Map -> Sink: Print to Std. Out, Flat Map -> Sink: Print to Std. Out) (1/1),flatmap1,1590227308258
ttttt-123
Legacy Source Thread - Source: Custom Source -> (Flat Map -> Sink: Print to Std. Out, Flat Map -> Sink: Print to Std. Out) (1/1),flatmap2,1590227309258
(ttttt,123)

从线程名称可以看出是一个线程在执行整个拓扑,时间也反映了执行了flatmap1后1s才执行flatmap2,3s后在执行flatmap1.
上面验证代码证实了我们的猜测。

解决方法

1.多用几个消费者消费topic,不同的链路用不同的source。
但是此方法违背了我们分流的初衷,并没有达到降低source kafka的压力。
2.source和map配置不同的并行度,使他们不能chain到一起。
但是目前FlinkSql的job还不支持细粒度的并行度设置。此方法只对api有效。

相关文章

  • Flink中串行的陷阱

    问题背景 在使用FlinkSql做topic复制程序时,遇到一个问题:一份kafka topic数据往多个topi...

  • Flink中的Time及Windows操作

    Flink中的Time类型 Flink中的Windows编程 Flink的watermarks Event Tim...

  • Flink之状态管理

    最近看了看Flink中state方面的知识,Flink中的state是啥?state的作用是啥?为什么Flink中...

  • GCD中的串行队列

    //串行队列 - (void)gcdDemo { // dispatch_queue_t q = dispatch...

  • 八、Kafka Connector

    Flink-kafka-connector 读写kafkaKafka中的partition机制和Flink的并行度...

  • Flink中的事件语义和watermark机制

    Flink中的事件语义和watermark机制 【[白话解析] Flink的Watermark机制】:https:...

  • GCD四大队列

    一.队列 原则:队列先进先出原则 1.串行队列 串行队列:放到该队列上的任务,在子线程中串行执行 dispatch...

  • GCD

    1、同步串行队列 2、同步并行队列 3、异步串行队列 4、异步并行队列 5、死锁 主线程中创建同步串行队列 主线程...

  • iOS GCD (2)

    在iOS GCD中,几个概念, 提交的 task block 遵循 FIFO 原则。 串行 vs 并行 串行:按添...

  • flink 学习笔记 — 时间定义及窗口机制

    flink 回顾 通过之前的了解,我们知道,flink是一个高吞吐、低延时的流式处理框架。flink 中具有严格的...

网友评论

      本文标题:Flink中串行的陷阱

      本文链接:https://www.haomeiwen.com/subject/zxslahtx.html