tl; DR версия < /strong> < /p>
Следующий кусок кода вызывает огромное бремя памяти. Библиотека: paruqet.net < /p>
Код: Выделить всё
using (ParquetReader reader = await ParquetReader.CreateAsync(filePath, null))
{
string tableName = GetTableName(filePath);
Table parquetTable = await reader.ReadAsTableAsync(); //Main culprit for huge RAM consumption
DataTable dataTable = new DataTable();
string sql = $"CREATE TABLE {tableName} (";
foreach(Field field in parquetTable.Schema.Fields)
{
DataField? ptField = field as DataField;
string columnName = ptField.Name;
Type columnType = ptField.ClrType;
dataTable.Columns.Add(columnName, columnType);
sql += $"[{columnName}] {GetSqlDataType(columnType, field)},";
}
Полная история [/b]
Чтобы дать обзор моего проекта, я пишу программу, и записываемы, и записываемы, и записывающие файлы, и все считывает, и считывает, что считывает, и считывает, что считывает, и выполняет документы. Соединение SQL и использование SQLBULKCOPY дает данные на SQL Server (SQL Server 2019, LocalDB, расположенный на той же машине). Большая часть кода, который я создал здесь, заключалась в использовании CHATGPT и Googling.string[] fileList = GetParquetFiles(activeDirectory[0]);
< /code>
Эти файлы будут прочитаны параллельно, и я использую Semaphoreslim для ограничения количества активных параллельных потоков. < /p>
public static async Task ProcessParquetFileAsync(string[] fileList, string databaseName)
{
int numberOfConcurrentFiles = 2;
using (SemaphoreSlim semaphore = new SemaphoreSlim(numberOfConcurrentFiles))
{
List tasks = new List();
foreach (var file in fileList)
{
await semaphore.WaitAsync();
tasks.Add(Task.Run(async () =>
{
try
{
await ReadAndDeployParquetFile(file, databaseName);
}
finally
{
semaphore.Release();
}
}));
}
await Task.WhenAll(tasks);
}
}
< /code>
Давайте возьмем поток из 1 такой потока.
Внутри этого потока я читаю весь файл Parquet в виде таблицы (я использую библиотеку Parquet.net для чтения). < /p>
В каждом потоке я читаю Parquettable полностью и копирую Schema в DataTable (просто Schema, не данные). Разделяйтесь, чтобы разделить и читать паркеру, подлежащие «кускам».public static async Task ReadAndDeployParquetFile(string filePath, string databasename)
{
using (ParquetReader reader = await ParquetReader.CreateAsync(filePath, null))
{
string tableName = GetTableName(filePath);
Table parquetTable = await reader.ReadAsTableAsync();
DataTable dataTable = new DataTable();
string sql = $"CREATE TABLE {tableName} (";
foreach(Field field in parquetTable.Schema.Fields)
{
DataField? ptField = field as DataField;
string columnName = ptField.Name;
Type columnType = ptField.ClrType;
dataTable.Columns.Add(columnName, columnType);
sql += $"[{columnName}] {GetSqlDataType(columnType, field)},";
}
sql = sql.Trim(',') + ')';
SQLConnection conn = new SQLConnection();
conn.ExecuteSqlCommand(sql, tableName, databasename);
int rowCount = parquetTable.Count;
int batchSize = 100000;
decimal parts = Decimal.Ceiling((decimal)rowCount / (decimal)batchSize);
SemaphoreSlim semaphore = new SemaphoreSlim(Environment.ProcessorCount);
List tasks = new List();
Console.WriteLine($"File {tableName} has total batch {(int)parts}");
for (int i= 0; i < (int)parts; i++)
{
await semaphore.WaitAsync();
int currentPart = i;
tasks.Add(Task.Run (() =>
{
try
{
ProcessBatch(parquetTable, dataTable.Clone(), currentPart, batchSize, tableName, databasename);
}
finally
{
semaphore.Release();
}
}));
}
await Task.WhenAll(tasks);
}
}
< /code>
Наконец, он добавляется строка за строкой в новый DataTable, называемый Parttable, который приведен каждый подраздел (схема основного данных клонируется и отправляется через). < /p>
public static void ProcessBatch(Table parquetTable, DataTable partTable, int currentPart, int batchSize, string tableName, string databaseName)
{
SQLConnection conn = new SQLConnection();
int columnCount = parquetTable.Schema.Fields.Count;
for (int i = currentPart * batchSize; (i < ((currentPart + 1) * batchSize)) && (i < parquetTable.Count); i++)
{
var row = parquetTable;
var dataRow = partTable.NewRow();
for (int j = 0; j < columnCount; j++)
{
if (row[j] != null)
{
dataRow[j] = row[j] ?? DBNull.Value;
}
}
partTable.Rows.Add(dataRow);
}
conn.InsertTable(tableName, partTable, databaseName, currentPart);
partTable.Dispose();
}
< /code>
Теперь проблема заключается в том, что существует паркетный файл, который составляет 2 миллиона строк. Размер чанка, который я дал, составляет 100 тыс., Так что теперь он будет делать 10 партий и запустить их параллельно, но сохраняет только 8 потоков активными за раз (среда. Processorcount - это 8 в моем компьютере) и запустите оставшиеся 2, когда какая -либо из 8 свобод (исправить меня, если я ошибаюсь здесь). < /P>
Сам файл - это 24mb, но Ram usage usage - до 3GB! Как?
Мое понимание того, как работает программа, это
Когда 1 подпоточная рубашка сделана, она должна освободить всю его память. Но кажется, что этого не происходит. Память не проясняется после того, как задание подпоточной нагрузки будет выполнено, а также поможет мне исправить код, чтобы уменьшить использование оперативной памяти?>
Подробнее здесь: https://stackoverflow.com/questions/786 ... o-much-ram