Loopers

"같은 주문이 두 번 결제됐습니다" 💸 - Kafka로 배운 분산 시스템의 잔혹한 현실

그zi운아이 2025. 9. 5. 15:18

TL;DR :

주문 이벤트가 다인스턴스에서 동시에 전송되며 중복 결제가 발생했다.
설정(acks=all, idempotence=true)만으로는 애플리케이션 레벨 경합을 막을 수 없었다.
나는 Outbox + DB 원자적 선점 + 동기 전송(.get())으로 "처음부터 안전"을 선택했다.
토픽은 max.in.flight=5, Consumer는 별도 테이블 기반 멱등성, DLT는 Table 우선으로 운영했다.
결과적으로 유실/중복/순서 문제를 "설계로 산" 뒤, 성능 최적화를 진행할 수 있었다.



 


그날, 같은 주문이 두 번 결제되었다 😨

[ERROR] Duplicate payment detected
paymentId: 92134, orderId: 55120, amount: 39000
Original: 2024-03-08 14:23:11
Duplicate: 2024-03-08 14:23:12

금요일 오후, 모니터링 알람이 울렸다 🚨. 같은 주문이 1초 차이로 두 번 결제됐다는 것이었다.

"뭔가 잘못됐다."


Kafka가 뭔데 이렇게 복잡해? 🤯

과제는 단순해 보였다: 이벤트를 Kafka에도 기록해서 외부 시스템이 볼 수 있게 하기

// 기존: 내부에서만 처리
@Transactional
public Order process(OrderCommand command) {
    // 비즈니스 로직
    Order order = orderService.createOrder(command.userId(), items, OrderAmount.of(finalAmount), command.couponId());
    
    // 내부 이벤트만 발행
    // eventBridge.publishEvent(EventType.ORDER_CREATED, OrderCreatedEvent.of(...));
    
    return order;
}

// 추가 목표: 동일한 이벤트를 Kafka에도 기록
@Transactional  
public Order process(OrderCommand command) {
    Order order = orderService.createOrder(...);
    
    // 내부 + 외부 이벤트 동시 발행
    eventBridge.publishEvent(EventType.ORDER_CREATED, 
        OrderCreatedEvent.of(order.getId(), command.userId(), items));
    
    return order;
}

기존 내부 처리는 그대로 두고, 추가로 Kafka에 기록해서 외부 시스템(commerce-streamer)이 감사로그, 메트릭 집계, 별도 캐시 관리를 할 수 있게 하는 것이 목표였다.

문제는 Kafka가 생각보다 훨씬 복잡한 녀석이었다는 것이다 😅.


Kafka의 잔혹한 진실들 💀

1. At-least-once가 기본이다

  • Producer는 "최소 한 번"은 보낸다 (더 보낼 수도 있음)
  • Consumer는 "최소 한 번"은 받는다 (더 받을 수도 있음)
  • Exactly-once는 환상이다

2. 순서는 파티션 내에서만 보장된다

Product 123 이벤트:
Partition 0: 재고 100 → 50 
Partition 1: 재고 0     ← 이게 먼저 처리될 수 있음! 😱

3. 설정 하나 잘못하면 지옥이다

# 이 한 줄이 순서를 망가뜨린다
max.in.flight.requests.per.connection: 5  # 기본값

첫 번째 삽질: 설정만으로 해결될 줄 알았다 🤦‍♂️

처음엔 "Kafka 설정만 잘하면 되겠지"라고 생각했다.

# commerce-api application.yml
spring:
  kafka:
    producer:
      bootstrap-servers: ${KAFKA_BROKERS:localhost:9092}
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      acks: all                    # 모든 replica 확인
      enable-idempotence: true     # 중복 전송 방지
      retries: 2147483647          # 거의 무한 재시도
      properties:
        max.in.flight.requests.per.connection: 5  # 성능 우선
        compression.type: snappy   # 성능 최적화
        linger.ms: 10
        batch.size: 16384
        spring.json.add.type.headers: false

