用户提交Flink任务时,通过先后调用transform()——>doTransform()——>addOperator()方法,将map、flatMap、filter、process等算子添加到List<Transformation<?>> transformations集合中。在执行execute()方法时,会使用StreamGraphGenerator的generate()方法构建流拓扑StreamGraph(即Pipeline),数据结构属于有向无环图。
在StreamGraph中,StreamNode用于记录算子信息,而StreamEdge则用于记录数据交换方式,包括以下几种Partitioner:
Partitioner类都是StreamPartitioner类的子类,它们通过实现isPointwise()方法来确定自身的类型。一种是ALL_TO_ALL,另一个种是POINTWISE。
/**
* A distribution pattern determines, which sub tasks of a producing task are connected to which
* consuming sub tasks.
*
* <p>It affects how {@link ExecutionVertex} and {@link IntermediateResultPartition} are connected
* in {@link EdgeManagerBuildUtil}
*/
public enum DistributionPattern {
/** Each producing sub task is connected to each sub task of the consuming task. */
ALL_TO_ALL,
/** Each producing sub task is connected to one or more subtask(s) of the consuming task. */
POINTWISE
}
ALL_TO_ALL意味着上游的每个subtask需要与下游的每个subtask建立连接。
POINTWISE则是上游的每个subtask和下游的一个或多个subtask连接。
StreamGraph构建完成后,,会通过 PipelineExecutorUtils.getJobGraph()构建JobGraph,具体流程是:
——>PipelineExecutorUtils.getJobGraph()
——>FlinkPipelineTranslationUtil.getJobGraph()
——>StreamGraphTranslator.translateToJobGraph()
——>StreamGraph.getJobGraph()
——>StreamingJobGraphGenerator.createJobGraph()
JobGraph是优化后的StreamGraph,如果相连的算子支持chaining,合并到一个StreamNode,chaining在StreamingJobGraphGenerator的setChaining()方法中实现:
/**
* Sets up task chains from the source {@link StreamNode} instances.
*
* <p>This will recursively create all {@link JobVertex} instances.
*/
private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes) {
// we separate out the sources that run as inputs to another operator (chained inputs)
// from the sources that needs to run as the main (head) operator.
final Map<Integer, OperatorChainInfo> chainEntryPoints =
buildChainedInputsAndGetHeadInputs(hashes, legacyHashes);
final Collection<OperatorChainInfo> initialEntryPoints =
chainEntryPoints.entrySet().stream()
.sorted(Comparator.comparing(Map.Entry::getKey))
.map(Map.Entry::getValue)
.collect(Collectors.toList());
// iterate over a copy of the values, because this map gets concurrently modified
for (OperatorChainInfo info : initialEntryPoints) {
createChain(
info.getStartNodeId(),
1, // operators start at position 1 because 0 is for chained source inputs
info,
chainEntryPoints);
}
}
将符合chaining条件的,合并到一个StreamNode
条件如下:
1. 下游节点输入边只有一个
2. 与下游属于同一个SlotSharingGroup
3. 数据分发策略Forward
4. 流数据交换模式不是批量模式
5. 上下游并行度相等
6. StreamGraph中chaining为true streamGraph 是可以 chain的
7. 算子是否可以链化areOperatorsChainable
代码如下:
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
return downStreamVertex.getInEdges().size() == 1 && isChainableInput(edge, streamGraph);
}
private static boolean isChainableInput(StreamEdge edge, StreamGraph streamGraph) {
StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
if (!(upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
&& areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph)
&& (edge.getPartitioner() instanceof ForwardPartitioner)
&& edge.getExchangeMode() != StreamExchangeMode.BATCH
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
&& streamGraph.isChainingEnabled())) {
return false;
}
... ...
从Source节点开始,使用深度优先搜索(DFS)算法递归遍历有向无环图中的所有StreamNode节点。
待续 ... ...
评论