Kafka

Kafka Consumer DLT(Dead Letter Topic) 전략

PI.314 2023. 12. 11. 19:18
분산 시스템과 메시지 큐를 다루는 과정에서 메시지 처리 실패는 흔한 일입니다. 이러한 실패를 효과적으로 관리하기 위해 Dead Letter Topic(DLT)과 재시도 전략이 필수적입니다. 해당 포스팅에서는 DLT의 개념을 설명하고, Kafka를 사용하는 구체적인 예시를 통해 재시도 전략을 설계 및 구현하는 방법을 알아보겠습니다.

 

Dead Letter Topic(DLT)이란?

DLT는 메시지 처리에 실패했을 때 해당 메시지를 저장하는 Kafka 토픽입니다. 재시도 과정에서 여전히 실패하는 메시지를 위한 '마지막 피난처'로 사용됩니다. DLT의 주요 목적은 다음과 같습니다.

  • 처리 실패 메시지의 안전한 저장
  • 실패 원인 분석 및 디버깅 용이성 향상

재시도 전략의 필요성

메시지 처리 중 잠깐의 네트워크 지연이나 일시적인 오류로 인해 실패할 수 있습니다. 이러한 경우, 메시지를 바로 DLT로 보내기보다는 몇 차례 재시도를 하는 것이 바람직합니다. 재시도 전략은 다음과 같은 이점을 제공합니다

  • 일시적 오류의 경우 자동 복구
  • 데이터 손실 최소화

 

그러면 이제 DLT 구현 샘플 예제를 한번 살펴보겠습니다.


Retry 전략 설계 및 구현 샘플 예제

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@RetryableTopic(
        attempts = "5",
        backoff = @Backoff(delay = 5000, multiplier = 2.0),
        dltStrategy = DltStrategy.FAIL_ON_ERROR,
        dltTopicSuffix = ".dlt",
        retryTopicSuffix = ".retry",
        exclude = {
                NonRetryableException.class
        })
@KafkaListener(
        topics = "sample.topic",
        groupId = "sample.topic.v1",
        containerFactory = "concurrentFactory"
)
 
public void consume(ConsumerRecord<StringString> record) {
    try {
        log.info("Received message: {}", record.value());
    } catch (RetryableException e) {
        log.error("Retryable error processing message: {}", e.getMessage());
        throw e;
    } catch (NonRetryableException e) {
        log.error("Non-retryable error processing message: {}", e.getMessage());
        throw e;
    }
}
cs

 

@RetryableTopic을 통해 재시도 전략을 구성합니다. 이 설정에는 최대 시도 횟수(attempts), 지연 시간(delay), 지연 시간 증가율(multiplier), DLT 전략(dltStrategy), DLT 및 재시도 토픽 접미사(dltTopicSuffix, retryTopicSuffix)가 포함됩니다.

 

consume 메서드는 Kafka로부터 메시지를 수신하여 처리합니다. 이 과정에서 재시도 가능한 예외(RetryableException)와 재시도 불가능한 예외(NonRetryableException)를 구분하여 처리합니다.


 

DLT 구현 샘플 예제

 

1
2
3
4
5
6
7
8
9
10
11
12
13
@KafkaListener(
        topics = "sample.topic.dlt",
        groupId = "sample.topic.dlt.v1"
)
public void processDltMessages(
        ConsumerRecord<StringString> record,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partitionId,
        @Header(KafkaHeaders.OFFSET) Long offset,
        @Header(KafkaHeaders.EXCEPTION_MESSAGE) String errorMessage
) {
    log.info("key: {}, value: {}, topic : {}, partition : {}, offset : {}, errorMessage : {}", record.key(), record.value(), topic, partitionId, offset, errorMessage);
}
cs

 

샘플코드를 보면, processDltMessages 메서드는 DLT로 전송된 메시지를 처리하게 됩니다.

 

Kafka의 DLT는 메시지 처리 과정에서 발생하는 오류를 효과적으로 관리하기 위한 메커니즘으로 작용합니다. 처리 과정에서 재시도 전략에도 불구하고 실패한 메시지들은 DLT로 이동되어, 시스템의 주요 메시지 흐름에서 분리됩니다. 이는 전체 시스템의 안정성을 유지하고, 메인 데이터 스트림의 방해를 최소화하는 데 중요합니다.


DLT로 이동된 메시지들은 오류의 원인을 분석하고, 필요한 코드 수정 및 문제를 해결하는 데 사용됩니다.

저는 이 과정에서 개발자가 Kafka 리스너를 동적으로 제어할 수 있도록 구현했고, 개발자나 운영자는 DLT에 적재된 메시지를 분석한 후, 문제를 해결하고 Kafka 리스너의 Start/Stop을 제어함으로써, 수정 사항이 시스템에 올바르게 반영되었는지 확인할 수 있습니다. 

 

이 부분에 대해서 예제를 한번 살펴보겠습니다.

 

Kafka Listener (DLT) 동적 제어 관련 샘플 예제

 

