In Spring project, I used Sinks
to emit events into a SSE endpoint, it worked well, check:
But when I tried to Smallrye Mutiny MultiEmitterProcessor
to archive the same purpose, it failed.
The example project is https://github.com/hantsy/quarkus-sandbox/tree/master/jms
MultiEmitterProcessor<Message> emitterProcessor = MultiEmitterProcessor.create();
void receive() {
var consumer = jmsContext.createConsumer(helloQueue);
consumer.setMessageListener(
msg -> {
try {
var received = jsonb.fromJson(msg.getBody(String.class), Message.class);
LOGGER.log(Level.INFO, "consuming message: {0}", received);
emitterProcessor.emit(received);
} catch (JMSException e) {
throw new RuntimeException(e);
}
}
);
}
And in the Resource class,
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@RestStreamElementType(MediaType.APPLICATION_JSON)
public Multi<Message> stream() {
// see: https://github.com/quarkusio/quarkus/issues/35220
return handler.emitterProcessor.toMulti().toHotStream();
}
I am not sure
MultiEmitterProcessor
is good to work as a hot stream?or there are something like Reactor
ConnectableFlux
to connect to a hot stream manually?