5. Confluent Cloud for Apache Flink Lab 2 따라하기

·7분 읽기·
목차

지난 글에서는 Confluent Cloud for Apache Flink의 기본 기능과 중복 데이터 제거를 통한 실시간 데이터 정제를 살펴보았습니다.

이번 글에서는 Lab 2를 통해 한 단계 더 발전된 데이터 enrichment 패턴고급 조인 기법을 자세히 다뤄보겠습니다. 특히 실제 운영 환경에서 자주 접하는 temporal joinstatement sets를 활용한 복합 데이터 파이프라인 구축 방법을 구체적으로 살펴보겠습니다.

실시간 데이터 Enrichment의 핵심 과제

실제 비즈니스 환경에서는 주문 데이터가 단독으로 존재하는 경우는 거의 없습니다. 고객 정보, 상품 카탈로그, 결제 기록 등 다양한 데이터 소스와 결합되어야 의미 있는 비즈니스 인사이트를 제공할 수 있습니다. 하지만 이러한 데이터 enrichment 과정에서 다음과 같은 현실적인 문제들이 발생합니다.

문제 유형구체적 이슈실무 영향
시간 불일치주문 시점과 고객 정보 변경 시점의 차이잘못된 고객 정보로 주문 처리
상태 폭증무한 스트림과 차원 테이블 조인 시 메모리 사용량 급증시스템 성능 저하 및 장애
데이터 일관성여러 데이터 소스 간 동기화 문제비즈니스 로직 오류

Lab 2에서는 이러한 문제들을 체계적으로 해결하는 방법을 제시합니다.

고급 조인 패턴: Regular Join vs Temporal Join

Regular Join의 한계점

먼저 일반적인 내부 조인의 문제점을 살펴보겠습니다.

sql
-- 고객 ID가 3001인 주문에 대해 고객 이메일 정보를 조인
SELECT 
    order_id, 
    unique_orders.`$rowtime`,   -- 주문 발생 시각 (이벤트 시간)
    email                       -- 고객 이메일 정보
FROM unique_orders
INNER JOIN customers           -- 일반 조인 (Regular Join)
ON unique_orders.customer_id = customers.customer_id
WHERE unique_orders.customer_id = 3001;

Regular Join의 문제점

  1. 무한 상태 증가: 주문 데이터가 무한히 증가하면 조인 상태도 무한히 커짐
  2. 부적절한 업데이트: 고객 정보 변경 시 과거 주문까지 모두 업데이트됨
  3. 메모리 부족: State TTL 없이는 메모리 사용량이 계속 증가

Temporal Join의 우월성

Temporal Join은 이러한 문제를 근본적으로 해결합니다.

sql
-- 고객 ID가 3001인 주문에 대해 주문 시점의 고객 이메일 정보를 조회
SELECT 
    order_id, 
    unique_orders.`$rowtime`,   -- 주문 발생 시각 (이벤트 시간)
    email                       -- 주문 당시의 고객 이메일 (변경 불가)
FROM unique_orders
-- Temporal Join을 사용하여 주문 발생 시점 기준으로 고객 정보 조회
INNER JOIN customers FOR SYSTEM_TIME AS OF unique_orders.`$rowtime`
ON unique_orders.customer_id = customers.customer_id
WHERE unique_orders.customer_id = 3001;

Temporal Join의 핵심 메커니즘

구성 요소기능실무 의미
FOR SYSTEM_TIME AS OF시점 기준 조인주문 발생 시점의 고객 정보만 조회
unique_orders.$rowtime이벤트 시간 참조실제 주문 발생 시간 기준으로 버전 선택
Versioned Table버전 관리 테이블고객 정보 변경 이력을 시간순으로 관리

추가 설명 - Temporal Join의 내부 동작

Temporal Join은 "Time Travel" 개념을 구현합니다. 각 주문에 대해 다음과 같은 과정을 거칩니다.

  1. 시점 결정: 주문의 $rowtime을 기준 시점으로 설정
  2. 버전 검색: 해당 시점에서 유효했던 고객 정보 버전을 찾음
  3. 스냅샷 조인: 찾은 고객 정보 버전과 주문을 조인
  4. 불변성 보장: 이후 고객 정보가 변경되어도 과거 주문은 영향받지 않음

이를 통해 고객이 이메일을 king.okuneva@yahoo.com에서 johnny.kling@gmail.com으로 변경하더라도, 이전 주문들은 변경 전 이메일을 유지합니다.

Interval Join: 시간 기반 연관성 검증

결제가 없는 주문은 유효하지 않습니다. Interval Join을 사용하여 주문과 결제를 시간 조건으로 연결할 수 있습니다.

Valid Orders 테이블 생성

sql
-- 상태 관리 설정
SET 'client.statement-name' = 'valid-orders-materializer';

