Flink从入门到实战八[State]-2-Flink Keyed State

Keyed State基于每个key进行状态存储,是KeyedStream上的状态。支持的数据类型有:ValueState,ListState,ReducingState,AggregatingState,MapState。

Keyed State	结构
    └--ValueState
    └--MapState
    └--AppendingState 
	└--FoldingState
        └--MergingState 
            └--ListState
	    └--ReducingState
	    └--AggregatingState

要使用 keyed state,首先需要为DataStream指定 key(主键)。这个主键用于状态分区(也会给数据流中的记录本身分区)。 你可以使用 DataStream 中 Java/Scala API 的 keyBy(KeySelector) 或者是 Python API 的 key_by(KeySelector) 来指定 key。 它将生成 KeyedStream,接下来允许使用 keyed state 操作。

我们来看一下Keyed State集中状态类型支持的方法:
1、ValueState: 保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的 key,因此算子接收到的每个 key 都可能对应一个值)。 这个值可以通过 update(T) 进行更新,通过 T value() 进行检索。

2、ListState: 保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过 add(T) 或者 addAll(List) 进行添加元素,通过 Iterable get() 获得整个列表。还可以通过 update(List) 覆盖当前的列表。

3、ReducingState: 保存一个单值,表示添加到状态的所有值的聚合。接口与 ListState 类似,但使用 add(T) 增加元素,会使用提供的 ReduceFunction 进行聚合。

4、AggregatingState: 保留一个单值,表示添加到状态的所有值的聚合。和 ReducingState 相反的是, 聚合类型可能与 添加到状态的元素的类型不同。 接口与 ListState 类似,但使用 add(IN) 添加的元素会用指定的 AggregateFunction 进行聚合。

5、MapState: 维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用 put(UK,UV) 或者 putAll(Map) 添加映射。 使用 get(UK) 检索特定 key。 使用 entries(),keys() 和 values() 分别检索映射、键和值的可迭代视图。你还可以通过 isEmpty() 来判断是否包含任何键值对。

所有类型的状态还有一个clear() 方法,清除当前 key 下的状态数据,也就是当前输入元素的 key。

总结一下上面各类型对应的方法:

ValueState<T>:	update(T)、T value()、clear()
ListState<T>:	add(T)、addAll(List<T>)、 Iterable<T> get()、update(List<T>)、clear()
ReducingState<T>:	add(T)、addAll(List<T>)、 Iterable<T> get()、update(List<T>)、clear()
AggregatingState<T>:	add(IN)、OUT get()、clear()
MapState<T>:	put(UK,UV)、putAll(Map<UK,UV>)、get(UK)、remove(UK)、entries(),keys()、values()、isEmpty()、clear()

以上5中不同的状态类型,在使用时要创建对应的状态描述符:ValueStateDescriptor,ListStateDescriptor, AggregatingStateDescriptor, ReducingStateDescriptor 或 MapStateDescriptor,这5中描述符通过RuntimeContext,使用以下5中对应方法进行访问:

alueState<T> getState(ValueStateDescriptor<T>)
ListState<T> getListState(ListStateDescriptor<T>)
ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)

要使用RuntimeContext,只能在 rich functions 中使用。