Sun.Zhu's Blog

Thinking will not overcome fear but action will.

Flink源码阅读之Window执行过程

Flink源码阅读之Window执行过程

前面文章介绍了Flink的任务执行流程,每一个operator都会有对应的Task去执行,如果程序中使用了window的话,当程序执行到window的task时就会调用WindowOperator中的实现。 public void processElement(StreamRecord<IN> element) throws Exception { //根据元素划分窗口 ...

FlinkSql之upsert模式入ES找不到类

FlinkSql之upsert模式入ES找不到类

问题现象: 利用FlinkSql用upsert模式往ES中写数据时报错,append模式却没问题。 报错如下: Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/table/typeutils/TypeCheckUtils at org.apache.flink.streaming.co...

两种方式实现Flink异步IO查询Mysql

Flink异步IO查询mysql

如官网所描述的Flink支持两种方式实现异步IO查询外部系统 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/asyncio.html 1.数据库(或key/value存储)提供支持异步请求的client。 参考代码: https://github.com/zhu...

Flink源码阅读之基于Flink1.10的任务提交流程

Flink源码阅读之基于Flink1.10的任务提交流程

Flink在1.10版本对整个作业提交流程有了较大改动,详情请见FLIP-73。本文基于1.10对作业提交的关键流程进行分析,不深究。 入口: 依旧是main函数最后env.execute(); public JobExecutionResult execute(String jobName) throws Exception { Preconditions.checkNotNull(...

Flink源码阅读之基于Flink1.10的任务执行流程

Flink源码阅读之基于Flink1.10的任务执行流程

在基于Flink1.10的任务提交流程中分析了任务的提交流程。本文基于前文基础上进行job执行流程的源码分析。 前文这里已经已经说明,执行流程就从resetAndStartScheduler()开始 private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception { //.........

记一次flink不做checkpoint的问题

记一次flink不做checkpoint的问题

问题现象:Flink UI界面查看checkpoint的metrics发现一直没有做checkpoint,仔细排查发现有部分subtask的状态是finished。 下图是测试环境复现问题 问题原因:仔细排查代码后发现source是消费kafka的数据,配置的并行度大于kafka的partition数,导致有部分subtask空闲,然后状态变为finished。后来查看了checkpoin...

Flink CheckPoint详细过程

Flink CheckPoint详细过程

Checkpoint由JM的Checkpoint Coordinator发起 第一步,Checkpoint Coordinator 向所有 source 节点 trigger Checkpoint;。 第二步,source 节点向下游广播 barrier,这个 barrier 就是实现 Chandy-Lamport 分布式快照算法的核心,下游的 task 只有收到所有 input 的...

Flink源码阅读之周期性watermark生成逻辑分析

Flink源码阅读之周期性watermark生成逻辑分析

在我们的main函数中会设置时间特性和生成水印的时间间隔 env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); env.getConfig().setAutoWatermarkInterval(200L); 不设置的话默认为 public void setStreamTimeCharacteristic(...

Kafka Producer序列化问题

Kafka Producer序列化问题

报错如下 Caused by: java.lang.ClassCastException: [B cannot be cast to java.lang.String 关键代码: Properties properties = new Properties(); properties.put("metadata.broker.list", "***"); properties.put(...

allowedlateness和watermark及窗口触发的条件

allowedlateness和watermark及窗口触发的条件

allowedlateness 时间( P)=窗口的endtime+allowedlateness ,作为窗口被释放时间。globle window的默认allowedlateness 为Long.MAX_VALUE,其他窗口默认都是0,所以如果不配置allowedlateness 的话在水印触发了窗口计算后窗口被销毁 假设窗口大小10s,水印允许延迟3s,allowedlateness ...