-- 유효 주문 테이블 생성
-- 주문과 결제를 시간 조건으로 조인하여 유효한 주문 추출
-- 결제 시각 기준으로 10분 이내에 발생한 주문만 매칭
CREATE TABLE valid_orders (
    order_id STRING,
    customer_id INT,
    product_id STRING,
    order_time TIMESTAMP_LTZ(3),
    payment_time TIMESTAMP_LTZ(3),
    amount DECIMAL,
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND  -- 이벤트 시간 기준 워터마크 설정
) AS 
SELECT 
    unique_orders.order_id,
    customer_id,
    product_id,
    unique_orders.`$rowtime` AS order_time,  -- 주문 시각
    payment_time,                            -- 결제 시각
    amount                                   -- 결제 금액
FROM unique_orders
INNER JOIN payments
ON unique_orders.order_id = payments.order_id
-- 주문 시각이 결제 시각보다 10분 전 ~ 결제 시각 사이인 경우만 유지
WHERE unique_orders.`$rowtime` 
  BETWEEN payment_time - INTERVAL '10' MINUTES AND payment_time;

Interval Join의 핵심 로직

  • BETWEEN payment_time - INTERVAL '10' MINUTES AND payment_time: 결제 시간 기준 10분 전까지의 주문과 매칭
  • 실무 활용: 결제 지연을 고려한 유연한 매칭 정책
  • 성능 최적화: 시간 제약으로 조인 범위 제한

복합 데이터 Enrichment: Multi-Table Join

종합 데이터 프로덕트 생성

실제 비즈니스에서는 여러 데이터 소스를 동시에 결합해야 합니다.

sql
-- 복합 데이터 프로덕트 설정
SET 'client.statement-name' = 'valid-orders-customer-product-materializer';

-- 주문 + 고객 + 상품 정보를 통합한 데이터 프로덕트 생성
CREATE TABLE order_customer_product (
    order_id STRING,
    customer_id INT,
    name STRING,
    email STRING,
    brand STRING,
    product STRING,
    sale_price DOUBLE
) WITH ('changelog.mode' = 'retract')  -- 변경 사항 추적을 위한 CDC 설정
AS 
SELECT 
    valid_orders.order_id,
    valid_orders.customer_id,
    customers.name,
    customers.email,
    products.brand,
    products.name AS product,
    valid_orders.amount AS sale_price
FROM valid_orders
-- 주문 시점 기준 고객 정보 조인 (Temporal Join)
INNER JOIN customers FOR SYSTEM_TIME AS OF valid_orders.order_time
    ON valid_orders.customer_id = customers.customer_id
-- 주문 시점 기준 상품 정보 조인 (Temporal Join)
INNER JOIN products FOR SYSTEM_TIME AS OF valid_orders.order_time
    ON valid_orders.product_id = products.product_id;

다중 Temporal Join 패턴 분석

조인 대상시간 기준활용 목적
customersvalid_orders.order_time주문 당시 고객 정보 확보
productsvalid_orders.order_time주문 당시 상품 정보 확보
changelog.mode = 'retract'-업데이트/삭제 이벤트 처리

추가 설명 - Changelog Mode

changelog.mode = 'retract' 설정은 변경 데이터 캡처(CDC) 패턴을 지원합니다.

  • INSERT: 새로운 레코드 추가 시
  • UPDATE_BEFORE: 기존 레코드 삭제 (retraction)
  • UPDATE_AFTER: 새로운 레코드 삽입
  • DELETE: 레코드 삭제 시

이를 통해 downstream 시스템에서 정확한 상태 변경을 추적할 수 있습니다.

Statement Sets: 효율적인 다중 파이프라인 관리

프로모션 계산 로직

고급 비즈니스 로직을 구현해보겠습니다.

sql
-- 전자제품 프로모션 대상 고객 식별
-- 특정 브랜드 제품을 많이 구매한 고객을 프로모션 대상으로 추출
SELECT 
    customer_id,
    COLLECT(brand) AS products,        -- 구매한 브랜드 목록 수집
    'bundle_offer' AS promotion_name   -- 고정 프로모션 이름
FROM order_customer_product
WHERE brand IN ('Samsung', 'Sony', 'LG')  -- 대상 브랜드 필터
GROUP BY customer_id
HAVING COUNT(DISTINCT brand) >= 2         -- 서로 다른 브랜드 2개 이상
   AND COUNT(brand) > 5;                  -- 총 5개 이상 구매

프로모션 결과 저장

sql
-- 프로모션 테이블 생성
CREATE TABLE electronics_promotions (
    customer_id INT,
    promotion_name STRING,
    PRIMARY KEY (customer_id) NOT ENFORCED
);

-- 프로모션 데이터 삽입
SET 'client.statement-name' = 'electronics-promotions-materializer';

INSERT INTO electronics_promotions
SELECT 
    customer_id,
    'bundle_offer' AS promotion_name
FROM order_customer_product
WHERE brand IN ('Samsung', 'Sony', 'LG')
GROUP BY customer_id
HAVING COUNT(DISTINCT brand) >= 2 AND COUNT(brand) > 5;

고급 윈도우 연산: 고객 충성도 계산

