Tuesday, January 1, 2013

Data Flow – TPL


There are components named as “Data Flow Components” in .NET 4.5 which enables the capability of in process message passing for coarse grained and pipelining tasks.
Data Flow components are used to build the channel or pipes, through which messages are passed to process further.
Below are few default data flow components introduced and a sample code follows.
1)      Action Block
2)      Buffer Block
3)      Transform Block
4)      TransformManyBlock
5)      BatchBlock
6)      JoinBlock
7)      BatchedJoinBlock
8)      WriteOnceBlock
9)      BroadCast Block

//action block
                var actionBlock = new ActionBlock<int>((a) =>
                    {
                        Console.WriteLine("Message recieved : {0}", a);
                    });
                IObserver<Int32> observer = actionBlock.AsObserver();
                observer.OnNext(10000);
                Task<bool> wait = actionBlock.SendAsync(100);
                actionBlock.Post(10);


                //buffer block
                var bufferBlock = new BufferBlock<int>();
                IObservable<int> bObservable = bufferBlock.AsObservable<int>();
                IObserver<int> bObserver = bufferBlock.AsObserver<int>();

                //linking currnet buffer to an another buffer block
                var bufferBlock2 = new BufferBlock<int>();
                bufferBlock.LinkTo(bufferBlock2, new DataflowLinkOptions() { Append = true });
                bObservable.Subscribe(bObserver);
                bObserver.OnNext(20);
                bufferBlock.Post<int>(200);
                int a1 = 0;
                bool b1 = bufferBlock.TryReceive(out a1);
                bool b2 = bufferBlock.TryReceive(out a1);

                bool b11 = bufferBlock2.TryReceive(out a1);
                bool b21 = bufferBlock2.TryReceive(out a1);


                //broad cast block
                var broadCast = new BroadcastBlock<int>((a) => a + 10);
                //linking current broadcast block to a buffer block
                broadCast.LinkTo(bufferBlock2);
                broadCast.Post(10);
                broadCast.Post(20);
                Int32 bd1 = broadCast.Receive<int>();
                broadCast.Post(30);
                broadCast.Post(40);
                bd1 = broadCast.Receive<int>();

                //write once block
                var writeOnceBlock = new WriteOnceBlock<int>((a) => a + 100);
                writeOnceBlock.Post(400);
                Int32 wr1 = writeOnceBlock.Receive();
                writeOnceBlock.Post(200);
                wr1 = writeOnceBlock.Receive();
                writeOnceBlock.Post(500);
                wr1 = writeOnceBlock.Receive();

                //transform block
                var transformBlock = new TransformBlock<float, float>((a) => a);
                transformBlock.Post(180f);
                float f1 = transformBlock.Receive();
                transformBlock.Post(280f);
                float f2 = transformBlock.Receive();

                //transform many block
                var transformmanyBlock = new TransformManyBlock<int, int>(a => Enumerable.Range(0, a).ToList());
                transformmanyBlock.Post(100);
                Int32 b3 = 0;
                while (transformmanyBlock.TryReceive(out b3))
                {
                    Console.WriteLine(b3);
                }

                //batch block
                var batchBlock = new BatchBlock<int>(1000);
                Enumerable.Repeat<int>(5, 100)
                    .ToList()
                    .ForEach(i => batchBlock.Post(i));
                batchBlock.Post(1);
                Task<int[]> batchData = batchBlock.ReceiveAsync();

                //join block
                var joinBlock = new JoinBlock<int, int, int>();
                joinBlock.Target1.Post(1);
                joinBlock.Target2.Post(2);
                joinBlock.Target3.Post(3);
                joinBlock.Target1.Post(4);
                joinBlock.Target2.Post(5);
                joinBlock.Target3.Post(6);
                Tuple<int, int, int> tup1 = joinBlock.Receive();

                //batched join block
                var bacthedJoinBlock = new BatchedJoinBlock<int, int, int>(2);
                bacthedJoinBlock.Target1.Post(1);
                bacthedJoinBlock.Target2.Post(2);
                bacthedJoinBlock.Target3.Post(3);
                bacthedJoinBlock.Target1.Post(4);
                bacthedJoinBlock.Target2.Post(5);
                bacthedJoinBlock.Target3.Post(6);
                Tuple<IList<Int32>, IList<Int32>, IList<Int32>> tup2 = bacthedJoinBlock.Receive();
Beside the provided default data flow blocks, custom data blocks can also be implemented.

No comments: