Spring

카프카 컨슈머 그룹을 추가한 시점 이후의 메시지만 받기

미스터머글 2022. 10. 7. 18:23
728x90
반응형

.

2022.10.12, 13:56

잘못 작성된 내용이 있어서 수정했습니다.

 

 

개발환경과 요구사항

본 내용은 스프링부트 + 카프카 환경에서 아래의 의존성과 카프카 세팅을 참고하여 읽기를 권한다. 개발 도구는 맥북 + 인텔리제이이다.

// build.gradle
plugins {
	id 'org.springframework.boot' version '2.7.2'
	id 'io.spring.dependency-management' version '1.0.12.RELEASE'
	id 'java'
}

dependencies {
	// 일부 생략
    implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
	implementation 'org.springframework.boot:spring-boot-starter-web'
	implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka'
}

dependencyManagement {
	imports {
		mavenBom "org.springframework.cloud:spring-cloud-dependencies:2021.0.3"
	}
}
// application.yml
spring:
  cloud:
    stream:
      defaultBinder: kafka
      kafka:
        binder:
          minPartitionCount: 2
          autoAddPartitions: true
          brokers: kafka-event-dev-0.internal.sample.io:9092,kafka-event-dev-1.internal.sample.io:9092
          producer-properties:
            key.serializer: org.apache.kafka.common.serialization.StringSerializer
          consumer-properties:
            key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
          replication-factor: 1

 

이제 다음과 같은 가정과 요구사항이 생긴다.

  • 기존에 외부에서 발행되고 있는 user-registered-event를 수신하고 싶다. 
  • consumer를 추가하려면 새로운 group도 추가해야 한다. (아래 그림의 빨간색 화살표)
  • 그런데 이렇게 추가를 해보니 이전에 이미 발행된 메시지를 전부 가지고 온다.
  • 새로운 컨슈머는 추가된 시점 이후의 메시지만 가져오고 싶다.

 

 

 

 

 

tl;dr

바쁜 시대를 사는 당신을 위해...

  • yml에 auto.offset.reset을 latest로 설정해준다.
  • 카프카는 auto.offset.reset의 default값이 latest이지만 group을 지정하지 않는 경우 default가 earliest가 된다.
# application.yml
spring:
  cloud:
    stream:
      defaultBinder: kafka
      kafka:
        binder:
          minPartitionCount: 2
          autoAddPartitions: true
          brokers: kafka-event-dev-0.internal.sample.io:9092,kafka-event-dev-1.internal.sample.io:9092
          producer-properties:
            key.serializer: org.apache.kafka.common.serialization.StringSerializer
          consumer-properties:
            key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
            auto.offset.reset: latest # 추가한 시점 이후의 메시지만 받도록 한다
          replication-factor: 1
      bindings:
        user-registered-event-in-0:
          destination: my-sample.registered-event
          content-type: application/json
          group: sample-group
    function:
      definition: user-registered-event

 

 

 

과정 : auto.offset.reset

요구사항을 해결하기 위해 많은 자료를 찾았보니 auto.offset.reset 옵션에 대한 설명이 가장 많았다. 설명은 아래와 같다.

 

 

consumer의 offset 정보가 존재하지 않는다면 auto.offset.reset의 default값(latest)이나 또는 설정한 값을 따라간다.

 

 

딱 요구사항에 필요한 내용이다. 그런데 문제는 옵션값이다. 다음 세가지 옵션을 지정할 수 있다.

  • latest : 마지막으로 구독한 다음 메세지 부터 구독한다. (가장 최신)
  • earliest : 처음부터 메세지를 구독한다. (가장 오래된)
  • none : 구독하고자 하는 topic의 offset 정보가 없으면 exception을 발생한다.

 

설명에서는 default가 latest라고 했다. 그러면 마지막으로 구독한 다음 메시지부터 구독한다는게, 새로운 그룹의 경우에는 구독한게 없으니 처음부터란 것일까? 개발문서를 살펴보았다.

 

  • latest: automatically reset the offset to the latest offset (가장 최신의 offset값으로 자동 설정)

 

