Не могу понять, почему моя программа использует слишком много оперативной памяти.C#

Место общения программистов C#
Ответить Пред. темаСлед. тема
Anonymous
 Не могу понять, почему моя программа использует слишком много оперативной памяти.

Сообщение Anonymous »

Чтобы дать обзор моего проекта, я пишу программу на C#, которая читает файлы Parquet, копирует их в DataTable, а затем устанавливает соединение SQL и с помощью SqlBulkCopy выгружает данные на сервер SQL.
Я использую параллельную обработку, но должен отметить, что я новичок в C#, а также в параллельных вычислениях. Большая часть кода, который я здесь создал, была создана с помощью ChatGPT и Google.
Теперь моя программа прочитает каталог, соберет все файлы с расширением «.parquet» и сохранит их в нем. массив строк.

Код: Выделить всё

string[] fileList = GetParquetFiles(activeDirectory[0]);
Эти файлы будут считываться параллельно, и я использую SemaphoreSlim, чтобы ограничить количество активных параллельных потоков.

Код: Выделить всё

public static async Task ProcessParquetFileAsync(string[] fileList, string databaseName)
{
int numberOfConcurrentFiles = 2;
using (SemaphoreSlim semaphore = new SemaphoreSlim(numberOfConcurrentFiles))
{
List tasks = new List();
foreach (var file in fileList)
{
await semaphore.WaitAsync();
tasks.Add(Task.Run(async () =>
{
try
{
await ReadAndDeployParquetFile(file, databaseName);
}
finally
{
semaphore.Release();
}
}));
}
await Task.WhenAll(tasks);
}

}
Давайте возьмем поток из 1 такого потока.
Внутри этого потока я читаю весь файл Parquet как таблицу (для чтения я использую библиотеку Parquet.NET).
В каждом потоке я полностью читаю ParquetTable и копирую схему в DataTable (только схема, без данных).
Далее: Я рассчитываю пакетный размер для разделения и чтения ParquetTable на «куски».
Эти фрагменты данных снова обрабатываются параллельно с помощью SemaphoreSlim

Код: Выделить всё

public static async Task ReadAndDeployParquetFile(string filePath, string databasename)
{
using (ParquetReader reader = await ParquetReader.CreateAsync(filePath, null))
{
string tableName = GetTableName(filePath);
Table parquetTable = await reader.ReadAsTableAsync();
DataTable dataTable = new DataTable();

string sql = $"CREATE TABLE {tableName} (";
foreach(Field field in parquetTable.Schema.Fields)
{

DataField? ptField = field as DataField;
string columnName = ptField.Name;
Type columnType = ptField.ClrType;
dataTable.Columns.Add(columnName, columnType);
sql += $"[{columnName}] {GetSqlDataType(columnType, field)},";
}
sql = sql.Trim(',') + ')';
SQLConnection conn = new SQLConnection();
conn.ExecuteSqlCommand(sql, tableName, databasename);

int rowCount = parquetTable.Count;
int batchSize = 100000;
decimal parts = Decimal.Ceiling((decimal)rowCount / (decimal)batchSize);

SemaphoreSlim semaphore = new SemaphoreSlim(Environment.ProcessorCount);
List tasks = new List();
Console.WriteLine($"File {tableName} has total batch {(int)parts}");
for (int i= 0; i < (int)parts; i++)
{
await semaphore.WaitAsync();
int currentPart = i;
tasks.Add(Task.Run (() =>
{
try
{
ProcessBatch(parquetTable, dataTable.Clone(), currentPart, batchSize, tableName, databasename);
}
finally
{
semaphore.Release();
}
}));
}
await Task.WhenAll(tasks);

}
}
Наконец, строка за строкой добавляется в новую таблицу DataTable, называемую partTable, которой задается каждый подпоток (схема основной таблицы DataTable клонируется и передается по ней).

Код: Выделить всё

public static void ProcessBatch(Table parquetTable, DataTable partTable, int currentPart, int batchSize, string tableName, string databaseName)
{
SQLConnection conn = new SQLConnection();
int columnCount = parquetTable.Schema.Fields.Count;
for (int i = currentPart * batchSize; (i < ((currentPart + 1) * batchSize)) && (i < parquetTable.Count); i++)
{
var row = parquetTable[i];
var dataRow = partTable.NewRow();
for (int j = 0; j < columnCount; j++)
{
if (row[j] != null)
{
dataRow[j] = row[j] ?? DBNull.Value;
}
}
partTable.Rows.Add(dataRow);
}
conn.InsertTable(tableName, partTable, databaseName, currentPart);
partTable.Dispose();
}
Теперь проблема в том, что есть файл паркета, содержащий 2 миллиона строк. Размер чанка, который я указал, составляет 100 КБ, поэтому теперь он будет создавать 10 пакетов и запускать их параллельно, но одновременно поддерживать активными только 8 потоков (Environment.ProcessorCount на моем компьютере равен 8) и запускать оставшиеся 2, когда любой из 8 освобождает (поправьте меня, если я ошибаюсь).
Сам файл весит 24 МБ, но использование оперативной памяти достигает 3 ГБ! Как?
Я понимаю, как работает программа:
Когда выполняется 1 подпоток, она должна освободить всю свою память. Но похоже, что этого не происходит.
Я использовал приложение dotMemory, чтобы проверить использование памяти, и потребление оперативной памяти продолжает расти и никогда не снижается.
< img alt="введите описание изображения здесь" src="https://i.sstatic.net/1c5nt3Lo.png" />
Может ли кто-нибудь помочь мне понять, почему память не очищается? после завершения работы подпотока, а также поможете мне исправить код, чтобы уменьшить использование оперативной памяти?
Опять же, я очень новичок в C# и еще более новичок в параллельных вычислениях, поэтому, пожалуйста, будьте осторожны с объяснениями.
РЕДАКТИРОВАТЬ: Исправлен номер переменной patchSize, который ошибочно был установлен как 10 КБ вместо 100 КБ.

Подробнее здесь: https://stackoverflow.com/questions/786 ... o-much-ram
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «C#»