이 설정들은 분명 중요하다 ✅. acks=all + enable.idempotence=true로 Producer 레벨에서는 중복을 막을 수 있다. 하지만 더 큰 문제가 있었다.


진짜 문제: 트랜잭션 경계가 애매했다 😵‍💫

// ❌ 처음 시도한 직접 발행 방식
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void onOrderCreated(OrderCreatedEvent event) {
    // 기존 내부 처리
    processInternalNotification(event);
    
    // Kafka 직접 발행 시도
    kafkaTemplate.send("order-events.v1", event); // 문제!
}

문제 상황들 💥:

  • Kafka 브로커가 다운되면? → 이벤트 유실
  • 네트워크 타임아웃 발생하면? → 재시도 불가
  • 여러 인스턴스가 동시에 실행되면? → 중복 발행 가능

가장 큰 깨달음: 설정에서 막을 수 없는 한계

  • acks=all + idempotence=true로도 애플리케이션 레벨 중복은 막을 수 없다는 걸 깨달았다.
Server A: onOrderCreated() 실행 → kafkaTemplate.send()
Server B: onOrderCreated() 실행 → 같은 이벤트 또 send() 😱

Producer idempotence=true는 같은 Producer 인스턴스 내에서의 중복만 막는다. 서로 다른 애플리케이션 인스턴스면 소용없다 🤷‍♂️.


해결책 1: Outbox 패턴으로 이벤트 유실 방지 🛡️

"이벤트 발행과 비즈니스 로직이 같이 성공하거나 같이 실패해야 한다"

Outbox 패턴이 뭔가? 🤔

기존 문제:

// ❌ 이렇게 하면 트랜잭션 경계가 애매해진다
@Transactional
public void completePayment(Long paymentId) {
    payment.complete();
    paymentRepository.save(payment);  // DB 트랜잭션
    
    kafkaTemplate.send("payment-events", event);  // 네트워크 호출
    // 만약 Kafka 전송이 실패하면? 결제는 완료되었는데 이벤트는 안 감... 💀
}

Outbox 해결책 ✅:

// ✅ 비즈니스 로직과 이벤트를 같은 트랜잭션에서 처리
@Transactional
public Order process(OrderCommand command) {
    // 1. 비즈니스 로직
    Order order = orderService.createOrder(...);
    
    // 2. 이벤트를 내부 + 외부 동시 발행 (같은 트랜잭션!)
    eventBridge.publishEvent(EventType.ORDER_CREATED, 
        OrderCreatedEvent.of(order.getId(), command.userId(), items));
    
    // 둘 다 성공하거나 둘 다 실패 🎯
    return order;
}

실제 DomainEventBridge 구조 📋

@Component
@RequiredArgsConstructor
public class DomainEventBridge {
    private final ApplicationEventPublisher publisher;
    private final EnvelopeFactory envelopeFactory;
    private final OutboxEventService outboxEventService;

    @Transactional(propagation = Propagation.MANDATORY)
    public void publishEvent(EventType eventType, Object payload) {
        Envelope<Object> envelope = envelopeFactory.create(eventType, payload);

        publisher.publishEvent(envelope);    // 내부 이벤트 발행
        outboxEventService.save(envelope);   // 외부 이벤트 DB 저장
    }
}

해결책 2: 원자적 선점 패턴 🎯

Outbox로 이벤트 유실은 막았지만, 새로운 문제가 생겼다: 여러 서버가 같은 이벤트를 중복 처리하는 것.

핵심 아이디어: "하나씩 선점해서 처리하자"

일괄 처리의 함정 🪤

// ❌ 이렇게 하면 동시성 문제 발생
@Scheduled(fixedDelay = 10000)
public void relayEvents() {
    List<OutboxEvent> events = repository.findByStatus(NEW);
    
    // 여러 서버가 같은 리스트를 가져올 수 있음! 😱
    events.forEach(event -> {
        event.markAsSending();
        sendToKafka(event);
    });
}

