Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
433 views
in Technique[技术] by (71.8m points)

c# - Reactive pipeline - how to control parallelism?

I'm building a straightforward processing pipeline where an item is fetched as an input, it is being operated by multiple processors in a sequential manner and finally it is output. Image below describes the overall architecture:

rx-pipe

The way it is currently working: Pipeline is fetching items from the provider as quickly as it can. As soon as an item is fetched, it is passed to the processors. Once an item is processed, the output is notified. While an individual item is processed in a sequential manner, multiple items may be processed in parallel (depending on how fast they are fetched from the provider).

The IObservable created and returned from the pipeline looks like this:

return Observable.Create<T>(async observer =>
{
    while (_provider.HasNext)
    {
        T item = await _provider.GetNextAsync();
        observer.OnNext(item);
    }                
}).SelectMany(item => Observable.FromAsync(() =>
    _processors.Aggregate(
        seed: Task.FromResult(item),
        func: (current, processor) => current.ContinueWith( // Append continuations.
            previous => processor.ProcessAsync(previous.Result))
            .Unwrap()))); // We need to unwrap Task{T} from Task{Task{T}};

The missing part: I need a control mechanism which controls how many items (max) can be in the pipeline at any given time.

For example, if max parallel processings is 3, then that would result in the following workflow:

  1. Item 1 is fetched and passed to the processors.
  2. Item 2 is fetched and passed to the processors.
  3. Item 3 is fetched and passed to the processors.
  4. Item 1 completed processing.
  5. Item 4 is fetched and passed to the processors.
  6. Item 3 completed processing.
  7. Item 5 is fetched and passed to the processors.
  8. Etc...
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Merge provides an overload which takes a max concurrency.

Its signature looks like: IObservable<T> Merge<T>(this IObservable<IObservable<T>> source, int maxConcurrency);

Here is what it would look like with your example (I refactored some of the other code as well, which you can take or leave):

return Observable
//Reactive while loop also takes care of the onComplete for you
.While(() => _provider.HasNext, 
       Observable.FromAsync(_provider.GetNextAsync))
//Makes return items that will only execute after subscription
.Select(item => Observable.Defer(() => {
  return _processers.Aggregate(
    seed: Observable.Return(item),
    func: (current, processor) => current.SelectMany(processor.ProcessAsync)); 
  }))
 //Only allow 3 streams to be execute in parallel.
.Merge(3);

To break down what this does,

  1. While will check each iteration, if _provider.HasNext is true, if so then it will resubscribe to get the next value for _provider, otherwise it emits onCompleted
  2. Inside of select a new observable stream is created, but not yet evaluated by using Defer
  3. The returned IObservable<IObservable<T>> is passed to Merge which subscribes to a max of 3 observables simultaneously.
  4. The inner observable finally evaluates when it is subscribed to.

Alternative 1

If you also need to control the number of parallel requests you need to get a little trickier, since you will need to signal that your Observable is ready for new values:

return Observable.Create<T>(observer => 
{
  var subject = new Subject<Unit>();
  var disposable = new CompositeDisposable(subject);

  disposable.Add(subject
    //This will complete when provider has run out of values
    .TakeWhile(_ => _provider.HasNext)
    .SelectMany(
      _ => _provider.GetNextAsync(),
     (_, item) => 
     {
       return _processors
        .Aggregate(
         seed: Observable.Return(item),
         func: (current, processor) => current.SelectMany(processor.ProcessAsync))
        //Could also use `Finally` here, this signals the chain
        //to start on the next item.
        .Do(dontCare => {}, () => subject.OnNext(Unit.Default));
     }
    )
    .Merge(3)
    .Subscribe(observer));

  //Queue up 3 requests for the initial kickoff
  disposable.Add(Observable.Repeat(Unit.Default, 3).Subscribe(subject.OnNext));

  return disposable;
});

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...