ㅓㅜㅑ. 더 헷갈린다. 그런데 여기서 더 혼란스럽게 만드는 것이 있다. 개발중인 서비스를 로컬에서 띄웠을 때, 수많은 로그 속에서 Consumer 설정을 확인할 수 있는 ConsumerConfig를 확인할 수 있다. 어라? auto.offset.reset이 latest가 아닌 earliest로 되어있다.

 

 

# 애플리케이션 띄우고 초기화 과정의 Console log
# 본 포스팅에서 직접적인 관련없는 내용은 생략

2022-10-07 14:22:53.368  INFO 14131 --- [  restartedMain] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
	auto.commit.interval.ms = 100
	auto.offset.reset = earliest
	client.id = consumer-user-registered-event-1

2022-10-07 14:22:53.512  INFO 14131 --- [  restartedMain] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
	auto.commit.interval.ms = 100
	auto.offset.reset = earliest
	client.id = consumer-user-registered-event-2
    

2022-10-07 14:23:03.736  INFO 14131 --- [pool-2-thread-1] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
	auto.commit.interval.ms = 5000
	auto.offset.reset = latest
	client.id = consumer-user-registered-event-7

 

의문점은 3가지.

 

  1. 어째서 Config 정보를 3회 보여주는가
  2. 첫번째와 두번째 config 정보에는 auto.offset.reset이 earliest인가
  3. 세번째는 왜 첫번째와 두번째와 다른가

 

 

 

과정 : earliest? (도움안됨 주의)

주의! 본 파트는 도움이 안되는 내용입니다. 제 착각으로 작성된 내용으로 혹시나 급하신 분은 다음 파트를 읽어주세요.

먼저 두번째 의문부터 해결해보자. 처음에는 ConsumerConfig 소스 코드를 보았다.

package org.apache.kafka.clients.consumer;

public class ConsumerConfig extends AbstractConfig {
    private static final ConfigDef CONFIG;
    
    // 생략
    
    public static final String AUTO_OFFSET_RESET_CONFIG = "auto.offset.reset";
    
}

 

이제는 앞서 보여준 로그보다 앞선 로그를 확인해보았다. 아래 로그는 Kafka가 처음 바인더를 생성하는 시점을 찍은 것이다. 집중한 부분은 DefaultBinderFactory이다. 

 

# Log
2022-10-07 14:22:52.619  INFO 14131 --- [  restartedMain] o.s.c.s.binder.DefaultBinderFactory      : Creating binder: kafka
2022-10-07 14:22:52.787  INFO 14131 --- [  restartedMain] o.s.c.s.binder.DefaultBinderFactory      : Caching the binder: kafka
2022-10-07 14:22:52.787  INFO 14131 --- [  restartedMain] o.s.c.s.binder.DefaultBinderFactory      : Retrieving cached binder: kafka
package org.springframework.cloud.stream.binder;

// import 생략

public class DefaultBinderFactory 
    implements BinderFactory, DisposableBean, ApplicationContextAware {
    
    // 생략
    
}

 

ConsumerConfig와 DefaultBinderFactory. 몇줄 안되는 위 코드에서 의문점을 해소할 수 있을 것이라는 기대를 했다. 어느 포인트에서 그런 생각을 했을까? 힌트는 이 포스팅의 카테고리.

 

 

// auto.offset.reset 정보가 있는 ConsumerConfig 패키지
package org.apache.kafka.clients.consumer;

// Kafka binder를 생성하는 DefaultBinderFactory 패키지
package org.springframework.cloud.stream.binder;

 

바로 패키지명. 한쪽은 카프카, 다른 한쪽은 스프링 프레임워크를 가리키고 있다. 스프링 프레임워크에서는 어플리케이션에 메시지 발행자와 소비자를 쉽게 구축할 수 있는 어노테이션 기반의 프레임워크를 제공한다. 그것이 위 코드의 두번째 패키지에 보이는 Spring cloud stream이다. 뭐가 익숙하다면 앞의 내용을 잘 읽었다는 뜻. 가장 위쪽에서 개발환경쪽을 다시 살펴보면 눈에 띄는게 있다.

 

implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka'

 