문제 시나리오 💥:

14:23:11.000 Server A: findByStatus(NEW) → [Event_92134]
14:23:11.001 Server B: findByStatus(NEW) → [Event_92134] (동일!)
14:23:11.100 Server A: markAsSending() + send()
14:23:11.101 Server B: markAsSending() + send() ← 중복! 😱

단건 선점의 안전성 ✅

// ✅ 하나씩 원자적으로 선점해서 처리
@Scheduled(fixedDelay = 10000)
@Transactional
public void relayEvents() {
    List<OutboxEvent> ready = outboxEventRepository.findTop100ReadyToSend(ZonedDateTime.now());
    if (ready.isEmpty()) return;

    // 원자적 클레임: UPDATE WHERE로 먼저 선점한 서버만 처리 🎯
    int claimed = outboxEventRepository.claimEventsForSending(
        ready.stream().map(OutboxEvent::getId).toList(),
        ZonedDateTime.now() 
    );
    
    if (claimed == 0) return; // 다른 서버가 이미 처리 중

    // 안전하게 하나씩 처리 ✅
    for (OutboxEvent event : ready) {
        try {
            sendOnce(event);
        } catch (Exception ex) {
            log.error("Unexpected error while sending event: {}", event.getMessageId(), ex);
        }
    }
}

Repository의 원자적 클레임 🔒:

@Modifying
@Query("UPDATE OutboxEvent o SET o.status = 'SENDING', o.lastModified = :now " +
       "WHERE o.id IN :ids AND o.status IN ('NEW', 'FAILED')")
int claimEventsForSending(@Param("ids") List<Long> ids, @Param("now") ZonedDateTime now);

이제 먼저 클레임한 서버만 이벤트를 처리할 수 있다 🏆.

 


토픽 설계 고민: 언제 나누고 언제 합칠까? 🤹‍♂️

실제 적용한 이벤트 타입별 분리 전략 🎯

처음엔 도메인별로 토픽을 나누려고 했지만, 실제 운영하면서 이벤트 타입별 분리가 더 현실적이었습니다.

 
 
yaml
# 실제 적용한 토픽 구조
order-events.v1: 주문 생성/수정/취소 통합
catalog-events.v1: 상품 재고/좋아요 변경 통합  
notification-events.v1: 알림 발송 요청 통합

왜 이벤트 타입별 통합이 현실적인가? 🤔

선택적 구독의 유연성: Consumer가 관심있는 이벤트만 필터링해서 처리

 
 
java
// 감사로그: 모든 이벤트 구독하되 타입별로 다르게 처리
@KafkaListener(topics = {"order-events.v1", "catalog-events.v1"}, groupId = "audit-consumer")
public void handleAllEvents(GeneralEnvelopeEvent envelope) {
    switch (envelope.type()) {
        case "ORDER_CREATED" -> auditService.logOrderCreation(envelope);
        case "STOCK_ADJUSTED" -> auditService.logStockChange(envelope);
        case "PRODUCT_LIKED" -> auditService.logUserActivity(envelope);
    }
}

// 캐시 무효화: 특정 이벤트만 처리
@KafkaListener(topics = "catalog-events.v1", groupId = "cache-consumer")
public void handleCacheEviction(GeneralEnvelopeEvent envelope) {
    if (CACHE_EVICTION_EVENTS.contains(envelope.type())) {
        cacheService.evict(envelope);
    }
    // 다른 타입은 무시
}

독립 스케일링: 토픽별로 파티션/처리량 최적화

  • order-events.v1: 10개 파티션 (주문량이 많음)
  • catalog-events.v1: 20개 파티션 (재고 변경이 빈번함)
  • notification-events.v1: 5개 파티션 (상대적으로 적음)

장애 격리: 한 토픽 문제가 다른 토픽에 영향 안 줌

 
 
java
// 주문 Consumer 장애가 상품 Consumer에 영향 없음
@KafkaListener(topics = "order-events.v1", groupId = "order-consumer")
public void handleOrderEvents(GeneralEnvelopeEvent envelope) {
    // 주문 처리 로직이 실패해도...
}

