.NET 4.0 frameworks, has introduced TPL (Task Parallel
Library) that has several advantages in achieving concurrency. Also there are
thread safe collections like “ConcurrentQueue”, “BlockingCollection”, “ConcurrentDictionary”,
“ConcurrentBag”, etc….
Producer would broadcast the data maybe which should be
thread safe, in case if there are multiple producers who post the data.
Consumer reads the data published may be via queue.
At a point of time consumers should know or signaled saying
the posting of data is completed by the producers or source.
Below example is considering there are 3 producers (3 tasks)
adding numbers from 1 to 100 to queue in parallel.
At the same time, there are 2 consumers (2 tasks) reading numbers
from the queue.
So adding and reading happens parallel using 5 tasks (3 for
adding, 2 for reading). However when adding is completed, reading tasks should
be signaled and that helps consumers to quit from reading data.
For this we use “BlockingCollection”, which is a new thread
safe collection class in .NET 4.0.
Enumerator from BlockingCollection, has a feature to pop the
item from the collection. Which mean, when we iterate using enumerator of blocking
collection, it not only reads the item but also removes the item which is read
from the collection. Also it tries to pop the items until it is signaled
(calling “CompleteAdding” method).
class Program
{
static void Main(string[] args)
{
try
{
var queue = new
BlockingCollection<int>();
var producers = Enumerable.Range(1,
3)
.Select(_ => Task.Factory.StartNew(
() =>
{
Enumerable.Range(1, 100)
.ToList()
.ForEach((i)
=>
{
queue.Add(i);
Thread.Sleep(100);
});
}
))
.ToArray();
var consumers = Enumerable.Range(1,
2)
.Select(_ => Task.Factory.StartNew(
() =>
{
foreach (var item in queue.GetConsumingEnumerable())
{
Console.WriteLine(item);
}
}
))
.ToArray();
Task.WaitAll(producers);
queue.CompleteAdding();
Task.WaitAll(consumers);
Console.WriteLine("Done!");
}
catch (Exception
exp)
{
Console.WriteLine("Error : " + exp.Message);
}
Console.Read();