애초에 스프링 클라우드 스트림을 카프카에 맞게 사용하기 위해 위의 의존성을 추가했었다. 추가된 라이브러리 목록을 보면 더욱 확실해진다. 이 서비스는 날것(?)의 카프카를 그대로 사용하는게 아닌 스프링 프레임워크에 맞게 사용하고 있음을 확인할 수 있다. 

 

// build.gradle
Gradle: org.springframework.cloud:spring-cloud-starter-stream-kafka:3.2.4
Gradle: org.springframework.cloud:spring-cloud-stream:3.2.4
Gradle: org.springframework.cloud:spring-cloud-stream-binder-kafka:3.2.4
Gradle: org.springframework.cloud:spring-cloud-stream-binder-kafka-core:3.2.4

 

게다가 DefaultBinderFactory로부터는 아파치 카프카 라이브러리쪽과 어떤 접점도 찾을 수 없었다. 어디까지나 여기서는 기본 바인더를 kafka로 세팅하는게 목적이기 때문이다. 그런데 기본 바인더 지정은 어떻게 했을까? 다시 application.yml을 보자.

 

  cloud:
    stream:
      defaultBinder: kafka
      kafka:
        binder:

 

 

있다. 기본 바인더 지정하는 부분이. 만약 여러분이 인텔리J를 사용하고 있다면 3번째 줄의 kafka의 출처(cmd + 클릭)를 확인할 수 있다. 카프카 바인더를 설정하는 위치를 알려준다. KafkaBinderConfiguration 코드를 살펴보자.

 

kafka:\
org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration
package org.springframework.boot.autoconfigure.kafka;

@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingBean(Binder.class)
@Import({ KafkaAutoConfiguration.class, KafkaBinderHealthIndicatorConfiguration.class })
@EnableConfigurationProperties({ KafkaExtendedBindingProperties.class })
public class KafkaBinderConfiguration {
    // 생략 
}

 

Import에 보면 KafkaAutoConfiguration이라는 것이 보인다. 서비스를 올리면 자동으로 설정하는 부분일 것이라는 유추를 해보았다. 해당 소스를 열어보면 KafkaStreamsAnnotationDrivenConfiguration이 보인다. 

 

package org.springframework.boot.autoconfigure.kafka;

@AutoConfiguration
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import({ KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class })
public class KafkaAutoConfiguration {
    // 생략
}

 

카프카 자체만으로 스트림 프로세싱을 만들기는 쉽지 않을 것이다. 그래서 아파치(카프카를 만든 재단)에서는 카프카에 저장된 데이터를 처리하고 분석하기 위해 개발된 클라이언트 라이브러리인 Kafka Streams를 제공하고 있다. 스프링 프레임워크가 카프카를 세팅할 때 스트림 프로세싱을 쉽게 처리하기 위해 KafkaStreamsAnnotationDrivenConfiguration을 import한게 아닐까라는 추측해볼 수 있다. 소스를 보자.

 

package org.springframework.boot.autoconfigure.kafka;

// 일부 생략
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(StreamsBuilder.class)
@ConditionalOnBean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME)
class KafkaStreamsAnnotationDrivenConfiguration {
    // 생략
}

 

웰컴. 실마리가 보인다. 카프카 스트림즈가 import되어 있다. 스트림즈 API를 세팅하는 부분을 보자.

 

package org.apache.kafka.streams;

public class StreamsConfig extends AbstractConfig {
    // 생략

    private static final Map<String, Object> CONSUMER_DEFAULT_OVERRIDES;
    static {
        final Map<String, Object> tempConsumerDefaultOverrides = new HashMap<>();
        tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");
        tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        tempConsumerDefaultOverrides.put("internal.leave.group.on.close", false);
        CONSUMER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempConsumerDefaultOverrides);
    }
    
    // 생략
}

 

이때 좋아할게 아니었다


자축하자. 드디어 찾았다. CONSUMER_DEFAULT_OVERRIDES 상수에 세팅된 값에는 AUTO_OFFSET_RESET_CONFIG가 earliest로 되어있다. 즉, 카프카 스트림즈를 이용하게 되었을 때 auto.offset.reset의 기본값은 latest가 아닌 earliest라는 것을 알게되었다. Kafka streams를 바탕으로 구글에 검색을 해보면 개발문서에서도 아래 내용을 확인할 수 있다.

 

 

