ProcessFunction中一个强大的功能就是定时器了。定时器可以支持我们设定在某个时间触发执行某些必要的操作。
通过上下文可以获取有一个TimerService类,该类提供了一些必要的方法来操作定时器。
包括方法:
获取当前的处理时间
long currentProcessingTime();
获取时间时间对应的Watermark
long currentWatermark();
注册一个定时器,触发定时器的判断时间为处理时间
void registerProcessingTimeTimer(long time);
注册一个定时器,触发定时器的判断时间为事件时间
void registerEventTimeTimer(long time);
删除一个处理时间定时器
void deleteProcessingTimeTimer(long time);
删除一个事件时间定时器
void deleteEventTimeTimer(long time);
处理时间和事件时间的方法都是对应的,不能相互混用。
定时器的demo,还是使用上一节的反欺诈的demo,我们重点来看一下核心代码的逻辑:
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>("flag", Types.BOOLEAN);
flagState = getRuntimeContext().getState(flagDescriptor);
ValueStateDescriptor<Long> timeDescriptor = new ValueStateDescriptor<>("time-state", Types.LONG);
timerState = getRuntimeContext().getState(timeDescriptor);
}
@Override
public void processElement(
Transaction transaction,
Context context,
Collector<Alert> collector) throws Exception {
//获取上一次计算的状态
Boolean value = flagState.value();
//上一次计算的状态非空,说明上一次状态<$1
if(Objects.nonNull(value)) {
//如果当前金额>$500,则报警
if(transaction.getAmount()>LARGE_AMOUNT) {
Alert alert = new Alert();
alert.setId(transaction.getAccountId());
collector.collect(alert);
}
//flagState.clear();
cleanUp(context);
}
//判断当前状态是否<$1,小于则标记并更新状态
if(transaction.getAmount()<SMALL_AMOUNT) {
flagState.update(true);
long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
context.timerService().registerProcessingTimeTimer(timer);
timerState.update(timer);
}
}
private void cleanUp(Context ctx) throws Exception {
Long timer = timerState.value();
ctx.timerService().deleteProcessingTimeTimer(timer);
timerState.clear();
flagState.clear();
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) throws Exception {
timerState.clear();
flagState.clear();
}
代码分析:
1、open()方法
open()方法中注册了两个状态,一个是标记当前交易是否是一笔小于1元的状态,一个是定时器状态。
2、processElement()方法
用于处理主要的业务逻辑,这里面主要有两大逻辑。
代码段1
//获取上一次计算的状态
Boolean value = flagState.value();
//上一次计算的状态非空,说明上一次状态<$1
if(Objects.nonNull(value)) {
//如果当前金额>$500,则报警
if(transaction.getAmount()>LARGE_AMOUNT) {
Alert alert = new Alert();
alert.setId(transaction.getAccountId());
collector.collect(alert);
}
//flagState.clear();
cleanUp(context);
}
这里是基于上一次计算的状态来进行处理的,如果状态非空,说明上一次交易金额小于1元,如果当前金额>$500,则报警。
最后不管是否报警,都触发cleanUp()方法,这个方法会删除定时器,因为不管是否报警,定时器都需要清除,等后续再重新注册。
代码段2
//判断当前状态是否<$1,小于则标记并更新状态
if(transaction.getAmount()<SMALL_AMOUNT) {
flagState.update(true);
long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
context.timerService().registerProcessingTimeTimer(timer);
timerState.update(timer);
}
这里就是判断当前状态是否<$1,小于则标记并更新flagState状态,并注册一个定时器,如果定时器触发,就会删除两个状态,flagState如果为空,说明近期是没有小于1元的交易的。
3、onTimer()方法
onTimer()方法是定时器触发时,用来清空状态的。