Разделение большого файла CSV и многопоточная загрузка с минимальными затратамиC#

Место общения программистов C#
Ответить Пред. темаСлед. тема
Anonymous
 Разделение большого файла CSV и многопоточная загрузка с минимальными затратами

Сообщение Anonymous »

Я пытаюсь разделить большие файлы CSV на файлы меньшего размера и одновременно отправить их в учетную запись хранения Azure.
Я в значительной степени читаю запись большого файла (исходного кода). по записи и записываю его во временный локальный файл.
Когда количество записей достигает определенного предела, я принудительно сбрасываю модуль записи, закрываю его, удаляю, чтобы продолжить чтение. источник и записать в новый.
При переходе к новому файлу я хочу начать отправку сгенерированного временного файла через Интернет.
Цель состоит в том, чтобы я мог читать/анализировать исходный код и загружать пакетные файлы как можно быстрее, занимая при этом минимально возможное место в памяти и на диске.
Мой текущий код выглядит так

Код: Выделить всё

public async Task HandleEventAsync(EntityEventMessage message)
{
var executionGuid = Guid.NewGuid();
var uri = new Uri(message.MessageLocation);
var containerName = uri.Segments[1].TrimEnd('/');
var blobName = string.Join("", uri.Segments[2..]);
var entityName = message.EntityName;
var blobFinalFileName = uri.Segments[^1].TrimEnd('/');

var blobServiceClient = new BlobServiceClient(uri, _tokenCredential);
var blobContainerClient = blobServiceClient.GetBlobContainerClient(containerName);
var sourceBlobClient = blobContainerClient.GetBlobClient(blobName);

var blobExists = await sourceBlobClient.ExistsAsync();
if (!blobExists)
{
_logger.LogWarning("The message of the event does not exist.  It might have been processed already.");
throw new NoRetryException();
}

var fileCounter = 1;
var totalRecords = 0;
SetUpBatchNames(executionGuid, blobFinalFileName, fileCounter, out var batchFileName, out var tempFilePath);
var tasks = new Queue();
var processTimer = new Stopwatch();
var options = new BlobUploadOptions
{
TransferOptions = new StorageTransferOptions
{
// Set the maximum number of workers that
// may be used in a parallel transfer.
MaximumConcurrency = 2
}
};

processTimer.Start();

using var sourceBlobStream = await sourceBlobClient.OpenReadAsync();
var writer = new StreamWriter(tempFilePath, true, Encoding.UTF8);
try
{
using var streamReader = new StreamReader(sourceBlobStream, Encoding.UTF8);
using var csvParser = new CsvParser(streamReader, _csvConfiguration);

var batchTimer = new Stopwatch();
batchTimer.Start();

while (await csvParser.ReadAsync())
{
await writer.WriteLineAsync(csvParser.RawRecord);
totalRecords++;

if (totalRecords % _options.Value.RecordsPerFile == 0)
{
fileCounter = await PrepareBatchAndUpload(blobName, blobFinalFileName, blobContainerClient, options, fileCounter, batchFileName, tempFilePath, tasks, writer, batchTimer);

// Prepare for new batch
SetUpBatchNames(executionGuid, blobFinalFileName, fileCounter, out batchFileName, out tempFilePath);
writer = new StreamWriter(tempFilePath, true, Encoding.UTF8);
batchTimer.Restart();
}
}

if (totalRecords % _options.Value.RecordsPerFile != 0)
await PrepareBatchAndUpload(blobName, blobFinalFileName, blobContainerClient, options, fileCounter, batchFileName, tempFilePath, tasks, writer, batchTimer);

await Task.WhenAll(tasks);

processTimer.Stop();
_logger.LogDebug("Took '{TotalSplitTimeMs}' ms to split the original file with '{OriginalTotalRecords}' records.", processTimer.ElapsedMilliseconds, totalRecords);

}
finally
{
writer?.Dispose();

// Ensure no temp file is left behind
var dir = new DirectoryInfo(Path.GetTempPath());
var fileNamesToDelete = blobFinalFileName.Replace(".csv", $"_{executionGuid}_*.csv");
foreach (var file in dir.EnumerateFiles(fileNamesToDelete))
{
file.Delete();
}
}

GC.Collect();

static void SetUpBatchNames(Guid executionGuid, string blobFinalFileName, int fileCounter, out string batchFileName, out string tempFilePath)
{
batchFileName = blobFinalFileName.Replace(".csv", $"_{executionGuid}_{fileCounter}.csv");
tempFilePath = $"{Path.GetTempPath()}{batchFileName}";
}

async Task PrepareBatchAndUpload(string blobName, string blobFinalFileName, BlobContainerClient blobContainerClient, BlobUploadOptions options, int fileCounter, string? batchFileName, string? tempFilePath, Queue tasks, StreamWriter writer, Stopwatch batchTimer)
{
batchTimer.Stop();
_logger.LogDebug($"Took '{batchTimer.Elapsed}' to parse the batch.");
_logger.LogDebug($"Creating temp file '{tempFilePath}'.");
await writer.FlushAsync();
writer.Close();
await writer.DisposeAsync();

// Ensure batch file is in same (virtual)folder as the source
var batchBlobName = blobName.Replace(blobFinalFileName, batchFileName);
tasks.Enqueue(UploadBatchFileAsync(batchBlobName, tempFilePath, blobContainerClient, options));
fileCounter++;
return fileCounter;
}
}

private async Task UploadBatchFileAsync(string blobName, string filePath, BlobContainerClient containerClient, BlobUploadOptions options)
{
var blobClient = containerClient.GetBlobClient(blobName);

_logger.LogDebug($"Uploading blob: '{blobClient.Uri}'.");

using (var fileStream = File.OpenRead(filePath))
{
await blobClient.UploadAsync(fileStream, options);
}

_logger.LogDebug($"Blob uploaded: '{blobClient.Uri}'.");

//If I do not put Task.Run, delete does not happen after the upload.
await Task.Run(() =>  File.Delete(filePath));
}
Я пробовал несколько вещей, таких как
  • plain List().Add(UploadBatchFileAsync(...)) + await Task.WhenAll
  • выполните синхронизацию UploadBatchFile и создайте пул потоков для запуска нескольких (ThreadPool.QueueUserWorkItem)
  • следуйте https://learn.microsoft. com/en-us/dotnet/api/system.threading.tasks.taskscheduler?view=net-8.0&redirectedfrom=MSDN
  • Task.Factory.StartNew
  • Tinkle с BlobUploadOptions.TransferOptions.MaximumConcurrency
В общем, я либо не получу желаемого «параллельного» поведения, либо не увижу производительность CSV это повлияет на чтение/запись.
Мое текущее «решение» оказывает некоторое влияние на чтение/запись, где обычно требуется 45 секунд на каждые 100 тысяч строк, но оно увеличивается до ~ 60 секунд.
Я понимаю, что в какой-то момент, и поскольку я выполняю локальную отладку, мои локальные процессы на компьютере могут оказать некоторое влияние, но я не могу понять, какой способ действительно лучший, и что-то подсказывает мне, что мне следует это сделать. можно добиться более высокой общей производительности или, по крайней мере, загрузить файлы, не влияя на чтение/запись.

Подробнее здесь: https://stackoverflow.com/questions/788 ... -footprint
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «C#»