基于 Dify 的 Spark SQL DDL 调优与 EXPLAIN 执行计划分析工作流实践
侧边栏壁纸
  • 累计撰写 56 篇文章
  • 累计阅读 12.4万
AI

基于 Dify 的 Spark SQL DDL 调优与 EXPLAIN 执行计划分析工作流实践

WD1016
2026-04-13 / 45 阅读 / 正在检测是否收录...

在最近的大数据任务优化工作中,业务分析同学手里有上千个复杂的 Spark SQL,如果全靠人工逐个调优,不仅耗时耗力,还容易遗漏大量低级问题,如分区设计不合理、存储格式落后、缺少 Bucket、压缩策略不当等。

为了解决这个痛点,先对底层 Hive 表进行了系统梳理,维护了一套表特征库。特征库里记录了每张表的结构化信息,包括但不限于:

1.存储格式:算法团队使用的表统一为 TEXT 格式,SQL 分析型表统一为 ORC 格式(或 Parquet)。

2.数据规模:表总行数、占用存储大小(GB/TB 级别)。

3.分区信息:是否分区、分区字段(日期/业务维度)、分区粒度、分区数量、是否存在数据倾斜。

4.其他关键特征:Bucket 数量与排序字段、压缩方式(ZLIB/SNAPPY)、表类型(外部表/内部表)、最近更新时间、血缘依赖关系等。

工作流核心优势

1.自动化闭环:从 SQL 解析 → 特征库查询 → Hive 元数据拉取 → 获取explain执行计划 → LLM 智能分析 → 优化报告,全程无人值守。

2.知识驱动:不再是“裸 LLM 瞎猜”,而是把公司沉淀的表特征库 + 真实元数据作为 Prompt 上下文,让 AI 给出精准、可落地的优化建议。

3.批量处理:一次上传包含上百张表的 SQL 文件也能秒级处理。

工作流详细拆解

整个流程采用 Dify Workflow 可视化拖拽搭建

1.起始节点:上传 SQL 文件
支持直接拖拽 .sql 文件,兼容多表 Spark SQL(SELECT / CREATE TABLE / INSERT OVERWRITE 等)。

2.提取 SQL 文件内容
将文件转为纯文本,供后续解析使用。

3.解析 SQL:提取表名
自动识别所有涉及的 Hive 表名。

import re
from typing import Dict, List, Set, Tuple

XXX_DB_PREFIX = "xxx_data_"

def _strip_comments(sql: str) -> str:
    # 去掉 /* ... */ 块注释
    sql = re.sub(r"/\*.*?\*/", " ", sql, flags=re.S)
    # 去掉 -- 行注释
    sql = re.sub(r"--.*?$", " ", sql, flags=re.M)
    return sql

def _normalize_space(sql: str) -> str:
    return re.sub(r"\s+", " ", sql).strip()

def _find_default_db(sql: str) -> str:
    m = re.search(r"(?i)\buse\s+([a-zA-Z0-9_]+)\b", sql)
    return m.group(1) if m else ""

def _extract_targets(sql: str, default_db: str) -> Set[str]:
    targets = set()

    # insert overwrite table xxx / insert into table xxx
    for m in re.finditer(r"(?i)\binsert\s+(?:overwrite|into)\s+table\s+([a-zA-Z0-9_\.]+)", sql):
        t = m.group(1)
        targets.add(t)

    # create table xxx as select ...
    for m in re.finditer(r"(?i)\bcreate\s+table\s+([a-zA-Z0-9_\.]+)\s+as\b", sql):
        targets.add(m.group(1))

    # 规范化:补 default_db
    norm = set()
    for t in targets:
        if "." not in t and default_db:
            norm.add(f"{default_db}.{t}")
        else:
            norm.add(t)
    return norm

def _is_valid_table_token(tok: str) -> bool:
    # 过滤掉 from (select ...) 之类
    if tok.startswith("("):
        return False
    low = tok.lower()
    if low in ("select", "values"):
        return False
    return True

def _extract_sources(sql: str, default_db: str) -> Set[str]:
    sources = set()
    # from/join 后面的第一个 token 通常是表名(忽略 lateral/view 等复杂结构,先覆盖你主要场景)
    for m in re.finditer(r"(?i)\b(from|join)\s+([a-zA-Z0-9_\.]+)", sql):
        tok = m.group(2)
        if not _is_valid_table_token(tok):
            continue
        # 去掉可能的反引号
        tok = tok.replace("`", "")
        # 补 default_db
        if "." not in tok and default_db:
            tok = f"{default_db}.{tok}"
        sources.add(tok)
    return sources

def _filter_xxx_tables(tables: Set[str]) -> Set[str]:
    out = set()
    for t in tables:
        t = t.replace("`", "")
        if "." not in t:
            continue
        db, tb = t.split(".", 1)
        if db.startswith(XXX_DB_PREFIX) and tb:
            out.add(f"{db}.{tb}")
    return out

def _extract_column_tokens(sql: str) -> Set[str]:
    # 粗粒度抓 where/on 里出现过的字段 token,用于判断分区是否有被限制
    cols = set()
    for m in re.finditer(r"(?i)\b(where|on)\b(.*?)(?=\b(group|order|having|limit|union|join|where|insert|create)\b|$)", sql):
        segment = m.group(2)
        # 抓类似 a.b、b、`b` 这种 token
        for tok in re.findall(r"`?[a-zA-Z_][a-zA-Z0-9_]*`?(?:\.`?[a-zA-Z_][a-zA-Z0-9_]*`?)?", segment):
            tok = tok.replace("`", "")
            # 只保留列名部分(a.b -> b)
            if "." in tok:
                tok = tok.split(".", 1)[1]
            cols.add(tok)
    return cols

def main(sql_raw: str) -> dict:
    sql_raw = sql_raw or ""
    sql1 = _strip_comments(sql_raw)
    sql2 = _normalize_space(sql1)

    default_db = _find_default_db(sql2)

    targets = _extract_targets(sql2, default_db)
    sources = _extract_sources(sql2, default_db)

    # 只保留 xxx_data_ 开头库
    targets = _filter_xxx_tables(targets)
    sources = _filter_xxx_tables(sources)

    # 源表排除目标表
    source_tables = sorted(list(sources - targets))
    target_tables = sorted(list(targets))

    # 粗粒度过滤字段:全局 where/on 出现的列 tokens
    global_cols = _extract_column_tokens(sql2)
    partition_filters: Dict[str, List[str]] = {}
    for t in source_tables:
        # 先把全局列 tokens 记上,后面 DDL 分区列会跟它对比
        partition_filters[t] = sorted(list(global_cols))

    return {
        "default_db": default_db,
        "source_tables": source_tables,
        "target_tables": target_tables,
        "partition_filters": partition_filters,
    }

4.迭代提取真实 Hive 元数据 && 查询表的特征库(核心节点)
根据提取出的表名,批量查询我们预先维护的表特征库(支持失败自动重试 3 次,保证稳定性),获取每张表的完整画像(存储格式、数据量、分区策略、Bucket 信息、使用场景等),这步相当于给 AI 提前“喂”了公司级最佳实践知识。

mnwx61wa.png

工作流输出结果

Prompt核心逻辑包括, 分区优化建议(粒度、字段、动态分区), 存储格式升级(TEXT → ORC/Parquet + 合适压缩), Bucket / Sort / Bloom Filter 推荐, 字段类型规范、注释补全、ACID 化建议, 结合表数据量和使用场景给出个性化优化, 最终输出结构化优化报告 + 重写后的完整 Spark SQL DDL。

mnwxdoml.png

6

评论

博主关闭了所有页面的评论