Parameter Name  Corresponding Client Streams Default
auto.offset.reset Consumer earliest

 

 

뿐만아니라 Kafka streams 개발문서에는 APPLICATION RESET TOOL이라는 가이드도 있는데 여기에 아래 내용을 볼 수 있다.

// The tool accepts the following parameters:
--to-earliest    Reset offsets to earliest offset.

 

 

Only one of these scenarios can be defined. If not, 'to-earliest' will be executed by default
이러한 시나리오(입력 토픽에 대한 오프셋 재설정 시나리오) 중 하나만 정의할 수 있다. 그렇지 않은 경우 기본적으로 'to-earliest'가 실행됩니다.

 

 

두번째 의문이었던 earliest로 세팅된 이유는 바로 Kafka streams를 사용했기 때문이다.

 

... 그리고 10월 12일, "default값이 earliest?" 파트는 제 착각임을 확인했다. 증명방식이 꺼림찍하여 카프카 스트림즈를 사용하는 부분을 확인해보았다. 개발중인 일부 서비스에서는 카프카 스트림즈를 사용하고 있지도 라이브러리가 추가되어 있지도 않았다. 그럼에도 동일한 현상이 나타났다. 다시 처음부터 원인 파악을 해야한다.

 

 

 

 

과정 : consumer-properties

다시 처음부터 살펴보자. auto.offset.reset를 설정하는 부분은 application.yml이다. 이 파일을 다시 들여다보았다.

  cloud:
    stream:
      kafka:
        binder:
          minPartitionCount: 2
          autoAddPartitions: true
          brokers: kafka-event-dev-0.internal.smartfoodnet.io:9092,kafka-event-dev-1.internal.smartfoodnet.io:9092
          producer-properties:
            key.serializer: org.apache.kafka.common.serialization.StringSerializer
          consumer-properties:
            key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
          replication-factor: 1

 

auto.offset.reset는 consumer-properties 아래에 두어야 한다. 그럼 consumer-properties에서 default가 아닌 값을 세팅하는게 아닐까? consumer-properties가 정의된 부분을 (CMD + click으로) 찾아보았다. 

 

@ConfigurationProperties(prefix = "spring.cloud.stream.kafka.binder")
public class KafkaBinderConfigurationProperties {

	/**
	 * Arbitrary kafka consumer properties.
	 */
	private Map<String, String> consumerProperties = new HashMap<>();

	public void setConsumerProperties(Map<String, String> consumerProperties) {
		Assert.notNull(consumerProperties, "'consumerProperties' cannot be null");
		this.consumerProperties = consumerProperties;
	}