@KafkaListener(topics = "catalog-events.v1", groupId = "catalog-consumer")  
public void handleCatalogEvents(GeneralEnvelopeEvent envelope) {
    // 상품 처리는 계속 정상 동작
}

보존 정책: 토픽별로 다른 TTL/보관 정책

  • 주문 이벤트: 7년 보관 (법적 요구사항)
  • 상품 이벤트: 1년 보관 (분석 목적)
  • 알림 이벤트: 30일 보관 (임시 추적)

안티패턴: 이벤트별 토픽 남발 ❌

 
 
yaml
❌ order-created-events.v1
❌ order-updated-events.v1  
❌ order-cancelled-events.v1
❌ stock-increased-events.v1
❌ stock-decreased-events.v1

문제점들:

  • 토픽 관리 복잡성 증가 (5개 → 50개)
  • Consumer 코드 중복 (비슷한 처리 로직)
  • 운영팀 부담 (토픽별 모니터링/알람 설정)

해결책: 도메인별 통합 + eventType 필드로 구분 ✅

 
 
java
// 하나의 토픽에서 여러 이벤트 타입 처리
@KafkaListener(topics = "order-events.v1", groupId = "order-consumer")
public void handleOrderEvents(GeneralEnvelopeEvent envelope) {
    switch (envelope.type()) {
        case "ORDER_CREATED" -> handleOrderCreated(envelope);
        case "ORDER_UPDATED" -> handleOrderUpdated(envelope);
        case "ORDER_CANCELLED" -> handleOrderCancelled(envelope);
        default -> log.warn("Unknown event type: {}", envelope.type());
    }
}

Consumer별 처리 전략 🎭

1. 감사로그: 모든 이벤트 저장 📝

@Component
public class AuditLogHandler implements EventHandler {
    private final AuditLogService auditLogService;

    @Override
    public boolean canHandle(String eventType) {
        return true; // 모든 이벤트 로깅 📝
    }
    
    @Override
    public void handle(GeneralEnvelopeEvent envelope) {
        auditLogService.saveAuditLog(envelope);
    }
}

2. 메트릭 집계: 특정 이벤트만 📊

@Component
public class MetricsHandler implements EventHandler {
    private final MetricsService metricsService;

    @Override
    public boolean canHandle(String eventType) {
        return EventTypes.METRIC_EVENTS.contains(eventType);
    }
    
    @Override
    public void handle(GeneralEnvelopeEvent envelope) {
        metricsService.recordMetric(envelope);
    }
}

 


세 번째 삽질: 비동기의 함정 😓

// ❌ 이것도 함정이었다
@Transactional
protected void sendOnce(OutboxEvent event) {
    event.markAsSending();
    
    kafkaTemplate.send(event.getTopic(), event.getEventKey(), event.toGeneralEnvelopeEvent())
        .whenComplete((result, ex) -> {
            if (ex == null) {
                event.markAsPublished(); // 다른 스레드에서 실행!
                repository.save(event);  // @Transactional 전파 안됨!
            }
        });
}

문제: .whenComplete() 콜백은 다른 스레드 풀에서 실행되어 트랜잭션 컨텍스트가 없다.

해결: 동기 전송으로 변경 ✅

// ✅ 안전한 동기 처리 (실제 구현)
@SuppressWarnings("unchecked")
@Transactional
protected void sendOnce(OutboxEvent event) {
    event.markAsSending();

    try {
        // .get()으로 동기 대기 - 트랜잭션 안에서 상태 관리
        SendResult<String, Object> result = (SendResult<String, Object>) kafkaTemplate
            .send(event.getTopic(), event.getEventKey(), event.toGeneralEnvelopeEvent()).get();

        event.markAsPublished();
        outboxEventRepository.save(event);

    } catch (InterruptedException ie) {
        Thread.currentThread().interrupt(); // 인터럽트 복구
        event.markAsFailed(ie.getMessage(), retryPolicy);
        outboxEventRepository.save(event);

    } catch (ExecutionException ee) {
        log.error("Failed to send event: {}", event.getMessageId(), ee);
        event.markAsFailed(ee.getMessage(), retryPolicy);
        outboxEventRepository.save(event);
    }
}

