Общий класс:
Код: Выделить всё
public class CalculationResult
{
public int Count { get; set; }
public decimal[] RunningTotals { get; set; }
public CalculationResult(decimal[] profits)
{
this.Count = 1;
this.RunningTotals = new decimal[12];
profits.CopyTo(this.RunningTotals, 0);
}
public void Update(decimal[] newData)
{
this.Count++;
// summ arrays
for (int i = 0; i < 12; i++)
this.RunningTotals[i] = this.RunningTotals[i] + newData[i];
}
public void Update(CalculationResult otherResult)
{
this.Count += otherResult.Count;
// summ arrays
for (int i = 0; i < 12; i++)
this.RunningTotals[i] = this.RunningTotals[i] + otherResult.RunningTotals[i];
}
}
Код: Выделить всё
Dictionary combinations = new Dictionary();
foreach (var i in itterations)
{
// do the processing
// ..
string combination = "1,2,3,4,42345,52,523"; // this is determined during the processing
if (combinations.ContainsKey(combination))
combinations[combination].Update(newData);
else
combinations.Add(combination, new CalculationResult(newData));
}
Код: Выделить всё
ConcurrentBag results = new ConcurrentBag();
Parallel.ForEach(itterations, (i, state) =>
{
Dictionary combinations = new Dictionary();
// do the processing
// ..
// add combination to combinations -> same logic as in single core implementation
results.Add(combinations);
});
Dictionary combinationsReal = new Dictionary();
foreach (var item in results)
{
foreach (var pair in item)
{
if (combinationsReal.ContainsKey(pair.Key))
combinationsReal[pair.Key].Update(pair.Value);
else
combinationsReal.Add(pair.Key, pair.Value);
}
}
Код: Выделить всё
930k
Код: Выделить всё
400 [MB]
Теперь в одноядерной реализации существует только один такой словарь. Все проверки выполняются по одному словарю. Но это медленный подход, и я хочу использовать многоядерную оптимизацию.
В многоядерной реализации создается экземпляр ConcurrentBag, который содержит все комбинации< /код> словари. Как только многопоточная работа завершена — все словари объединяются в один. Этот подход хорошо работает для небольшого количества одновременных итераций. Например, за 4 итерации мое использование ОЗУ составило
Код: Выделить всё
~ 1.5 [GB]. Проблема возникает, когда я устанавливаю полное количество параллельных итераций, а именно 200! Никакого объема оперативной памяти ПК
Я думал об использовании ConcurrentDictioanary, пока не нашел Выяснилось, что метод «TryAdd» не гарантирует целостность добавленных данных в моей ситуации, так как мне также необходимо запускать обновления текущих итогов.
Единственный настоящий многопоточный метод вариант — вместо добавления всех комбинаций в словарь - это сохранить их в какой-нибудь БД. Тогда агрегирование данных будет выполняться с помощью одного оператора SQL select с предложением group by... но мне не нравится идея создания временной таблицы и запуска экземпляра БД только для этого.
Есть ли способ одновременной обработки данных без нехватки оперативной памяти?[/b]
РЕДАКТИРОВАТЬ:
Может быть, настоящий вопрос должно было быть - как сделать обновление RunningTotals потокобезопасным при использовании ConcurrentDictionary? Я только что столкнулся с этой темой с аналогичной проблемой с ConcurrentDictionary, но моя ситуация кажется более сложной, поскольку у меня есть массив, который необходимо обновить. Я все еще изучаю этот вопрос.
EDIT2: Вот рабочее решение с ConcurrentDictionary. Все, что мне нужно было сделать, это добавить блокировку для ключа словаря.
Код: Выделить всё
ConcurrentDictionary combinations = new ConcurrentDictionary();
Parallel.ForEach(itterations, (i, state) =>
{
// do the processing
// ..
string combination = "1,2,3,4,42345,52,523"; // this is determined during the processing
if (combinations.ContainsKey(combination)) {
lock(combinations[combination])
combinations[combination].Update(newData);
}
else
combinations.TryAdd(combination, new CalculationResult(newData));
});
Редактировать 3: Для тех из вас, кому интересно, что не так с обновлениями ConcurrentDictionary значение — запустите этот код с блокировкой и без нее.
Код: Выделить всё
public class Result
{
public int Count { get; set; }
}
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Start");
List keys = new List();
for (int i = 0; i < 100; i++)
keys.Add(i);
ConcurrentDictionary dict = new ConcurrentDictionary();
Parallel.For(0, 8, i =>
{
foreach(var key in keys)
{
if (dict.ContainsKey(key))
{
//lock (dict[key]) // uncomment this
dict[key].Count++;
}
else
dict.TryAdd(key, new Result());
}
});
// any output here is incorrect behavior. best result = no lines
foreach (var item in dict)
if (item.Value.Count != 7) { Console.WriteLine($"{item.Key}; {item.Value.Count}"); }
Console.WriteLine($"Finish");
Console.ReadKey();
}
}

Подробнее здесь: https://stackoverflow.com/questions/479 ... roccessing