상태 저장 스트리밍 애플리케이션을 위한 API
FraudDetectionJob.java
데이터 흐름을 정의
package spendreport;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.sink.AlertSink;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;
public class FraudDetectionJob {
public static void main(String[] args) throws Exception {
// 작업 속성을 설정하고 소스를 만들고 작업 실행을 트리거 하는 방법
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// KAFKA 등 외부 시스템에서 Flink작업으로 데이터 수집
DataStream<Transaction> transactions = env
.addSource(new TransactionSource()) // timestamp트랜잭션이 발생한 시점 포함
.name("transactions"); // 디버깅용
// 이벤트 분할 및 사기 탐지 -> key 직후에 연산자가 FraudDetector 내에서 실행
DataStream<Alert> alerts = transactions
.keyBy(Transaction::getAccountId) // 특정 키에 대한 모든 레코드를 처리하도록 스트림 분할
.process(new FraudDetector()) // 스트림의 분할된 각 요소에 함수를 적용하는 연산자 추가
.name("fraud-detector");
// 결과 출력 -> KAFKA 등 외부 시스템에 기록
alerts
.addSink(new AlertSink())
.name("send-alerts");
env.execute("Fraud Detection");
}
}
FraudDetector.java
사기 탐지 기능의 비즈니스 로직
package spendreport;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
private static final long serialVersionUID = 1L;
private static final double SMALL_AMOUNT = 1.00;
private static final double LARGE_AMOUNT = 500.00;
private static final long ONE_MINUTE = 60 * 1000;
private transient ValueState<Boolean> flagState;
private transient ValueState<Long> timerState;
@Override
public void processElement(
Transaction transaction,
Context context,
Collector<Alert> collector) throws Exception {
// Get the current state for the current key
Boolean lastTransactionWasSmall = flagState.value();
// Check if the flag is set
if (lastTransactionWasSmall != null) {
if (transaction.getAmount() > LARGE_AMOUNT) {
// Output an alert downstream
Alert alert = new Alert();
alert.setId(transaction.getAccountId());
collector.collect(alert);
}
// Clean up our state
flagState.clear();
}
if (transaction.getAmount() < SMALL_AMOUNT) {
// Set the flag to true
flagState.update(true);
// set the timer and timer state
long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
context.timerService().registerProcessingTimeTimer(timer);
timerState.update(timer);
}
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
"flag",
Types.BOOLEAN);
flagState = getRuntimeContext().getState(flagDescriptor);
ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
"timer-state",
Types.LONG);
timerState = getRuntimeContext().getState(timerDescriptor);
}
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
// remove flag after 1 minute
timerState.clear();
flagState.clear();
}
private void cleanUp(Context ctx) throws Exception {
// delete timer
Long timer = timerState.value();
ctx.timerService().deleteProcessingTimeTimer(timer);
// clean up all state
timerState.clear();
flagState.clear();
}
}
'⭐ Plogramming > BigData' 카테고리의 다른 글
Chpater 6. 클러스터에서 애플리케이션 실행하기 - 스파크 프레임워크 (0) | 2023.08.09 |
---|---|
분산 클러스터를 위한 하둡 시스템 (0) | 2023.07.16 |
Apache Flink란? (0) | 2023.07.14 |
TimeWindow란 (1) | 2023.07.13 |
Chpater 3. 하둡 클러스터 생성 및 환경 설정 (0) | 2023.07.08 |