트레이드오프 표:

방식 장점 단점 선택

비동기 + 콜백 TPS 높음 트랜잭션 경계 불명확
동기 .get() 상태관리 명확 TPS 손실

결정 근거: 초기에는 안전성이 성능보다 우선


Consumer: 멱등성이 생명 🔒

Producer에서 중복을 막아도, Consumer는 또 다른 차원의 중복을 고려해야 한다.

멱등성 테이블 설계

@Entity
@Table(name = "processed_events")
public class ProcessedEvent extends BaseEntity {
    
    @Column(name = "message_id", length = 128, unique = true, nullable = false)
    private String messageId; // 멱등성 키
    
    @Enumerated(EnumType.STRING)
    @Column(name = "status", nullable = false)
    private Status status = Status.PROCESSING; // 기본은 선점 중
    
    @Column(name = "event_type", length = 100, nullable = false)
    private String eventType;
    
    @Column(name = "correlation_id", length = 128)
    private String correlationId;
    
    @Column(name = "started_at", nullable = false)
    private ZonedDateTime startedAt = ZonedDateTime.now();
    
    @Column(name = "processed_at")
    private ZonedDateTime processedAt;
    
    public enum Status { PROCESSING, PROCESSED, FAILED }
}

Consumer 멱등성 처리 실제 구현

@KafkaListener(topics = "order-events.v1", groupId = "order-consumer")
public void handleOrderEvents(GeneralEnvelopeEvent envelope, 
                             Acknowledgment ack,
                             @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                             @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
                             @Header(KafkaHeaders.OFFSET) long offset) {
    
    log.info("Order event received - messageId: {}, type: {}, offset: {}:{}", 
            envelope.messageId(), envelope.type(), partition, offset);
    
    try {
        // 주문 이벤트는 중요하므로 적극적인 재시도 정책 적용
        retryableEventProcessor.processWithRetry(
            envelope, topic, partition, offset, "order-consumer", RetryPolicy.AGGRESSIVE);
        
        ack.acknowledge();
        log.debug("Order event acknowledged - messageId: {}", envelope.messageId());
        
    } catch (Exception e) {
        log.error("Critical: Failed to process order event after all retries - messageId: {}. " +
                 "Event has been sent to Dead Letter Table.", envelope.messageId(), e);
        
        // 재시도 로직에서 이미 DLT로 보냈으므로 ACK 처리
        ack.acknowledge();
    }
}

핵심 설정들과 이유

Producer 필수 설정

spring:
  kafka:
    producer:
      bootstrap-servers: ${KAFKA_BROKERS:localhost:9092}
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      acks: all                              # 모든 ISR 확인
      enable-idempotence: true               # Producer 레벨 중복 방지
      retries: 2147483647                    # 거의 무한 재시도
      properties:
        max.in.flight.requests.per.connection: 5  # 성능 우선
        compression.type: snappy             # 네트워크 대역폭 절약
        linger.ms: 10                        # 배치 최적화
        batch.size: 16384                    # 적당한 배치 크기
        spring.json.add.type.headers: false  # 타입 헤더 생략

생각해볼 점: "max.in.flight=5가 너무 성능 우선적이지 않나?"

: 순서가 중요한 토픽만 1로 설정. 순서 무관하면 5 (기본값) 사용해서 성능 확보.

Consumer 필수 설정

spring:
  kafka:
    consumer:
      group-id: loopers-default-consumer
      key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      properties:
        spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
        spring.json.trusted.packages: "*"   # 역직렬화 허용 패키지
        spring.json.value.default.type: com.fasterxml.jackson.databind.JsonNode
        enable.auto.commit: false            # Manual ACK
        auto.offset.reset: latest            # 새 메시지부터
    listener:
      ack-mode: manual                       # 처리 완료 후 수동 ACK

