KeyedProcessFunction是在业务开发中应用最多的ProcessFunction,用在KeyedStream上,也就是基于key分组后,再使用KeyedProcessFunction进行计算。
KeyedProcessFunction继承自AbstractRichFunction,整体功能和ProcessFunction相同。因为其用在KeyedStream,所以KeyedProcessFunction的参数泛型比ProcessFunction多了一个key,也就是分组用的key。
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {
public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
}
我们在前面的章节就分析过官网的反欺诈demo,这个demo在前面章节看的时候有很多后置知识都没有开始学习,当时看会比较吃力,但是本节,刚好可以用来深入理解ProcessFunction。
反欺诈demo的需求是:对于一个账户,如果出现一笔小于$1的交易后,间隔1分钟以上,发生了一笔$500的交易,则不报警,只有两笔交易间隔在1分钟以内才进行报警。
代码:
/**
* 基于ProcessFunctio对用户交易进行反欺诈检测
* KeyedProcessFunction使用
*/
public class ProcessFunction_1_KeyedProcessFunction extends KeyedProcessFunction<Long, Transaction, Alert> {
private static final long serialVersionUID = 1L;
private static final double SMALL_AMOUNT = 1.00;
private static final double LARGE_AMOUNT = 500.00;
private static final long ONE_MINUTE = 60 * 1000;
//前一次交易小于$1的状态标识
private transient ValueState<Boolean> flagState;
//前一次交易小于$1时,注册定时器
private transient ValueState<Long> timerState;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>("flag", Types.BOOLEAN);
flagState = getRuntimeContext().getState(flagDescriptor);
ValueStateDescriptor<Long> timeDescriptor = new ValueStateDescriptor<>("time-state", Types.LONG);
timerState = getRuntimeContext().getState(timeDescriptor);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) throws Exception {
timerState.clear();
flagState.clear();
}
@Override
public void processElement(
Transaction transaction,
Context context,
Collector<Alert> collector) throws Exception {
//获取上一次计算的状态
Boolean value = flagState.value();
//上一次计算的状态非空,说明上一次状态<$1
if(Objects.nonNull(value)) {
//如果当前金额>$500,则报警
if(transaction.getAmount()>LARGE_AMOUNT) {
Alert alert = new Alert();
alert.setId(transaction.getAccountId());
collector.collect(alert);
}
//flagState.clear();
cleanUp(context);
}
//判断当前状态是否<$1,小于则标记并更新状态
if(transaction.getAmount()<SMALL_AMOUNT) {
flagState.update(true);
long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
context.timerService().registerProcessingTimeTimer(timer);
timerState.update(timer);
}
}
private void cleanUp(Context ctx) throws Exception {
Long timer = timerState.value();
ctx.timerService().deleteProcessingTimeTimer(timer);
timerState.clear();
flagState.clear();
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Transaction> transactions = env
.addSource(new TransactionSource())
.name("transactions");
DataStream<Alert> alerts = transactions
.keyBy(Transaction::getAccountId)
.process(new ProcessFunction_1_FraudDetector())
.name("fraud-detector");
alerts.addSink(new AlertSink())
.name("send-alerts");
env.execute("交易欺诈检测");
}
}