State 分类
Keyed/Operator State
Keyed State: 只能在 KeyedStream 中使用,State 与 Key 相关联
Operator State: 对 Stream 类型无限制,State 与 Operator Index 相关联
Managed State
- State 数据存储于 StateBackend 中,Checkpoint 细节由 Flink 框架来维护
- KeyedState: ValueState、ListState、MapState、ReducingState ...
- OperatorState: ListState、BroadcastState、UnionListState
Raw State
- State 数据存储于 Operator 中,Checkpoint 细节由用户来处理
- 要使用 Raw State 用户需要自己实现 StreamOperator
StateBackend 分类
State 主要涉及到两方面
- 作业运行时 State 如何维护(内存 VS 磁盘,数据如何组织)
- Checkpoint 快照/恢复时,如何快照/恢复 State 数据
而 StateBackend 主要就是定义了上面两方面的内容,现有的三种实现:
- FsStateBackend:将工作状态保存在 TaskManager 内存中,并快照到文件系统
- MemoryStateBackend:将工作状态保存在 TaskManager 内存中,并快照到 JobManager 的内存
- RocksDBStateBackend:将工作状态保存在 RocksDB 中,并快照到文件系统
Backend
|
Keyed State 维护
|
Operator State 维护
|
Raw State 维护
|
State 快照/恢复
|
FsStateBackend
|
HeapKeyedStateBackend
|
DefaultOperatorStateBackend
|
用户
|
FsCheckpointStreamFactory
|
MemoryStateBackend
|
HeapKeyedStateBackend
|
DefaultOperatorStateBackend
|
用户
|
MemCheckpointStreamFactory
|
RocksDBStateBackend
|
RocksDBKeyedStateBackend
|
DefaultOperatorStateBackend
|
用户
|
FsCheckpointStreamFactory
|
Managed State 维护
对于 Managed State,State 的维护、Snapshot 的具体实现由 KeyedStateBackend 和 OperatorStateBackend 来负责,StateBackend 提供了两者相应的工厂方法,子类 RocksDBStateBackend、FsStateBackend 等会提供基于不同存储介质的实现。
// Interface StateBackend <K> AbstractKeyedStateBackend<K> createKeyedStateBackend() throws Exception; OperatorStateBackend createOperatorStateBackend() throws Exception;
KeyedStateBackend
HeapKeyedStateBackend
- 工作状态维护在内存中
- FsStateBackend + MemoryStateBackend
RocksDBKeyedStateBackend
- 工作状态维护在 RocksDB 中
- RocksDBStateBackend
OperatorStateBackend
目前只有一种实现:DefaultOperatorStateBackend,工作状态全部维护在内存中。FsStateBackend + MemoryStateBackend + RocksDBStateBackend 都用了这一种实现来维护 OperatorState。
State in RocksDBKeyedStateBackend
Example: RocksDBKeyedStateBackend 中的 ValueState
ValueState<Long> longState = context.getKeyedStateStore() .getState(new ValueStateDescriptor<Long>("long" /* name */, Long.class)); longState.update(1000L); long value = longState.value();
调用链
context.getKeyedStateStore().getState()
-> DefaultKeyedStateStore.getState()
-> DefaultKeyedStateStore.getPartitionedState()
-> DefaultKeyedStateStore.keyedStateBackend.getPartitionedState()
-> AbstractKeyedStateBackend.getPartitionedState()
-> AbstractKeyedStateBackend.getOrCreateKeyedState()
-> RocksDBKeyedStateBackend.createInternalState()
-> RocksDBValueState.create()
-> new RocksDBValueState()
RocksDBValueState 其实就封装了下 RocksDB,将 value 的读写转化为 RocksDB 的 put/get
所有基于 RocksDB 的 State 最后都转化为 k-v 的形式存储到 db 中,其中每个 State Name 有一个独立的 ColumnFamily,key、value 内容和 State 类型相关。
KeyedState
(面向用户)
|
RocksDBState
(Flink 内部)
|
key
|
value
|
ValueState
|
RocksDBValueState
|
KeyGroupID + Key + NameSpace
|
用户数据
|
MapState
|
RocksDBMapState
|
KeyGroupID + Key + NameSpace + UserKey
|
用户数据
|
ListState
|
RocksDBListState
|
KeyGroupID + Key + NameSpace
|
Serialized List
|
ReducingState
|
RocksDBReducingState
|
KeyGroupID + Key + NameSpace
|
用户数据
|
AggregatingState
|
RocksDBAggregatingState
|
KeyGroupID + Key + NameSpace
|
用户数据
|
State in HeapKeyedStateBackend
通过 StateTable 维护了 Key -> State 的映射
/** * @param <K> type of key * @param <N> type of namespace * @param <S> type of state */ class StateTable { StateMap<K, N, S>[] keyGroupedStateMaps; }
State in DefaultOperatorStateBackend
ListState
初始化
// 会在 map 中新增 k-v: "list" -> PartitionableListState<Long> ListState<Long> listState = context.getOperatorStateStore() .getListState(new ListStateDescriptor<Long>("list", Long.class)); listState.add(1L); listState.addAll(Arrays.asList(1L, 2L, 3L)); listState.update(Arrays.asList(1L, 2L, 3L)); for (Long element : listState.get()) { // element }
OperatorStateBackend 目前只有基于内存的实现 DefaultOperatorStateBackend,用两 Map 维护了目前已经注册的所有 State。
// Map for all registered operator states. Maps state name -> state Map<String /* State Name */, PartitionableListState<?>> registeredOperatorStates; // Map for all registered operator broadcast states. Map<String /* State Name */, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates;
Raw State 维护
完全由用户来控制,实现自己的 Operator,并在 snapshotState 和 initializeState 处理 Checkpoint 的备份和恢复
class DemoOperator extends AbstractStreamOperator { // State private Long state; public void processElement(xx) { // work with state } @Override public void snapshotState(StateSnapshotContext context) { // write state to context ... } @Override public void initializeState(StateInitializationContext context) { // recover state from context ... } }
暂无评论...