I need the consumer to read messages from the topic not immediately, but after a certain time interval. In my case, this time = 10 seconds.
I made a KafkaConfig in which I specified the properties and added them to the DefaultKafkaConsumerFactory:
I tried through
factory.getContainerProperties().setIdleBetweenPolls(10000);
I also tried via
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 10000);
I also tried via
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "999999999"); props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "10000")
But none of the above works :(
I am sad.
I found similar cases here, they are very similar to my solution from the internet, but they don t work either.
Example of my code below:
@Slf4j
@EnableKafka
@Configuration
@ConditionalOnProperty(
value = "exclude.kafka",
havingValue = "false",
matchIfMissing = true
)
@RequiredArgsConstructor
public class MyKafkaConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setIdleBetweenPolls(10000);
return factory;
}
private ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfig());
}
private Map<String, Object> consumerConfig() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "batchId");
// props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
// props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 10000);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "999999999");
// props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "10000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
}
Does anyone have any ideas how to make Kafka work (read messages after 10 seconds, not as soon as they hit the topic) ?