Dead Letter Table: 실패의 완전한 추적

고민: "DLQ는 Topic으로 해야 하나요, Table로 해야 하나요?"

: 초기엔 Table이 관리하기 쉽다. 이유:

  • SQL로 쉽게 조회/분석 가능
  • 상태 관리 (DEAD → INVESTIGATING → RESOLVED)
  • 운영팀이 익숙한 도구 (DB Admin)
@Entity
public class DeadLetterEvent extends BaseEntity {
    
    private String messageId;
    private String originalTopic;
    private Integer partitionId;
    private Long offsetId;
    private String consumerGroup;
    
    @Column(columnDefinition = "JSON")
    private String payload;
    
    @Enumerated(EnumType.STRING)
    private FailureReason failureReason;
    
    private String errorMessage;
    private String stackTrace;
    private Integer retryAttempts;
    
    public enum FailureReason {
        DESERIALIZATION_ERROR,    // JSON 파싱 실패
        HANDLER_EXCEPTION,        // 비즈니스 로직 오류
        TIMEOUT,                  // 처리 시간 초과
        RESOURCE_UNAVAILABLE      // DB/API 접근 불가
    }
}

 


아직 남은 과제들

1. 배치 처리 도입

현재 한계: 메시지 1개씩 처리로 DB 커넥션 낭비

// 지금: 100개 메시지 = 100번 DB 호출
for (GeneralEnvelopeEvent event : events) {
    auditLogService.saveAuditLog(event); // 각각 DB 호출
}

// 목표: 100개 메시지 = 1번 배치 처리
auditLogService.saveBatch(events); // 한 번에 처리

고려사항: 일부 실패 시 전체 롤백 vs 부분 처리

2. Hot Key 대응

잠재적 문제: 인기 상품에 이벤트 몰릴 경우

productId=12345 (인기상품)에 이벤트 몰림
→ 특정 파티션에만 트래픽 집중
→ 해당 Consumer만 과부하

대응 방안: 샤딩 키 전략 재설계 or 파티션 증설

3. 스키마 진화 대응

현재: JSON 직렬화로 스키마 변화에 취약
향후: Avro + Schema Registry 고려

4. 모니터링 개선

추가 필요한 지표:

  • 토픽별 처리량/지연시간 추이
  • DLT 누적 패턴 분석
  • Consumer Group별 성능 비교
  • 파티션간 처리량 불균형 모니터링

1주간 Kafka 파이프라인 구현 여정

Week 1 Day 1-2: "Kafka 설정만 하면 되겠지" → 직접 발행 시도 → 트랜잭션 경계 문제 발견
Week 1 Day 3-4: Outbox 패턴 도입 → 동시성 지옥 경험 → 원자적 선점으로 해결
Week 1 Day 5-7: Consumer 멱등성 + DLT 구현 → 모니터링까지 완성

짧은 기간이었지만 가장 중요한 깨달음들:

  1. At-least-once가 현실이다: Producer든 Consumer든 중복을 전제로 설계
  2. 동시성은 항상 고려하라: 여러 인스턴스 환경에서는 모든 게 경합 상황
  3. 실패를 설계하라: DLT, 재시도, 모니터링까지 포함해야 완성
  4. 안전성 확보 후 성능 최적화: 빠르게 만들어서 버그 투성이가 되느니 느리더라도 안전하게

마무리

Kafka는 도구일 뿐이다. 중요한 건 안전하고 추적 가능한 이벤트 전파 체계를 만드는 것이다. 이번 1주 경험으로 분산 시스템 설계의 기본기를 탄탄히 다질 수 있었다.

"같은 주문이 두 번 결제되는 일은 이제 없다. 하지만 언제나 새로운 문제가 기다리고 있다는 것도 알고 있다."


다이어그램 모음 📊

전체 아키텍처 플로우 🏗️