Kafka Consumer를 운영하는 서비스에서 메시지를 개별적으로 처리하면, 각 메시지를 가져오고 처리하는데 필요한 네트워크 오버헤드와 CPU 사용량이 늘어난다.
특히, 각 메시지에 대해 별도의 트랜잭션을 시작하고 커밋하는 것은 비용이 많이 든다. DB와의 네트워크 통신, 트랜잭션 로그의 쓰기, 디스크 I/O 등에 의한 오버헤드가 각 메시지마다 발생하므로 메시지 처리 성능이 저하될 수 있다. 뿐 만 아니라 각 메시지 처리에 대해 별도 DB Connection Pool이 고갈되어, 새로운 트랜잭션을 생성할 수 없게된다.
이러한 경우 배치 처리를 사용하면 여러 메시지를 하나의 트랜잭션으로 쉽게 묶어 처리할 수 있다.
그럼 Batch Listener를 구현하는 방법에 대해 알아보자.
1. Kafka Listener 구현
@KafkaListener(topics = {"topic"}, containerFactory = "batchKafkaListenerContainerFactory")
public void consume(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
// 로직 수행
handler.process();
// Commit
ack.acknowledge();
}
자동/수동 커밋은 상황에 맞게 사용하면 된다.
2. Container Factory 설정
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> batchKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
var containerProperties = factory.getContainerProperties();
containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
// Auto Commit일 경우
// containerProperties.setAckMode(ContainerProperties.AckMode.BATCH);
factory.getContainerProperties().setIdleBetweenPolls(60000);
factory.setConsumerFactory(batchConsumerFactory());
factory.setBatchListener(true);
return factory;
}
- setIdleBetweenPolls: 두 poll 사이의 최대 대기 시간을 설정
- setBatchListener: Listener가 배치 모드로 동작 (여러 메시지를 한 번에 가져와 처리)
setIdleBetweenPolls를 설정하지 않으면, Consumer가 바로 poll을 수행하기 때문에 상황에 따라 batch가 의미가 없어질 수 도 있다. setIdleBetweenPolls를 설정하면, poll을 수행하기 전에 idle time 동안 메시지를 쌓고 난후에 batch 처리할 수 있다.
@Bean
public ConsumerFactory<String, String> batchConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 60000);
// Auto Commit일 경우
// props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000);
return new DefaultKafkaConsumerFactory<>(props);
}
- MAX_POLL_RECORDS_CONFIG: 한 번의 poll에 의해 반환될 수 있는 최대 레코드 수를 지정
- MAX_POLL_INTERVAL_MS_CONFIG: Consumer가 Broker에게 정상적으로 응답하기 위한 최대 시간을 지정
- FETCH_MAX_WAIT_MS_CONFIG: Broker가 요청에 응답하기 전에 대기하는 최대 시간을 지정
batch 처리의 효과를 높이기 위해서는 MAX_POLL_RECORDS_CONFIG를 통해 반환될 수 있는 최대 레코드 수를 적절하게 지정해야 한다. 각 비즈니스 환경에 따라 다르기 때문에 상황을 고려해서 적절하게 설정하면 된다.
FETCH_MAX_WAIT_MS_CONFIG는 메세지가 없을 때 대기하는 최대시간을 설정할 수 있다. 이 때 주의해야할 것은 FETCH_MAX_WAIT_MS_CONFIG 값은 MAX_POLL_INTERVAL_MS_CONFIG 값보다 작아야 된다. MAX_POLL_INTERVAL_MS_CONFIG은 Consumer가 Brokerd에게 정상적으로 응답하기 위한 최대 시간을 설정하는 값이기 때문에 FETCH_MAX_WAIT_MS_CONFIG값이 더 크면 Timeout이 발생하여 Broker가 비정상으로 인지하여 Rebalancing이 일어날 수 있다.
3. 수행 결과
Kafka Batch Listener를 통해 한 번에 여러 메시지를 batch 처리하고, bulk Insert까지 잘 수행 된 것을 볼 수 있다.
'Kafka' 카테고리의 다른 글
Kafka Consumer DLT(Dead Letter Topic) 전략 (0) | 2023.12.11 |
---|---|
Kafka Consumer 관련 Trouble Shooting 정리 (0) | 2022.08.19 |