Keyed State基于每个key进行状态存储,是KeyedStream上的状态。支持的数据类型有:ValueState,ListState,ReducingState,AggregatingState,MapState。
Keyed State 结构
└--ValueState
└--MapState
└--AppendingState
└--FoldingState
└--MergingState
└--ListState
└--ReducingState
└--AggregatingState
要使用 keyed state,首先需要为DataStream指定 key(主键)。这个主键用于状态分区(也会给数据流中的记录本身分区)。 你可以使用 DataStream 中 Java/Scala API 的 keyBy(KeySelector) 或者是 Python API 的 key_by(KeySelector) 来指定 key。 它将生成 KeyedStream,接下来允许使用 keyed state 操作。
我们来看一下Keyed State集中状态类型支持的方法:
1、ValueState: 保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的 key,因此算子接收到的每个 key 都可能对应一个值)。 这个值可以通过 update(T) 进行更新,通过 T value() 进行检索。
2、ListState: 保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过 add(T) 或者 addAll(List) 进行添加元素,通过 Iterable get() 获得整个列表。还可以通过 update(List) 覆盖当前的列表。
3、ReducingState: 保存一个单值,表示添加到状态的所有值的聚合。接口与 ListState 类似,但使用 add(T) 增加元素,会使用提供的 ReduceFunction 进行聚合。
4、AggregatingState: 保留一个单值,表示添加到状态的所有值的聚合。和 ReducingState 相反的是, 聚合类型可能与 添加到状态的元素的类型不同。 接口与 ListState 类似,但使用 add(IN) 添加的元素会用指定的 AggregateFunction 进行聚合。
5、MapState: 维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用 put(UK,UV) 或者 putAll(Map) 添加映射。 使用 get(UK) 检索特定 key。 使用 entries(),keys() 和 values() 分别检索映射、键和值的可迭代视图。你还可以通过 isEmpty() 来判断是否包含任何键值对。
所有类型的状态还有一个clear() 方法,清除当前 key 下的状态数据,也就是当前输入元素的 key。
总结一下上面各类型对应的方法:
ValueState<T>: update(T)、T value()、clear()
ListState<T>: add(T)、addAll(List<T>)、 Iterable<T> get()、update(List<T>)、clear()
ReducingState<T>: add(T)、addAll(List<T>)、 Iterable<T> get()、update(List<T>)、clear()
AggregatingState<T>: add(IN)、OUT get()、clear()
MapState<T>: put(UK,UV)、putAll(Map<UK,UV>)、get(UK)、remove(UK)、entries(),keys()、values()、isEmpty()、clear()
以上5中不同的状态类型,在使用时要创建对应的状态描述符:ValueStateDescriptor,ListStateDescriptor, AggregatingStateDescriptor, ReducingStateDescriptor 或 MapStateDescriptor,这5中描述符通过RuntimeContext,使用以下5中对应方法进行访问:
alueState<T> getState(ValueStateDescriptor<T>)
ListState<T> getListState(ListStateDescriptor<T>)
ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)
要使用RuntimeContext,只能在 rich functions 中使用。