Однако, если есть несколько элементов на входной очереди с одной и той же идентичностью, только самый последний человек нуждается в обработке. Промежуточные можно отказаться. Это та часть, у меня возникают проблемы с выяснением.
Код: Выделить всё
public record Bid(int Id, int Value);
async Task Main()
{
// This block is just here to log that an input is received.
var startBlock = new TransformBlock(d =>
{
Console.WriteLine("Input: {0} ({1})", d.Id, d.Value);
return d;
});
//TODO: Check for duplicate identity (Bid.Id) and replace the
// current element with the most recent one.
var updateWithMostRecentBlock = new TransformBlock(d => d);
var processBlock = new TransformBlock(async d =>
{
Console.WriteLine("Processing: {0} ({1})", d.Id, d.Value);
await Task.Delay(1000);
return d;
});
var finishBlock = new ActionBlock(d =>
{
Console.WriteLine("Done: {0} ({1})", d.Id, d.Value);
});
var propagateCompletion = new DataflowLinkOptions { PropagateCompletion = true };
startBlock.LinkTo(updateWithMostRecentBlock, propagateCompletion);
updateWithMostRecentBlock.LinkTo(processBlock, propagateCompletion);
processBlock.LinkTo(finishBlock, propagateCompletion);
var data = new[]
{
new Bid(1, 0), // Processed immediately
new Bid(1, 1), // Replaced with (1,2)
new Bid(2, 0), // Replaced with (2,1)
new Bid(1, 2), // Queued
new Bid(2, 1) // Queued
};
foreach (var d in data)
startBlock.Post(d);
startBlock.Complete();
await finishBlock.Completion;
}
фактический выход:
Код: Выделить всё
Input: 1 (0)
Input: 1 (1)
Input: 2 (0)
Input: 1 (2)
Input: 2 (1)
Processing: 1 (0)
Processing: 1 (1)
Done: 1 (0)
Processing: 2 (0)
Done: 1 (1)
Processing: 1 (2)
Done: 2 (0)
Processing: 2 (1)
Done: 1 (2)
Done: 2 (1)
Input: 1 (0) // Immediately processed
Input: 1 (1) // Replaced by (1,2)
Input: 2 (0) // Replaced by (2,1)
Input: 1 (2) // Queued
Input: 2 (1) // Queued
Processing: 1 (0)
Done: 1 (0)
Processing: 1 (2)
Done: 1 (2)
Processing: 2 (1)
Done: 2 (1)
< /code>
Подсказка: < /strong>
Stephen Toub имеет элегантное решение для полной противоположности того, чего я пытаюсь достичь. Его решение отвергает все входящие элементы и сохраняет самые старые.
Подробнее здесь: https://stackoverflow.com/questions/683 ... l-dataflow
Мобильная версия