Flink Task Deploy & Task Failover

Flink 4年前 (2021) 农夫的狗
1,072 0

基本概念

资源单位

Instance

TaskManager 实例
public class Instance implements SlotOwner {
/** The instance gateway to communicate with the instance */
private final TaskManagerGateway taskManagerGateway;

/** The number of task slots available on the node */
private final int numberOfSlots;

/** A list of available slot positions */
private final Queue<Integer> availableSlots;

/** Allocated slots on this taskManager */
private final Set<Slot> allocatedSlots = new HashSet<Slot>();
}

SharedSlot

可以继续拆分为多个 SimpleSlot | SharedSlot
public class SharedSlot extends Slot implements LogicalSlot {
/** The assignment group os shared slots that manages the availability and release of the slots */
private final SlotSharingGroupAssignment assignmentGroup;

/** The set os sub-slots allocated from this shared slot */
private final Set<Slot> subSlots;
}

SimpleSlot

最小单位

Scheduler

维护着集群中所有可以用资源,对外暴露了资源分配接口
  1. 作为 InstanceManager 的 InstanceListener 存在,新 TaskManager 注册后,InstanceManager 会通知到 Scheduler
  1. 对 Execution 暴露 allocateSlot 接口,提供 Slot 的分配

关键方法

  1. newInstanceAvailable
    1. 新 TaskManager 注册后,InstanceManager 会通过此接口通知到 Scheduler
  1. allocateSlot
    1. 优先通过 SlotSharingGroupAssignment 来分配
    2. 如果 SlotSharingGroupAssignment 无可用 Slot 或者分配的 Slot 不满足需求,从总的资源池中分配一个 Slot,并将其加入 Task 所属的 SlotSharingGroupAssignment

资源组织方式

// Scheduler
Map<ResourceID /* TaskManagerID */, Instance> instancesWithAvailableResources = new LinkedHashMap<>();

ExecutionGraph

与 Job 一一对应
  1. 维护了 Job 当前的运行状态,各种状态。。。
  1. 协调 Task 的调度执行
  1. 处理运行异常,协调进行 Failover

ExecutionJobVertex

One ExecutionJobVertex per JobVertex
...
private final ExecutionVertex[] taskVertices;
private final SlotSharingGroup slotSharingGroup;
private final CoLocationGroup coLocationGroup;
...

关键方法

resetForNewExecution
  1. 清理 slotSharingGroup
  1. 重置所有 ExecutionVertex

ExecutionVertex

Operator 的每个并发都对应一个 ExecutionVertex。
// 历史运行记录
private final EvictingBoundedList<Execution> priorExecutions;

// 当前正在执行
private volatile Execution currentExecution;

关键方法

  1. resetForNewExecution
    1. 保存现有的 Execution 信息
    2. 初始化一个新的 Execution
  1. scheduleForExecution
    1. Execution.scheduleForExecution

Execution

ExecutionVertex 的运行实例,ExecutionVertex 的每次运行都会生成一个新的 Execution,主要实现了 Allocate Slot、Deploy Task 功能。

关键方法

  1. allocateAndAssignSlotForExecution
    1. 分配一个 Slot
  1. deploy
    1. 和 TaskManager 通信,将 Task 部署到 TaskManager
  1. scheduleForExecution
    1. allocateAndAssignSlotForExecution + deploy

SlotSharingGroupAssignment

每个 SharingGroup 一个 SlotSharingGroupAssignment,维护了已经分配给此 SharingGroup 的 Slot。

关键方法

  1. getSharedSlotForTask
    1. 从已经分配给此 SharingGroup 的 Slots 中获取一个可用 Slot
  1. addSharedSlotAndAllocateSubSlot
    1. 为此 SharingGroup 新增一个 Slot

Slots 资源组织方式

// 所有分配给此 SharingGroup 的 Slot
Set<SharedSlot> allSlots = new LinkedHashSet<SharedSlot>();

// 每个 JobVertex 的 SubTask 之间不可以共享 Slot,
Map<AbstractID/* JobVertexID */, Map<ResourceID /* TaskManagerID */, List<SharedSlot>>> availableSlotsPerJid = new LinkedHashMap<>();

Job 正常执行流程

ExecutionGraph.scheduleEager

首次运行、FullRestart 入口
// 为每个 ExecutionVertex 生成新的 Execution
for ExecutionJobVertex in ExecutionGraph.ExecutionJobVertexs
for ExecutionVertex in ExecutionJobVertex.ExecutionVertexs
ExecutionVertex.resetForNewExecution()

// 为每个 Execution 分配 Slot
for ExecutionJobVertex in ExecutionGraph.ExecutionJobVertexs
for ExecutionVertex in ExecutionJobVertex.ExecutionVertexs
ExecutionVertex.CurrentExecution.allocateAndAssignSlotForExecution()

