首页
推荐
在线工具
今日运势
Search
1
数据同步工具DataX、Sqoop和Canal
981 阅读
2
Hadoop各版本汇总
953 阅读
3
Spark学习笔记
946 阅读
4
计算机网络笔记
856 阅读
5
“锁”相关总结
842 阅读
BigData
Flink
AI
Backend
Java
Note
OPS
游客
Search
标签搜索
大数据
Flink
离线
实时
JVM
Redis
OpenJDK
Java
笔记
Elasticsearch
Hadoop
Hudi
Flink CDC
K8S
数据湖
AI
WD1016
累计撰写
56
篇文章
累计阅读
12.4万
次
首页
栏目
BigData
Flink
AI
Backend
Java
Note
OPS
页面
推荐
在线工具
今日运势
搜索到
31
篇与
WD1016
的结果
返回首页
2026-04-13
基于 Dify 的 Spark SQL DDL 调优与 EXPLAIN 执行计划分析工作流实践
在最近的大数据任务优化工作中,业务分析同学手里有上千个复杂的 Spark SQL,如果全靠人工逐个调优,不仅耗时耗力,还容易遗漏大量低级问题,如分区设计不合理、存储格式落后、缺少 Bucket、压缩策略不当等。为了解决这个痛点,先对底层 Hive 表进行了系统梳理,维护了一套表特征库。特征库里记录了每张表的结构化信息,包括但不限于:1.存储格式:算法团队使用的表统一为 TEXT 格式,SQL 分析型表统一为 ORC 格式(或 Parquet)。2.数据规模:表总行数、占用存储大小(GB/TB 级别)。3.分区信息:是否分区、分区字段(日期/业务维度)、分区粒度、分区数量、是否存在数据倾斜。4.其他关键特征:Bucket 数量与排序字段、压缩方式(ZLIB/SNAPPY)、表类型(外部表/内部表)、最近更新时间、血缘依赖关系等。工作流核心优势1.自动化闭环:从 SQL 解析 → 特征库查询 → Hive 元数据拉取 → 获取explain执行计划 → LLM 智能分析 → 优化报告,全程无人值守。2.知识驱动:不再是“裸 LLM 瞎猜”,而是把公司沉淀的表特征库 + 真实元数据作为 Prompt 上下文,让 AI 给出精准、可落地的优化建议。3.批量处理:一次上传包含上百张表的 SQL 文件也能秒级处理。工作流详细拆解整个流程采用 Dify Workflow 可视化拖拽搭建1.起始节点:上传 SQL 文件支持直接拖拽 .sql 文件,兼容多表 Spark SQL(SELECT / CREATE TABLE / INSERT OVERWRITE 等)。2.提取 SQL 文件内容将文件转为纯文本,供后续解析使用。3.解析 SQL:提取表名自动识别所有涉及的 Hive 表名。import re from typing import Dict, List, Set, Tuple XXX_DB_PREFIX = "xxx_data_" def _strip_comments(sql: str) -> str: # 去掉 /* ... */ 块注释 sql = re.sub(r"/\*.*?\*/", " ", sql, flags=re.S) # 去掉 -- 行注释 sql = re.sub(r"--.*?$", " ", sql, flags=re.M) return sql def _normalize_space(sql: str) -> str: return re.sub(r"\s+", " ", sql).strip() def _find_default_db(sql: str) -> str: m = re.search(r"(?i)\buse\s+([a-zA-Z0-9_]+)\b", sql) return m.group(1) if m else "" def _extract_targets(sql: str, default_db: str) -> Set[str]: targets = set() # insert overwrite table xxx / insert into table xxx for m in re.finditer(r"(?i)\binsert\s+(?:overwrite|into)\s+table\s+([a-zA-Z0-9_\.]+)", sql): t = m.group(1) targets.add(t) # create table xxx as select ... for m in re.finditer(r"(?i)\bcreate\s+table\s+([a-zA-Z0-9_\.]+)\s+as\b", sql): targets.add(m.group(1)) # 规范化:补 default_db norm = set() for t in targets: if "." not in t and default_db: norm.add(f"{default_db}.{t}") else: norm.add(t) return norm def _is_valid_table_token(tok: str) -> bool: # 过滤掉 from (select ...) 之类 if tok.startswith("("): return False low = tok.lower() if low in ("select", "values"): return False return True def _extract_sources(sql: str, default_db: str) -> Set[str]: sources = set() # from/join 后面的第一个 token 通常是表名(忽略 lateral/view 等复杂结构,先覆盖你主要场景) for m in re.finditer(r"(?i)\b(from|join)\s+([a-zA-Z0-9_\.]+)", sql): tok = m.group(2) if not _is_valid_table_token(tok): continue # 去掉可能的反引号 tok = tok.replace("`", "") # 补 default_db if "." not in tok and default_db: tok = f"{default_db}.{tok}" sources.add(tok) return sources def _filter_xxx_tables(tables: Set[str]) -> Set[str]: out = set() for t in tables: t = t.replace("`", "") if "." not in t: continue db, tb = t.split(".", 1) if db.startswith(XXX_DB_PREFIX) and tb: out.add(f"{db}.{tb}") return out def _extract_column_tokens(sql: str) -> Set[str]: # 粗粒度抓 where/on 里出现过的字段 token,用于判断分区是否有被限制 cols = set() for m in re.finditer(r"(?i)\b(where|on)\b(.*?)(?=\b(group|order|having|limit|union|join|where|insert|create)\b|$)", sql): segment = m.group(2) # 抓类似 a.b、b、`b` 这种 token for tok in re.findall(r"`?[a-zA-Z_][a-zA-Z0-9_]*`?(?:\.`?[a-zA-Z_][a-zA-Z0-9_]*`?)?", segment): tok = tok.replace("`", "") # 只保留列名部分(a.b -> b) if "." in tok: tok = tok.split(".", 1)[1] cols.add(tok) return cols def main(sql_raw: str) -> dict: sql_raw = sql_raw or "" sql1 = _strip_comments(sql_raw) sql2 = _normalize_space(sql1) default_db = _find_default_db(sql2) targets = _extract_targets(sql2, default_db) sources = _extract_sources(sql2, default_db) # 只保留 xxx_data_ 开头库 targets = _filter_xxx_tables(targets) sources = _filter_xxx_tables(sources) # 源表排除目标表 source_tables = sorted(list(sources - targets)) target_tables = sorted(list(targets)) # 粗粒度过滤字段:全局 where/on 出现的列 tokens global_cols = _extract_column_tokens(sql2) partition_filters: Dict[str, List[str]] = {} for t in source_tables: # 先把全局列 tokens 记上,后面 DDL 分区列会跟它对比 partition_filters[t] = sorted(list(global_cols)) return { "default_db": default_db, "source_tables": source_tables, "target_tables": target_tables, "partition_filters": partition_filters, }4.迭代提取真实 Hive 元数据 && 查询表的特征库(核心节点)根据提取出的表名,批量查询我们预先维护的表特征库(支持失败自动重试 3 次,保证稳定性),获取每张表的完整画像(存储格式、数据量、分区策略、Bucket 信息、使用场景等),这步相当于给 AI 提前“喂”了公司级最佳实践知识。工作流输出结果Prompt核心逻辑包括, 分区优化建议(粒度、字段、动态分区), 存储格式升级(TEXT → ORC/Parquet + 合适压缩), Bucket / Sort / Bloom Filter 推荐, 字段类型规范、注释补全、ACID 化建议, 结合表数据量和使用场景给出个性化优化, 最终输出结构化优化报告 + 重写后的完整 Spark SQL DDL。
2026年04月13日
53 阅读
6 点赞
2025-11-28
Apache Hudi 源码优化:引入Flink聚簇策略,降低资源消耗,提升稳定性
在 Hudi 的设计中,为了保持数据文件大小合理、提升查询性能,引入了 clustering 后台表服务。Clustering 的核心目的,是将多个小文件(small files)合并或重写为更大、更均衡的文件,从而减少读取和查询过程中因小文件过多带来的性能开销。然而,在实际生产环境中,会出现一个较为微妙但对稳定性和资源利用影响显著的问题 —— “尾部小文件(tail small file)”。具体来说,一次 clustering 执行后可能会残留一个体积非常小的文件,而在下一轮 clustering 周期中,这个小文件会再次被选中重写。如此反复,不仅浪费计算资源,还会导致 commit 和 metadata 的额外开销,甚至可能因为文件频繁更替,造成下游查询表现不稳定。准备一个分区,其数据量大约 720 MB,并配置如下参数:'clustering.plan.strategy.target.file.max.bytes' = '650000000'在执行聚簇之前,该分区(例如 p_date=20251029)大约包含 10 个 Parquet 文件,每个文件大小从 8 MB 到 120 MB 不等。第一次运行 clustering 后,会生成一个大文件(大约 680 MB)和一个小文件(大约 50 MB)。这符合设定的目标文件大小,按理说不应该再有额外合并。然而,当 clustering 周期性地持续运行时,这个 50 MB 的小文件却持续被反复重写 —— 每次 clustering 执行都会生成这个小文件的新版本。由于启用了 “clean-retain-commits” 配置,多个该小文件的历史版本被保留下来,导致不必要的文件膨胀和冗余提交。在生产环境中,各分区的数据量往往差异较大。即使对文件的最小/最大大小做了精细调优,也难以保证所有分区都能均匀地合并成大文件,因此产生“小尾巴文件”几乎是不可避免的。为此,我们引入了一种全新的聚类策略。该策略会综合考虑是否需要在聚类过程中进行排序等因素,并仅在 clustering 的 plan 构建阶段 添加 early-exit(提前退出) 条件,而不会改变 commit 或 execution 的整体流程。用户可通过参数 clustering.plan.strategy.class 启用该策略,完全兼容旧逻辑。该策略已测试并在生产环境稳定运行,可靠性经过验证。目前,这一策略已被社区正式采纳合并入主分支。对于 Hudi + Flink 场景下启用了 clustering 的用户——尤其是分区数据分布不均 的场景,该策略能够显著降低不必要的 clustering 开销,从而提升作业的稳定性与整体效率。如果你在生产环境中使用 Hudi + Flink 进行 clustering,强烈建议开启此新策略。对于绝大多数场景来说,这是一个几乎“零成本”、但收益十分明显的优化方案。
2025年11月28日
3,650 阅读
107 点赞
2025-08-21
阿里云大数据计算: MaxCompute 资源调度与复杂查询加速实践
云平台的性能与功能通常是用户最核心的关注点——性能直接影响计算任务的执行效率,而功能则决定了数据处理链路的流畅性与灵活性。近期在使用云平台大数据计算服务时,重点测试了其数据处理、SQL查询及并发执行等性能表现,同时评估了任务调度、权限管理和数据导入导出等关键功能的实用性。任务优化 MaxCompute常用参数set odps.sql.allow.fullscan=true; 不指定分局查询 set odps.sql.groupby.skewindata=true; 解决agg数据倾斜问题 set odps.service.mode=off; 如果打开,超过十分钟时,会被kill,增加执行时长 set odps.sql.hive.compatible=true; 开启Hive SQL兼容模式,降低任务迁移成本 set odps.task.wlm.quota=os_datagouptest3; 指定计算资源Quota 自动MapJoin的阈值,用于决定是否将小表数据广播set odps.optimizer.auto.mapjoin.threshold=4096000000; set odps.optimizer.enable.online.conditional.mapjoin=true; set odps.sql.split.dop={"xxx.table1":120, "xxx.table2": 10}; 读取并行度设置 set odps.optimizer.hbo.enable.new.signature=true; 历史执行信息进行查询优化的增强功能压力测试 MaxCompute并发跑任务,默认FIFO(先进先出),可配置成FAIR,资源抢占和分配情况如下FIFO 确保了公平的顺序,但可能导致头部阻塞,即早期任务(任务 1)垄断资源,延迟后续任务(任务 2 和 3),直到早期任务让步,从任务 2 和 3 的初始低值中显而易见。查询加速 MaxQA引擎,默认FAIR(公平调度),不可改:MaxQA引擎概述链接 任务并发情况如下FAIR调度支持抢占,如果一个任务超过其份额,系统可能暂停或回收其资源,重新分配给欠份额的任务,可见三个任务在各自的峰值(最高点)时期趋于均衡。DataWorks数据开发 定时任务可以是单个节点、也可以是工作流,创建时选择任务类型数据集成 官网链接Serverless资源组 官网链接Spark作业创建任务运行信息MaxCompute功能特性 开放存储(Storage API):数据不可直接访问,可通过Storage API访问(按量付费):官网链接Quota资源管理:分层管理、资源隔离、弹性伸缩:官网链接物化视图:根据历史作业和性能分析自动创建物化视图(AutoMV):官网链接数据安全:支持按项目、表(列级别、行级别)、资源、函数或实例维度的访问控制:官网链接近实时批流一体数仓:支持基于Flink等流计算的分钟级数据写入与秒级查询加速(MCQA2.0):官网链接任务调优:内置作业诊断与优化建议,包括了SQL调优、数据倾斜调优等:官网链接成本优化推荐:保障作业按时完成的前提下,生成更优的资源配置方案,降低成本:官网链接账单明细:支持历史作业分析、资源分账,可以详细统计任务的执行时长、资源使用、所属人等:官网链接
2025年08月21日
297 阅读
16 点赞
2025-08-02
湖仓一体流批协同实践:从Spark批量加速到Flink实时更新
在湖仓一体架构下,无论是业务库数据同步还是宽表构建,Flink虽然支持"先全量,再增量"的处理模式,但当历史数据规模达到亿级时,全量数据导入阶段存在瓶颈,如果调高并发会抢占实时集群资源,低并发会导致处理时间过长。因此,我们采用Spark+Flink的协同计算架构,首先基于Spark的离线计算优势,在离线集群完成大规模历史数据的批量导入,再启动Flink任务进行增量更新。本文将分享该方案落地过程中遇到的问题和解决方案。Spark加速全量数据入湖 配置写入方式为bulk insert,减少数据序列化以及合并操作,该数据写入方式会跳过数据去重,所以可以在hive中通过SparkSQL预处理历史数据。set hoodie.spark.sql.insert.into.operation=bulk_insert;如果历史数据在Hive中,表格式尽可能是orc或parquet格式,否则处理效率会显著下降,SparkSQL任务配置如下:executor-memory 16g num-executors 150 executor-cores 4 spark.default.parallelism 600上述配置在数据量较大的情况下容易OOM,如果存量数据过多,需要分多个批次入湖,这样耗时更长,作业更不稳定。因为数据入湖时数据分桶、写入缓冲都属于内存密集型操作,所以适当调大spark.memory.fraction,调小spark.memory.storageFraction(减小存储内存,增加执行内存),有利于加速入湖。spark.memory.fraction 0.8 spark.memory.storageFraction 0.3同时,在资源不变的情况下,适当增大并行度,低并行度时每个Task处理的数据量较大,排序操作的内存压力剧增,当单个Task的数据量超过可用内存时,Spark会触发磁盘溢写,并行度增大一倍后,每个Task处理的数据量减半,排序完全在内存中完成,避免Spill和GC,效率显著提升。spark.default.parallelism 1200最终配置如下,实现2分钟内15亿全量数据入湖:spark-sql --master yarn --queue ... --deploy-mode client --name ... \ --driver-memory 16g --driver-cores 8 --executor-memory 16g --num-executors 150 \ --conf spark.executor.heartbeatInterval=120000s \ --conf spark.network.timeout=130000s \ --conf spark.memory.fraction=0.8 --conf spark.memory.storageFraction=0.3 \ --executor-cores 4 --conf spark.default.parallelism=1200 \ --hiveconf hive.cli.print.header=true \ --jars hudi-spark3.2-bundle_2.12-0.14.1.jar \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \ --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \ --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar' -S全量数据入湖后小文件合并 增大并行度会生成很多小文件,在全量数据导入完成后,Flink任务启动前,通过Spark提交Clustering任务进行一次小文件合并。spark-submit --master yarn --deploy-mode cluster --queue ... \ --name ... \ --driver-memory 16g --executor-memory 16g --num-executors 150 \ --executor-cores 8 --conf spark.default.parallelism=1200 \ --class org.apache.hudi.utilities.HoodieClusteringJob hudi-utilities-bundle_2.12-0.14.1.jar \ --mode scheduleAndExecute \ --base-path hdfs://... \ --table-name ... \ --retry-last-failed-clustering-job \ --job-max-processing-time-ms 1800000 \ --hoodie-conf hoodie.clean.async=true \ --hoodie-conf hoodie.clean.automatic=true \ --hoodie-conf hoodie.cleaner.policy=KEEP_LATEST_FILE_VERSIONS \ --hoodie-conf hoodie.cleaner.fileversions.retained=1 \ --hoodie-conf hoodie.cleaner.parallelism=100 \ --hoodie-conf hoodie.clustering.async.enabled=true \ --hoodie-conf hoodie.clustering.async.max.commits=1 \ --hoodie-conf hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy \ --hoodie-conf hoodie.clustering.plan.strategy.max.num.groups=20000 \ --hoodie-conf hoodie.metadata.enable=false \ --hoodie-conf hoodie.clustering.plan.strategy.sort.columns=...使用KEEP_LATEST_FILE_VERSIONS清理策略,保留最后1个文件版本,避免使用成KEEP_LATEST_COMMITS,因为压缩前的文件仍关联原始提交时间,会导致合并前的小文件和合并后的大文件共存的问题。hoodie.cleaner.policy=KEEP_LATEST_FILE_VERSIONS hoodie.cleaner.fileversions.retained=1清理策略原理可参考:https://developer.aliyun.com/article/1457637 Flink index bootstrap加速 第一次启动实时任务需要打开索引引导函数,同时把开始消费时间回调到离线数据的截止时间,配置如下:'index.bootstrap.enabled'='true' 'scan.startup.mode'='timestamp' 'scan.startup.timestamp-millis' = ''数据量较大时,可以提高并发,同时调整下面三个配置,以确保上下游算子的并行度保持一致。否则,会导致大量不必要的网络IO,进而引发 checkpoint 长时间阻塞,最终可能导致任务启动失败。'write.tasks' = '100', 'write.bucket_assign.tasks' = '100', 'write.index_bootstrap.tasks'='100'初始化索引完成后,执行savepoint,减小任务的并行度,关闭index bootstrap,从savepoint重新启动作业。'write.tasks' = '12' 'write.bucket_assign.tasks' = '20' 'index.bootstrap.enabled'='false'由于savepoint保存了index bootstrap算子信息,关闭index bootstrap后,会导致作业无法恢复,需要通过参数配置,允许跳过无法还原的保存点状态。作业为Per-Job模式时,启动任务配置--allowNonRestoredState参数即可。flink run -d -t yarn-per-job -Dyarn.application.queue=... -Dparallelism.default=6 \ -Dtaskmanager.numberOfTaskSlots=3 -Djobmanager.memory.process.size=2048m \ -Dexecution.checkpointing.snapshot-compression=true \ -Dexecution.checkpointing.local-backup.enabled=true \ -Dexecution.state-recovery.from-local=true \ -s hdfs://.../savepoint/savepoint-f36c2e-ada1c3edc7f2 --allowNonRestoredState \ -Dtaskmanager.memory.process.size=2048m -Dtaskmanager.memory.managed.fraction=0.3 \ -Dstate.backend.rocksdb.log.size=100m -Dyarn.application.name=... -c ... ....jar \ ... -stateBackendType 2 -externalizedCheckpointCleanup RETAIN_ON_CANCELLATION -enableIncremental true作业是Flink SQL或者Application模式时,并不支持在任务启动命令上配置--allowNonRestoredState参数,设置下面参数后并不生效,作业仍然会启动失败。execution.savepoint.ignore-unclaimed-state=true如果作业需要以Application模式运行,目前最佳的方式是先通过Per-Job模式恢复作业,再次执行savepoint,把作业从该savepoint以Application模式的方式拉起即可。
2025年08月02日
800 阅读
35 点赞
2024-07-28
数据服务化:Ribbon负载均衡策略升级,弹性自愈与全链路容错
分布式系统架构中,负载均衡是确保系统高可用性和性能的关键环节。以线上服务为例,某个服务原本部署了4个实例来应对大量的请求流量。然而,意外情况发生,其中一个实例所在的机房出现故障,导致其响应速度变得极为缓慢,但是仍然和Nacos注册中心保持着心跳。而当时所采用的负载均衡策略是轮询策略 RoundRobinRule,这一策略在正常情况下能够较为均匀地分配请求,但在面对这种异常情况时,却暴露出了明显的局限性。由于轮询策略的特性,它不会根据实例的实际响应情况进行动态调整,这就使得故障实例上仍然会有大量请求持续堆积。随着时间的推移,发现该实例所在机器的 close_wait 连接数急剧增加,导致整个机器负载加重。为了解决这一问题,调研了一些传统的应对策略:其一,配置超时失败重试机制 ... httpclient: response-timeout: 30s。故障实例响应慢时,自动失败路由到其他实例进行重试,从而使上游的请求最终能够成功。但故障服务实例的流量并没有得到有效的控制和调整。这意味着故障实例和所在机器仍然在承受着巨大的压力。其二,采用熔断策略 Sentinel、Resilience4J、Hystrix。在响应时间/出错百分比/线程数等达到阈值时进行降级、熔断,以保护其他服务实例不受影响。然而,在该场景中,由于还有 3/4 的实例处于正常可用状态,直接进行熔断操作显得过于激进。其三,考虑使用权重轮询策略 WeightedResponseTimeRule。根据服务实例的性能表现动态地分配权重,性能好的实例会被分配更多的请求,而性能差的实例则会逐渐减少请求分配。但该场景下,故障机器的响应时间与正常服务相比已经不在一个数量级,其 QPS 却依然很高。这就导致在权重轮询策略下,故障机器的服务权重会迅速降低,几乎不再接收请求。而且由于我们的配置是在网关层面,当故障机器恢复后,系统无法自动重新计算权重,使得分配到故障机器的流量很少,其权重也很难再次提升上去。基于以上困境,决定对权重轮询策略进行二次开发,使其更加智能,以最大限度地减小请求端的影响。首先增加过滤器RibbonResponseFilter。这个过滤器的主要作用是计算每个服务实例的响应时间,并将其记录到 ServerStats 中。同时,它还会记录请求的返回状态,如果返回状态不是 200,就将其转化为请求超时,并相应地减小该服务的权重。@Component @Slf4j public class RibbonResponseFilter implements GlobalFilter, Ordered { @Autowired protected final SpringClientFactory springClientFactory; public static final String RQUEST_START_TIME = "RequestStartTime"; public static final double TIME_WEIGHT = 30000; public RibbonResponseFilter(SpringClientFactory springClientFactory) { this.springClientFactory = springClientFactory; } @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { exchange.getAttributes().put(RQUEST_START_TIME, System.currentTimeMillis()); return chain.filter(exchange).then(Mono.fromRunnable(() -> { URI requestUrl = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR); Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR); LoadBalancerContext loadBalancerContext = this.springClientFactory.getLoadBalancerContext(route.getUri().getHost()); ServerStats stats = loadBalancerContext.getServerStats(new Server(requestUrl.getHost(), requestUrl.getPort())); long orgStartTime = exchange.getAttribute(RQUEST_START_TIME); long time = System.currentTimeMillis() - orgStartTime; // 响应时间超过 5s 或者服务异常时,减小权重 if (exchange.getResponse().getStatusCode().value()!= 200 || time > 5000) { log.info("The abnormal response will lead to a decrease in weight : {} ", requestUrl.getHost()); stats.noteResponseTime(TIME_WEIGHT); } })); } @Override public int getOrder() { return Ordered.LOWEST_PRECEDENCE; } }增加这个过滤器的原因在于,无论是使用自定义的负载均衡策略,还是内置的 WeightedResponseTimeRule,都无法自动获取到每个服务实例的总请求次数、异常请求次数以及响应时间等关键参数。通过这个过滤器,能够有效地收集这些信息,为后续的权重计算和调整提供有力的数据支持。在注册权重更新 Timer(默认 30s)的同时,同时注册了一个权重重置 Timer(5m)。这样一来,当故障服务实例恢复后,在 5 分钟内,它就能够重新参与到负载均衡的分配中。以下是相关的代码片段:void resetWeight() { if (resetWeightTimer!= null) { resetWeightTimer.cancel(); } resetWeightTimer = new Timer("NFLoadBalancer-AutoRobinRule-resetWeightTimer-" + name, true); resetWeightTimer.schedule(new ResetServerWeightTask(), 0, 60 * 1000 * 5); ResetServerWeight rsw = new ResetServerWeight(); rsw.maintainWeights(); Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { public void run() { logger.info("Stopping NFLoadBalancer-AutoRobinRule-ResetWeightTimer-" + name); resetWeightTimer.cancel(); } })); } public void maintainWeights() { ILoadBalancer lb = getLoadBalancer(); if (lb == null) { return; } if (!resetServerWeightAssignmentInProgress.compareAndSet(false, true)) { return; } try { logger.info("Reset weight job started"); AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb; LoadBalancerStats stats = nlb.getLoadBalancerStats(); if (stats == null) { return; } Double weightSoFar = 0.0; List<Double> finalWeights = new ArrayList<Double>(); for (Server server : nlb.getAllServers()) { finalWeights.add(weightSoFar); } setWeights(finalWeights); } catch (Exception e) { logger.error("Error reset server weights", e); } finally { resetServerWeightAssignmentInProgress.set(false); } }在采用此负载均衡策略时,若重置权重后服务仍未修复,由于配置了超时重试机制,请求端可毫无察觉。与此同时,该服务实例的权重会迅速在短时间内再次降至极低水平,如此循环,直至实例恢复正常。此策略有效地处理了线上服务可能遭遇的各类异常状况。
2024年07月28日
175 阅读
1 点赞
1
2
...
7