	/**
	 * Merge boot consumer properties, general properties from
	 * {@link #setConfiguration(Map)} that apply to consumers, properties from
	 * {@link #setConsumerProperties(Map)}, in that order.
	 * @return the merged properties.
	 */
	public Map<String, Object> mergedConsumerConfiguration() {
		// 일부 생략
        
		consumerConfiguration.putAll(this.consumerProperties);
		filterStreamManagedConfiguration(consumerConfiguration);
		// Override Spring Boot bootstrap server setting if left to default with the value
		// configured in the binder
		return getConfigurationWithBootstrapServer(consumerConfiguration,
				ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
	}

    // 이하 생략

 

KafkaBinderConfigurationProperties 클래스에서는 Consumer 뿐만아니라 Producer의 속성을 임의로 바꿀 수 있도록 기능을 제공하고 있다. 앞에서 언급한 consumer-properties를 추적하면 setConsumerProperties 메소드가 나온다. consumerProperties를 갱신하는 메소드로, consumerProperties는 사용자가 지정한 속성을 가지게 된다. consumerProperties는 mergedConsumerConfiguration 메소드에서 consumerConfiguration에 저장된다. mergedConsumerConfiguration 메소드는 스프링부트가 부팅, 즉 빌드될 때 Consumer 설정을 재정의하는 값을 반환한다. 이 때 반환되는 consumerConfiguration은 다음 순서로 병합(merged)된다. consumerConfiguration이 Map이기 때문에 마지막에 병합되는 속성이 최종값이된다.

 

  1. boot consumer properties
  2. general properties from setConfiguration(Map) that apply to consumers
  3. properties from setConsumerProperties(Map)

 

application.yml에 세팅된 값은 setConsumerProperties로 반영이 되기에 최종값으로 결정된다. 이제 consumerConfiguration을 얻기 위해 mergedConsumerConfiguration를 사용하는 곳을 (역시나 CMD + click로) 찾아보자. 그곳에서 설정값을 결정한다고 예상할 수 있다. 

 

 

 

과정 : KafkaMessageChannelBinder

mergedConsumerConfiguration를 추적해보면 KafkaMessageChannelBinder 클래스에서 사용함을 알 수 있다. 아래 코드의 상단을 보면 카프카를 기본 미들웨어로 사용하는 바인더라는 주석의 설명이 보인다. 바인더에서 설정할 수 있는 것 중에서 Consumer를 설정하는 부분이 createKafkaConsumerFactory 메소드이며 여기서 mergedConsumerConfiguration을 사용한다.

/**
 * A {@link org.springframework.cloud.stream.binder.Binder} that uses Kafka as the
 * underlying middleware.
 */
public class KafkaMessageChannelBinder extends
		// @checkstyle:off
		AbstractMessageChannelBinder<ExtendedConsumerProperties<KafkaConsumerProperties>, ExtendedProducerProperties<KafkaProducerProperties>, KafkaTopicProvisioner>
		// @checkstyle:on
		implements
		ExtendedPropertiesBinder<MessageChannel, KafkaConsumerProperties, KafkaProducerProperties> {

	protected ConsumerFactory<?, ?> createKafkaConsumerFactory(boolean anonymous,
			String consumerGroup, ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties,
			String beanName, String destination) {
		Map<String, Object> props = new HashMap<>();
		// 일부 생략
		props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
				anonymous ? "latest" : "earliest");
		props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);

		Map<String, Object> mergedConfig = this.configurationProperties
				.mergedConsumerConfiguration();
		if (!ObjectUtils.isEmpty(mergedConfig)) {
			props.putAll(mergedConfig);
		}
        
        // 이하 생략

 

만약에 application.yml에 auto.offset.reset 옵션값이 있었다면 위 코드의 가장 하단 로직에 의해 속성이 바뀌었을 것이다. 지금은 별도의 auto.offset.reset을 설정하지 않았다고 가정해보자. 그럼 auto.offset.reset은 미리 설정된 값이 지정될 것이다. 위 로직을 보면 auto.offset.reset은 anonymous의 유무에 의해 결정된다. anonymous는 무엇일까? 디버깅을 해보면 anonymous에는 application.yml의 bindings 속성인 group명이 anonymous에 입력됨을 알 수 있다. 또한 createKafkaConsumerFactory 메소드 호출부를 찾아보면 아래와 같은 메소드를 찾을 수 있다.

 

public class KafkaMessageChannelBinder extends
		// @checkstyle:off
		AbstractMessageChannelBinder<ExtendedConsumerProperties<KafkaConsumerProperties>, ExtendedProducerProperties<KafkaProducerProperties>, KafkaTopicProvisioner>
		// @checkstyle:on
		implements
		ExtendedPropertiesBinder<MessageChannel, KafkaConsumerProperties, KafkaProducerProperties> {

// 일부 생략

	@Override
	@SuppressWarnings("unchecked")
	protected MessageProducer createConsumerEndpoint(
			final ConsumerDestination destination, final String group,
			final ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {

		boolean anonymous = !StringUtils.hasText(group);
		// 일부 생략
		final ConsumerFactory<?, ?> consumerFactory = createKafkaConsumerFactory(
				anonymous, consumerGroup, extendedConsumerProperties, destination.getName() + ".consumer", destination.getName());

 

createKafkaConsumerFactory를 사용하는 쪽에서 group String이 있으면 false를, 없으면 true를 반환하는 것을 볼 수 있다. 여기서 의문이 생긴다. group이 없을 수 있을까?

 

 

 

과정 : 컨슈머 그룹의 존재 여부

그래서 application.yml에 group을 제거해서 동작시켜 보았다.

 

# application.yml
spring:
  cloud:
    stream:
      ### 일부 생략 ###
      bindings:
        user-registered-event-in-0:
          destination: my-sample.registered-event
          content-type: application/json
          # group: sample-group # 주석처리해서 제거
    function:
      definition: user-registered-event
// Log
2022-10-12 11:53:53.501  INFO 99503 --- [  restartedMain] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
	auto.commit.interval.ms = 100
	auto.offset.reset = latest
	client.id = consumer-anonymous.b25325a4-1a25-4dac-aed5-c23cacea2acc-1
	group.id = anonymous.b25325a4-1a25-4dac-aed5-c23cacea2acc

2022-10-12 11:53:53.633  INFO 99503 --- [  restartedMain] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
	auto.commit.interval.ms = 100
	auto.offset.reset = latest
	client.id = consumer-anonymous.b25325a4-1a25-4dac-aed5-c23cacea2acc-2
	group.id = anonymous.b25325a4-1a25-4dac-aed5-c23cacea2acc

2022-10-12 11:54:03.820  INFO 99503 --- [pool-2-thread-1] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
	auto.commit.interval.ms = 5000
	auto.offset.reset = latest
	client.id = consumer-anonymous.b25325a4-1a25-4dac-aed5-c23cacea2acc-7
	group.id = anonymous.b25325a4-1a25-4dac-aed5-c23cacea2acc

 

임의의 그룹이 생성된다. 그리고 auto.offset.reset이 latest가 되었다. 그럼 디폴트값이라고 했던 건 그룹이 지정되어 있지 않은 상태를 얘기했던 걸까? KafkaMessageChannelBinder 클래스 코드를 보면 ConsumerConfig에서 설정에 필요한 상수값을 사용하고 있었다. 여기에  ConsumerConfig 클래스의 AUTO_OFFSET_RESET_DOC를 보자.

 

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):
- earliest: automatically reset the offset to the earliest offset
- latest: automatically reset the offset to the latest offset (default)
- none: throw exception to the consumer if no previous offset is found for the consumer's group
- anything else: throw exception to the consumer.

 

가장 상단을 번역해보면 아래와 같다.

 

Kafka에 초기 오프셋이 없거나 현재 오프셋이 서버에 더 이상 존재하지 않는 경우(예: 해당 데이터가 삭제되었기 때문에) 수행할 작업:

 

초기 오프셋이 없다는게 컨슈머 그룹을 새로 만들었을 때가 아니라 그룹이 없어서 오프셋이 없을 수 밖에 없는 경우를 얘기한게 아닐까 싶다. 새로운 그룹이 생성되더라도 오프셋은 0으로 세팅되기에 이 경우에 해당하지 않는다. 즉 그룹의 존재 여부로 auto.offset.reset이 결정되고 그룹이 없더라도 auto.offset.reset을 강제하고 싶으면 값을 지정해주면 되는 것이다.

 

 

 

해결 : default값을 latest로

기본값이 earliest이라는 것을 알았으니 이제 원하는 형태로 변경을 해보자. 아래처럼 application.yml을 수정해보자. 앞의 tl;dr과 동일한 내용이다.

 

# application.yml
spring:
  cloud:
    stream:
      defaultBinder: kafka
      kafka:
        binder:
          minPartitionCount: 2
          autoAddPartitions: true
          brokers: kafka-event-dev-0.internal.sample.io:9092,kafka-event-dev-1.internal.sample.io:9092
          producer-properties:
            key.serializer: org.apache.kafka.common.serialization.StringSerializer
          consumer-properties:
            key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
            auto.offset.reset: latest # 추가한 시점 이후의 메시지만 받도록 한다
          replication-factor: 1
      bindings:
        user-registered-event-in-0:
          destination: my-sample.registered-event
          content-type: application/json
          group: sample-group
    function:
      definition: user-registered-event

 

 

테스트를 하려면 새로운 그룹을 지정하거나 사용했던 그룹을 삭제해야 한다. 테스트니깐 새로운 그룹(user-registered-event2)을 지정하는 방식으로 진행했다. 애플리케이션을 재가동하면 아래와 같은 로그를 볼 수 있다.

 

2022-10-07 18:17:13.139  INFO 65671 --- [  restartedMain] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
	auto.offset.reset = latest
	client.id = consumer-user-registered-event2-1

2022-10-07 18:17:13.327  INFO 65671 --- [  restartedMain] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
	auto.offset.reset = latest
	client.id = consumer-user-registered-event2-2

2022-10-07 18:17:23.575  INFO 65671 --- [pool-2-thread-1] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
	auto.offset.reset = latest
	client.id = consumer-user-registered-event2-7

 

이전 메시지를 수신하지 않는 것도 확인되며, 새로운 메시지를 발행하면 정상적으로 수신도 하는 것을 확인할 수 있다.

 

 

해결되지 않은 의문

아쉽게도 첫번째 의문과 세번째 의문은 해결되지 않았다. 그냥 추측만 하는 형태로 의견을 남겨보겠다. 아래는 앞에서 보여줬던 ConsumerConfig 로그 중 일부이다.

 

2022-10-07 14:22:53.368  INFO 14131 --- [  restartedMain] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
	auto.offset.reset = earliest
    auto.commit.interval.ms = 100
	client.id = consumer-user-registered-event-1

2022-10-07 18:17:14.451  INFO 65671 --- [binder-health-1] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
	auto.commit.interval.ms = 5000
	auto.offset.reset = latest
	client.id = consumer-null-5

2022-10-07 14:23:03.736  INFO 14131 --- [pool-2-thread-1] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
	auto.offset.reset = latest
    auto.commit.interval.ms = 5000
	client.id = consumer-user-registered-event-7

 

앞에서 언급하지 않은 내용인데, 정체 불명의 컨슈머가 발견이 된다. consumer-null-5가 보이는데 이 이후로 새로운 쓰레드로 처리되는 컨슈머들이 보인다. 헬스체크하는 것으로 보이는데 이게 어떤 의미를 가지는지도 의문이다.

 

첫번째와 세번째 로그를 보면 쓰레드가 다르다. 하나는 restartedMain, 다른 하나는 pool-2-thread-1. 혹시 Kafka streams를 사용하지 않는, Kafka-clients를 사용한 비동기 호출이 있는게 아닐까?  (다시 언급하지만 추측이다.) 그렇게 추측하는 이유는 ConsumerConfig의 CONFIG 상수에 기본 정보가 들어있는 것을 확인했기 때문이다. 상당히 어거지로 찾은건데, ConsumerConfig 소스 코드를 보자.

 

public class ConsumerConfig extends AbstractConfig {
    private static final ConfigDef CONFIG;
    // 생략
    static {
        CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
                                        Type.LIST,
                                        Collections.emptyList(),
                                        new ConfigDef.NonNullValidator(),
                                        Importance.HIGH,
                                        CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
                                // 생략
                                .define(AUTO_COMMIT_INTERVAL_MS_CONFIG,
                                        Type.INT,
                                        5000,
                                        atLeast(0),
                                        Importance.LOW,
                                        AUTO_COMMIT_INTERVAL_MS_DOC)
                                // 생략
                                .define(AUTO_OFFSET_RESET_CONFIG,
                                        Type.STRING,
                                        "latest",
                                        in("latest", "earliest", "none"),
                                        Importance.MEDIUM,
                                        AUTO_OFFSET_RESET_DOC)
                                        
                                        // 후략

 

CONFIG 상수를 정의하는 과정에 auto.offset.reset 뿐만 아니라 auto.commit.interval.ms도 정의한다. 정의된 기본값이 위의 로그에 출력된 값(5000)과 일치한다. ConsumerConfig는 스트림즈 API가 아닌 카프카 클라이언트쪽에 포함되어 있다. 어디선가 KafkaStreams를 거치지 않는 비동기 처리가 있을거라고 추측이 되는 이유이다. 문제는 여기서부터는 어떻게 증명해야할지를 못찾았다는 점. 아쉽지만 여기에만 매달릴수가 없어서 요구사항을 잘 처리한 것에 만족하고 마무리를 지으려고 한다.

 

.

728x90
반응형