Checkpoint Snapshot
由 ExecutionGraph.CheckpointCoordinator 来触发,StreamInputProcessor 中做完 Barrier 对齐之后,会调用 Operator 的 snapshotState 来做 Checkpoint,主要代码:
AbstractStreamOperator.snapshotState(...) { // Take snapshot for TimeService + Raw State snapshotState(snapshotContext); // Take snapshot for operatorState operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions) // Take snapshot for keyedState keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions) }
CheckpointStreamFactory
提供了对底层存储的封装,上层的 StateBackend 和 RawState(由用户管理) 在 Checkpoint 时会将自身维护的所有 State 序列化后写入由 CheckpointStreamFactory 初始化的 CheckpointStateOutputStream。
public interface CheckpointStreamFactory { CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws IOException; }
根据存储介质不同目前主要由两种实现
MemCheckpointStreamFactory:初始化基于内存的 StateOutputStream
FsCheckpointStreamFactory:初始化基于分布式文件系统的 StateOutputStream
CheckpointStateOutputStream
现有的几种实现
MemoryCheckpointOutputStream:数据写内存,基本就是封装了内存中的一个 byte[]
FsCheckpointStateOutputStream:数据小于阈值时写内存,否则写文件FileBasedStateOutputStream:写文件,主要是用来在本地写 State 数据,用来做 LocalRestart
DuplicatingCheckpointOutputStream:封装了两个 StateOutputStream,数据会同时写入两个 Stream 中。比如 FsCheckpointStateOutputStream + FileBasedStateOutputStream,达到在本地 + 远端同时写两份 State 数据的效果。
closeAndGetHandle
- 关闭 OutputStream,返回包含写入数据的 StateHandle,JM 会将 StateHandle 持久化,后续恢复 State 时会通过 StateHandle 来检索 State 数据
- 对于 MemCheckpointStreamFactory
- 返回 ByteStreamStateHandle,封装了内存中包含写入的 State 数据的 byte[]
- 对于 FsCheckpointStreamFactory
- 数据小于阈值,返回 ByteStreamStateHandle
- 大于阈值,返回 FileStateHandle,包含存储 State 数据的文件信息
State、StateBackend、CheckpointStreamFactory、Memory/FS 之间关系
Snapshot in DefaultOperatorStateBackend
DefaultOperatorStateBackend
CheckpointStreamFactory.CheckpointStateOutputStream localOut = streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE); // 将 StateMetaInfo 序列化后写入 localOut // StateMetaInfo:name,serializers 等 writeStateMetaInfoSnapshots(operatorStateMetaInfoSnapshots, localOut); writeStateMetaInfoSnapshots(broadcastStateMetaInfoSnapshots, localOut); // 将 State 数据序列化后写入 localOut writeOperatorState(registeredOperatorStates, localOut); writeOperatorState(registeredBroadcastStates, localOut); // 最后将 State offset 等信息封装到 OperatorStateHandle 交由 JM 来持久化
备注
- MetaInfo + State 数据一个文件
- SnapshotResult: OperatorStateHandle,包含了 StateMetaInfo,State 在 Stream 中的 offset 等信息,交由 JM 来持久化,Checkpoint 恢复时会用到
Snapshot in RocksDBKeyedStateBackend
全量备份
全量备份的实现逻辑在 RocksFullSnapshotStrategy.doSnapshot,主要工作
- 从 CheckpointStreamFactory 获取 StateOutputStream
- 写 StateMetaInfo 到 StateOutputStream:state name, colmnFamily 等信息
- RocksDB scan 拿到所有的 k-v,序列化后写入 StateOutputStream
备注
- MetaInfo + k-v 只有一个文件
- SnapshotResult 包含了 KeyGroupsStateHandle,功能同 OperatorStateHandle 类似
增量备份
增量备份的实现逻辑在 RocksIncrementalSnapshotStrategy.doSnapshot,主要工作:
- 从 CheckpointStreamFactory 获取 StateOutputStream
- 写 StateMetaInfo 到 StateOutputStream, state name, colmnFamily 等信息
- 拿到上一个成功的 snapshot,算出增量的 sst,将增量的 sst upload 到 HDFS
备注
- MetaInfo 一个文件,每个 sst 一个独立文件
- SnapshotResult 结果包含了 IncrementalRemoteKeyedStateHandle or IncrementalLocalKeyedStateHandle,包含了 checkpoint id、sst 文件等信息
Snapshot in HeapKeyedStateBackend
实现细节在 HeapSnapshotStrategy.snapshot 中,总体以下几件事情
- 打开 StateOutputStream
- 遍历 StateTable 将 StateMetainfo 写入 StateOutputStream
- 遍历 StateTable 将 State 数据写入 StateOutputStream
- close StateOutputStream,返回包含 State 信息的 StateHandle
- 根据 State 存储介质不同,返回的 StateHandle 也不同,ByteStreamStateHandle or FileStateHandle
暂无评论...