Я использую параллельную обработку, но должен отметить, что я новичок в C#, а также в параллельных вычислениях. Большая часть кода, который я здесь создал, была создана с помощью ChatGPT и Google.
Теперь моя программа прочитает каталог, соберет все файлы с расширением «.parquet» и сохранит их в нем. массив строк.
Код: Выделить всё
string[] fileList = GetParquetFiles(activeDirectory[0]);
Код: Выделить всё
public static async Task ProcessParquetFileAsync(string[] fileList, string databaseName)
{
int numberOfConcurrentFiles = 2;
using (SemaphoreSlim semaphore = new SemaphoreSlim(numberOfConcurrentFiles))
{
List tasks = new List();
foreach (var file in fileList)
{
await semaphore.WaitAsync();
tasks.Add(Task.Run(async () =>
{
try
{
await ReadAndDeployParquetFile(file, databaseName);
}
finally
{
semaphore.Release();
}
}));
}
await Task.WhenAll(tasks);
}
}
Внутри этого потока я читаю весь файл Parquet как таблицу (для чтения я использую библиотеку Parquet.NET).
В каждом потоке я полностью читаю ParquetTable и копирую схему в DataTable (только схема, без данных).
Далее: Я рассчитываю пакетный размер для разделения и чтения ParquetTable на «куски».
Эти фрагменты данных снова обрабатываются параллельно с помощью SemaphoreSlim
Код: Выделить всё
public static async Task ReadAndDeployParquetFile(string filePath, string databasename)
{
using (ParquetReader reader = await ParquetReader.CreateAsync(filePath, null))
{
string tableName = GetTableName(filePath);
Table parquetTable = await reader.ReadAsTableAsync();
DataTable dataTable = new DataTable();
string sql = $"CREATE TABLE {tableName} (";
foreach(Field field in parquetTable.Schema.Fields)
{
DataField? ptField = field as DataField;
string columnName = ptField.Name;
Type columnType = ptField.ClrType;
dataTable.Columns.Add(columnName, columnType);
sql += $"[{columnName}] {GetSqlDataType(columnType, field)},";
}
sql = sql.Trim(',') + ')';
SQLConnection conn = new SQLConnection();
conn.ExecuteSqlCommand(sql, tableName, databasename);
int rowCount = parquetTable.Count;
int batchSize = 100000;
decimal parts = Decimal.Ceiling((decimal)rowCount / (decimal)batchSize);
SemaphoreSlim semaphore = new SemaphoreSlim(Environment.ProcessorCount);
List tasks = new List();
Console.WriteLine($"File {tableName} has total batch {(int)parts}");
for (int i= 0; i < (int)parts; i++)
{
await semaphore.WaitAsync();
int currentPart = i;
tasks.Add(Task.Run (() =>
{
try
{
ProcessBatch(parquetTable, dataTable.Clone(), currentPart, batchSize, tableName, databasename);
}
finally
{
semaphore.Release();
}
}));
}
await Task.WhenAll(tasks);
}
}
Код: Выделить всё
public static void ProcessBatch(Table parquetTable, DataTable partTable, int currentPart, int batchSize, string tableName, string databaseName)
{
SQLConnection conn = new SQLConnection();
int columnCount = parquetTable.Schema.Fields.Count;
for (int i = currentPart * batchSize; (i < ((currentPart + 1) * batchSize)) && (i < parquetTable.Count); i++)
{
var row = parquetTable[i];
var dataRow = partTable.NewRow();
for (int j = 0; j < columnCount; j++)
{
if (row[j] != null)
{
dataRow[j] = row[j] ?? DBNull.Value;
}
}
partTable.Rows.Add(dataRow);
}
conn.InsertTable(tableName, partTable, databaseName, currentPart);
partTable.Dispose();
}
Сам файл весит 24 МБ, но использование оперативной памяти достигает 3 ГБ! Как?
Я понимаю, как работает программа:
Когда выполняется 1 подпоток, она должна освободить всю свою память. Но похоже, что этого не происходит.
Я использовал приложение dotMemory, чтобы проверить использование памяти, и потребление оперативной памяти продолжает расти и никогда не снижается.
< img alt="введите описание изображения здесь" src="https://i.sstatic.net/1c5nt3Lo.png" />
Может ли кто-нибудь помочь мне понять, почему память не очищается? после завершения работы подпотока, а также поможете мне исправить код, чтобы уменьшить использование оперативной памяти?
Опять же, я очень новичок в C# и еще более новичок в параллельных вычислениях, поэтому, пожалуйста, будьте осторожны с объяснениями.
РЕДАКТИРОВАТЬ: Исправлен номер переменной пакетного размера, который ошибочно был установлен как 10 КБ вместо 100 КБ.
Подробнее здесь: https://stackoverflow.com/questions/786 ... o-much-ram