⭐ Plogramming/BigData

Apache Flink DataStream API를 사용한 사기 탐지

김진한

상태 저장 스트리밍 애플리케이션을 위한 API

FraudDetectionJob.java 

데이터 흐름을 정의

package spendreport;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.sink.AlertSink;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;

public class FraudDetectionJob {

    public static void main(String[] args) throws Exception {
    	// 작업 속성을 설정하고 소스를 만들고 작업 실행을 트리거 하는 방법
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	
    	// KAFKA 등 외부 시스템에서 Flink작업으로 데이터 수집
        DataStream<Transaction> transactions = env
            .addSource(new TransactionSource()) // timestamp트랜잭션이 발생한 시점 포함
            .name("transactions"); // 디버깅용
        
        // 이벤트 분할 및 사기 탐지 -> key 직후에 연산자가 FraudDetector 내에서 실행
        DataStream<Alert> alerts = transactions
            .keyBy(Transaction::getAccountId) // 특정 키에 대한 모든 레코드를 처리하도록 스트림 분할
            .process(new FraudDetector())	// 스트림의 분할된 각 요소에 함수를 적용하는 연산자 추가
            .name("fraud-detector");
		
        // 결과 출력 -> KAFKA 등 외부 시스템에 기록
        alerts
            .addSink(new AlertSink())
            .name("send-alerts");

        env.execute("Fraud Detection");
    }
}

FraudDetector.java

사기 탐지 기능의 비즈니스 로직

package spendreport;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;

public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {

    private static final long serialVersionUID = 1L;

    private static final double SMALL_AMOUNT = 1.00;
    private static final double LARGE_AMOUNT = 500.00;
    private static final long ONE_MINUTE = 60 * 1000;
    
    private transient ValueState<Boolean> flagState;
    private transient ValueState<Long> timerState;

@Override
public void processElement(
        Transaction transaction,
        Context context,
        Collector<Alert> collector) throws Exception {

    // Get the current state for the current key
    Boolean lastTransactionWasSmall = flagState.value();

    // Check if the flag is set
    if (lastTransactionWasSmall != null) {
        if (transaction.getAmount() > LARGE_AMOUNT) {
            // Output an alert downstream
            Alert alert = new Alert();
            alert.setId(transaction.getAccountId());

            collector.collect(alert);            
        }

        // Clean up our state
        flagState.clear();
    }

    if (transaction.getAmount() < SMALL_AMOUNT) {
        // Set the flag to true
        flagState.update(true);
        
        // set the timer and timer state
        long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
        context.timerService().registerProcessingTimeTimer(timer);
        timerState.update(timer);
    }   
    
    @Override
    public void open(Configuration parameters) {
        ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
                "flag",
                Types.BOOLEAN);
        flagState = getRuntimeContext().getState(flagDescriptor);

        ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
                "timer-state",
                Types.LONG);
        timerState = getRuntimeContext().getState(timerDescriptor);
    }
    
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
        // remove flag after 1 minute
        timerState.clear();
        flagState.clear();
    }
    
    private void cleanUp(Context ctx) throws Exception {
        // delete timer
        Long timer = timerState.value();
        ctx.timerService().deleteProcessingTimeTimer(timer);

        // clean up all state
        timerState.clear();
        flagState.clear();
    }
}