У меня есть пакетный блок в потоке данных tpl, и с ним связано несколько целевых блоков. Однако количество целевых блоков динамически меняется и, следовательно, размер пакетов. Проблема в том, что размер пакета должен быть указан при инициализации пакетного блока, и я не вижу способа изменить его позже. Есть идеи, как это обойти? Является ли единственный способ отменить связь (удалить все ссылки на пакетный блок и из пакетного блока), повторно инициализировать пакетный блок с новым размером пакета, а затем снова связать его? Я мог бы это сделать, но как гарантировать, что старые и новые пакеты не перепутаются?
Например, если у меня было 2 блока преобразования, передаваемых в пакетный блок, а теперь у меня есть дополнительный блок преобразования и я хочу увеличить размер пакета до 3, как мне убедиться, что все предыдущие пакеты до увеличения были обработаны, чтобы обеспечить синхронизированное поведение? Дело в том, что все блоки преобразования получают один и тот же элемент, и выходные данные этих блоков преобразования должны быть объединены в пакеты таким образом, чтобы группировались только те выходные данные, которые соответствуют идентичным входным данным.
Вот пример того, как я хочу:
Постоянный поток целых чисел для преобразования блоков:
1,2,3, [точка, где размер пакета увеличивается],4,5,...
Пусть блоки преобразования выводят то, что они получили, например 1 => 1
Так что пакетный блок должен выводить следующее:
[1,1], [2,2], [3,3], [изменение размера пакета], [4,4,4], [5,5,5],...
Вот мой текущий код:
public class Test
{
private Stopwatch watch;
private BroadcastBlock tempBCB;
private BatchBlock batchBlock;
private TransformBlock transformBlock;
private ActionBlock justToFlushTransformBlock;
private CoreLogic core1;
private CoreLogic core2;
public Test()
{
tempBCB = new BroadcastBlock(input => input);
//here batch size = 2
batchBlock = new BatchBlock(2, new GroupingDataflowBlockOptions { Greedy = false });
transformBlock = new TransformBlock(array =>
{
List inputObjects = array[0].Item1;
List ret = inputObjects.ConvertAll(x => new FinalObject(x));
foreach (var tuple in array)
{
//iterate over each individual object
foreach (var dictionary in tuple.Item2)
{
ret[dictionary.Key].outputList.Add(dictionary.Value);
}
}
return ret;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
justToFlushTransformBlock = new ActionBlock(list =>
{
//just in order to accept items from the transformBlock output queue
});
//Generate 2 CoreLogic objects
core1 = new CoreLogic();
core2 = new CoreLogic();
//linking
tempBCB.LinkTo(core1.transformBlock, new DataflowLinkOptions { PropagateCompletion = true });
tempBCB.LinkTo(core2.transformBlock, new DataflowLinkOptions { PropagateCompletion = true });
core1.transformBlock.LinkTo(batchBlock);
core2.transformBlock.LinkTo(batchBlock);
batchBlock.LinkTo(transformBlock, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock.LinkTo(justToFlushTransformBlock, new DataflowLinkOptions { PropagateCompletion = true });
}
public void Start()
{
const int numberChunks = 30;
watch = new Stopwatch();
watch.Start();
for (int j = 1; j
{
batchBlock.Complete();
});
transformBlock.Completion.Wait();
watch.Stop();
Console.WriteLine("Elapsed time (in milliseconds): " + watch.ElapsedMilliseconds);
Console.ReadLine();
}
}
public class CoreLogic
{
private Random rand;
public TransformBlock transformBlock;
public CoreLogic()
{
const int numberIntermediateObjects = 10000;
transformBlock = new TransformBlock(input =>
{
//please ignore the fact that `input` is not utilized here, the point is to generate a collection of IntermediateObject and return
Dictionary ret = new Dictionary();
for (int i = 0; i < numberIntermediateObjects; i++)
{
IntermediateObject value = new IntermediateObject(i);
ret.Add(i, value);
}
var tuple = new Tuple(input, ret);
return tuple;
});
}
}
public class InputObject : ICloneable
{
public int value1 { get; private set; }
public InputObject(int value)
{
this.value1 = value;
}
object ICloneable.Clone()
{
return Clone();
}
public InputObject Clone()
{
return (InputObject)this.MemberwiseClone();
}
}
public class IntermediateObject
{
public int value1 { get; private set; }
public IntermediateObject(int value)
{
this.value1 = value;
}
}
public class FinalObject
{
public InputObject input { get; private set; }
public List outputList;
public FinalObject(InputObject input)
{
this.input = input;
this.outputList = new List();
}
}
public static class Cloning
{
public static List CloneListCloneValues(List original) where TValue : ICloneable
{
List ret = new List(original.Count);
foreach (TValue entry in original)
{
ret.Add((TValue)entry.Clone());
}
return ret;
}
}
Подробнее здесь: https://stackoverflow.com/questions/137 ... ng-runtime
Как динамически изменять размер пакета пакетного блока во время выполнения? ⇐ C#
Место общения программистов C#
1764359166
Anonymous
У меня есть пакетный блок в потоке данных tpl, и с ним связано несколько целевых блоков. Однако количество целевых блоков динамически меняется и, следовательно, размер пакетов. Проблема в том, что размер пакета должен быть указан при инициализации пакетного блока, и я не вижу способа изменить его позже. Есть идеи, как это обойти? Является ли единственный способ отменить связь (удалить все ссылки на пакетный блок и из пакетного блока), повторно инициализировать пакетный блок с новым размером пакета, а затем снова связать его? Я мог бы это сделать, но как гарантировать, что старые и новые пакеты не перепутаются?
Например, если у меня было 2 блока преобразования, передаваемых в пакетный блок, а теперь у меня есть дополнительный блок преобразования и я хочу увеличить размер пакета до 3, как мне убедиться, что все предыдущие пакеты до увеличения были обработаны, чтобы обеспечить синхронизированное поведение? Дело в том, что все блоки преобразования получают один и тот же элемент, и выходные данные этих блоков преобразования должны быть объединены в пакеты таким образом, чтобы группировались только те выходные данные, которые соответствуют идентичным входным данным.
Вот пример того, как я хочу:
Постоянный поток целых чисел для преобразования блоков:
1,2,3, [точка, где размер пакета увеличивается],4,5,...
Пусть блоки преобразования выводят то, что они получили, например 1 => 1
Так что пакетный блок должен выводить следующее:
[1,1], [2,2], [3,3], [изменение размера пакета], [4,4,4], [5,5,5],...
Вот мой текущий код:
public class Test
{
private Stopwatch watch;
private BroadcastBlock tempBCB;
private BatchBlock batchBlock;
private TransformBlock transformBlock;
private ActionBlock justToFlushTransformBlock;
private CoreLogic core1;
private CoreLogic core2;
public Test()
{
tempBCB = new BroadcastBlock(input => input);
//here batch size = 2
batchBlock = new BatchBlock(2, new GroupingDataflowBlockOptions { Greedy = false });
transformBlock = new TransformBlock(array =>
{
List inputObjects = array[0].Item1;
List ret = inputObjects.ConvertAll(x => new FinalObject(x));
foreach (var tuple in array)
{
//iterate over each individual object
foreach (var dictionary in tuple.Item2)
{
ret[dictionary.Key].outputList.Add(dictionary.Value);
}
}
return ret;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
justToFlushTransformBlock = new ActionBlock(list =>
{
//just in order to accept items from the transformBlock output queue
});
//Generate 2 CoreLogic objects
core1 = new CoreLogic();
core2 = new CoreLogic();
//linking
tempBCB.LinkTo(core1.transformBlock, new DataflowLinkOptions { PropagateCompletion = true });
tempBCB.LinkTo(core2.transformBlock, new DataflowLinkOptions { PropagateCompletion = true });
core1.transformBlock.LinkTo(batchBlock);
core2.transformBlock.LinkTo(batchBlock);
batchBlock.LinkTo(transformBlock, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock.LinkTo(justToFlushTransformBlock, new DataflowLinkOptions { PropagateCompletion = true });
}
public void Start()
{
const int numberChunks = 30;
watch = new Stopwatch();
watch.Start();
for (int j = 1; j
{
batchBlock.Complete();
});
transformBlock.Completion.Wait();
watch.Stop();
Console.WriteLine("Elapsed time (in milliseconds): " + watch.ElapsedMilliseconds);
Console.ReadLine();
}
}
public class CoreLogic
{
private Random rand;
public TransformBlock transformBlock;
public CoreLogic()
{
const int numberIntermediateObjects = 10000;
transformBlock = new TransformBlock(input =>
{
//please ignore the fact that `input` is not utilized here, the point is to generate a collection of IntermediateObject and return
Dictionary ret = new Dictionary();
for (int i = 0; i < numberIntermediateObjects; i++)
{
IntermediateObject value = new IntermediateObject(i);
ret.Add(i, value);
}
var tuple = new Tuple(input, ret);
return tuple;
});
}
}
public class InputObject : ICloneable
{
public int value1 { get; private set; }
public InputObject(int value)
{
this.value1 = value;
}
object ICloneable.Clone()
{
return Clone();
}
public InputObject Clone()
{
return (InputObject)this.MemberwiseClone();
}
}
public class IntermediateObject
{
public int value1 { get; private set; }
public IntermediateObject(int value)
{
this.value1 = value;
}
}
public class FinalObject
{
public InputObject input { get; private set; }
public List outputList;
public FinalObject(InputObject input)
{
this.input = input;
this.outputList = new List();
}
}
public static class Cloning
{
public static List CloneListCloneValues(List original) where TValue : ICloneable
{
List ret = new List(original.Count);
foreach (TValue entry in original)
{
ret.Add((TValue)entry.Clone());
}
return ret;
}
}
Подробнее здесь: [url]https://stackoverflow.com/questions/13755790/how-to-change-batch-size-of-batchblock-dynamically-during-runtime[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия