English 中文(简体)
How to read messages from topic by Kafka after a my custom time interval?
原标题:

I need the consumer to read messages from the topic not immediately, but after a certain time interval. In my case, this time = 10 seconds.

I made a KafkaConfig in which I specified the properties and added them to the DefaultKafkaConsumerFactory:

  1. I tried through

    factory.getContainerProperties().setIdleBetweenPolls(10000);

  2. I also tried via

    props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 10000);

  3. I also tried via

    props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "999999999"); props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "10000")

But none of the above works :(

I am sad.

I found similar cases here, they are very similar to my solution from the internet, but they don t work either.

Example of my code below:

@Slf4j
@EnableKafka
@Configuration
@ConditionalOnProperty(
    value = "exclude.kafka",
    havingValue = "false",
    matchIfMissing = true
)
@RequiredArgsConstructor
public class MyKafkaConfig {

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        factory.getContainerProperties().setIdleBetweenPolls(10000);

        return factory;
    }

    private ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfig());
    }

    private Map<String, Object> consumerConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "batchId");
//        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
//        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 10000);
        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "999999999");
//        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "10000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
}

Does anyone have any ideas how to make Kafka work (read messages after 10 seconds, not as soon as they hit the topic) ?

问题回答

There is no built in message delay - see more about this here.

You can however, leverage the pause and resume methods from the KafkaConsumer API

Which states the following

Kafka supports dynamic controlling of consumption flows by using pause(Collection) and resume(Collection) to pause the consumption on the specified assigned partitions and resume the consumption on the specified paused partitions respectively in the future poll(long) calls.

This seem to me as the go to solution after reading this

So Java code would be as following, more or less, this is a sketch:

Set<TopicPartition> partitions = consumer.assignment();
consumer.pause(partitions);
Thread.sleep(10000); // 10 Seconds in millis
...
consumer.resume(partitions)

Disclaimer: I haven t tested it.





相关问题
Spring Properties File

Hi have this j2ee web application developed using spring framework. I have a problem with rendering mnessages in nihongo characters from the properties file. I tried converting the file to ascii using ...

Logging a global ID in multiple components

I have a system which contains multiple applications connected together using JMS and Spring Integration. Messages get sent along a chain of applications. [App A] -> [App B] -> [App C] We set a ...

Java Library Size

If I m given two Java Libraries in Jar format, 1 having no bells and whistles, and the other having lots of them that will mostly go unused.... my question is: How will the larger, mostly unused ...

How to get the Array Class for a given Class in Java?

I have a Class variable that holds a certain type and I need to get a variable that holds the corresponding array class. The best I could come up with is this: Class arrayOfFooClass = java.lang....

SQLite , Derby vs file system

I m working on a Java desktop application that reads and writes from/to different files. I think a better solution would be to replace the file system by a SQLite database. How hard is it to migrate ...

热门标签