首页
Search
1
Redis集群部署方案
996 阅读
2
数据同步工具DataX、Sqoop和Canal
961 阅读
3
Hadoop各版本汇总
935 阅读
4
Spark学习笔记
926 阅读
5
计算机网络笔记
843 阅读
BigData
Flink
AI
Backend
Java
Note
OPS
游客
Search
标签搜索
大数据
Flink
离线
实时
Redis
OpenJDK
Java
笔记
JVM
Elasticsearch
GC
Hadoop
Hudi
Flink CDC
K8S
数据湖
WD1016
累计撰写
56
篇文章
累计阅读
12.4万
次
首页
栏目
BigData
Flink
AI
Backend
Java
Note
OPS
页面
搜索到
1
篇与
AI
的结果
返回首页
2026-04-13
基于 Dify 的 Spark SQL DDL 调优与 EXPLAIN 执行计划分析工作流实践
在最近的大数据任务优化工作中,业务分析同学手里有上千个复杂的 Spark SQL,如果全靠人工逐个调优,不仅耗时耗力,还容易遗漏大量低级问题,如分区设计不合理、存储格式落后、缺少 Bucket、压缩策略不当等。为了解决这个痛点,先对底层 Hive 表进行了系统梳理,维护了一套表特征库。特征库里记录了每张表的结构化信息,包括但不限于:1.存储格式:算法团队使用的表统一为 TEXT 格式,SQL 分析型表统一为 ORC 格式(或 Parquet)。2.数据规模:表总行数、占用存储大小(GB/TB 级别)。3.分区信息:是否分区、分区字段(日期/业务维度)、分区粒度、分区数量、是否存在数据倾斜。4.其他关键特征:Bucket 数量与排序字段、压缩方式(ZLIB/SNAPPY)、表类型(外部表/内部表)、最近更新时间、血缘依赖关系等。工作流核心优势1.自动化闭环:从 SQL 解析 → 特征库查询 → Hive 元数据拉取 → 获取explain执行计划 → LLM 智能分析 → 优化报告,全程无人值守。2.知识驱动:不再是“裸 LLM 瞎猜”,而是把公司沉淀的表特征库 + 真实元数据作为 Prompt 上下文,让 AI 给出精准、可落地的优化建议。3.批量处理:一次上传包含上百张表的 SQL 文件也能秒级处理。工作流详细拆解整个流程采用 Dify Workflow 可视化拖拽搭建1.起始节点:上传 SQL 文件支持直接拖拽 .sql 文件,兼容多表 Spark SQL(SELECT / CREATE TABLE / INSERT OVERWRITE 等)。2.提取 SQL 文件内容将文件转为纯文本,供后续解析使用。3.解析 SQL:提取表名自动识别所有涉及的 Hive 表名。import re from typing import Dict, List, Set, Tuple XXX_DB_PREFIX = "xxx_data_" def _strip_comments(sql: str) -> str: # 去掉 /* ... */ 块注释 sql = re.sub(r"/\*.*?\*/", " ", sql, flags=re.S) # 去掉 -- 行注释 sql = re.sub(r"--.*?$", " ", sql, flags=re.M) return sql def _normalize_space(sql: str) -> str: return re.sub(r"\s+", " ", sql).strip() def _find_default_db(sql: str) -> str: m = re.search(r"(?i)\buse\s+([a-zA-Z0-9_]+)\b", sql) return m.group(1) if m else "" def _extract_targets(sql: str, default_db: str) -> Set[str]: targets = set() # insert overwrite table xxx / insert into table xxx for m in re.finditer(r"(?i)\binsert\s+(?:overwrite|into)\s+table\s+([a-zA-Z0-9_\.]+)", sql): t = m.group(1) targets.add(t) # create table xxx as select ... for m in re.finditer(r"(?i)\bcreate\s+table\s+([a-zA-Z0-9_\.]+)\s+as\b", sql): targets.add(m.group(1)) # 规范化:补 default_db norm = set() for t in targets: if "." not in t and default_db: norm.add(f"{default_db}.{t}") else: norm.add(t) return norm def _is_valid_table_token(tok: str) -> bool: # 过滤掉 from (select ...) 之类 if tok.startswith("("): return False low = tok.lower() if low in ("select", "values"): return False return True def _extract_sources(sql: str, default_db: str) -> Set[str]: sources = set() # from/join 后面的第一个 token 通常是表名(忽略 lateral/view 等复杂结构,先覆盖你主要场景) for m in re.finditer(r"(?i)\b(from|join)\s+([a-zA-Z0-9_\.]+)", sql): tok = m.group(2) if not _is_valid_table_token(tok): continue # 去掉可能的反引号 tok = tok.replace("`", "") # 补 default_db if "." not in tok and default_db: tok = f"{default_db}.{tok}" sources.add(tok) return sources def _filter_xxx_tables(tables: Set[str]) -> Set[str]: out = set() for t in tables: t = t.replace("`", "") if "." not in t: continue db, tb = t.split(".", 1) if db.startswith(XXX_DB_PREFIX) and tb: out.add(f"{db}.{tb}") return out def _extract_column_tokens(sql: str) -> Set[str]: # 粗粒度抓 where/on 里出现过的字段 token,用于判断分区是否有被限制 cols = set() for m in re.finditer(r"(?i)\b(where|on)\b(.*?)(?=\b(group|order|having|limit|union|join|where|insert|create)\b|$)", sql): segment = m.group(2) # 抓类似 a.b、b、`b` 这种 token for tok in re.findall(r"`?[a-zA-Z_][a-zA-Z0-9_]*`?(?:\.`?[a-zA-Z_][a-zA-Z0-9_]*`?)?", segment): tok = tok.replace("`", "") # 只保留列名部分(a.b -> b) if "." in tok: tok = tok.split(".", 1)[1] cols.add(tok) return cols def main(sql_raw: str) -> dict: sql_raw = sql_raw or "" sql1 = _strip_comments(sql_raw) sql2 = _normalize_space(sql1) default_db = _find_default_db(sql2) targets = _extract_targets(sql2, default_db) sources = _extract_sources(sql2, default_db) # 只保留 xxx_data_ 开头库 targets = _filter_xxx_tables(targets) sources = _filter_xxx_tables(sources) # 源表排除目标表 source_tables = sorted(list(sources - targets)) target_tables = sorted(list(targets)) # 粗粒度过滤字段:全局 where/on 出现的列 tokens global_cols = _extract_column_tokens(sql2) partition_filters: Dict[str, List[str]] = {} for t in source_tables: # 先把全局列 tokens 记上,后面 DDL 分区列会跟它对比 partition_filters[t] = sorted(list(global_cols)) return { "default_db": default_db, "source_tables": source_tables, "target_tables": target_tables, "partition_filters": partition_filters, }4.迭代提取真实 Hive 元数据 && 查询表的特征库(核心节点)根据提取出的表名,批量查询我们预先维护的表特征库(支持失败自动重试 3 次,保证稳定性),获取每张表的完整画像(存储格式、数据量、分区策略、Bucket 信息、使用场景等),这步相当于给 AI 提前“喂”了公司级最佳实践知识。工作流输出结果Prompt核心逻辑包括, 分区优化建议(粒度、字段、动态分区), 存储格式升级(TEXT → ORC/Parquet + 合适压缩), Bucket / Sort / Bloom Filter 推荐, 字段类型规范、注释补全、ACID 化建议, 结合表数据量和使用场景给出个性化优化, 最终输出结构化优化报告 + 重写后的完整 Spark SQL DDL。
2026年04月13日
45 阅读
6 点赞