Flink中串行的陷阱

Flink中串行的陷阱

Posted by Sun.Zhu on May 23, 2020

问题背景

在使用FlinkSql做topic复制程序时,遇到一个问题: 一份kafka topic数据往多个topic中发,实现一份数据复制多份的功能,但是在做性能压测的时候发现tps上不来。 image.png

问题分析

由于flinksql目前不支持针对具体的算子设置并行度,所以整个job只设置了一个整体并行度。由于flink内部的operation chain机制,会将source和3个map、3个sink都chain到一起,形成一个串行的结构 验证代码如下:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;

public class MyTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> src = env.addSource(new RichSourceFunction<String>() {
            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                while (true) {
                    ctx.collect("ttttt-123");
                }
            }

            @Override
            public void cancel() {

            }
        });
        src.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                System.out.println(Thread.currentThread().getName()+",flatmap1,"+ System.currentTimeMillis());
                out.collect(value);
                Thread.sleep(1000);
            }
        }).print();

        src.flatMap(new FlatMapFunction<String, Tuple2<String,String>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, String>> out) throws Exception {
                System.out.println(Thread.currentThread().getName()+",flatmap2,"+ System.currentTimeMillis());
                out.collect(Tuple2.of(value.split("-")[0],value.split("-")[1]));
                Thread.sleep(3000);
            }
        }).print();
        System.out.println(env.getExecutionPlan());
        env.execute();


    }
}

执行结果如下:

Legacy Source Thread - Source: Custom Source -> (Flat Map -> Sink: Print to Std. Out, Flat Map -> Sink: Print to Std. Out) (1/1),flatmap1,1590227296256
ttttt-123
Legacy Source Thread - Source: Custom Source -> (Flat Map -> Sink: Print to Std. Out, Flat Map -> Sink: Print to Std. Out) (1/1),flatmap2,1590227297258
(ttttt,123)
Legacy Source Thread - Source: Custom Source -> (Flat Map -> Sink: Print to Std. Out, Flat Map -> Sink: Print to Std. Out) (1/1),flatmap1,1590227300258
ttttt-123
Legacy Source Thread - Source: Custom Source -> (Flat Map -> Sink: Print to Std. Out, Flat Map -> Sink: Print to Std. Out) (1/1),flatmap2,1590227301258
(ttttt,123)
Legacy Source Thread - Source: Custom Source -> (Flat Map -> Sink: Print to Std. Out, Flat Map -> Sink: Print to Std. Out) (1/1),flatmap1,1590227304258
ttttt-123
Legacy Source Thread - Source: Custom Source -> (Flat Map -> Sink: Print to Std. Out, Flat Map -> Sink: Print to Std. Out) (1/1),flatmap2,1590227305258
(ttttt,123)
Legacy Source Thread - Source: Custom Source -> (Flat Map -> Sink: Print to Std. Out, Flat Map -> Sink: Print to Std. Out) (1/1),flatmap1,1590227308258
ttttt-123
Legacy Source Thread - Source: Custom Source -> (Flat Map -> Sink: Print to Std. Out, Flat Map -> Sink: Print to Std. Out) (1/1),flatmap2,1590227309258
(ttttt,123)

从线程名称可以看出是一个线程在执行整个拓扑,时间也反映了执行了flatmap1后1s才执行flatmap2,3s后在执行flatmap1. 上面验证代码证实了我们的猜测。

解决方法

1.多用几个消费者消费topic,不同的链路用不同的source。 但是此方法违背了我们分流的初衷,并没有达到降低source kafka的压力。 2.source和map配置不同的并行度,使他们不能chain到一起。 但是目前FlinkSql的job还不支持细粒度的并行度设置。此方法只对api有效。