可以关注下公众号,专注数据开发、数据架构之路,热衷于分享技术干货。
一、提交流程
Flink作业在开发完毕之后,需要提交到Flink集群执行。ClientFronted是入口,触发用户开发的Flink应用Jar文件中的main方法,然后交给PipelineExecutor(流水线执行器,在FlinkClient 升成JobGraph之后,将作业提交给集群的重要环节。)#execue方法,最终会选择一个触发一个具体的PiplineExecutor执行。
运行模式 | 适用场景 |
Session模式 | 共享Dispatcher 和ResourceManager 按需申请资源,作业共享集群资源 适合执行时间段,频繁执行的短任务 |
Per-Job模式 | 独享Dispatcher 和 ResourceManager 按需申请资源,作业独享集群资源 长周期执行的任务,集群异常影响范围小 |
提交模式又可分为:
- Detached:Flink Client创建完集群之后,可以退出命令行窗口,集群独立运行。
- Attached:不能关闭命令行窗口,需要与集群之间维持连接。
1.1 Yarn Session提交流程
启动集群:
- 使用bin/yarn-session.sh提交会话模式的作业。
如果提交到已经存在的集群,则获取Yarn集群信息、应用ID,并准备提交作业。如果启动新的Yarn Session集群,则进入步骤(2)
- Yarn启动新Flink集群
1)如果没有集群,则创建一个新的Session模式的集群。首先将应用配置(flink-conf.yaml、logback.xml、log4j.properties)和相关文件(Flink Jar、配置类文件、用户Jar文件、JobGraph对象等)上传至分布式存储(如HDFS)的应用暂存目录。
2)通过Yarn Client 向Yarn 提交Flink创建集群的申请,Yarn分配资源,在申请的Yarn Container中初始化并启动FlinkJobManager进程,在JobManager进程中运行YarnSessionClusterEntrypoint作为集群启动入口(不同的集群部署模式有不同的ClusterEntrypoint实现),初始化Dispatcher、ResourceManager,启动相关的RPC服务,等待Client通过Rest接口提交作业。
作业提交:
Yarn 集群准备好后,开始作业提交。
1)Flink Client通过Rest向Dispatcher提交JobGraph。
2)Dispatcher是Rest接口,不负责实际的调度、执行方面的工作,当收到JobGraph后,为作业创建一个JobMaster,将工作交给JobManager(负责作业调度、管理作业和Task的生命周期),构建ExecutionGraph(JobGraph的并行化版本,调度层最核心的数据结构)。
这两个步骤结束后,作业进入调度执行阶段。
作业调度执行:
1)JobMaster向YarnResourceManager申请资源,开始调度ExecutionGraph执行,向YarnResourceManager申请资源;初次提交作业集群中尚没有TaskManager,此时资源不足,开始申请资源。
2)YarnResourceManager收到JobManager的资源请求,如果当前有空闲Slot则将Slot分配给JobMaster.,否则YarnResourceManager将向YarnMaster请求创建TaskManager。
3)YarnResourceManager将资源请求加入到等待请求队列,并通过心跳向Yarn RM 申请新的Container资源来启动TaskManager进程,Yarn分配新的Container给TaskManager。
4)YarnResourceManager启动,然后从HDFS加载Jar文件等所需要的的相关资源,在容器中启动TaskManager。
5)TaskManager启动之后,向ResourceManager注册,并把自己的Slot资源情况汇报给ResouceManager。
6)ResourceManager从等待队列中取出Slot请求,向TaskManager确认资源可用情况,并告知TaskManager将Slot分配给哪个JobMaster。
7)TaskManager向JobMaster提供Slot,JobMaster调度Task到TaskManager的此Slot上执行。
1.2 Yarn Per-Job提交流程
启动集群:
- 使用./flink run -m yarn-cluster提交Per-Job模式的作业。
- Yarn启动Flink集群。该模式下Flink集群的启动入口是YarnJobClusterEntryPoint,其他与YarnSession模式下集群的启动类似。
作业提交:
该步骤与Seesion模式下的不同,Client并不会通过Rest向Dispatcher提交JobGraph,由Dispatcher从本地文件系统获取JObGraph,其后的不好走与Session模式的一样
作业调度执行:
与Yarn Session模式下一致。
1.3 K8s Session提交流程
启动集群:
- Flink客户端首先连接Kubernetes API Server,提交Flink集群的资源描述文件,包括flink-configuration-configmap.yaml、jobmanager-service.yaml、jobmanager-deployment.yaml和taskmanager-deployment.yaml等。
- Kubernets Master会根据这些资源描述文件去创建对应的Kubernetes实体。以JobManager部署为例,Kubernetes集群中的某个节点收到请求后,Kubelet进程会从中央仓库下载Flink镜像,准备和挂载卷,然后执行启动命令。Pod启动后Flink Master(JobManager)进程随之启动,初始化Dispacher和KubernetesResourceManager。并通过K8s服务对外暴露FlinkMaster的端口,K8s服务类似于路由服务。
两个步骤完成之后,Session模式的集群就创建成功,集群可以接收作业提交请求,但是此时还没有JobManager、TaskManager,当作业需要执行时,才会按需创建。
作业提交:
- Client用户可以通过Flink命令行(即Flink Client)向这个会话模式的集群提交任务。此时JobGraph会在FlinkClient端生成,然后和用户Jar包一起通过RestClient上传。
- 作业提交成功,Dispatcher会为每个作业启动一个JobMaster,将JobGraph交给JobMaster调度执行。
两个步骤完成之后,作业进入调度执行阶段。
作业调度执行:
K8s Session模式集群下,ResourceManager向k8sMaster申请和释放TaskManager,除此之外,作业的调度与执行和Yarn模式是一样的。
1)JobMaster向KubernetesResourceManager请求Slot。
2)KubernetesResourceManager从kubernetes集群分配TaskManager。每个TaskManager都是具有唯一标识的Pod。KubernetesResourceManager会为TaskManager生成一份新的配置文件,里面有Flink Master的service name 作为地址。这样在FLInkMaster failover之后,TaskManager仍然可以重新连上。
3)Kubernetes集群分配一个新的Pod后,在上面启动TaskManager。
4)TaskManager启动后注册到SlotManager。
5)SlotManager向TaskManager请求Slot.
6)TaskManager 提供Slot给JobMaster,然后任务就会被分配到这个Slot上运行。
二、Graph总览
- 流计算应用的Graph转换:StreamGraph-->JobGraph-->ExecutionGraph-->物理执行图(启动计算任务)
- 批处理应用的Graph转换:OptimizedPlan-->JobGraph
- Table & SQL API的Graph转换:Blink Table Planner /Flink Table Planner。
2.1 流图
使用DataStreamAPI 开发的应用程序,首先被转换为Transformation,然后被映射为StreamGraph。
2.1.1 SteramGraph核心对象
- StreamNode
StreamNode是StremGraph中的节点 ,从Transformation转换而来,可以简单理解为一个StreamNode表示一个算子,从逻辑上来说,SteramNode在StreamGraph中存在实体和虚拟的StreamNode。StremNode可以有多个输入,也可以有多个输出。
实体的StreamNode会最终变成物理算子。虚拟的StreamNode会附着在StreamEdge上。
- StreamEdge
StreamEdge是StreamGraph中的边,用来连接两个StreamNode,一个StreamNode可以有多个出边、入边,StreamEdge中包含了旁路输出、分区器、字段筛选输出等信息。
2.1.2 StreamGraph生成过程
StreamGraph在FlinkClient中生成,由FlinkClient在提交的时候触发Flink应用的main方法,用户编写的业务逻辑组装成Transformation流水线,在最后调用StreamExecutionEnvironment.execute() 的时候开始触发StreamGraph构建。
StreamGraph实际上是在StreamGraphGenerator中生成的,从SinkTransformation(输出) 向前追溯到SourceTransformation。在遍历过程中一边遍历一边构建StreamGraph。
在遍历Transformation的过程中,会对不同类型的Transformation分别进行转换。对于物理Transformation则转换为StreamNode实体,对于虚拟Transformation则作为虚拟StreamNode。
针对于某一种类型的Transformation,会调用其相应的transformxxx()函数进行转换。transfromxxx()首先转换上游Transformation进行递归转换,确保上游的都已经完成了转换。然后通过addOperator()方法构造出StreamNode,通过addEdge()方法与上游的transform进行连接,构造出StreamEdge。
在添加StreamEdge的过程中,如果ShuffleMode为null,则使用ShuffleMode PIPELINED模式,在流计算中,只有PIPLINED模式才会在批处理中设计其他模式。构建StreamEdge的时候,在转换Transformation过程中生成的 虚拟StreamNode会将虚拟StreamNode的信息附着在StreamEdge上
2.1.3 虚拟Transformation 的转换
虚拟的Transformation生成的时候不会转换为SteramNode,而是添加为虚拟节点。
private void addEdgeInternal(Integer upStreamVertexID,
Integer downStreamVertexID,
int typeNumber,
StreamPartitioner<?> partitioner,
List<String> outputNames,
OutputTag outputTag,
ShuffleMode shuffleMode) {
//当上游是sideoutput时,递归调用,并传入sideoutput信息
if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
if (outputTag == null) {
outputTag = virtualSideOutputNodes.get(virtualId).f1;
}
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag, shuffleMode);
}
//当上游是select时,递归调用,并传入select信息
else if (virtualSelectNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualSelectNodes.get(virtualId).f0;
if (outputNames.isEmpty()) {
// selections that happen downstream override earlier selections
outputNames = virtualSelectNodes.get(virtualId).f1;
}
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);
}
//当上游是Partition时,递归调用,并传入Partition信息
else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
if (partitioner == null) {
partitioner = virtualPartitionNodes.get(virtualId).f1;
}
shuffleMode = virtualPartitionNodes.get(virtualId).f2;
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);
}
//不是以上逻辑转换的情况,真正构建StreamEdge
else {
StreamNode upstreamNode = getStreamNode(upStreamVertexID);
StreamNode downstreamNode = getStreamNode(downStreamVertexID);
// If no partitioner was specified and the parallelism of upstream and downstream
// operator matches use forward partitioning, use rebalance otherwise.
//没有指定partitioner时,会为其选择forward或者rebalance
if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
partitioner = new ForwardPartitioner<Object>();
} else if (partitioner == null) {
partitioner = new RebalancePartitioner<Object>();
}
if (partitioner instanceof ForwardPartitioner) {
if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
throw new UnsupportedOperationException("Forward partitioning does not allow " +
"change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
" You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
}
}
if (shuffleMode == null) {
shuffleMode = ShuffleMode.UNDEFINED;
}
//创建StreamEdge,并将该SteramEdge添加到上游的输出,下游的输入。
StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag, shuffleMode);
getStreamNode(edge.getSourceId()).addOutEdge(edge);
getStreamNode(edge.getTargetId()).addInEdge(edge);
}
}
2.2 作业图
JobGraph可以由流计算的StreamGraph和批处理的OptimizedPlan转换而来。流计算中,在StreamGraph的基础上进行了一些优化,如果通过OperatorChain机制将算子合并起来,在执行时,调度在同一个Task线程上,避免数据的跨线程、跨网段的传递。
2.2.1 JobGraph核心对象
- JobVertex
经过算子融合优化后符合条件的多个SteramNode可能会融合在一起生成一个JobVertex,即一个JobVertex包含一个或多个算子,JobVertex的输入是JobEdge,输出是IntermediateDataSet。
- JobEdge
JobEdge是JobGraph中连接IntermediateDataSet和JobVertex的边,表示JobGraph中的一个数据流转通道,其上游数据源是IntermediateDataSet,下游消费者是JobVertex。数据通过JobEdge 由IntermediateDataSet传递给JobVertex。
- IntermediateDataSet
中间数据集IntermediateDataSet是一种逻辑结构,用来表示JobVertex的输出,即该JobVertex中包含的算子会产生的数据集。不同的执行模式下,其对应的结果分区类型不同,决定了在执行时刻数据交换的模式。
IntermediateDataSet的个数与该JobVertex对应的StreamNode的出边数量相同,可以是一个或者多个。
2.2.2 JobGraph生成过程
StreamingJobGraphGenerator负责流计算JobGraph的生成,在转换前需要进行一系列的预处理。
private JobGraph createJobGraph() {
preValidate();
// make sure that all vertices start immediately
//设置调度模式
jobGraph.setScheduleMode(streamGraph.getScheduleMode());
// Generate deterministic hashes for the nodes in order to identify them across
// submission iff they didn't change.
//为每个节点生成确定的hashid作为唯一表示,在提交和执行过程中保持不变。
Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
// Generate legacy version hashes for backwards compatibility
//为了向后保持兼容,为每个节点生成老版本的hash id
List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
}
Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>();
//真正对SteramGraph进行转换,生成JobGraph图
setChaining(hashes, legacyHashes, chainedOperatorHashes);
setPhysicalEdges();
//设置共享slotgroup
setSlotSharingAndCoLocation();
setManagedMemoryFraction(
Collections.unmodifiableMap(jobVertices),
Collections.unmodifiableMap(vertexConfigs),
Collections.unmodifiableMap(chainedConfigs),
id -> streamGraph.getStreamNode(id).getMinResources(),
id -> streamGraph.getStreamNode(id).getManagedMemoryWeight());
//配置checkpoint
configureCheckpointing();
jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());
//如果有之前的缓存文件的配置,则重新读入
JobGraphGenerator.addUserArtifactEntries(streamGraph.getUserArtifacts(), jobGraph);
// set the ExecutionConfig last when it has been finalized
try {
//设置执行环境配置
jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
}
catch (IOException e) {
throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." +
"This indicates that non-serializable types (like custom serializers) were registered");
}
return jobGraph;
}
预处理完毕后,开始构建JobGraph中的点和边,从Source向下遍历StreamGraph,逐步创建JObGraph,在创建的过程中同事完成算子融合(OperatorChain)优化。
执行具体的Chain和JobVertex生成、JobEdge的关联、IntermediateDataSet。从StreamGraph读取数据的StreamNode开始,递归遍历同时将StreamOperator连接在一起。
整理构建的逻辑如下(看上图!!!):
1)从Source开始,Source与下游的FlatMap不可连接,Source是起始节点,自己成为一个JobVertx。
2)此时开始一个新的连接分析,FlatMap是起始节点,与下游的KeyedAgg也不可以连接,那么FlatMap自己成为一个JobVertex。
3)此时开始一个新的连接分析。KeyedAgg是起始节点,并且与下游的Sink可以连接,那么递归地分析Sink节点,构造Sink与其下游是否可以连接,因为Slink没有下游,所以KeyedAgg和Sink节点连接在一起,共同构成了一个JobVertex。在这个JobVertex中,KeyedAgg是起始节点,index编号为0,sink节点index编号为1.
构建JobVertex的时候需要将StreamNode中的重要配置信息复制到JobVertex中。构建好JobVertex之后,需要构建JobEdge将JobVertex连接起来。KeyedAgg和Sink之间构成了一个算子连接,连接内部的算子之间无序构成JobEdge进行连接。
在构建JobEdge的时候,很重要的一点是确定上游JobVertex和下游JobVertex的数据交换方式。此时根据ShuffleMode来确定ResultPartition类型,用FlinkPartition来确定JobVertex的连接方式。
Shuffle确定了ResultPartition,那么就可以确定上游JobVertex输出的IntermediateDataSet的类型了,也就知道JobEdge的输入IntermediateDataSet。
ForwardPartitioner和RescalePartitioner两种类型的Partitioner转换为DistributionPattern.POINTWISE 的分发模式。其他类型的Partitioner统一转换为DistributionPattern.ALL_TO_ALL模式。
JobGraph的构建和OperatorChain优化:
private List<StreamEdge> createChain(
Integer startNodeId,
Integer currentNodeId,
Map<Integer, byte[]> hashes,
List<Map<Integer, byte[]>> legacyHashes,
int chainIndex,
Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
if (!builtVertices.contains(startNodeId)) {
List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);
//获取当前节点的出边,判断是否符合OperatorChain的条件
//分为两类:chainableoutputs,nonchainableoutputs
for (StreamEdge outEdge : currentNode.getOutEdges()) {
if (isChainable(outEdge, streamGraph)) {
chainableOutputs.add(outEdge);
} else {
nonChainableOutputs.add(outEdge);
}
}
//对于chainable的边,递归调用createchain
//返回值添加到transitiveOutEdges中
for (StreamEdge chainable : chainableOutputs) {
transitiveOutEdges.addAll(
createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes));
}
//对于无法chain在一起的边,边的下游节点作为Operatorchain的Head节点
//进行递归调用,返回值添加到transitiveOutEdges中
for (StreamEdge nonChainable : nonChainableOutputs) {
transitiveOutEdges.add(nonChainable);
createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);
}
List<Tuple2<byte[], byte[]>> operatorHashes =
chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>());
byte[] primaryHashBytes = hashes.get(currentNodeId);
OperatorID currentOperatorId = new OperatorID(primaryHashBytes);
for (Map<Integer, byte[]> legacyHash : legacyHashes) {
operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId)));
}
chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));
if (currentNode.getInputFormat() != null) {
getOrCreateFormatContainer(startNodeId).addInputFormat(currentOperatorId, currentNode.getInputFormat());
}
if (currentNode.getOutputFormat() != null) {
getOrCreateFormatContainer(startNodeId).addOutputFormat(currentOperatorId, currentNode.getOutputFormat());
}
//如果当前节点是起始节点,则直接创建JobVertex,否则返回一个空的StreamConfig
StreamConfig config = currentNodeId.equals(startNodeId)
? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes)
: new StreamConfig(new Configuration());
//将StreamNode中的配置信息序列化到Streamconfig中。
setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);
//再次判断,如果是Chain的起始节点,执行connect()方法,创建JobEdge和IntermediateDataset
//否则将当前节点的StreamConfig 添加到chainedConfig中。
if (currentNodeId.equals(startNodeId)) {
config.setChainStart();
config.setChainIndex(0);
config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
for (StreamEdge edge : transitiveOutEdges) {
connect(startNodeId, edge);
}
config.setOutEdgesInOrder(transitiveOutEdges);
config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
} else {
chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap<Integer, StreamConfig>());
config.setChainIndex(chainIndex);
StreamNode node = streamGraph.getStreamNode(currentNodeId);
config.setOperatorName(node.getOperatorName());
chainedConfigs.get(startNodeId).put(currentNodeId, config);
}
config.setOperatorID(currentOperatorId);
if (chainableOutputs.isEmpty()) {
config.setChainEnd();
}
return transitiveOutEdges;
} else {
return new ArrayList<>();
}
}
2.2.3 算子融合
一个Operatorchain在同一个Task线程内执行。OperatorChain内的算子之间,在同一个线程内通过方法调用的方式传递数据,能减少线程之间的切换,减少消息的序列化/反序列化,无序借助内存缓存区,也无须通过网络在算子间传递数据,可在减少延迟的同时提高整体吞吐量
operatorchain的条件:
1)下游节点的入度为1
2)SteramEdge的下游节点对应的算子不为null
3)StreamEdge的上游节点对应的算子不为null
4)StreamEdge的上下游节点拥有相同的slotSharingGroup,默认都是default.
5)下游算子的连接策略为ALWAYS.
6)上游算子的连接策略为ALWAYS 或者HEAD.
7)StreamEdge的分区类型为ForwardPartitioner
8)上下游节点的并行度一致
9)当前StreamGraph允许chain
2.3 执行图
2.3.1 ExecutionGraph核心对象
- ExecutionJobVertex
该对象和JobGraph中的JobVertex一一对应。该对象还包含了一组ExecutionVertex,数量与该JobVertex中所包含的SteramNode的并行度一致。
ExecutionJobVertex用来将一个JobVertex封装成一ExecutionJobVertex,并以此创建ExecutionVertex、Execution、IntermediateResult和IntermediateResultPartition,用于丰富ExecutionGraph。
在ExecutionJobVertex的构造函数中,首先是依据对应的JobVertex的并发度,生成对应个数的ExecutionVertex。其中,一个ExecutionVertex代表一个ExecutionJobVertex的并发子Task。然后是将原来JobVertex的中间结果IntermediateDataSet转化为ExecutionGrap中IntermediateResult
- ExecutionVertex
ExecutionJobVertex中会对作业进行并行化处理,构造可以并行执行的实例,每个并行执行的实例就是ExecutionVertex.
构建ExecutionVertex的同时,也回构建ExecutionVertex的输出IntermediateResult。并且将ExecutionEdge输出为IntermediatePartition。
ExecutionVertex的构造函数中,首先会创建IntermediatePartition,并通过IntermediateResult.setPartition()建立IntermediateResult和IntermediateResultPartition之间的关系,然后生成Execution,并配置资源相关。
- IntermediateResult
IntermediateResult又叫做中间结果集,该对象是个逻辑概念,表示ExecutionJobVertex的输出,和JobGraph中的IntermediateDataSet一一对应,同样,一个ExecutionJobVertex可以有多个中间二级果,取决于当前JobVertex有几个出边。
一个中间结果集包含多个中间结果分区IntermediateResultPartition,其个数等于该JobVertex的并发度。
- IntermediateResultPartition
IntermediateResultPartition又叫做中间结果分区,表示1个ExecutionVertex输出结果,与ExecutionEdge相关联。
- ExecutionEdge
表示ExecutionVertex的输入,连接到上游产生的IntermediateResultPartition。一个Execution对应于唯一的一个IntermediateResultPartition和一个ExecutionVertex。一个ExecutionVertex可以有多个ExecutionEdge。
- Execution
ExecutionVertex相当于每个Task的模板,在真正执行的时候,会将ExecutionVertex中的信息包装为一个Execution,执行一个ExecutionVertex的一次尝试。JobManager和TaskManager之间关于Task的部署和Task执行状态的更新都是通过ExecutionAttemptID来标识实例的。在故障或者数据需要重算的情况下,ExecutionVertex可能会有多个ExecutionAttemptID.一个Execution通过ExecutionAttemptID标识。
2.3.2 ExecutionGrap生成过程
初始话作业调度器的时候,根据JobGraph生活ExecutionGraph。在SchedulerBase的构造方法中触发构建,最终调用SchedulerBase#createExecutionGraph 触发实际的构建动作,使用ExecutionGraphBuiler构建ExecutionGraph。
核心代码attachJobGraph:
构建ExecutionEdge 的连接策略:
- 点对点连接(DistributionPattern.POINTWISE)
该策略用来连接当前ExecutionVertex与上游的IntermediataeResultParition。
连接分三种情况
1)一对一连接:并发的Task数量与分区数相等。
2)多对一连接:下游的Task数量小于上游的分区数,此时分两种情况:
a:下游Task可以分配同数量的结果分区IntermediataeResultParition。如上游有4个结果分区,下游有2个Task,那么每个Task会分配两个结果分区进行消费。
b:每个Task消费的上游分区结果数据不均,如上游有3个结果分区,下游有两个Task,那么一个Task分配2个结果分区消费,另一个分配一个结果分区消费。
3)一对多连接:下游的Task数量多余上游的分区数,此时两种情况:
a:每个结果分区的下游消费Task数据量相同,如上游有两个结果分区,下游有4个Task,每个结果分区被两个Task消费。
b:每个结果分区的下游消费Task数量不相同,如上游有两个结果分区,下游有3个Task,那么一个结果分区分配2个Task消费,另一个结果分区分配一个Task消费。
- 全连接(DistributionPattern.ALL_TO_ALL)
该策略下游的ExecutionVertex与上游的所有IntermediataeResultParition建立连接,消费其生产的数据。一般全连接的情况意味着数据在Shuffle。
接下来Flink资源管理篇,如果对Flink感兴趣或者正在使用的小伙伴,可以加我入群一起探讨学习。
参考书籍《Flink 内核原理与实现》
欢迎关注公众号: 数据基石