You can solve this relatively easily using a BlockingCollection<T>
.
You can use one as a queue, and pass a reference to it to the producer()
and each of the consumers()
.
You'll be calling GetConsumingEnumerable()
from each consumer thread, and using it with foreach
.
The producer thread will add items to the collection, and will call CompleteAdding()
when it has finished producing stuff. This will automatically make all the consumer threads exit their foreach loops.
Here's a basic example (with no error handling). The calls to Thread.Sleep()
are to simulate load, and should not be used in real code.
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace Demo
{
internal class Program
{
private static void Main(string[] args)
{
ThreadPool.SetMinThreads(10, 0); // To help the demo; not needed in real code.
var plant = new ProcessingPlant();
plant.Process();
Console.WriteLine("Work complete.");
}
}
public sealed class ProcessingPlant
{
private readonly BlockingCollection<string> _queue = new BlockingCollection<string>();
public void Process()
{
Parallel.Invoke(producer, consumers);
}
private void producer()
{
for (int i = 0; i < 100; ++i)
{
string item = i.ToString();
Console.WriteLine("Producer is queueing {0}", item);
_queue.Add(item); // <- Here's where we add an item to the queue.
Thread.Sleep(0);
}
_queue.CompleteAdding(); // <- Here's where we make all the consumers
} // exit their foreach loops.
private void consumers()
{
Parallel.Invoke(
() => consumer(1),
() => consumer(2),
() => consumer(3),
() => consumer(4),
() => consumer(5)
);
}
private void consumer(int id)
{
Console.WriteLine("Consumer {0} is starting.", id);
foreach (var item in _queue.GetConsumingEnumerable()) // <- Here's where we remove items.
{
Console.WriteLine("Consumer {0} read {1}", id, item);
Thread.Sleep(0);
}
Console.WriteLine("Consumer {0} is stopping.", id);
}
}
}
(I know this is using an extra thread just to start the consumers, but I did it this way to avoid obscuring the real point - which is to demonstrate the use of BlockingCollection.)
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…