Я в значительной степени читаю запись большого файла (исходного кода). по записи и записываю его во временный локальный файл.
Когда количество записей достигает определенного предела, я принудительно сбрасываю модуль записи, закрываю его, удаляю, чтобы продолжить чтение. источник и записать в новый.
При переходе к новому файлу я хочу начать отправку сгенерированного временного файла через Интернет.
Цель состоит в том, чтобы я мог читать/анализировать исходный код и загружать пакетные файлы как можно быстрее, занимая при этом минимально возможное место в памяти и на диске.
Мой текущий код выглядит так
Код: Выделить всё
public async Task HandleEventAsync(EntityEventMessage message)
{
var executionGuid = Guid.NewGuid();
var uri = new Uri(message.MessageLocation);
var containerName = uri.Segments[1].TrimEnd('/');
var blobName = string.Join("", uri.Segments[2..]);
var entityName = message.EntityName;
var blobFinalFileName = uri.Segments[^1].TrimEnd('/');
var blobServiceClient = new BlobServiceClient(uri, _tokenCredential);
var blobContainerClient = blobServiceClient.GetBlobContainerClient(containerName);
var sourceBlobClient = blobContainerClient.GetBlobClient(blobName);
var blobExists = await sourceBlobClient.ExistsAsync();
if (!blobExists)
{
_logger.LogWarning("The message of the event does not exist. It might have been processed already.");
throw new NoRetryException();
}
var fileCounter = 1;
var totalRecords = 0;
SetUpBatchNames(executionGuid, blobFinalFileName, fileCounter, out var batchFileName, out var tempFilePath);
var tasks = new Queue();
var processTimer = new Stopwatch();
var options = new BlobUploadOptions
{
TransferOptions = new StorageTransferOptions
{
// Set the maximum number of workers that
// may be used in a parallel transfer.
MaximumConcurrency = 2
}
};
processTimer.Start();
using var sourceBlobStream = await sourceBlobClient.OpenReadAsync();
var writer = new StreamWriter(tempFilePath, true, Encoding.UTF8);
try
{
using var streamReader = new StreamReader(sourceBlobStream, Encoding.UTF8);
using var csvParser = new CsvParser(streamReader, _csvConfiguration);
var batchTimer = new Stopwatch();
batchTimer.Start();
while (await csvParser.ReadAsync())
{
await writer.WriteLineAsync(csvParser.RawRecord);
totalRecords++;
if (totalRecords % _options.Value.RecordsPerFile == 0)
{
fileCounter = await PrepareBatchAndUpload(blobName, blobFinalFileName, blobContainerClient, options, fileCounter, batchFileName, tempFilePath, tasks, writer, batchTimer);
// Prepare for new batch
SetUpBatchNames(executionGuid, blobFinalFileName, fileCounter, out batchFileName, out tempFilePath);
writer = new StreamWriter(tempFilePath, true, Encoding.UTF8);
batchTimer.Restart();
}
}
if (totalRecords % _options.Value.RecordsPerFile != 0)
await PrepareBatchAndUpload(blobName, blobFinalFileName, blobContainerClient, options, fileCounter, batchFileName, tempFilePath, tasks, writer, batchTimer);
await Task.WhenAll(tasks);
processTimer.Stop();
_logger.LogDebug("Took '{TotalSplitTimeMs}' ms to split the original file with '{OriginalTotalRecords}' records.", processTimer.ElapsedMilliseconds, totalRecords);
}
finally
{
writer?.Dispose();
// Ensure no temp file is left behind
var dir = new DirectoryInfo(Path.GetTempPath());
var fileNamesToDelete = blobFinalFileName.Replace(".csv", $"_{executionGuid}_*.csv");
foreach (var file in dir.EnumerateFiles(fileNamesToDelete))
{
file.Delete();
}
}
GC.Collect();
static void SetUpBatchNames(Guid executionGuid, string blobFinalFileName, int fileCounter, out string batchFileName, out string tempFilePath)
{
batchFileName = blobFinalFileName.Replace(".csv", $"_{executionGuid}_{fileCounter}.csv");
tempFilePath = $"{Path.GetTempPath()}{batchFileName}";
}
async Task PrepareBatchAndUpload(string blobName, string blobFinalFileName, BlobContainerClient blobContainerClient, BlobUploadOptions options, int fileCounter, string? batchFileName, string? tempFilePath, Queue tasks, StreamWriter writer, Stopwatch batchTimer)
{
batchTimer.Stop();
_logger.LogDebug($"Took '{batchTimer.Elapsed}' to parse the batch.");
_logger.LogDebug($"Creating temp file '{tempFilePath}'.");
await writer.FlushAsync();
writer.Close();
await writer.DisposeAsync();
// Ensure batch file is in same (virtual)folder as the source
var batchBlobName = blobName.Replace(blobFinalFileName, batchFileName);
tasks.Enqueue(UploadBatchFileAsync(batchBlobName, tempFilePath, blobContainerClient, options));
fileCounter++;
return fileCounter;
}
}
private async Task UploadBatchFileAsync(string blobName, string filePath, BlobContainerClient containerClient, BlobUploadOptions options)
{
var blobClient = containerClient.GetBlobClient(blobName);
_logger.LogDebug($"Uploading blob: '{blobClient.Uri}'.");
using (var fileStream = File.OpenRead(filePath))
{
await blobClient.UploadAsync(fileStream, options);
}
_logger.LogDebug($"Blob uploaded: '{blobClient.Uri}'.");
//If I do not put Task.Run, delete does not happen after the upload.
await Task.Run(() => File.Delete(filePath));
}
- plain List().Add(UploadBatchFileAsync(...)) + await Task.WhenAll
- выполните синхронизацию UploadBatchFile и создайте пул потоков для запуска нескольких (ThreadPool.QueueUserWorkItem)
- следуйте https://learn.microsoft. com/en-us/dotnet/api/system.threading.tasks.taskscheduler?view=net-8.0&redirectedfrom=MSDN
- Task.Factory.StartNew
- Tinkle с BlobUploadOptions.TransferOptions.MaximumConcurrency
Мое текущее «решение» оказывает некоторое влияние на чтение/запись, где обычно требуется 45 секунд на каждые 100 тысяч строк, но оно увеличивается до ~ 60 секунд.
Я понимаю, что в какой-то момент, и поскольку я выполняю локальную отладку, мои локальные процессы на компьютере могут оказать некоторое влияние, но я не могу понять, какой способ действительно лучший, и что-то подсказывает мне, что мне следует это сделать. можно добиться более высокой общей производительности или, по крайней мере, загрузить файлы, не влияя на чтение/запись.
Подробнее здесь: https://stackoverflow.com/questions/788 ... -footprint