Flink + Hudi 流批一体作业稳定性优化
侧边栏壁纸
  • 累计撰写 307 篇文章
  • 累计阅读 104.3万

Flink + Hudi 流批一体作业稳定性优化

TOTC
2023-12-30 / 1,850 阅读 / 正在检测是否收录...

目前,任务分为两种类型:当业务逻辑较为简单时,使用 Flink SQL 进行处理,例如将原始日志或业务库同步至 Hudi 的 ODS 层、进行多表关联和聚合等操作;当业务逻辑比较复杂或需要特殊处理时,例如部分数据需要通过 API 获取,则使用 Flink DataStream API 消费 Hudi 数据,并经过一系列处理后,写入 Hudi 的 DWD、DWS 或 ADS 层。

针对内存方面的问题,对Flink写入Hudi任务多个Task的Heap使用率进行了监控。注意到使用率存在较大的波动,有时甚至可能超出分配的阈值,这种情况可能导致任务被强制终止。

Snipaste_2023-08-29_14-45-33.png

Yarn Application报错信息:
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
Association with remote system [akka.tcp://flink] has failed, address is now
Association with remote system [akka.tcp://flink] has failed,address is now gated for [50] ms

NodeManager Local logs报错信息:
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=13427,containerID=container_e632_1670877056012_20807318_01_000001] is running beyond physical memory limits. Current usage: 2.0 GB of 2 GB physical memory used; 4.5 GB of 4.2 GB virtual memory used. Killing container.

可以通过在yarn-site.xml中将yarn.nodemanager.vmem-check-enabled设为false,或者为jobmanager和taskmanager分配足够的内存来提高任务的稳定性。

<property>  
  <name>yarn.nodemanager.vmem-check-enabled</name>  
  <value>false</value>  
</property>
注:是否启用一个线程检查每个任务正在使用的虚拟内存量,如果任务超出了分配值,则直接将其kill,默认是true

另一个问题是,Managed Memory的使用率较低,而Task Heap的使用率却相当高。在增加TaskManager的内存分配(通过-ytm参数)时,会根据taskmanager.memory.managed.fraction的设置将新增的内存分配给Managed Memory。这可能会造成资源浪费。适当减小该配置的值有助于提高整体内存利用率。

tttt.jpg

taskmanager.memory.managed.fraction: 0.2

使用Hudi写入可能导致节点过载,从而导致上游算子背压。在执行SQL时,可以考虑提高写入并行度。如果是DateStream API Hudi到Hudi的数据传输,由于数据分布可能不均,建议在读取后直接使用rebalance()算子进行数据均衡处理。

yyyy.jpg

对于当前分区文件数量问题,写入Hudi任务的并行度会直接影响文件数量,随着并行度的增加,文件数量也会相应增加。此外,对于COPY_ON_WRITE模式,以下三个参数会直接影响文件数量。任务在运行一段时间后,当合并操作和触发操作达到平衡时,文件数量会在很小范围内波动。此外,可以通过减小clustering.delta_commits来减少当前分区文件数量。

clustering.delta_commits  触发合并文件所需的提交次数
clean.retain_commits  保留的提交数,不进行清理
cleaner.policy(KEEP_LATEST_COMMITS)

相对于在线clustering而言,离线clustering更加稳定并且可以实现资源隔离。经过大量任务长时间运行后来看,在使用离线clustering时需要注意以下问题。

常用离线clustering命令如下:

nohup flink run \
-c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob \
lib/${bundle_jar_name}.jar \
--service \
--path ${table_path} > clustering.log & \

以上离线clustering运行时,--plan-partition-filter-mode默认为NONE,所有符合条件的分区都会加入到任务当中,根据使用情况来看,随着历史分区的增加,资源使用逐渐处于较高的水平,该模式比较适用于临时处理大量滞留文件。同时,当数据量较大时还存在一个问题,当Clustering异常时,重启任务,偶尔会报错提示历史分区某parquet不存在(社区群有同学反馈过该问题)。

微信图片_20231205182116.jpg

通过配置参数--plan-partition-filter-mode RECENT_DAYS --target-partitions 2可以解决该问题。

Snipaste_2024-02-04_15-37-14.png

还可以使用--plan-partition-filter-mode的SELECTED_PARTITIONS模式,划定历史分区的范围,处理积压问题。

Flink离线Clustering使用建议,分区过滤模式RECENT_DAYS和NONE的组合使用相对较为稳定,适用于线上环境。当历史分区中存在大量待Clustering的文件时,可以临时使用NONE模式压缩几次。等到压缩速度跟得上时,再切换回RECENT_DAYS模式进行进一步的处理,最终的离线Clustering命令:

常规:
nohup flink run \
-c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob \
lib/hudi-flink1.14-bundle-0.12.3.jar \
--service \
--clustering-delta-commits 8 \
--small-file-limit 90 \
--target-file-max-bytes 125829120 \
--plan-partition-filter-mode RECENT_DAYS \
--target-partitions 2 \
--path hdfs://...... > clustering.log &

处理积压:
flink run -c \
org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob \
lib/hudi-flink1.14-bundle-0.12.3.jar \
--service \
--min-clustering-interval-seconds 3 \
--max-num-groups 1000 \
--clustering-delta-commits 1 \
--small-file-limit 90 \
--target-file-max-bytes 125829120 \
--path ${target_path}

在MERGE_ON_READ模式下,当前分区的文件数量与compaction.delta_commits、clean.retain_commits有关。但是当clean.retain_commits大于10时,可能会出现parquet和log文件无法被清理干净的情况,导致部分小文件滞留在历史分区中。

HoodieFlinkClusteringJob类可以接受多个参数,可以在源码org.apache.hudi.sink.clustering.FlinkClusteringConfig中或使用如下命令查看:

flink run \
-c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob \
../flink-1.14.2/lib/hudi-flink1.14-bundle-0.12.3.jar \
-h

需要注意的是Boolean类型的参数,默认都为false,如果要开启,直接使用--param_name即可,而不是--param_name true,Hudi中解析参数使用的JCommander,如下图所示,如果Boolean参数后面跟true、false、0、1,都会在遍历参数时错位,导致任务提交失败,报错提示和这种配置方式并不是很友好,很容易让使用者误认为该参数是不可配置的!

code.png

e1.png

e2.png

通过适当增加以下两个超时参数,可以有效避免因网络波动等原因导致的超时问题,从而降低任务失败的概率。

set execution.checkpointing.timeout=600000
properties.request.timeout.ms=120000(connector为kafka时)

以下参数设置为true时,如果TaskManager发生了Akka错误,例如无法与JobManager通信或其他网络问题,TaskManager将会立即退出;设置为false时,TaskManager将不会主动退出,而是继续运行,这可能会导致系统处于不稳定状态,并且需要人工干预来解决问题。

taskmanager.exit-on-fatal-akka-error: true

改动Hudi源码或者适配低版本大数据组件,参考了如下文章,可顺利完成编译打包:

https://blog.csdn.net/weixin_45417821/article/details/127407461

mvn clean package -DskipTests -Drat.skip=true -Pflink-bundle-shade-hive3  -Dflink1.14 -Dscala-2.12
mvn clean install -DskipTests -Dscala-2.12 -Pflink-bundle-shade-hive3
15

评论

博主关闭了所有页面的评论