Flink StateBackend:概述

Flink 4年前 (2021) 农夫的狗
675 0

State 分类

Keyed/Operator State

Keyed State: 只能在 KeyedStream 中使用,State 与 Key 相关联
Operator State: 对 Stream 类型无限制,State 与 Operator Index 相关联

Managed State

  • State 数据存储于 StateBackend 中,Checkpoint 细节由 Flink 框架来维护
  • KeyedState: ValueState、ListState、MapState、ReducingState ...
  • OperatorState: ListState、BroadcastState、UnionListState

Raw State

  • State 数据存储于 Operator 中,Checkpoint 细节由用户来处理
  • 要使用 Raw State 用户需要自己实现 StreamOperator

StateBackend 分类

State 主要涉及到两方面
  • 作业运行时 State 如何维护(内存 VS 磁盘,数据如何组织)
  • Checkpoint 快照/恢复时,如何快照/恢复 State 数据
而 StateBackend 主要就是定义了上面两方面的内容,现有的三种实现:
Flink StateBackend:概述
  • FsStateBackend:将工作状态保存在 TaskManager 内存中,并快照到文件系统
  • MemoryStateBackend:将工作状态保存在 TaskManager 内存中,并快照到 JobManager 的内存
  • RocksDBStateBackend:将工作状态保存在 RocksDB 中,并快照到文件系统
Backend
Keyed State 维护
Operator State 维护
Raw State 维护
State 快照/恢复
FsStateBackend
HeapKeyedStateBackend
DefaultOperatorStateBackend
用户
FsCheckpointStreamFactory
MemoryStateBackend
HeapKeyedStateBackend
DefaultOperatorStateBackend
用户
MemCheckpointStreamFactory
RocksDBStateBackend
RocksDBKeyedStateBackend
DefaultOperatorStateBackend
用户
FsCheckpointStreamFactory

Managed State 维护

对于 Managed State,State 的维护、Snapshot 的具体实现由 KeyedStateBackend 和 OperatorStateBackend 来负责,StateBackend 提供了两者相应的工厂方法,子类 RocksDBStateBackend、FsStateBackend 等会提供基于不同存储介质的实现。
// Interface StateBackend
<K> AbstractKeyedStateBackend<K> createKeyedStateBackend() throws Exception;
OperatorStateBackend createOperatorStateBackend() throws Exception;
Flink StateBackend:概述

KeyedStateBackend

HeapKeyedStateBackend
  • 工作状态维护在内存中
  • FsStateBackend + MemoryStateBackend
RocksDBKeyedStateBackend
  • 工作状态维护在 RocksDB 中
  • RocksDBStateBackend

OperatorStateBackend

目前只有一种实现:DefaultOperatorStateBackend,工作状态全部维护在内存中。FsStateBackend + MemoryStateBackend + RocksDBStateBackend 都用了这一种实现来维护 OperatorState。

State in RocksDBKeyedStateBackend

Example: RocksDBKeyedStateBackend 中的 ValueState
ValueState<Long> longState = context.getKeyedStateStore()
.getState(new ValueStateDescriptor<Long>("long" /* name */, Long.class));
longState.update(1000L);
long value = longState.value();
调用链
context.getKeyedStateStore().getState()
-> DefaultKeyedStateStore.getState()
-> DefaultKeyedStateStore.getPartitionedState()
-> DefaultKeyedStateStore.keyedStateBackend.getPartitionedState()
-> AbstractKeyedStateBackend.getPartitionedState()
-> AbstractKeyedStateBackend.getOrCreateKeyedState()
-> RocksDBKeyedStateBackend.createInternalState()
-> RocksDBValueState.create()
-> new RocksDBValueState()
RocksDBValueState 其实就封装了下 RocksDB,将 value 的读写转化为 RocksDB 的 put/get
所有基于 RocksDB 的 State 最后都转化为 k-v 的形式存储到 db 中,其中每个 State Name 有一个独立的 ColumnFamily,key、value 内容和 State 类型相关。
KeyedState
(面向用户)
RocksDBState
(Flink 内部)
key
value
ValueState
RocksDBValueState
KeyGroupID + Key + NameSpace
用户数据
MapState
RocksDBMapState
KeyGroupID + Key + NameSpace + UserKey
用户数据
ListState
RocksDBListState
KeyGroupID + Key + NameSpace
Serialized List
ReducingState
RocksDBReducingState
KeyGroupID + Key + NameSpace
用户数据
AggregatingState
RocksDBAggregatingState
KeyGroupID + Key + NameSpace
用户数据

State in HeapKeyedStateBackend

通过 StateTable 维护了 Key -> State 的映射
/**
* @param <K> type of key
* @param <N> type of namespace
* @param <S> type of state
*/
class StateTable {
StateMap<K, N, S>[] keyGroupedStateMaps;
}

State in DefaultOperatorStateBackend

ListState

初始化
// 会在 map 中新增 k-v: "list" -> PartitionableListState<Long>
ListState<Long> listState = context.getOperatorStateStore()
.getListState(new ListStateDescriptor<Long>("list", Long.class));

listState.add(1L);
listState.addAll(Arrays.asList(1L, 2L, 3L));
listState.update(Arrays.asList(1L, 2L, 3L));
for (Long element : listState.get()) {
// element
}

OperatorStateBackend 目前只有基于内存的实现 DefaultOperatorStateBackend,用两 Map 维护了目前已经注册的所有 State。

// Map for all registered operator states. Maps state name -> state
Map<String /* State Name */, PartitionableListState<?>> registeredOperatorStates;

// Map for all registered operator broadcast states.
Map<String /* State Name */, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates;

Raw State 维护

完全由用户来控制,实现自己的 Operator,并在 snapshotState 和 initializeState 处理 Checkpoint 的备份和恢复
class DemoOperator extends AbstractStreamOperator {

// State
private Long state;

public void processElement(xx) {
// work with state
}

@Override
public void snapshotState(StateSnapshotContext context) {
// write state to context ...
}

@Override
public void initializeState(StateInitializationContext context) {
// recover state from context ...
}
}
版权声明:农夫的狗 发表于 2021-01-12 10:59:15。
转载请注明:Flink StateBackend:概述 | 404导航

暂无评论

暂无评论...