지난 글에서는 Confluent Cloud for Apache Flink의 기본 기능과 중복 데이터 제거를 통한 실시간 데이터 정제를 살펴보았습니다.
이번 글에서는 Lab 2를 통해 한 단계 더 발전된 데이터 enrichment 패턴과 고급 조인 기법을 자세히 다뤄보겠습니다. 특히 실제 운영 환경에서 자주 접하는 temporal join과 statement sets를 활용한 복합 데이터 파이프라인 구축 방법을 구체적으로 살펴보겠습니다.
실시간 데이터 Enrichment의 핵심 과제
실제 비즈니스 환경에서는 주문 데이터가 단독으로 존재하는 경우는 거의 없습니다. 고객 정보, 상품 카탈로그, 결제 기록 등 다양한 데이터 소스와 결합되어야 의미 있는 비즈니스 인사이트를 제공할 수 있습니다. 하지만 이러한 데이터 enrichment 과정에서 다음과 같은 현실적인 문제들이 발생합니다.
Lab 2에서는 이러한 문제들을 체계적으로 해결하는 방법을 제시합니다.
고급 조인 패턴: Regular Join vs Temporal Join
Regular Join의 한계점
먼저 일반적인 내부 조인의 문제점을 살펴보겠습니다.
-- 고객 ID가 3001인 주문에 대해 고객 이메일 정보를 조인
SELECT
order_id,
unique_orders.`$rowtime`, -- 주문 발생 시각 (이벤트 시간)
email -- 고객 이메일 정보
FROM unique_orders
INNER JOIN customers -- 일반 조인 (Regular Join)
ON unique_orders.customer_id = customers.customer_id
WHERE unique_orders.customer_id = 3001;Regular Join의 문제점
- 무한 상태 증가: 주문 데이터가 무한히 증가하면 조인 상태도 무한히 커짐
- 부적절한 업데이트: 고객 정보 변경 시 과거 주문까지 모두 업데이트됨
- 메모리 부족: State TTL 없이는 메모리 사용량이 계속 증가
Temporal Join의 우월성
Temporal Join은 이러한 문제를 근본적으로 해결합니다.
-- 고객 ID가 3001인 주문에 대해 주문 시점의 고객 이메일 정보를 조회
SELECT
order_id,
unique_orders.`$rowtime`, -- 주문 발생 시각 (이벤트 시간)
email -- 주문 당시의 고객 이메일 (변경 불가)
FROM unique_orders
-- Temporal Join을 사용하여 주문 발생 시점 기준으로 고객 정보 조회
INNER JOIN customers FOR SYSTEM_TIME AS OF unique_orders.`$rowtime`
ON unique_orders.customer_id = customers.customer_id
WHERE unique_orders.customer_id = 3001;Temporal Join의 핵심 메커니즘
추가 설명 - Temporal Join의 내부 동작
Temporal Join은 "Time Travel" 개념을 구현합니다. 각 주문에 대해 다음과 같은 과정을 거칩니다.
- 시점 결정: 주문의
$rowtime을 기준 시점으로 설정 - 버전 검색: 해당 시점에서 유효했던 고객 정보 버전을 찾음
- 스냅샷 조인: 찾은 고객 정보 버전과 주문을 조인
- 불변성 보장: 이후 고객 정보가 변경되어도 과거 주문은 영향받지 않음
이를 통해 고객이 이메일을 king.okuneva@yahoo.com에서 johnny.kling@gmail.com으로 변경하더라도, 이전 주문들은 변경 전 이메일을 유지합니다.
Interval Join: 시간 기반 연관성 검증
결제가 없는 주문은 유효하지 않습니다. Interval Join을 사용하여 주문과 결제를 시간 조건으로 연결할 수 있습니다.
Valid Orders 테이블 생성
-- 상태 관리 설정
SET 'client.statement-name' = 'valid-orders-materializer';
-- 유효 주문 테이블 생성
-- 주문과 결제를 시간 조건으로 조인하여 유효한 주문 추출
-- 결제 시각 기준으로 10분 이내에 발생한 주문만 매칭
CREATE TABLE valid_orders (
order_id STRING,
customer_id INT,
product_id STRING,
order_time TIMESTAMP_LTZ(3),
payment_time TIMESTAMP_LTZ(3),
amount DECIMAL,
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND -- 이벤트 시간 기준 워터마크 설정
) AS
SELECT
unique_orders.order_id,
customer_id,
product_id,
unique_orders.`$rowtime` AS order_time, -- 주문 시각
payment_time, -- 결제 시각
amount -- 결제 금액
FROM unique_orders
INNER JOIN payments
ON unique_orders.order_id = payments.order_id
-- 주문 시각이 결제 시각보다 10분 전 ~ 결제 시각 사이인 경우만 유지
WHERE unique_orders.`$rowtime`
BETWEEN payment_time - INTERVAL '10' MINUTES AND payment_time;Interval Join의 핵심 로직
BETWEEN payment_time - INTERVAL '10' MINUTES AND payment_time: 결제 시간 기준 10분 전까지의 주문과 매칭- 실무 활용: 결제 지연을 고려한 유연한 매칭 정책
- 성능 최적화: 시간 제약으로 조인 범위 제한
복합 데이터 Enrichment: Multi-Table Join
종합 데이터 프로덕트 생성
실제 비즈니스에서는 여러 데이터 소스를 동시에 결합해야 합니다.
-- 복합 데이터 프로덕트 설정
SET 'client.statement-name' = 'valid-orders-customer-product-materializer';
-- 주문 + 고객 + 상품 정보를 통합한 데이터 프로덕트 생성
CREATE TABLE order_customer_product (
order_id STRING,
customer_id INT,
name STRING,
email STRING,
brand STRING,
product STRING,
sale_price DOUBLE
) WITH ('changelog.mode' = 'retract') -- 변경 사항 추적을 위한 CDC 설정
AS
SELECT
valid_orders.order_id,
valid_orders.customer_id,
customers.name,
customers.email,
products.brand,
products.name AS product,
valid_orders.amount AS sale_price
FROM valid_orders
-- 주문 시점 기준 고객 정보 조인 (Temporal Join)
INNER JOIN customers FOR SYSTEM_TIME AS OF valid_orders.order_time
ON valid_orders.customer_id = customers.customer_id
-- 주문 시점 기준 상품 정보 조인 (Temporal Join)
INNER JOIN products FOR SYSTEM_TIME AS OF valid_orders.order_time
ON valid_orders.product_id = products.product_id;다중 Temporal Join 패턴 분석
추가 설명 - Changelog Mode
changelog.mode = 'retract' 설정은 변경 데이터 캡처(CDC) 패턴을 지원합니다.
- INSERT: 새로운 레코드 추가 시
- UPDATE_BEFORE: 기존 레코드 삭제 (retraction)
- UPDATE_AFTER: 새로운 레코드 삽입
- DELETE: 레코드 삭제 시
이를 통해 downstream 시스템에서 정확한 상태 변경을 추적할 수 있습니다.
Statement Sets: 효율적인 다중 파이프라인 관리
프로모션 계산 로직
고급 비즈니스 로직을 구현해보겠습니다.
-- 전자제품 프로모션 대상 고객 식별
-- 특정 브랜드 제품을 많이 구매한 고객을 프로모션 대상으로 추출
SELECT
customer_id,
COLLECT(brand) AS products, -- 구매한 브랜드 목록 수집
'bundle_offer' AS promotion_name -- 고정 프로모션 이름
FROM order_customer_product
WHERE brand IN ('Samsung', 'Sony', 'LG') -- 대상 브랜드 필터
GROUP BY customer_id
HAVING COUNT(DISTINCT brand) >= 2 -- 서로 다른 브랜드 2개 이상
AND COUNT(brand) > 5; -- 총 5개 이상 구매프로모션 결과 저장
-- 프로모션 테이블 생성
CREATE TABLE electronics_promotions (
customer_id INT,
promotion_name STRING,
PRIMARY KEY (customer_id) NOT ENFORCED
);
-- 프로모션 데이터 삽입
SET 'client.statement-name' = 'electronics-promotions-materializer';
INSERT INTO electronics_promotions
SELECT
customer_id,
'bundle_offer' AS promotion_name
FROM order_customer_product
WHERE brand IN ('Samsung', 'Sony', 'LG')
GROUP BY customer_id
HAVING COUNT(DISTINCT brand) >= 2 AND COUNT(brand) > 5;고급 윈도우 연산: 고객 충성도 계산
시간 기반 집계와 윈도우 함수
-- 고객별 시간대별 주문 통계
-- 고객별 최근 1시간 내 주문 수와 총 결제 금액 집계
SELECT
customer_id,
order_id,
order_time,
COUNT(*) OVER w AS num_orders, -- 최근 1시간 주문 수
SUM(amount) OVER w AS total_price -- 최근 1시간 총 결제 금액
FROM `valid_orders`
WINDOW w AS (
PARTITION BY customer_id -- 고객별로 집계 분리
ORDER BY order_time -- 주문 시각 기준 정렬
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW -- 최근 1시간 범위
);윈도우 함수의 핵심 구성 요소
CTE를 활용한 충성도 등급 계산
-- CTE를 활용한 고객 등급 분류
CREATE TABLE reward_levels (
customer_id BIGINT,
total_price DOUBLE,
rewards_level STRING,
updated_at TIMESTAMP_LTZ(3),
PRIMARY KEY (customer_id) NOT ENFORCED
) AS
WITH total_price_per_customer_1h AS (
-- 고객별 최근 1시간 누적 결제 금액
SELECT
customer_id,
SUM(amount) OVER w AS total_price,
order_time
FROM `valid_orders`
WINDOW w AS (
PARTITION BY customer_id
ORDER BY order_time
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
)
)
-- 금액 구간별 등급 부여
SELECT
COALESCE(customer_id, 0) AS customer_id, -- null 방지
total_price,
CASE
WHEN total_price > 30000 THEN 'GOLD'
WHEN total_price > 20000 THEN 'SILVER'
WHEN total_price > 10000 THEN 'BRONZE'
ELSE 'NONE'
END AS rewards_level,
order_time AS updated_at
FROM total_price_per_customer_1h;추가 설명 - CTE(Common Table Expression)의 실무 활용
CTE는 복잡한 쿼리의 가독성 향상과 중간 결과 재사용을 위해 사용됩니다.
- 임시 결과 집합:
total_price_per_customer_1h는 쿼리 실행 동안만 존재 - 로직 분리: 윈도우 계산과 등급 분류 로직을 명확히 구분
- 메모리 효율성: 물리적 테이블 생성 없이 논리적 분할 수행
COALESCE(customer_id, 0) 함수는 null 값 처리를 담당합니다.
- NULL인
customer_id를 0으로 대체 - 데이터 품질 보장 및 downstream 오류 방지
실시간 데이터 파이프라인 관찰성
Stream Lineage를 통한 데이터 플로우 추적
Confluent Cloud의 Stream Lineage 기능을 통해 구축한 데이터 파이프라인을 시각화할 수 있습니다.
- 데이터 프로덕트 식별:
order_customer_product,electronics_promotions - 의존성 추적: 각 테이블 간 연관관계 파악
- 영향도 분석: 변경 사항의 downstream 영향 범위 확인
이는 데이터 거버넌스와 운영 투명성 확보에 필수적입니다.
Lab 2에서 배운 핵심 패턴 정리
조인 패턴 선택 가이드
성능 최적화 기법
마무리
이번 글에서는 Confluent Cloud for Apache Flink Lab 2를 통해 고급 데이터 처리 패턴을 살펴보았습니다. 특히 temporal join을 활용한 정확한 시점 기반 enrichment와 statement sets를 통한 효율적인 다중 파이프라인 관리 방법을 구체적으로 학습했습니다.
Kafka가 이벤트를 안전하게 수집·전달하는 데 중점을 두었다면, Flink는 그 위에서 시간 기준의 정제, 상태 기반의 조인, 그리고 복합 로직 실행까지 가능하게 해주는 실시간 처리 플랫폼입니다. 앞으로 실무에서 Flink를 사용할 때 이 글들이 조금이라도 도움이 되었으면 좋겠습니다.
이 시리즈가 Flink 기반 실시간 파이프라인을 설계할 때 실질적인 참고가 되었기를 바랍니다.