Flink从入门到实战七[ProcessFunction]-3-Timer和TimerService

ProcessFunction中一个强大的功能就是定时器了。定时器可以支持我们设定在某个时间触发执行某些必要的操作。
通过上下文可以获取有一个TimerService类,该类提供了一些必要的方法来操作定时器。

包括方法:

	获取当前的处理时间
	long currentProcessingTime();

	获取时间时间对应的Watermark
	long currentWatermark();

	注册一个定时器,触发定时器的判断时间为处理时间
	void registerProcessingTimeTimer(long time);

	注册一个定时器,触发定时器的判断时间为事件时间
	void registerEventTimeTimer(long time);

	删除一个处理时间定时器
	void deleteProcessingTimeTimer(long time);

	删除一个事件时间定时器
	void deleteEventTimeTimer(long time);

处理时间和事件时间的方法都是对应的,不能相互混用。

定时器的demo,还是使用上一节的反欺诈的demo,我们重点来看一下核心代码的逻辑:

@Override
public void open(Configuration parameters) throws Exception {
	ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>("flag", Types.BOOLEAN);
	flagState = getRuntimeContext().getState(flagDescriptor);

	ValueStateDescriptor<Long> timeDescriptor = new ValueStateDescriptor<>("time-state", Types.LONG);
	timerState = getRuntimeContext().getState(timeDescriptor);
}

@Override
public void processElement(
		Transaction transaction,
		Context context,
		Collector<Alert> collector) throws Exception {

	//获取上一次计算的状态
	Boolean value = flagState.value();
	//上一次计算的状态非空,说明上一次状态<$1
	if(Objects.nonNull(value)) {
		//如果当前金额>$500,则报警
		if(transaction.getAmount()>LARGE_AMOUNT) {
			Alert alert = new Alert();
			alert.setId(transaction.getAccountId());
			collector.collect(alert);
		}
		//flagState.clear();
		cleanUp(context);
	}

	//判断当前状态是否<$1,小于则标记并更新状态
	if(transaction.getAmount()<SMALL_AMOUNT) {
		flagState.update(true);
		
		long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
		context.timerService().registerProcessingTimeTimer(timer);

		timerState.update(timer);
	}

}

private void cleanUp(Context ctx) throws Exception {
	Long timer = timerState.value();
	ctx.timerService().deleteProcessingTimeTimer(timer);

	timerState.clear();
	flagState.clear();
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) throws Exception {
	timerState.clear();
	flagState.clear();
}

代码分析:

1、open()方法
open()方法中注册了两个状态,一个是标记当前交易是否是一笔小于1元的状态,一个是定时器状态。

2、processElement()方法
用于处理主要的业务逻辑,这里面主要有两大逻辑。
代码段1

//获取上一次计算的状态
Boolean value = flagState.value();
//上一次计算的状态非空,说明上一次状态<$1
if(Objects.nonNull(value)) {
	//如果当前金额>$500,则报警
	if(transaction.getAmount()>LARGE_AMOUNT) {
		Alert alert = new Alert();
		alert.setId(transaction.getAccountId());
		collector.collect(alert);
	}
	//flagState.clear();
	cleanUp(context);
}

这里是基于上一次计算的状态来进行处理的,如果状态非空,说明上一次交易金额小于1元,如果当前金额>$500,则报警。
最后不管是否报警,都触发cleanUp()方法,这个方法会删除定时器,因为不管是否报警,定时器都需要清除,等后续再重新注册。

代码段2

//判断当前状态是否<$1,小于则标记并更新状态
if(transaction.getAmount()<SMALL_AMOUNT) {
	flagState.update(true);
	
	long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
	context.timerService().registerProcessingTimeTimer(timer);

	timerState.update(timer);
}

这里就是判断当前状态是否<$1,小于则标记并更新flagState状态,并注册一个定时器,如果定时器触发,就会删除两个状态,flagState如果为空,说明近期是没有小于1元的交易的。

3、onTimer()方法
onTimer()方法是定时器触发时,用来清空状态的。