问题背景
在使用FlinkSql做topic复制程序时,遇到一个问题: 一份kafka topic数据往多个topic中发,实现一份数据复制多份的功能,但是在做性能压测的时候发现tps上不来。
问题分析
由于flinksql目前不支持针对具体的算子设置并行度,所以整个job只设置了一个整体并行度。由于flink内部的operation chain机制,会将source和3个map、3个sink都chain到一起,形成一个串行的结构 验证代码如下:
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;
public class MyTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> src = env.addSource(new RichSourceFunction<String>() {
@Override
public void run(SourceContext<String> ctx) throws Exception {
while (true) {
ctx.collect("ttttt-123");
}
}
@Override
public void cancel() {
}
});
src.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
System.out.println(Thread.currentThread().getName()+",flatmap1,"+ System.currentTimeMillis());
out.collect(value);
Thread.sleep(1000);
}
}).print();
src.flatMap(new FlatMapFunction<String, Tuple2<String,String>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, String>> out) throws Exception {
System.out.println(Thread.currentThread().getName()+",flatmap2,"+ System.currentTimeMillis());
out.collect(Tuple2.of(value.split("-")[0],value.split("-")[1]));
Thread.sleep(3000);
}
}).print();
System.out.println(env.getExecutionPlan());
env.execute();
}
}
执行结果如下:
Legacy Source Thread - Source: Custom Source -> (Flat Map -> Sink: Print to Std. Out, Flat Map -> Sink: Print to Std. Out) (1/1),flatmap1,1590227296256
ttttt-123
Legacy Source Thread - Source: Custom Source -> (Flat Map -> Sink: Print to Std. Out, Flat Map -> Sink: Print to Std. Out) (1/1),flatmap2,1590227297258
(ttttt,123)
Legacy Source Thread - Source: Custom Source -> (Flat Map -> Sink: Print to Std. Out, Flat Map -> Sink: Print to Std. Out) (1/1),flatmap1,1590227300258
ttttt-123
Legacy Source Thread - Source: Custom Source -> (Flat Map -> Sink: Print to Std. Out, Flat Map -> Sink: Print to Std. Out) (1/1),flatmap2,1590227301258
(ttttt,123)
Legacy Source Thread - Source: Custom Source -> (Flat Map -> Sink: Print to Std. Out, Flat Map -> Sink: Print to Std. Out) (1/1),flatmap1,1590227304258
ttttt-123
Legacy Source Thread - Source: Custom Source -> (Flat Map -> Sink: Print to Std. Out, Flat Map -> Sink: Print to Std. Out) (1/1),flatmap2,1590227305258
(ttttt,123)
Legacy Source Thread - Source: Custom Source -> (Flat Map -> Sink: Print to Std. Out, Flat Map -> Sink: Print to Std. Out) (1/1),flatmap1,1590227308258
ttttt-123
Legacy Source Thread - Source: Custom Source -> (Flat Map -> Sink: Print to Std. Out, Flat Map -> Sink: Print to Std. Out) (1/1),flatmap2,1590227309258
(ttttt,123)
从线程名称可以看出是一个线程在执行整个拓扑,时间也反映了执行了flatmap1后1s才执行flatmap2,3s后在执行flatmap1. 上面验证代码证实了我们的猜测。
解决方法
1.多用几个消费者消费topic,不同的链路用不同的source。 但是此方法违背了我们分流的初衷,并没有达到降低source kafka的压力。 2.source和map配置不同的并行度,使他们不能chain到一起。 但是目前FlinkSql的job还不支持细粒度的并行度设置。此方法只对api有效。