시간 기반 집계와 윈도우 함수

sql
-- 고객별 시간대별 주문 통계
-- 고객별 최근 1시간 내 주문 수와 총 결제 금액 집계
SELECT 
    customer_id,
    order_id,
    order_time,
    COUNT(*) OVER w AS num_orders,   -- 최근 1시간 주문 수
    SUM(amount) OVER w AS total_price -- 최근 1시간 총 결제 금액
FROM `valid_orders`
WINDOW w AS (
    PARTITION BY customer_id           -- 고객별로 집계 분리
    ORDER BY order_time                -- 주문 시각 기준 정렬
    RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW -- 최근 1시간 범위
);

윈도우 함수의 핵심 구성 요소

구성 요소기능실무 활용
PARTITION BY customer_id고객별 윈도우 분리고객별 독립적 계산
ORDER BY order_time시간순 정렬순차적 누적 계산
RANGE BETWEEN ... AND CURRENT ROW시간 범위 지정최근 1시간 데이터만 포함

CTE를 활용한 충성도 등급 계산

sql
-- CTE를 활용한 고객 등급 분류
CREATE TABLE reward_levels (
    customer_id BIGINT, 
    total_price DOUBLE,
    rewards_level STRING,
    updated_at TIMESTAMP_LTZ(3),
    PRIMARY KEY (customer_id) NOT ENFORCED
) AS 
WITH total_price_per_customer_1h AS (
    -- 고객별 최근 1시간 누적 결제 금액
    SELECT 
        customer_id,
        SUM(amount) OVER w AS total_price, 
        order_time
    FROM `valid_orders`
    WINDOW w AS (
        PARTITION BY customer_id
        ORDER BY order_time
        RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
    )
) 
-- 금액 구간별 등급 부여
SELECT 
    COALESCE(customer_id, 0) AS customer_id,     -- null 방지
    total_price,
    CASE
        WHEN total_price > 30000 THEN 'GOLD'
        WHEN total_price > 20000 THEN 'SILVER'
        WHEN total_price > 10000 THEN 'BRONZE'
        ELSE 'NONE'
    END AS rewards_level,
    order_time AS updated_at
FROM total_price_per_customer_1h;

추가 설명 - CTE(Common Table Expression)의 실무 활용

CTE는 복잡한 쿼리의 가독성 향상중간 결과 재사용을 위해 사용됩니다.

  1. 임시 결과 집합: total_price_per_customer_1h는 쿼리 실행 동안만 존재
  2. 로직 분리: 윈도우 계산과 등급 분류 로직을 명확히 구분
  3. 메모리 효율성: 물리적 테이블 생성 없이 논리적 분할 수행

COALESCE(customer_id, 0) 함수는 null 값 처리를 담당합니다.

  • NULL인 customer_id를 0으로 대체
  • 데이터 품질 보장 및 downstream 오류 방지

실시간 데이터 파이프라인 관찰성

Stream Lineage를 통한 데이터 플로우 추적

Confluent Cloud의 Stream Lineage 기능을 통해 구축한 데이터 파이프라인을 시각화할 수 있습니다.

  1. 데이터 프로덕트 식별: order_customer_product, electronics_promotions
  2. 의존성 추적: 각 테이블 간 연관관계 파악
  3. 영향도 분석: 변경 사항의 downstream 영향 범위 확인

이는 데이터 거버넌스운영 투명성 확보에 필수적입니다.

Lab 2에서 배운 핵심 패턴 정리

조인 패턴 선택 가이드

패턴적용 시나리오장점주의사항
Temporal JoinEvent enrichment시점 정확성, 메모리 효율성Primary key 필수
Interval Join시간 관련성 검증유연한 시간 조건워터마크 설정 중요
Regular Join정적 테이블 조인간단한 구현State TTL 필수

성능 최적화 기법

기법목적구현 방법
State TTL메모리 사용량 제한SET 'sql.state-ttl' = '1 hour'
Changelog Mode효율적 변경 추적'changelog.mode' = 'retract'
Statement Sets여러 파이프라인 최적화EXECUTE STATEMENT SET

마무리

이번 글에서는 Confluent Cloud for Apache Flink Lab 2를 통해 고급 데이터 처리 패턴을 살펴보았습니다. 특히 temporal join을 활용한 정확한 시점 기반 enrichment와 statement sets를 통한 효율적인 다중 파이프라인 관리 방법을 구체적으로 학습했습니다.

Kafka가 이벤트를 안전하게 수집·전달하는 데 중점을 두었다면, Flink는 그 위에서 시간 기준의 정제, 상태 기반의 조인, 그리고 복합 로직 실행까지 가능하게 해주는 실시간 처리 플랫폼입니다. 앞으로 실무에서 Flink를 사용할 때 이 글들이 조금이라도 도움이 되었으면 좋겠습니다.

이 시리즈가 Flink 기반 실시간 파이프라인을 설계할 때 실질적인 참고가 되었기를 바랍니다.