I have a module that accepts entity IDs and a "resolution type" as parameters, and then gathers data (primarily) asynchronously via multiple operations that return Fluxes. The resolution is broken into multiple (primarily, again) asynchronous operations that each work on gathering different data types that contribute to the resolution. I say "primarily" asynchronously because some of the resolution types require some preliminary operation(s) that must happen synchronously to provide information for the remaining asynchronous Flux operations of the resolution. Now, while this synchronous operation is taking place, at least a portion of the overall asynchronous resolution operation can begin. I would like to start these Flux operations while the synchronous operations are taking place. Then, once the synchronous data has been resolved, I can get each Flux for the remaining operations underway. Some resolution types will have all Flux operations returning data, while others gather less information, and some of the Flux operations will remain empty. The resolution operations are time-expensive, and I would like to be able to start some Flux operations earlier so that I can compress the time a bit -- that is quite important for what I am accomplishing. So eager subscription is ideal, as long as I can guarantee that I will not miss any item emission.
With that in mind, how can I:
- Create a "holder" or a "container" for each of the Flux operations that will be needed to resolve everything, and initialize them as empty (like
Flux.empty()
)
- Add items to whatever I can create in item 1 above -- it was initialized as empty, but I might want the data from one or multiple finite and asynchronous Flux operations, but I do not care to keep them separate, and they can appear as one stream when I will use
collectList()
on them to produce a Mono
.
- When some of these
Flux
operations should start before some of the others, how can I start them, and ensure that I do not miss any data? And if I start a name resolution Flux, for example, can I add to it, as in item 2 above? Let's say that I want to start retrieving some data, then perform a synchronous operation, and then I create another name resolution Flux from the result of the synchronous operation, can I append this new Flux to the original name resolution Flux, since it will be returning the same data type? I am aware of Flux.merge()
, but it would be convenient to work with a single Flux reference that I can keep adding to, if possible.
Will I need a collection object, like a list, and then use a merge operation? Initially, I thought about using a ConnectableFlux
, until I realized that it is for connecting multiple subscribers, rather than for connecting multiple publishers. Connecting multiple publishers is what I think would be a good answer for my need, unless this is a common pattern that can be handled in a better way.
I have only been doing reactive programming for a short time, so please be patient with the way I am trying to describe what I want to do. If I can better clarify my intentions, please let me know where I have been unclear, and I will gladly attempt to clear it up. Thanks in advance for your time and help!
EDIT:
Here is the final Kotlin version, nice and concise:
private val log = KotlinLogging.logger {}
class ReactiveDataService {
private val createMono: () -> Mono<List<Int>> = {
Flux.just(9, 8, 7)
.flatMap {
Flux.fromIterable(List(it) { Random.nextInt(0, 100) })
.parallel()
.runOn(Schedulers.boundedElastic())
}
.collectList()
.cache()
}
private val processResults: (List<String>, List<String>) -> String =
{ d1, d2 -> "
downstream 1: $d1
downstream 2: $d2" }
private val convert: (List<Int>, Int) -> Flux<String> =
{ data, multiplier -> Flux.fromIterable(data.map { String.format("%3d", it * multiplier) }) }
fun doQuery(): String? {
val mono = createMono()
val downstream1 = mono.flatMapMany { convert(it, 1) }.collectList()
val downstream2 = mono.flatMapMany { convert(it, 2) }.collectList()
return Mono.zip(downstream1, downstream2, processResults).block()
}
}
fun main() {
val service = ReactiveDataService()
val start = System.currentTimeMillis()
val result = service.doQuery()
log.info("{}
Total time: {}ms", result, System.currentTimeMillis() - start)
}
And the output:
downstream 1: [ 66, 39, 40, 88, 97, 35, 70, 91, 27, 12, 84, 37, 35, 15, 45, 27, 85, 22, 55, 89, 81, 21, 43, 62]
downstream 2: [132, 78, 80, 176, 194, 70, 140, 182, 54, 24, 168, 74, 70, 30, 90, 54, 170, 44, 110, 178, 162, 42, 86, 124]
Total time: 209ms