I have a Flux emitting items every 1 seconds, on repeat. How can I change the window size dynamically?
public static void main(String[] args) throws InterruptedException {
Duration interval = Duration.ofSeconds(1);
Flux<String> flux = Flux.just("A", "B", "C", "D", "E", "F", "G", "H", "I", "J");
flux
.repeat()
// on each emit from the publisher,
// a batch of a dynamic size is read
// could be just 1 item, 3 items, or even more
.window(calcWindowSize()))
.concatMap(Flux::collectList)
// since we only emit every 1 second
// the window size changes over time
.delayElements(interval)
.doOnNext(batch -> {
log.info("Processed batch: " + batch + " at " + System.currentTimeMillis());
})
.subscribe();
Thread.sleep(10_000);
}
private static int calcWindowSize() {
// this calculation is a little more complex in reality
// so let s just use some random int here
return RandomUtils.nextInt(1, 10);
}
The code above is NOT working but hopefully illustrates what I want to achieve. I tried using windowWhen
but don t understand its syntax. If I cannot dynamically change the window size, can I recreate the Flux maybe and continue from where I left on the second before?