I have a thread pushing events to a PublishProcessor
and then a piece of code that consumes the messages with backpressure:
final PublishProcessor<String> publishProcessor = PublishProcessor.create();
// a separate thread calls publishProcessor.onNext every 1s
publishProcessor
.onBackpressureBuffer(
1,
() -> System.out.println("Buffer full!"),
BackpressureOverflowStrategy.DROP_OLDEST)
.observeOn(Schedulers.newThread(), false, 1)
.map(this::parseMessage)
.subscribe(this::consume); // consume has a fixed delay of 5s
This works fine, however what I'd like is to be able to expose a function that would return a Flowable of already parsed messages. Like this:
final Flowable<String> parsedMessages = publishProcessor.map(this::parseMessage);
// and then somewhere else:
parsedMessages
.onBackpressureBuffer(
1,
() -> System.out.println("Buffer full!"),
BackpressureOverflowStrategy.DROP_OLDEST)
.observeOn(Schedulers.newThread(), false, 1)
.subscribe(this::consume);
the thing is, I don't want the messages to get parsed if there is no space left in the buffer.
This is what happens in the first case when buffer is full:
> Publish msg
> Buffer full!
and this is the 2nd case:
> Publish msg
> Parse msg
> Buffer full!
is there a way to make backpressure work all the way through the flow, even on stages that are added before backpressure buffer is set up?
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…