Flink实战学习1[快速上手]-6-Flink官方反欺诈项目版本1

官方网站使用了一个欺诈检测的项目来演示Flink的功能特性,但是这个项目对于之前完全没有接触过大数据计算框架的同学来说,多少有点难度,如果你也是之前没有接触过大数据,可以先看看本套Flink学习课程的前面几节课程,分别用了三种方式演示了Flink的WordCount的计算实现方式,WordCount一直是大数据入门的HelloWorld。

官方Demo介绍
项目目标是实现一个信用卡欺诈检测功能。
检测规则是:对于一个账户,如果出现小于 $1 美元的交易后紧跟着一个大于 $500 的交易,就输出一个报警信息。
例如:

tx1   tx2   tx3   tx4   tx5   tx5   tx6
$11   $2    $0.1  $501  $0.1  $20   $500
            ----------  ----------------
	    Fraud       Not Fraud

官方Demo,将这个项目分成了三个阶段,第一个阶段是基于“演练”包,生成一个基本的反欺诈项目,这个项目对于输入的每一笔数据都进行报警。
因为第一个项目还不是预期的需求目标,所以第二个阶段,按照需求进行了规则判断,规则是如果出现小于 $1 美元的交易后紧跟着一个大于 $500 的交易,就输出一个报警信息。
第三个版本对规则做了更合理的优化,因为试想,如果我一笔小于$1的交易后,然后没有任何交易了,间隔10天后,发生了一笔$500的交易,那这两笔相连的交易,显然不能判定为问题交易,因为中间间隔的时间比较久,第三个版本相当于是对规则的再次升级,使规则更加合理。
这个Demo,主要演示了基于状态+时间来进行计算。

版本1:生成一个基础项目
1、使用maven,基于提供的Flink Maven Archetype,快速生成一个项目骨架

mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-walkthrough-datastream-java -DarchetypeVersion=1.14.3 -DgroupId=frauddetection -DartifactId=frauddetection -Dversion=0.1 -Dpackage=spendreport -DinteractiveMode=false

2、将生成的项目导入的IDE中

3、编译运行
因为这是第一个阶段的版本,这个项目对于输入的每一笔数据都进行报警。
输出结果:

15:13:29,622 WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils          [] - Log file environment variable 'log.file' is not set.
15:13:29,624 WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils          [] - JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable 'log.file' or configuration key 'web.log.path'.
15:13:30,598 INFO  org.apache.flink.walkthrough.common.sink.AlertSink           [] - Alert{id=1}
15:13:30,706 INFO  org.apache.flink.walkthrough.common.sink.AlertSink           [] - Alert{id=2}
15:13:30,816 INFO  org.apache.flink.walkthrough.common.sink.AlertSink           [] - Alert{id=3}
15:13:30,923 INFO  org.apache.flink.walkthrough.common.sink.AlertSink           [] - Alert{id=4}
15:13:31,034 INFO  org.apache.flink.walkthrough.common.sink.AlertSink           [] - Alert{id=5}
15:13:31,143 INFO  org.apache.flink.walkthrough.common.sink.AlertSink           [] - Alert{id=1}
15:13:31,253 INFO  org.apache.flink.walkthrough.common.sink.AlertSink           [] - Alert{id=2}
15:13:31,364 INFO  org.apache.flink.walkthrough.common.sink.AlertSink           [] - Alert{id=3}
15:13:31,472 INFO  org.apache.flink.walkthrough.common.sink.AlertSink           [] - Alert{id=4}
15:13:31,582 INFO  org.apache.flink.walkthrough.common.sink.AlertSink           [] - Alert{id=5}
15:13:31,692 INFO  org.apache.flink.walkthrough.common.sink.AlertSink           [] - Alert{id=1}
15:13:31,801 INFO  org.apache.flink.walkthrough.common.sink.AlertSink           [] - Alert{id=2}
15:13:31,911 INFO  org.apache.flink.walkthrough.common.sink.AlertSink           [] - Alert{id=3}
15:13:32,020 INFO  org.apache.flink.walkthrough.common.sink.AlertSink           [] - Alert{id=4}
15:13:32,131 INFO  org.apache.flink.walkthrough.common.sink.AlertSink           [] - Alert{id=5}
15:13:32,241 INFO  org.apache.flink.walkthrough.common.sink.AlertSink           [] - Alert{id=1}

代码:

package spendreport;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.sink.AlertSink;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;

/**
 * Skeleton code for the datastream walkthrough
 */
public class FraudDetectionJob {
	public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		DataStream<Transaction> transactions = env
			.addSource(new TransactionSource())
			.name("transactions");

		DataStream<Alert> alerts = transactions
			.keyBy(Transaction::getAccountId)
			.process(new FraudDetector())
			.name("fraud-detector");

		alerts
			.addSink(new AlertSink())
			.name("send-alerts");

		env.execute("Fraud Detection");
	}
}
package spendreport;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;

/**
 * Skeleton code for implementing a fraud detector.
 */
public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {

	private static final long serialVersionUID = 1L;

	private static final double SMALL_AMOUNT = 1.00;
	private static final double LARGE_AMOUNT = 500.00;
	private static final long ONE_MINUTE = 60 * 1000;

	@Override
	public void processElement(
			Transaction transaction,
			Context context,
			Collector<Alert> collector) throws Exception {

		Alert alert = new Alert();
		alert.setId(transaction.getAccountId());

		collector.collect(alert);
	}
}

代码分析:
1、创建一个流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

2、创建数据源
DataStream transactions = env
.addSource(new TransactionSource())
.name(“transactions”);

3、欺诈检测
DataStream alerts = transactions
.keyBy(Transaction::getAccountId)
.process(new FraudDetector())
.name(“fraud-detector”);

4、数据输出
alerts.addSink(new AlertSink())
.name(“send-alerts”);