// 将 Execution 部署到 TaskManager
for ExecutionJobVertex in ExecutionGraph.ExecutionJobVertexs
for ExecutionVertex in ExecutionJobVertex.ExecutionVertexs
ExecutionVertex.CurrentExecution.deploy()

Slot 分配逻辑

Execution.allocateAndAssignSlotForExecution

  1. calculatePreferredLocations
    1. 默认:input 所在的 TaskManager 优先
    2. LocalRestart 场景:PriorAllocation 优先
    3. CoGroup 场景:
  1. 调用 Scheduler.allocateSlot 分配 Slot

Scheduler.allocateSlot

  1. 从 SlotSharingGroupAssignment 获取一个 Slot (假设为 S1), SlotSharingGroupAssignment.getShardSlotForTask()
  1. 判断 Slot 是否满足条件:是否为 Local(CoLocations + PreferredLocations 的限制)
  1. 不满足条件的话,从总资源池中申请一个新 Slot 加入到 SlotSharingGroupAssignment,假设为 (S3),Scheduler.getNewSlotForSharingGroup()
  1. 综合第一步、第三步执行结果做不同处理
    1. S1 == null and S3 != null:使用 S3
    2. S1 == null and S3 == null:NoResourceAvailableException
    3. S1 != null and S3 != null and S3 is local:释放 S1
    4. S1 != null and S3 != null and S3 not local:释放 S3,使用 S1

Scheduler.getNewSlotForSharingGroup

  1. 找到可用的 TaskManager,查找 TaskManager 时会参考 PreferredLocations
  1. 从可用 TaskManager 分配 Slot 加入到 SlotSharingGroup 中

Failover

入口

...
-> running
-> 执行异常
-> ExecutionGraph.notifyExecutionChange
-> failoverStrategy.onTaskFailure(execution, ex)
-> xxx

FailoverStrategy

  • RestartAllStrategy
  • LocalRestartAllStrategy
  • RestartIndividualStrategy
  • RestartIndividualForeverStrategy
  • RestartPipelinedRegionStrategy

RestartAllStrategy

RestartAllStrategy.onTaskFailure
executionGraph.failGlobal(cause)
// 退出所有 Task
-> ExecutionJobVertex.cancelWithFuture()
-> foreach ExecutionVertex : ExecutionVertex::cancel()
// 等待所有 Task 都退出后重新调度执行
-> allVerticesInTerminalState(globalVersionForRestart)
-> tryRestartOrFail
-> restartStrategy.restart : FixedDelayRestartStrategy | FailureRateRestartStrategy
-> ExecutionGraph.restart
// 清理状态
-> foreach ExecutionJobVertex : resetForNewExecution
-> clearTaskAssignment
-> foreach ExecutionVertex : resetForNewExecution
-> priorExecutions.add(oldExecution)
-> generate new Execution
// 分配 Slot 并调度执行
-> ExecutionGraph。scheduleForExecution
-> ExecutionGraph.scheduleEager
注:每次 Failover 都会清理 SlotSharingGroupAssignment,ExecutionJobVertex,resetForNewExecution

LocalRestartAllStrategy

设置了 ExecutionGraph.previousLocationFirst = true,除此之外同 RestartAllStrategy

RestartIndividualStrategy

onTaskFailure
-> ExecutionVertex.resetForNewExecution
-> newExecution.scheduleForExecution
-> Execution.allocateAndAssignSlotForExecution()
-> Execution.deploy()

RestartPipelinedRegionStrategy

相比其它 FailoverStrategy 额外维护了 ExecutionVertex 的分组信息
class RestartPipelinedRegionStrategy {
private final HashMap<ExecutionVertex, FailoverRegion> vertexToRegion
}

class FailoverRegion {
private final List<ExecutionVertex> connectedExecutionVertexes;
}
onTaskFailure
-> FailoverRegion.onExecutionFail
-> FailoverRegion.cancel
-> foreach connected ExecutionVertex : ExecutionVertex.cancel
-> allVerticesInTerminalState
-> reset
-> foreach connected ExecutionVertex : ExecutionVertex.resetForNewExecution
-> restart
-> foreach connected ExecutionVertex : ExecutionVertex.scheduleForExecution
-> Execution.scheduleForExecution
-> Execution.allocateAndAssignSlotForExecution
-> Execution.deploy

xx

SlotSharingGroupAssignment line 256
重启时 xx map
// make sure an empty entry exists for this group, if no other entry exists
if (!entryForNewJidExists) {
availableSlotsPerJid.put(groupIdForMap, new LinkedHashMap<ResourceID, List<SharedSlot>>());
}
版权声明:农夫的狗 发表于 2021-01-16 18:03:35。
转载请注明:Flink Task Deploy & Task Failover | 404导航

暂无评论

暂无评论...