TL;DR :
주문 이벤트가 다인스턴스에서 동시에 전송되며 중복 결제가 발생했다.
설정(acks=all, idempotence=true)만으로는 애플리케이션 레벨 경합을 막을 수 없었다.
나는 Outbox + DB 원자적 선점 + 동기 전송(.get())으로 "처음부터 안전"을 선택했다.
토픽은 max.in.flight=5, Consumer는 별도 테이블 기반 멱등성, DLT는 Table 우선으로 운영했다.
결과적으로 유실/중복/순서 문제를 "설계로 산" 뒤, 성능 최적화를 진행할 수 있었다.
그날, 같은 주문이 두 번 결제되었다 😨
[ERROR] Duplicate payment detected
paymentId: 92134, orderId: 55120, amount: 39000
Original: 2024-03-08 14:23:11
Duplicate: 2024-03-08 14:23:12
금요일 오후, 모니터링 알람이 울렸다 🚨. 같은 주문이 1초 차이로 두 번 결제됐다는 것이었다.
"뭔가 잘못됐다."
Kafka가 뭔데 이렇게 복잡해? 🤯
과제는 단순해 보였다: 이벤트를 Kafka에도 기록해서 외부 시스템이 볼 수 있게 하기
// 기존: 내부에서만 처리
@Transactional
public Order process(OrderCommand command) {
// 비즈니스 로직
Order order = orderService.createOrder(command.userId(), items, OrderAmount.of(finalAmount), command.couponId());
// 내부 이벤트만 발행
// eventBridge.publishEvent(EventType.ORDER_CREATED, OrderCreatedEvent.of(...));
return order;
}
// 추가 목표: 동일한 이벤트를 Kafka에도 기록
@Transactional
public Order process(OrderCommand command) {
Order order = orderService.createOrder(...);
// 내부 + 외부 이벤트 동시 발행
eventBridge.publishEvent(EventType.ORDER_CREATED,
OrderCreatedEvent.of(order.getId(), command.userId(), items));
return order;
}
기존 내부 처리는 그대로 두고, 추가로 Kafka에 기록해서 외부 시스템(commerce-streamer)이 감사로그, 메트릭 집계, 별도 캐시 관리를 할 수 있게 하는 것이 목표였다.
문제는 Kafka가 생각보다 훨씬 복잡한 녀석이었다는 것이다 😅.
Kafka의 잔혹한 진실들 💀
1. At-least-once가 기본이다
- Producer는 "최소 한 번"은 보낸다 (더 보낼 수도 있음)
- Consumer는 "최소 한 번"은 받는다 (더 받을 수도 있음)
- Exactly-once는 환상이다 ✨
2. 순서는 파티션 내에서만 보장된다
Product 123 이벤트:
Partition 0: 재고 100 → 50
Partition 1: 재고 0 ← 이게 먼저 처리될 수 있음! 😱
3. 설정 하나 잘못하면 지옥이다
# 이 한 줄이 순서를 망가뜨린다
max.in.flight.requests.per.connection: 5 # 기본값
첫 번째 삽질: 설정만으로 해결될 줄 알았다 🤦♂️
처음엔 "Kafka 설정만 잘하면 되겠지"라고 생각했다.
# commerce-api application.yml
spring:
kafka:
producer:
bootstrap-servers: ${KAFKA_BROKERS:localhost:9092}
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
acks: all # 모든 replica 확인
enable-idempotence: true # 중복 전송 방지
retries: 2147483647 # 거의 무한 재시도
properties:
max.in.flight.requests.per.connection: 5 # 성능 우선
compression.type: snappy # 성능 최적화
linger.ms: 10
batch.size: 16384
spring.json.add.type.headers: false
이 설정들은 분명 중요하다 ✅. acks=all + enable.idempotence=true로 Producer 레벨에서는 중복을 막을 수 있다. 하지만 더 큰 문제가 있었다.
진짜 문제: 트랜잭션 경계가 애매했다 😵💫
// ❌ 처음 시도한 직접 발행 방식
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void onOrderCreated(OrderCreatedEvent event) {
// 기존 내부 처리
processInternalNotification(event);
// Kafka 직접 발행 시도
kafkaTemplate.send("order-events.v1", event); // 문제!
}
문제 상황들 💥:
- Kafka 브로커가 다운되면? → 이벤트 유실
- 네트워크 타임아웃 발생하면? → 재시도 불가
- 여러 인스턴스가 동시에 실행되면? → 중복 발행 가능
가장 큰 깨달음: 설정에서 막을 수 없는 한계
- acks=all + idempotence=true로도 애플리케이션 레벨 중복은 막을 수 없다는 걸 깨달았다.
Server A: onOrderCreated() 실행 → kafkaTemplate.send()
Server B: onOrderCreated() 실행 → 같은 이벤트 또 send() 😱
Producer idempotence=true는 같은 Producer 인스턴스 내에서의 중복만 막는다. 서로 다른 애플리케이션 인스턴스면 소용없다 🤷♂️.
해결책 1: Outbox 패턴으로 이벤트 유실 방지 🛡️
"이벤트 발행과 비즈니스 로직이 같이 성공하거나 같이 실패해야 한다"
Outbox 패턴이 뭔가? 🤔
기존 문제:
// ❌ 이렇게 하면 트랜잭션 경계가 애매해진다
@Transactional
public void completePayment(Long paymentId) {
payment.complete();
paymentRepository.save(payment); // DB 트랜잭션
kafkaTemplate.send("payment-events", event); // 네트워크 호출
// 만약 Kafka 전송이 실패하면? 결제는 완료되었는데 이벤트는 안 감... 💀
}
Outbox 해결책 ✅:
// ✅ 비즈니스 로직과 이벤트를 같은 트랜잭션에서 처리
@Transactional
public Order process(OrderCommand command) {
// 1. 비즈니스 로직
Order order = orderService.createOrder(...);
// 2. 이벤트를 내부 + 외부 동시 발행 (같은 트랜잭션!)
eventBridge.publishEvent(EventType.ORDER_CREATED,
OrderCreatedEvent.of(order.getId(), command.userId(), items));
// 둘 다 성공하거나 둘 다 실패 🎯
return order;
}
실제 DomainEventBridge 구조 📋
@Component
@RequiredArgsConstructor
public class DomainEventBridge {
private final ApplicationEventPublisher publisher;
private final EnvelopeFactory envelopeFactory;
private final OutboxEventService outboxEventService;
@Transactional(propagation = Propagation.MANDATORY)
public void publishEvent(EventType eventType, Object payload) {
Envelope<Object> envelope = envelopeFactory.create(eventType, payload);
publisher.publishEvent(envelope); // 내부 이벤트 발행
outboxEventService.save(envelope); // 외부 이벤트 DB 저장
}
}
해결책 2: 원자적 선점 패턴 🎯
Outbox로 이벤트 유실은 막았지만, 새로운 문제가 생겼다: 여러 서버가 같은 이벤트를 중복 처리하는 것.
핵심 아이디어: "하나씩 선점해서 처리하자"
일괄 처리의 함정 🪤
// ❌ 이렇게 하면 동시성 문제 발생
@Scheduled(fixedDelay = 10000)
public void relayEvents() {
List<OutboxEvent> events = repository.findByStatus(NEW);
// 여러 서버가 같은 리스트를 가져올 수 있음! 😱
events.forEach(event -> {
event.markAsSending();
sendToKafka(event);
});
}
문제 시나리오 💥:
14:23:11.000 Server A: findByStatus(NEW) → [Event_92134]
14:23:11.001 Server B: findByStatus(NEW) → [Event_92134] (동일!)
14:23:11.100 Server A: markAsSending() + send()
14:23:11.101 Server B: markAsSending() + send() ← 중복! 😱
단건 선점의 안전성 ✅
// ✅ 하나씩 원자적으로 선점해서 처리
@Scheduled(fixedDelay = 10000)
@Transactional
public void relayEvents() {
List<OutboxEvent> ready = outboxEventRepository.findTop100ReadyToSend(ZonedDateTime.now());
if (ready.isEmpty()) return;
// 원자적 클레임: UPDATE WHERE로 먼저 선점한 서버만 처리 🎯
int claimed = outboxEventRepository.claimEventsForSending(
ready.stream().map(OutboxEvent::getId).toList(),
ZonedDateTime.now()
);
if (claimed == 0) return; // 다른 서버가 이미 처리 중
// 안전하게 하나씩 처리 ✅
for (OutboxEvent event : ready) {
try {
sendOnce(event);
} catch (Exception ex) {
log.error("Unexpected error while sending event: {}", event.getMessageId(), ex);
}
}
}
Repository의 원자적 클레임 🔒:
@Modifying
@Query("UPDATE OutboxEvent o SET o.status = 'SENDING', o.lastModified = :now " +
"WHERE o.id IN :ids AND o.status IN ('NEW', 'FAILED')")
int claimEventsForSending(@Param("ids") List<Long> ids, @Param("now") ZonedDateTime now);
이제 먼저 클레임한 서버만 이벤트를 처리할 수 있다 🏆.
토픽 설계 고민: 언제 나누고 언제 합칠까? 🤹♂️
실제 적용한 이벤트 타입별 분리 전략 🎯
처음엔 도메인별로 토픽을 나누려고 했지만, 실제 운영하면서 이벤트 타입별 분리가 더 현실적이었습니다.
# 실제 적용한 토픽 구조
order-events.v1: 주문 생성/수정/취소 통합
catalog-events.v1: 상품 재고/좋아요 변경 통합
notification-events.v1: 알림 발송 요청 통합
왜 이벤트 타입별 통합이 현실적인가? 🤔
선택적 구독의 유연성: Consumer가 관심있는 이벤트만 필터링해서 처리
// 감사로그: 모든 이벤트 구독하되 타입별로 다르게 처리
@KafkaListener(topics = {"order-events.v1", "catalog-events.v1"}, groupId = "audit-consumer")
public void handleAllEvents(GeneralEnvelopeEvent envelope) {
switch (envelope.type()) {
case "ORDER_CREATED" -> auditService.logOrderCreation(envelope);
case "STOCK_ADJUSTED" -> auditService.logStockChange(envelope);
case "PRODUCT_LIKED" -> auditService.logUserActivity(envelope);
}
}
// 캐시 무효화: 특정 이벤트만 처리
@KafkaListener(topics = "catalog-events.v1", groupId = "cache-consumer")
public void handleCacheEviction(GeneralEnvelopeEvent envelope) {
if (CACHE_EVICTION_EVENTS.contains(envelope.type())) {
cacheService.evict(envelope);
}
// 다른 타입은 무시
}
독립 스케일링: 토픽별로 파티션/처리량 최적화
- order-events.v1: 10개 파티션 (주문량이 많음)
- catalog-events.v1: 20개 파티션 (재고 변경이 빈번함)
- notification-events.v1: 5개 파티션 (상대적으로 적음)
장애 격리: 한 토픽 문제가 다른 토픽에 영향 안 줌
// 주문 Consumer 장애가 상품 Consumer에 영향 없음
@KafkaListener(topics = "order-events.v1", groupId = "order-consumer")
public void handleOrderEvents(GeneralEnvelopeEvent envelope) {
// 주문 처리 로직이 실패해도...
}
@KafkaListener(topics = "catalog-events.v1", groupId = "catalog-consumer")
public void handleCatalogEvents(GeneralEnvelopeEvent envelope) {
// 상품 처리는 계속 정상 동작
}
보존 정책: 토픽별로 다른 TTL/보관 정책
- 주문 이벤트: 7년 보관 (법적 요구사항)
- 상품 이벤트: 1년 보관 (분석 목적)
- 알림 이벤트: 30일 보관 (임시 추적)
안티패턴: 이벤트별 토픽 남발 ❌
❌ order-created-events.v1
❌ order-updated-events.v1
❌ order-cancelled-events.v1
❌ stock-increased-events.v1
❌ stock-decreased-events.v1
문제점들:
- 토픽 관리 복잡성 증가 (5개 → 50개)
- Consumer 코드 중복 (비슷한 처리 로직)
- 운영팀 부담 (토픽별 모니터링/알람 설정)
해결책: 도메인별 통합 + eventType 필드로 구분 ✅
// 하나의 토픽에서 여러 이벤트 타입 처리
@KafkaListener(topics = "order-events.v1", groupId = "order-consumer")
public void handleOrderEvents(GeneralEnvelopeEvent envelope) {
switch (envelope.type()) {
case "ORDER_CREATED" -> handleOrderCreated(envelope);
case "ORDER_UPDATED" -> handleOrderUpdated(envelope);
case "ORDER_CANCELLED" -> handleOrderCancelled(envelope);
default -> log.warn("Unknown event type: {}", envelope.type());
}
}
Consumer별 처리 전략 🎭
1. 감사로그: 모든 이벤트 저장 📝
@Component
public class AuditLogHandler implements EventHandler {
private final AuditLogService auditLogService;
@Override
public boolean canHandle(String eventType) {
return true; // 모든 이벤트 로깅 📝
}
@Override
public void handle(GeneralEnvelopeEvent envelope) {
auditLogService.saveAuditLog(envelope);
}
}
2. 메트릭 집계: 특정 이벤트만 📊
@Component
public class MetricsHandler implements EventHandler {
private final MetricsService metricsService;
@Override
public boolean canHandle(String eventType) {
return EventTypes.METRIC_EVENTS.contains(eventType);
}
@Override
public void handle(GeneralEnvelopeEvent envelope) {
metricsService.recordMetric(envelope);
}
}
세 번째 삽질: 비동기의 함정 😓
// ❌ 이것도 함정이었다
@Transactional
protected void sendOnce(OutboxEvent event) {
event.markAsSending();
kafkaTemplate.send(event.getTopic(), event.getEventKey(), event.toGeneralEnvelopeEvent())
.whenComplete((result, ex) -> {
if (ex == null) {
event.markAsPublished(); // 다른 스레드에서 실행!
repository.save(event); // @Transactional 전파 안됨!
}
});
}
문제: .whenComplete() 콜백은 다른 스레드 풀에서 실행되어 트랜잭션 컨텍스트가 없다.
해결: 동기 전송으로 변경 ✅
// ✅ 안전한 동기 처리 (실제 구현)
@SuppressWarnings("unchecked")
@Transactional
protected void sendOnce(OutboxEvent event) {
event.markAsSending();
try {
// .get()으로 동기 대기 - 트랜잭션 안에서 상태 관리
SendResult<String, Object> result = (SendResult<String, Object>) kafkaTemplate
.send(event.getTopic(), event.getEventKey(), event.toGeneralEnvelopeEvent()).get();
event.markAsPublished();
outboxEventRepository.save(event);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // 인터럽트 복구
event.markAsFailed(ie.getMessage(), retryPolicy);
outboxEventRepository.save(event);
} catch (ExecutionException ee) {
log.error("Failed to send event: {}", event.getMessageId(), ee);
event.markAsFailed(ee.getMessage(), retryPolicy);
outboxEventRepository.save(event);
}
}
트레이드오프 표:
방식 장점 단점 선택
비동기 + 콜백 | TPS 높음 | 트랜잭션 경계 불명확 | ❌ |
동기 .get() | 상태관리 명확 | TPS 손실 | ✅ |
결정 근거: 초기에는 안전성이 성능보다 우선
Consumer: 멱등성이 생명 🔒
Producer에서 중복을 막아도, Consumer는 또 다른 차원의 중복을 고려해야 한다.
멱등성 테이블 설계
@Entity
@Table(name = "processed_events")
public class ProcessedEvent extends BaseEntity {
@Column(name = "message_id", length = 128, unique = true, nullable = false)
private String messageId; // 멱등성 키
@Enumerated(EnumType.STRING)
@Column(name = "status", nullable = false)
private Status status = Status.PROCESSING; // 기본은 선점 중
@Column(name = "event_type", length = 100, nullable = false)
private String eventType;
@Column(name = "correlation_id", length = 128)
private String correlationId;
@Column(name = "started_at", nullable = false)
private ZonedDateTime startedAt = ZonedDateTime.now();
@Column(name = "processed_at")
private ZonedDateTime processedAt;
public enum Status { PROCESSING, PROCESSED, FAILED }
}
Consumer 멱등성 처리 실제 구현
@KafkaListener(topics = "order-events.v1", groupId = "order-consumer")
public void handleOrderEvents(GeneralEnvelopeEvent envelope,
Acknowledgment ack,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset) {
log.info("Order event received - messageId: {}, type: {}, offset: {}:{}",
envelope.messageId(), envelope.type(), partition, offset);
try {
// 주문 이벤트는 중요하므로 적극적인 재시도 정책 적용
retryableEventProcessor.processWithRetry(
envelope, topic, partition, offset, "order-consumer", RetryPolicy.AGGRESSIVE);
ack.acknowledge();
log.debug("Order event acknowledged - messageId: {}", envelope.messageId());
} catch (Exception e) {
log.error("Critical: Failed to process order event after all retries - messageId: {}. " +
"Event has been sent to Dead Letter Table.", envelope.messageId(), e);
// 재시도 로직에서 이미 DLT로 보냈으므로 ACK 처리
ack.acknowledge();
}
}
핵심 설정들과 이유
Producer 필수 설정
spring:
kafka:
producer:
bootstrap-servers: ${KAFKA_BROKERS:localhost:9092}
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
acks: all # 모든 ISR 확인
enable-idempotence: true # Producer 레벨 중복 방지
retries: 2147483647 # 거의 무한 재시도
properties:
max.in.flight.requests.per.connection: 5 # 성능 우선
compression.type: snappy # 네트워크 대역폭 절약
linger.ms: 10 # 배치 최적화
batch.size: 16384 # 적당한 배치 크기
spring.json.add.type.headers: false # 타입 헤더 생략
생각해볼 점: "max.in.flight=5가 너무 성능 우선적이지 않나?"
답: 순서가 중요한 토픽만 1로 설정. 순서 무관하면 5 (기본값) 사용해서 성능 확보.
Consumer 필수 설정
spring:
kafka:
consumer:
group-id: loopers-default-consumer
key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
properties:
spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
spring.json.trusted.packages: "*" # 역직렬화 허용 패키지
spring.json.value.default.type: com.fasterxml.jackson.databind.JsonNode
enable.auto.commit: false # Manual ACK
auto.offset.reset: latest # 새 메시지부터
listener:
ack-mode: manual # 처리 완료 후 수동 ACK
Dead Letter Table: 실패의 완전한 추적
고민: "DLQ는 Topic으로 해야 하나요, Table로 해야 하나요?"
답: 초기엔 Table이 관리하기 쉽다. 이유:
- SQL로 쉽게 조회/분석 가능
- 상태 관리 (DEAD → INVESTIGATING → RESOLVED)
- 운영팀이 익숙한 도구 (DB Admin)
@Entity
public class DeadLetterEvent extends BaseEntity {
private String messageId;
private String originalTopic;
private Integer partitionId;
private Long offsetId;
private String consumerGroup;
@Column(columnDefinition = "JSON")
private String payload;
@Enumerated(EnumType.STRING)
private FailureReason failureReason;
private String errorMessage;
private String stackTrace;
private Integer retryAttempts;
public enum FailureReason {
DESERIALIZATION_ERROR, // JSON 파싱 실패
HANDLER_EXCEPTION, // 비즈니스 로직 오류
TIMEOUT, // 처리 시간 초과
RESOURCE_UNAVAILABLE // DB/API 접근 불가
}
}
아직 남은 과제들
1. 배치 처리 도입
현재 한계: 메시지 1개씩 처리로 DB 커넥션 낭비
// 지금: 100개 메시지 = 100번 DB 호출
for (GeneralEnvelopeEvent event : events) {
auditLogService.saveAuditLog(event); // 각각 DB 호출
}
// 목표: 100개 메시지 = 1번 배치 처리
auditLogService.saveBatch(events); // 한 번에 처리
고려사항: 일부 실패 시 전체 롤백 vs 부분 처리
2. Hot Key 대응
잠재적 문제: 인기 상품에 이벤트 몰릴 경우
productId=12345 (인기상품)에 이벤트 몰림
→ 특정 파티션에만 트래픽 집중
→ 해당 Consumer만 과부하
대응 방안: 샤딩 키 전략 재설계 or 파티션 증설
3. 스키마 진화 대응
현재: JSON 직렬화로 스키마 변화에 취약
향후: Avro + Schema Registry 고려
4. 모니터링 개선
추가 필요한 지표:
- 토픽별 처리량/지연시간 추이
- DLT 누적 패턴 분석
- Consumer Group별 성능 비교
- 파티션간 처리량 불균형 모니터링
1주간 Kafka 파이프라인 구현 여정
Week 1 Day 1-2: "Kafka 설정만 하면 되겠지" → 직접 발행 시도 → 트랜잭션 경계 문제 발견
Week 1 Day 3-4: Outbox 패턴 도입 → 동시성 지옥 경험 → 원자적 선점으로 해결
Week 1 Day 5-7: Consumer 멱등성 + DLT 구현 → 모니터링까지 완성
짧은 기간이었지만 가장 중요한 깨달음들:
- At-least-once가 현실이다: Producer든 Consumer든 중복을 전제로 설계
- 동시성은 항상 고려하라: 여러 인스턴스 환경에서는 모든 게 경합 상황
- 실패를 설계하라: DLT, 재시도, 모니터링까지 포함해야 완성
- 안전성 확보 후 성능 최적화: 빠르게 만들어서 버그 투성이가 되느니 느리더라도 안전하게
마무리
Kafka는 도구일 뿐이다. 중요한 건 안전하고 추적 가능한 이벤트 전파 체계를 만드는 것이다. 이번 1주 경험으로 분산 시스템 설계의 기본기를 탄탄히 다질 수 있었다.
"같은 주문이 두 번 결제되는 일은 이제 없다. 하지만 언제나 새로운 문제가 기다리고 있다는 것도 알고 있다."
다이어그램 모음 📊
전체 아키텍처 플로우 🏗️
'Loopers' 카테고리의 다른 글
🔥 "랭킹 좀 만들어달라는데 Redis 메모리가..." - ZSET 랭킹 시스템 구축기 (1) | 2025.09.11 |
---|---|
🚨 “그냥 @EventListener면 끝?” — 이벤트, 언제·왜·어떻게 사용할 것인가 ⚙️ (3) | 2025.08.29 |
Resilience와 보상 트랜잭션: 장애에 대응하는 방법 (1) | 2025.08.24 |
PG가 터져도 우리 서비스는 멀쩡해야 한다 🔥 (3) | 2025.08.22 |
인덱스를 걸었는데… 더 느려졌다? 🤯 (4) | 2025.08.15 |