There are components named as “Data Flow Components” in .NET
4.5 which enables the capability of in process message passing for coarse
grained and pipelining tasks.
Data Flow components are used to build the channel or pipes,
through which messages are passed to process further.
Below are few default data flow components introduced and a
sample code follows.
1)
Action Block
2)
Buffer Block
3)
Transform Block
4)
TransformManyBlock
5)
BatchBlock
6)
JoinBlock
7)
BatchedJoinBlock
8)
WriteOnceBlock
9)
BroadCast Block
//action
block
var actionBlock = new ActionBlock<int>((a) =>
{
Console.WriteLine("Message recieved : {0}", a);
});
IObserver<Int32> observer = actionBlock.AsObserver();
observer.OnNext(10000);
Task<bool> wait = actionBlock.SendAsync(100);
actionBlock.Post(10);
//buffer
block
var bufferBlock = new BufferBlock<int>();
IObservable<int> bObservable = bufferBlock.AsObservable<int>();
IObserver<int> bObserver = bufferBlock.AsObserver<int>();
//linking
currnet buffer to an another buffer block
var bufferBlock2 = new BufferBlock<int>();
bufferBlock.LinkTo(bufferBlock2, new DataflowLinkOptions() { Append = true });
bObservable.Subscribe(bObserver);
bObserver.OnNext(20);
bufferBlock.Post<int>(200);
int a1 = 0;
bool b1 = bufferBlock.TryReceive(out a1);
bool b2 = bufferBlock.TryReceive(out a1);
bool b11 = bufferBlock2.TryReceive(out a1);
bool b21 = bufferBlock2.TryReceive(out a1);
//broad
cast block
var broadCast = new BroadcastBlock<int>((a) => a + 10);
//linking
current broadcast block to a buffer block
broadCast.LinkTo(bufferBlock2);
broadCast.Post(10);
broadCast.Post(20);
Int32 bd1 = broadCast.Receive<int>();
broadCast.Post(30);
broadCast.Post(40);
bd1 = broadCast.Receive<int>();
//write
once block
var writeOnceBlock = new WriteOnceBlock<int>((a) => a + 100);
writeOnceBlock.Post(400);
Int32 wr1 = writeOnceBlock.Receive();
writeOnceBlock.Post(200);
wr1 = writeOnceBlock.Receive();
writeOnceBlock.Post(500);
wr1 = writeOnceBlock.Receive();
//transform
block
var transformBlock = new TransformBlock<float, float>((a) => a);
transformBlock.Post(180f);
float f1 = transformBlock.Receive();
transformBlock.Post(280f);
float f2 = transformBlock.Receive();
//transform
many block
var transformmanyBlock = new TransformManyBlock<int, int>(a => Enumerable.Range(0, a).ToList());
transformmanyBlock.Post(100);
Int32 b3 = 0;
while (transformmanyBlock.TryReceive(out b3))
{
Console.WriteLine(b3);
}
//batch
block
var batchBlock = new BatchBlock<int>(1000);
Enumerable.Repeat<int>(5, 100)
.ToList()
.ForEach(i =>
batchBlock.Post(i));
batchBlock.Post(1);
Task<int[]> batchData = batchBlock.ReceiveAsync();
//join
block
var joinBlock = new JoinBlock<int, int, int>();
joinBlock.Target1.Post(1);
joinBlock.Target2.Post(2);
joinBlock.Target3.Post(3);
joinBlock.Target1.Post(4);
joinBlock.Target2.Post(5);
joinBlock.Target3.Post(6);
Tuple<int, int, int> tup1 = joinBlock.Receive();
//batched
join block
var bacthedJoinBlock = new BatchedJoinBlock<int, int, int>(2);
bacthedJoinBlock.Target1.Post(1);
bacthedJoinBlock.Target2.Post(2);
bacthedJoinBlock.Target3.Post(3);
bacthedJoinBlock.Target1.Post(4);
bacthedJoinBlock.Target2.Post(5);
bacthedJoinBlock.Target3.Post(6);
Tuple<IList<Int32>, IList<Int32>, IList<Int32>>
tup2 = bacthedJoinBlock.Receive();
Beside the provided default data flow blocks, custom data
blocks can also be implemented.
No comments:
Post a Comment