Flink从入门到实战七[ProcessFunction]-2-KeyedProcessFunction使用

KeyedProcessFunction是在业务开发中应用最多的ProcessFunction,用在KeyedStream上,也就是基于key分组后,再使用KeyedProcessFunction进行计算。
KeyedProcessFunction继承自AbstractRichFunction,整体功能和ProcessFunction相同。因为其用在KeyedStream,所以KeyedProcessFunction的参数泛型比ProcessFunction多了一个key,也就是分组用的key。

public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {

	public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
	
	public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
}

我们在前面的章节就分析过官网的反欺诈demo,这个demo在前面章节看的时候有很多后置知识都没有开始学习,当时看会比较吃力,但是本节,刚好可以用来深入理解ProcessFunction。
反欺诈demo的需求是:对于一个账户,如果出现一笔小于$1的交易后,间隔1分钟以上,发生了一笔$500的交易,则不报警,只有两笔交易间隔在1分钟以内才进行报警。

代码:

/**
 * 基于ProcessFunctio对用户交易进行反欺诈检测
 * KeyedProcessFunction使用
 */
public class ProcessFunction_1_KeyedProcessFunction extends KeyedProcessFunction<Long, Transaction, Alert> {

	private static final long serialVersionUID = 1L;

	private static final double SMALL_AMOUNT = 1.00;
	private static final double LARGE_AMOUNT = 500.00;
	private static final long ONE_MINUTE = 60 * 1000;

	//前一次交易小于$1的状态标识
	private transient ValueState<Boolean> flagState;

	//前一次交易小于$1时,注册定时器
	private transient ValueState<Long> timerState;

	@Override
	public void open(Configuration parameters) throws Exception {
		ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>("flag", Types.BOOLEAN);
		flagState = getRuntimeContext().getState(flagDescriptor);

		ValueStateDescriptor<Long> timeDescriptor = new ValueStateDescriptor<>("time-state", Types.LONG);
		timerState = getRuntimeContext().getState(timeDescriptor);
	}

	@Override
	public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) throws Exception {
		timerState.clear();
		flagState.clear();
	}

	@Override
	public void processElement(
			Transaction transaction,
			Context context,
			Collector<Alert> collector) throws Exception {

		//获取上一次计算的状态
		Boolean value = flagState.value();
		//上一次计算的状态非空,说明上一次状态<$1
		if(Objects.nonNull(value)) {
			//如果当前金额>$500,则报警
			if(transaction.getAmount()>LARGE_AMOUNT) {
				Alert alert = new Alert();
				alert.setId(transaction.getAccountId());
				collector.collect(alert);
			}
			//flagState.clear();
			cleanUp(context);
		}

		//判断当前状态是否<$1,小于则标记并更新状态
		if(transaction.getAmount()<SMALL_AMOUNT) {
			flagState.update(true);

			long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
			context.timerService().registerProcessingTimeTimer(timer);

			timerState.update(timer);
		}

	}

	private void cleanUp(Context ctx) throws Exception {
		Long timer = timerState.value();
		ctx.timerService().deleteProcessingTimeTimer(timer);

		timerState.clear();
		flagState.clear();
	}

	public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		DataStream<Transaction> transactions = env
				.addSource(new TransactionSource())
				.name("transactions");

		DataStream<Alert> alerts = transactions
				.keyBy(Transaction::getAccountId)
				.process(new ProcessFunction_1_FraudDetector())
				.name("fraud-detector");

		alerts.addSink(new AlertSink())
				.name("send-alerts");

		env.execute("交易欺诈检测");
	}
}