背景
最近工作中需要自定义开发一些flink sql的connector,因为官方提供的connector毕竟有限,在我们工作中可能会用到各种各样的中间件。所以官方没有提供的就需要我们自定义开发。 就是如:
CREATE TABLE XXX(
A STRING,
B BIGINT)
WITH(
'connect.type' = 'kafka',
...
)
所以开发一个自己的connector需要做哪些,本文就来总结一下开发的主要步骤,以及我遇到的问题怎么解决的。
开发
-
自定义Factory,根据需要实现StreamTableSourceFactory和StreamTableSinkFactory
-
根据需要继承ConnectorDescriptorValidator,定义自己的connector参数(with 后面跟的那些)
-
Factory中的requiredContext、supportedProperties都比较重要,框架中对Factory的过滤和检查需要他们
-
需要自定义个TableSink,根据你需要连接的中间件选择是AppendStreamTableSink、Upsert、Retract,并重写consumeDataStream方法
-
自定义一个SinkFunction,在invoke方法中实现将数据写入到外部中间件。
以上5步基本上可以写一个简单的sql-connector了
问题
-
异常信息:
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSinkFactory' in the classpath. Reason: Required context properties mismatch. The following properties are requested: connector.address=localhost:9091 connector.job=testJob connector.metrics=testMetrics connector.type=xxxx schema.0.data-type=ROW<`val` DOUBLE> schema.0.name=value
有两个问题都导致上面的报错
1) discoverFactory时找不到我自定义的Factory
解决方法:
添加如下目录及文件
2) 根据properties过滤时过滤掉了我的Factory,代码在TableFactoryService#filterBySupportedProperties
解决方法:在自定义Factory#supportedProperties方法中添加schema的配置
// schema
properties.add(SCHEMA + ".#." + SCHEMA_DATA_TYPE);
properties.add(SCHEMA + ".#." + SCHEMA_NAME);
2.序列化问题
异常信息:
xxx类不能被序列化
原因:开始在我的sinkFunction中有个TableSchema属性,该属性不能被序列化,TableSchema我是用来获取字段信息的,后来直接改成了fieldName数组,从TableSchema.getFieldNames()获取。
改完后又报了我使用的Util类不能序列化,我把util实现了Serializable接口解决
3.sink类型不匹配问题
异常如下:
org.apache.flink.table.api.TableException: The StreamTableSink#consumeDataStream(DataStream) must be implemented and return the sink transformation DataStreamSink. However, org.apache.flink.connector.prometheus.xxxTableSink doesn't implement this method.
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:142)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
at PrometheusConnectorTest.test(PrometheusConnectorTest.java:13)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
但是我xxxTableSink实现的是AppendStreamTableSink,AppendStreamTableSink继承了StreamTableSink
解决方法:
实现一个emitDataStream空方法,重写consumeDataStream即可
注意:flink 1.11版本emitDataStream方法已被移除
@Override
public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
return dataStream
.addSink(new xxxSinkFunction())
.setParallelism(dataStream.getParallelism())
.name(TableConnectorUtils.generateRuntimeName(this.getClass(), getFieldNames()));
}
暂时还没涉及到TableFormat的自定义,如果后面涉及到这方面的开发再来分享
关于调试
代码开发完后,整个编译一下flink的代码(如果之前编译过,那么只需要编译你的新工程即可)。 有两种方法进行测试
- 整个编译完后会在flink-dist下生成flink的运行包,并把你的connector包放到lib目录下,通过bin/start-cluster.sh启动单机环境,再通过sql client来测试。如果代码改动后只需要把你的工程重新打包即可。
- idea里面进行调试(比较推荐这种方式),可以一行行的debug。
如果对你帮助,帮忙点个star。