基本概念
资源单位
Instance
TaskManager 实例
public class Instance implements SlotOwner { /** The instance gateway to communicate with the instance */ private final TaskManagerGateway taskManagerGateway; /** The number of task slots available on the node */ private final int numberOfSlots; /** A list of available slot positions */ private final Queue<Integer> availableSlots; /** Allocated slots on this taskManager */ private final Set<Slot> allocatedSlots = new HashSet<Slot>(); }
SharedSlot
可以继续拆分为多个 SimpleSlot | SharedSlot
public class SharedSlot extends Slot implements LogicalSlot { /** The assignment group os shared slots that manages the availability and release of the slots */ private final SlotSharingGroupAssignment assignmentGroup; /** The set os sub-slots allocated from this shared slot */ private final Set<Slot> subSlots; }
SimpleSlot
最小单位
Scheduler
维护着集群中所有可以用资源,对外暴露了资源分配接口
- 作为 InstanceManager 的 InstanceListener 存在,新 TaskManager 注册后,InstanceManager 会通知到 Scheduler
- 对 Execution 暴露 allocateSlot 接口,提供 Slot 的分配
关键方法
- newInstanceAvailable
- 新 TaskManager 注册后,InstanceManager 会通过此接口通知到 Scheduler
- allocateSlot
- 优先通过 SlotSharingGroupAssignment 来分配
- 如果 SlotSharingGroupAssignment 无可用 Slot 或者分配的 Slot 不满足需求,从总的资源池中分配一个 Slot,并将其加入 Task 所属的 SlotSharingGroupAssignment
资源组织方式
// Scheduler Map<ResourceID /* TaskManagerID */, Instance> instancesWithAvailableResources = new LinkedHashMap<>();
ExecutionGraph
与 Job 一一对应
- 维护了 Job 当前的运行状态,各种状态。。。
- 协调 Task 的调度执行
- 处理运行异常,协调进行 Failover
ExecutionJobVertex
One ExecutionJobVertex per JobVertex
... private final ExecutionVertex[] taskVertices; private final SlotSharingGroup slotSharingGroup; private final CoLocationGroup coLocationGroup; ...
关键方法
resetForNewExecution
- 清理 slotSharingGroup
- 重置所有 ExecutionVertex
ExecutionVertex
Operator 的每个并发都对应一个 ExecutionVertex。
// 历史运行记录 private final EvictingBoundedList<Execution> priorExecutions; // 当前正在执行 private volatile Execution currentExecution;
关键方法
- resetForNewExecution
- 保存现有的 Execution 信息
- 初始化一个新的 Execution
- scheduleForExecution
- Execution.scheduleForExecution
Execution
ExecutionVertex 的运行实例,ExecutionVertex 的每次运行都会生成一个新的 Execution,主要实现了 Allocate Slot、Deploy Task 功能。
关键方法
- allocateAndAssignSlotForExecution
- 分配一个 Slot
- deploy
- 和 TaskManager 通信,将 Task 部署到 TaskManager
- scheduleForExecution
- allocateAndAssignSlotForExecution + deploy
SlotSharingGroupAssignment
每个 SharingGroup 一个 SlotSharingGroupAssignment,维护了已经分配给此 SharingGroup 的 Slot。
关键方法
- getSharedSlotForTask
- 从已经分配给此 SharingGroup 的 Slots 中获取一个可用 Slot
- addSharedSlotAndAllocateSubSlot
- 为此 SharingGroup 新增一个 Slot
Slots 资源组织方式
// 所有分配给此 SharingGroup 的 Slot Set<SharedSlot> allSlots = new LinkedHashSet<SharedSlot>(); // 每个 JobVertex 的 SubTask 之间不可以共享 Slot, Map<AbstractID/* JobVertexID */, Map<ResourceID /* TaskManagerID */, List<SharedSlot>>> availableSlotsPerJid = new LinkedHashMap<>();
Job 正常执行流程
ExecutionGraph.scheduleEager
首次运行、FullRestart 入口
// 为每个 ExecutionVertex 生成新的 Execution for ExecutionJobVertex in ExecutionGraph.ExecutionJobVertexs for ExecutionVertex in ExecutionJobVertex.ExecutionVertexs ExecutionVertex.resetForNewExecution() // 为每个 Execution 分配 Slot for ExecutionJobVertex in ExecutionGraph.ExecutionJobVertexs for ExecutionVertex in ExecutionJobVertex.ExecutionVertexs ExecutionVertex.CurrentExecution.allocateAndAssignSlotForExecution() // 将 Execution 部署到 TaskManager for ExecutionJobVertex in ExecutionGraph.ExecutionJobVertexs for ExecutionVertex in ExecutionJobVertex.ExecutionVertexs ExecutionVertex.CurrentExecution.deploy()
Slot 分配逻辑
Execution.allocateAndAssignSlotForExecution
- calculatePreferredLocations
- 默认:input 所在的 TaskManager 优先
- LocalRestart 场景:PriorAllocation 优先
- CoGroup 场景:
- 调用 Scheduler.allocateSlot 分配 Slot
Scheduler.allocateSlot
- 从 SlotSharingGroupAssignment 获取一个 Slot (假设为 S1), SlotSharingGroupAssignment.getShardSlotForTask()
- 判断 Slot 是否满足条件:是否为 Local(CoLocations + PreferredLocations 的限制)
- 不满足条件的话,从总资源池中申请一个新 Slot 加入到 SlotSharingGroupAssignment,假设为 (S3),Scheduler.getNewSlotForSharingGroup()
- 综合第一步、第三步执行结果做不同处理
- S1 == null and S3 != null:使用 S3
- S1 == null and S3 == null:NoResourceAvailableException
- S1 != null and S3 != null and S3 is local:释放 S1
- S1 != null and S3 != null and S3 not local:释放 S3,使用 S1
Scheduler.getNewSlotForSharingGroup
- 找到可用的 TaskManager,查找 TaskManager 时会参考 PreferredLocations
- 从可用 TaskManager 分配 Slot 加入到 SlotSharingGroup 中
Failover
入口
...
-> running
-> 执行异常
-> ExecutionGraph.notifyExecutionChange
-> failoverStrategy.onTaskFailure(execution, ex)
-> xxx
FailoverStrategy
- RestartAllStrategy
- LocalRestartAllStrategy
- RestartIndividualStrategy
- RestartIndividualForeverStrategy
- RestartPipelinedRegionStrategy
RestartAllStrategy
RestartAllStrategy.onTaskFailure
executionGraph.failGlobal(cause)
// 退出所有 Task
-> ExecutionJobVertex.cancelWithFuture()
-> foreach ExecutionVertex : ExecutionVertex::cancel()
// 等待所有 Task 都退出后重新调度执行
-> allVerticesInTerminalState(globalVersionForRestart)
-> tryRestartOrFail
-> restartStrategy.restart : FixedDelayRestartStrategy | FailureRateRestartStrategy
-> ExecutionGraph.restart
// 清理状态
-> foreach ExecutionJobVertex : resetForNewExecution
-> clearTaskAssignment
-> foreach ExecutionVertex : resetForNewExecution
-> priorExecutions.add(oldExecution)
-> generate new Execution
// 分配 Slot 并调度执行
-> ExecutionGraph。scheduleForExecution
-> ExecutionGraph.scheduleEager
注:每次 Failover 都会清理 SlotSharingGroupAssignment,ExecutionJobVertex,resetForNewExecution
LocalRestartAllStrategy
设置了 ExecutionGraph.previousLocationFirst = true,除此之外同 RestartAllStrategy
RestartIndividualStrategy
onTaskFailure
-> ExecutionVertex.resetForNewExecution
-> newExecution.scheduleForExecution
-> Execution.allocateAndAssignSlotForExecution()
-> Execution.deploy()
RestartPipelinedRegionStrategy
相比其它 FailoverStrategy 额外维护了 ExecutionVertex 的分组信息
class RestartPipelinedRegionStrategy { private final HashMap<ExecutionVertex, FailoverRegion> vertexToRegion } class FailoverRegion { private final List<ExecutionVertex> connectedExecutionVertexes; }
onTaskFailure
-> FailoverRegion.onExecutionFail
-> FailoverRegion.cancel
-> foreach connected ExecutionVertex : ExecutionVertex.cancel
-> allVerticesInTerminalState
-> reset
-> foreach connected ExecutionVertex : ExecutionVertex.resetForNewExecution
-> restart
-> foreach connected ExecutionVertex : ExecutionVertex.scheduleForExecution
-> Execution.scheduleForExecution
-> Execution.allocateAndAssignSlotForExecution
-> Execution.deploy
xx
SlotSharingGroupAssignment line 256
重启时 xx map
// make sure an empty entry exists for this group, if no other entry exists
if (!entryForNewJidExists) {
availableSlotsPerJid.put(groupIdForMap, new LinkedHashMap<ResourceID, List<SharedSlot>>());
}
暂无评论...