Kafka

Kafka Batch Listener를 활용하여 성능 개선하기

PI.314 2023. 6. 18. 20:58

Kafka Consumer를 운영하는 서비스에서 메시지를 개별적으로 처리하면, 각 메시지를 가져오고 처리하는데 필요한 네트워크 오버헤드와 CPU 사용량이 늘어난다.

 

특히, 각 메시지에 대해 별도의 트랜잭션을 시작하고 커밋하는 것은 비용이 많이 든다. DB와의 네트워크 통신, 트랜잭션 로그의 쓰기, 디스크 I/O 등에 의한 오버헤드가 각 메시지마다 발생하므로 메시지 처리 성능이 저하될 수 있다. 뿐 만 아니라 각 메시지 처리에 대해 별도 DB Connection Pool이 고갈되어, 새로운 트랜잭션을 생성할 수 없게된다.

 

이러한 경우 배치 처리를 사용하면 여러 메시지를 하나의 트랜잭션으로 쉽게 묶어 처리할 수 있다.

 

그럼 Batch Listener를 구현하는 방법에 대해 알아보자.

 

1.  Kafka Listener 구현

    @KafkaListener(topics = {"topic"}, containerFactory = "batchKafkaListenerContainerFactory")
    public void consume(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {	
        
        // 로직 수행
        handler.process();
        
        // Commit
        ack.acknowledge();
    }

자동/수동 커밋은 상황에 맞게 사용하면 된다.

 

2. Container Factory 설정 

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> batchKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        var containerProperties = factory.getContainerProperties();
        containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        // Auto Commit일 경우
        // containerProperties.setAckMode(ContainerProperties.AckMode.BATCH);
        factory.getContainerProperties().setIdleBetweenPolls(60000);
        factory.setConsumerFactory(batchConsumerFactory());
        factory.setBatchListener(true);
        return factory;
    }
  • setIdleBetweenPolls: 두 poll 사이의 최대 대기 시간을 설정
  • setBatchListener: Listener가 배치 모드로 동작 (여러 메시지를 한 번에 가져와 처리)

setIdleBetweenPolls를 설정하지 않으면, Consumer가 바로 poll을 수행하기 때문에 상황에 따라 batch가 의미가 없어질 수 도 있다. setIdleBetweenPolls를 설정하면, poll을 수행하기 전에 idle time 동안 메시지를 쌓고 난후에 batch 처리할 수 있다.

    @Bean
    public ConsumerFactory<String, String> batchConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 60000);
        // Auto Commit일 경우
        // props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        // props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000);

        return new DefaultKafkaConsumerFactory<>(props);
    }
  • MAX_POLL_RECORDS_CONFIG: 한 번의 poll에 의해 반환될 수 있는 최대 레코드 수를 지정
  • MAX_POLL_INTERVAL_MS_CONFIG: Consumer가 Broker에게 정상적으로 응답하기 위한 최대 시간을 지정
  • FETCH_MAX_WAIT_MS_CONFIG: Broker가 요청에 응답하기 전에 대기하는 최대 시간을 지정

batch 처리의 효과를 높이기 위해서는 MAX_POLL_RECORDS_CONFIG를 통해 반환될 수 있는 최대 레코드 수를 적절하게 지정해야 한다. 각 비즈니스 환경에 따라 다르기 때문에 상황을 고려해서 적절하게 설정하면 된다. 

FETCH_MAX_WAIT_MS_CONFIG는 메세지가 없을 때 대기하는 최대시간을 설정할 수 있다. 이 때 주의해야할 것은 FETCH_MAX_WAIT_MS_CONFIG 값은 MAX_POLL_INTERVAL_MS_CONFIG 값보다 작아야 된다. MAX_POLL_INTERVAL_MS_CONFIG은 Consumer가 Brokerd에게 정상적으로 응답하기 위한 최대 시간을 설정하는 값이기 때문에 FETCH_MAX_WAIT_MS_CONFIG값이 더 크면 Timeout이 발생하여 Broker가 비정상으로 인지하여 Rebalancing이 일어날 수 있다.

 

3. 수행 결과

Kafka Batch Listener를 통해 한 번에 여러 메시지를 batch 처리하고,  bulk Insert까지 잘 수행 된 것을 볼 수 있다.

'Kafka' 카테고리의 다른 글

Kafka Consumer DLT(Dead Letter Topic) 전략  (0) 2023.12.11
Kafka Consumer 관련 Trouble Shooting 정리  (0) 2022.08.19