I get Flowable<MyItem>
from a network request, and need to do some transformations on the MyItem
objects, finally pass them on to another Flowable
.
Currently I have something like this:
Flowable<MyItem> myStreamTransformed = myClient.getMyItemStream().map(myItem -> {
return new MyItem(data: myItem.data+"_transformed")
}).onBackpressureBuffer(100)
return myStreamTransformed
This works, but if the other end reading the transformed Flowable
is slow to consume, the buffer often fills up, and error stops the stream.
Rather than increase the buffer to some crazy size, I would prefer to delay reading more of the incoming Flowable
until there is space in the buffer.
I looked at writing my own Flowable
, and implement onNext()
processing with this logic, but it seemed overly complex for me.
This code is running as part of a Micronaut application, where the incoming Flowable
is from a Micronaut generated client for a JSON stream,
and the output Flowable
is also turned into another stream (by Micronaut) for the caller of the above method.
So it sits between a client and a backend server, transforming some data, but often the backend is a bit faster to produce data than the client is to read,
and I would not want to lose data but also not have to build huge buffers as that is kind of the point of streaming..
I believe the underlying operating systems and TCP stacks should be able to manage it. Can't quite figure out the RxJava approach to it though.
Q: How to handle this in RxJava2?
question from:
https://stackoverflow.com/questions/65922261/how-to-address-flowable-backpressure-by-delaying-read-until-buffer-has-space 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…