Как динамически изменять размер пакета пакетного блока во время выполнения?C#

Место общения программистов C#
Ответить
Anonymous
 Как динамически изменять размер пакета пакетного блока во время выполнения?

Сообщение Anonymous »

У меня есть пакетный блок в потоке данных tpl, и с ним связано несколько целевых блоков. Однако количество целевых блоков динамически меняется и, следовательно, размер пакетов. Проблема в том, что размер пакета должен быть указан при инициализации пакетного блока, и я не вижу способа изменить его позже. Есть идеи, как это обойти? Является ли единственный способ отменить связь (удалить все ссылки на пакетный блок и из пакетного блока), повторно инициализировать пакетный блок с новым размером пакета, а затем снова связать его? Я мог бы это сделать, но как гарантировать, что старые и новые пакеты не перепутаются?

Например, если у меня было 2 блока преобразования, передаваемых в пакетный блок, а теперь у меня есть дополнительный блок преобразования и я хочу увеличить размер пакета до 3, как мне убедиться, что все предыдущие пакеты до увеличения были обработаны, чтобы обеспечить синхронизированное поведение? Дело в том, что все блоки преобразования получают один и тот же элемент, и выходные данные этих блоков преобразования должны быть объединены в пакеты таким образом, чтобы группировались только те выходные данные, которые соответствуют идентичным входным данным.

Вот пример того, как я хочу:

Постоянный поток целых чисел для преобразования блоков:
1,2,3, [точка, где размер пакета увеличивается],4,5,...

Пусть блоки преобразования выводят то, что они получили, например 1 => 1

Так что пакетный блок должен выводить следующее:
[1,1], [2,2], [3,3], [изменение размера пакета], [4,4,4], [5,5,5],...

Вот мой текущий код:

public class Test
{
private Stopwatch watch;

private BroadcastBlock tempBCB;
private BatchBlock batchBlock;
private TransformBlock transformBlock;
private ActionBlock justToFlushTransformBlock;

private CoreLogic core1;
private CoreLogic core2;

public Test()
{
tempBCB = new BroadcastBlock(input => input);

//here batch size = 2
batchBlock = new BatchBlock(2, new GroupingDataflowBlockOptions { Greedy = false });

transformBlock = new TransformBlock(array =>
{
List inputObjects = array[0].Item1;
List ret = inputObjects.ConvertAll(x => new FinalObject(x));

foreach (var tuple in array)
{
//iterate over each individual object
foreach (var dictionary in tuple.Item2)
{
ret[dictionary.Key].outputList.Add(dictionary.Value);
}
}

return ret;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

justToFlushTransformBlock = new ActionBlock(list =>
{
//just in order to accept items from the transformBlock output queue
});

//Generate 2 CoreLogic objects
core1 = new CoreLogic();
core2 = new CoreLogic();

//linking
tempBCB.LinkTo(core1.transformBlock, new DataflowLinkOptions { PropagateCompletion = true });
tempBCB.LinkTo(core2.transformBlock, new DataflowLinkOptions { PropagateCompletion = true });

core1.transformBlock.LinkTo(batchBlock);
core2.transformBlock.LinkTo(batchBlock);

batchBlock.LinkTo(transformBlock, new DataflowLinkOptions { PropagateCompletion = true });

transformBlock.LinkTo(justToFlushTransformBlock, new DataflowLinkOptions { PropagateCompletion = true });
}

public void Start()
{
const int numberChunks = 30;

watch = new Stopwatch();
watch.Start();

for (int j = 1; j
{
batchBlock.Complete();
});

transformBlock.Completion.Wait();

watch.Stop();

Console.WriteLine("Elapsed time (in milliseconds): " + watch.ElapsedMilliseconds);
Console.ReadLine();
}
}

public class CoreLogic
{
private Random rand;
public TransformBlock transformBlock;

public CoreLogic()
{
const int numberIntermediateObjects = 10000;

transformBlock = new TransformBlock(input =>
{
//please ignore the fact that `input` is not utilized here, the point is to generate a collection of IntermediateObject and return

Dictionary ret = new Dictionary();
for (int i = 0; i < numberIntermediateObjects; i++)
{
IntermediateObject value = new IntermediateObject(i);

ret.Add(i, value);
}

var tuple = new Tuple(input, ret);

return tuple;
});
}
}

public class InputObject : ICloneable
{
public int value1 { get; private set; }

public InputObject(int value)
{
this.value1 = value;
}

object ICloneable.Clone()
{
return Clone();
}

public InputObject Clone()
{
return (InputObject)this.MemberwiseClone();
}
}

public class IntermediateObject
{
public int value1 { get; private set; }

public IntermediateObject(int value)
{
this.value1 = value;
}
}

public class FinalObject
{
public InputObject input { get; private set; }
public List outputList;

public FinalObject(InputObject input)
{
this.input = input;

this.outputList = new List();
}
}

public static class Cloning
{
public static List CloneListCloneValues(List original) where TValue : ICloneable
{
List ret = new List(original.Count);

foreach (TValue entry in original)
{
ret.Add((TValue)entry.Clone());
}

return ret;
}
}


Подробнее здесь: https://stackoverflow.com/questions/137 ... ng-runtime
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «C#»