重新编译Flink SQL CDC支持JDK11
数据同步中,Flink SQL CDC 是低代码开发的一个很好的选择,无需后端程序员参与,就可以高性能完成从MySQL/PostgreSQL数据同步到kafka/ElaticSearch/其它第三方数据库,数据仓库的工作。
Flink SQL CDC的官方文档如下:
https://github.com/ververica/flink-cdc-connectors
按照官方文档,Flink SQL CDC 是支持JDK11的,但是按照此官方文档,下载
- flink-sql-connector-mysql-cdc-1.1.0.jar
- flink-sql-connector-postgres-cdc-1.1.0.jar
- flink-format-changelog-json-1.1.0.jar
https://github.com/ververica/flink-cdc-connectors/wiki/Downloads
,然后上传到
flink-1.12.1/lib
目录下,重启Flink后,我们按照官方文档创建同步的表
CREATE TABLE mysql_binlog_order (
order_id STRING NOT NULL,
cust_user_id STRING,
cust_cellphone STRING,
cust_name STRING,
order_source STRING,
order_date STRING,
pickup_store_code STRING,
pickup_express_number STRING,
pay_money DECIMAL(10,2),
order_status STRING,
pay_status STRING,
pay_info STRING,
tenantno STRING
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '实际IP',
'port' = '3306',
'username' = '实际数据库用户名',
'password' = '实际数据库密码',
'database-name' = 'hlh',
'table-name' = 't_order'
);
然后执行 select * from mysql_binlog_order
我们立即就会得到Unsafe.monitorEnter not supported 的错误,就是JDK 11把
sun.misc.Unsafe.monitorEnter()方法移除了,查看Flink SQL CDC的源代码,发现是
com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeConsumer 在101行和106行使用了Unsafe.monitorEnter()方法,入下图所示:
也就是说Flink SQL CDC实际是不支持JDK11的,要么你把JDK版本回退到JDK1.8,要么你怎么想办法去掉Unsafe.monitorEnter()方法,好在有人也给Flink SQL CDC小组提了这个Bug
https://github.com/ververica/flink-cdc-connectors/issues/10
然后一个叫balint133的小哥哥也fixed 了这个Bug,如下图所示:
不知道为什么,该代码至今未合并到Flink SQL CDC 的Master仓库中,所以你下载Flink SQL CDC的master分支源代码,在JDK 11下编译也是错误的,我们要做的就是用balint133小哥哥的源代码替换Fink SQL CDC的master 分支的DebeziumChangeConsumer源代码,其次,别忘记
把pom.xml文件中的CheckStyle插件去掉
org.apache.maven.plugins
maven-checkstyle-plugin
2.17
com.puppycrawl.tools
checkstyle
8.14
validate
validate
check
/tools/maven/suppressions.xml
true
/tools/maven/checkstyle.xml
true
true
该插件在Maven打包时,检查代码风格,代码风格有问题时,就会编译失败,也没仔细观察Flink SQL CDC缺省为啥代码检查时编译失败了,把这个插件从Flink SQL CDC的pom.xml文件中去掉,再次打包编译成功。然后再把编译成功的这三个文件拷贝到/home/flink-1.12.1/lib 目录下,重新启动Flink,这时再执行:
select * from mysql_binlog_order
这时SQL执行成功。
然后往该订单表插入一个订单,Flink SQL CDC会实时捕获到该表的数据变化,实时插入到mysql_binlog_order表里,此时利用Flink SQL CDC低代码数据同步成功。