Flink 任务执行流程源码解析
侧边栏壁纸
  • 累计撰写 307 篇文章
  • 累计阅读 104.3万

Flink 任务执行流程源码解析

TOTC
2022-12-03 / 1,267 阅读 / 正在检测是否收录...

用户提交Flink任务时,通过先后调用transform()——>doTransform()——>addOperator()方法,将map、flatMap、filter、process等算子添加到List<Transformation<?>> transformations集合中。在执行execute()方法时,会使用StreamGraphGenerator的generate()方法构建流拓扑StreamGraph(即Pipeline),数据结构属于有向无环图。

在StreamGraph中,StreamNode用于记录算子信息,而StreamEdge则用于记录数据交换方式,包括以下几种Partitioner:


lnsl6a15.png

Partitioner类都是StreamPartitioner类的子类,它们通过实现isPointwise()方法来确定自身的类型。一种是ALL_TO_ALL,另一个种是POINTWISE。

/**
 * A distribution pattern determines, which sub tasks of a producing task are connected to which
 * consuming sub tasks.
 *
 * <p>It affects how {@link ExecutionVertex} and {@link IntermediateResultPartition} are connected
 * in {@link EdgeManagerBuildUtil}
 */
public enum DistributionPattern {

    /** Each producing sub task is connected to each sub task of the consuming task. */
    ALL_TO_ALL,

    /** Each producing sub task is connected to one or more subtask(s) of the consuming task. */
    POINTWISE
}

ALL_TO_ALL意味着上游的每个subtask需要与下游的每个subtask建立连接。


lnsjhrmw.png

POINTWISE则是上游的每个subtask和下游的一个或多个subtask连接。


lnsjiaiy.png

StreamGraph构建完成后,,会通过 PipelineExecutorUtils.getJobGraph()构建JobGraph,具体流程是:

——>PipelineExecutorUtils.getJobGraph()
——>FlinkPipelineTranslationUtil.getJobGraph()
——>StreamGraphTranslator.translateToJobGraph()
——>StreamGraph.getJobGraph()  
——>StreamingJobGraphGenerator.createJobGraph()

JobGraph是优化后的StreamGraph,如果相连的算子支持chaining,合并到一个StreamNode,chaining在StreamingJobGraphGenerator的setChaining()方法中实现:

/**
 * Sets up task chains from the source {@link StreamNode} instances.
 *
 * <p>This will recursively create all {@link JobVertex} instances.
 */
    private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes) {
    // we separate out the sources that run as inputs to another operator (chained inputs)
    // from the sources that needs to run as the main (head) operator.
    final Map<Integer, OperatorChainInfo> chainEntryPoints =
            buildChainedInputsAndGetHeadInputs(hashes, legacyHashes);
    final Collection<OperatorChainInfo> initialEntryPoints =
            chainEntryPoints.entrySet().stream()
                    .sorted(Comparator.comparing(Map.Entry::getKey))
                    .map(Map.Entry::getValue)
                    .collect(Collectors.toList());

    // iterate over a copy of the values, because this map gets concurrently modified
    for (OperatorChainInfo info : initialEntryPoints) {
        createChain(
                info.getStartNodeId(),
                1, // operators start at position 1 because 0 is for chained source inputs
                info,
                chainEntryPoints);
    }
}

将符合chaining条件的,合并到一个StreamNode

条件如下:

1. 下游节点输入边只有一个
2. 与下游属于同一个SlotSharingGroup
3. 数据分发策略Forward
4. 流数据交换模式不是批量模式
5. 上下游并行度相等
6. StreamGraph中chaining为true  streamGraph 是可以 chain的
7. 算子是否可以链化areOperatorsChainable

代码如下:

public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
    StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);

    return downStreamVertex.getInEdges().size() == 1 && isChainableInput(edge, streamGraph);
}

private static boolean isChainableInput(StreamEdge edge, StreamGraph streamGraph) {
    StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
    StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);

    if (!(upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
            && areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph)
            && (edge.getPartitioner() instanceof ForwardPartitioner)
            && edge.getExchangeMode() != StreamExchangeMode.BATCH
            && upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
            && streamGraph.isChainingEnabled())) {

        return false;
    }

... ...    

从Source节点开始,使用深度优先搜索(DFS)算法递归遍历有向无环图中的所有StreamNode节点。


lnso60u9.png

待续 ... ...

31

评论

博主关闭了所有页面的评论