Flink实时计算问题记录与解决方案
侧边栏壁纸
  • 累计撰写 307 篇文章
  • 累计阅读 104.3万

Flink实时计算问题记录与解决方案

TOTC
2021-11-20 / 1,494 阅读 / 正在检测是否收录...

当Flink任务的并行度大于Kafka分区数时,可能会导致部分并行度空闲,进而影响水位线(watermark)的生成。为了解决这个问题,可以通过设置withIdleness来进行调整:

WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withIdleness(Duration.ofSeconds(60))

对于withIdleness参数,应避免将下游任务设置得太小。原因在于,如果上游任务因故障停止,而其恢复所需时间超过了下游任务设置的withIdleness值,那么下游任务会将超时的分区标记为不再消费,导致数据丢失。为避免此问题,建议将分区数设置为不小于任务的并行度,并不设置withIdleness参数,这样可以有效防止潜在的数据丢失情况。

kafkaSource指定时间戳消费时,必须为毫秒时间戳,Flink 1.14官网文档为秒,是错误的,指定后不会生效。

setStartingOffsets(OffsetsInitializer.timestamp(1654703973000L))

要实现Flink与Kafka的端到端一致性,需要确保Kafka的版本不低于2.5。要注意的是,Flink 1.14.2中flink-connector所包含的kafka-clients版本是2.4.X。

because of a bug in the Kafka broker (KAFKA-9310). Please upgrade to Kafka 2.5+. If you are running with concurrent checkpoints, you also may want to try without them.

Flink-Kafka端到端一致性需要设置TRANSACTIONAL_ID_CONFIG = "transactional.id",如果不设置,从checkpoint重启会报错:

OutOfOrderSequenceException: The broker received an out of order sequence number。

Flink CDC同步mysql时,需要把binlog配置成ROW模式,查看命令和配置方法如下:

show variables like 'binlog_format%'; 

vi /etc/my.cnf
binlog_format=row
systemctl restart mariadb.service

非ROW模式时会报以下错误:

Caused by: org.apache.flink.table.api.ValidationException: The MySQL server is configured with binlog_format MIXED rather than ROW, which is required for this connector to work properly. Change the MySQL configuration to use a binlog_format=ROW and restart the connector.

758511-20220301173534024-601175990.png

Flink 1.14.2版本使用CDC 2.2,需要编译CDC源码进行版本适配:

1.pom文件中修改flink版本为1.14.2、scala版本为2.12.7

2.修改flink-table-planner-blink为flink-table-planner;flink-table-runtime-blink为flink-table-runtime

3.flink-shaded-guava版本由30.1.1-jre-14.0修改为18.0-13.0

修改完成后,会出现部分import报错的情况。需要根据新依赖版本中的路径和类进行相应的修改,例如,将创建TimestampFormat的代码修改为:

TimestampFormat timestampOption = JsonFormatOptionsUtil.getTimestampFormat(formatOptions)。

在编译过程中,首先需要使用install命令对父module进行安装。这样可以确保本地Maven仓库中包含各个子module的JAR包。

对子module进行打包时,如Flink MySQL CDC,可以在子module的POM文件中修改打包方式,将所有依赖项都打包到一个JAR文件中,这样在工程中只需引入一个<dependency>即可。否则,会因为缺少某些依赖报错,如:

Could not initialize class io.debezium.connector.mysql.MySqlConnectorConfig
10

评论

博主关闭了所有页面的评论