1. Kafka Listener 제어 관련 API 구현

 

먼저 외부에서 HTTP 요청을 통해 Kafka 리스너를 제어하도록 하는 API 구현이 필요합니다.

다음은 Kafka 토픽에 메시지를 보내는 요청을 처리하여 리스너의 상태(시작/중지)를 변경하는 명령을 Kafka Topic에 전달하는 과정입니다. (분산 시스템 환경에서는 여러 서버 또는 인스턴스가 동시에 운영되기 때문에 Kafka 토픽을 통해 리스너를 제어하는 방식으로 구현)

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Slf4j
@RestController
@RequestMapping("/v1/kafka")
@RequiredArgsConstructor
public class KafkaControlController {
 
    private final KafkaControlService kafkaControlService;
 
    @PostMapping("/control")
    public ResponseEntity<?> control(@RequestBody Map<String, Object> request) {
        try {
            kafkaControlService.control("kafka.control", request);
            return ResponseEntity.ok().body("Message sent successfully");
        } catch (Exception e) {
            log.error("Error sending Kafka message: ", e);
            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Error sending message");
        }
    }
}
cs

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaControlService {
 
    private final KafkaProducerService kafkaProducerService;
    private final ObjectMapper objectMapper;
 
    public void control(String topic, Map<String, Object> event) {
        List<PartitionInfo> partitions = kafkaProducerService.partitions(topic);
        for (PartitionInfo partitionInfo : partitions) {
            try {
                String eventString = objectMapper.writeValueAsString(event);
                kafkaProducerService.send(new ProducerRecord<>(topic, partitionInfo.partition(), null, eventString));
                log.info("Message sent to partition {}: {}", partitionInfo.partition(), eventString);
            } catch (JsonProcessingException e) {
                log.error("Error processing JSON: ", e);
                throw new RuntimeException("Error processing JSON", e);
            }
        }
    }
}
 
cs

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaProducerService {
 
    private final KafkaTemplate<StringString> kafkaTemplate;
 
    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message).addCallback(
            result -> log.info("Message sent to topic {}: {}", topic, message),
            ex -> log.error("Error sending message to topic {}: {}", topic, ex.getMessage())
        );
    }
 
    public void send(ProducerRecord<StringString> record) {
        kafkaTemplate.send(record).addCallback(
            result -> log.info("Message sent to topic-partition {}:{}", record.topic(), record.partition()),
            ex -> log.error("Error sending message to topic-partition {}:{}", record.topic(), ex.getMessage())
        );
    }
 
    public List<PartitionInfo> partitions(String topic) {
        return kafkaTemplate.partitionsFor(topic);
    }
}
 
cs

 

2. KafkaListenerEndpointRegistry를 활용한 Kafka Listener 제어 기능 구현

 

이번에는 kafka.control 토픽로부터 소비한 메시지를 통해 특정 리스너의 상태(시작 또는 중지)를 변경하는 예제를 구현하겠습니다. (분산 환경에서 Kafka 리스너를 동적으로 제어하기 위함)

 

참고로 KafkaListenerEndpointRegistry는 Spring Kafka에서 Kafka 리스너 컨테이너들을 관리하는 역할을 하는 클래스입니다. 이 클래스를 사용함으로써, 개발자는 프로그래밍 방식으로 Kafka 메시지 리스너의 등록, 조회, 그리고 실행 상태를 제어할 수 있습니다.

 

1
2
3
4
5
6
7
    @KafkaListener(topics = "kafka.control", groupId = "control-group")
    public void listen(ConsumerRecord<StringString> record) {
        String listenerId = record.key(); 
        boolean active = Boolean.parseBoolean(record.value()); 
 
        kafkaListenerEndpoint.startOrStopListener(listenerId, active);
    }
cs

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Component
@RequiredArgsConstructor
@Slf4j
public class KafkaListenerEndpoint {
 
    private final KafkaListenerEndpointRegistry registry;
 
 
    public void startOrStopListener(String listenerId, boolean active) {
        MessageListenerContainer listenerContainer = registry.getListenerContainer(listenerId);
        if (active) {
            listenerContainer.start();
            return;
        }
        listenerContainer.stop();
    }
}
 
cs

 

이렇게 DLT & 동적 제어 메카니즘을 구축하게 되면 시스템의 전반적인 관리와 유지보수를 용이하게 하며, 복잡한 분산 환경에서의 운영상의 편의성을 제공합니다. 오류 발생 시 적절한 대응과 빠른 문제 해결을 통해, 시스템의 안정성과 신뢰성을 보장하는 데 필수적인 기능입니다.

 


DLT와 재시도 전략은 Kafka를 사용하는 분산 시스템에서 메시지 처리의 안정성을 보장하는 중요한 요소입니다. 예시를 통해 이러한 전략을 어떻게 구현할 수 있는지 알아보았습니다. 이러한 접근 방식은 메시지 처리 실패를 효과적으로 관리하고 시스템의 전반적인 신뢰성을 높이는 데 도움이 됩니다.