Если я обработаю рабочую нагрузку 1K документов, он завершается через 2 минуты. Если я обработаю рабочую нагрузку 5K документов, он завершается за 53 минуты. Работа распространяется по 20 мероприятиям, и я вижу, как 10 случаев вращаются, чтобы справиться с ними. Каждое действие получает файлы списка, которые представляют 1/20 от работы. Для рабочей нагрузки документа 5K, если я генерирую полный список 5K -файлов, но прохожу первую 1K для обработки (все еще распределенной по 20 действиям), теперь для его завершения требуется 5 минут. Таким образом, все, что вызывает замедление, сохраняется, даже если на самом деле меньше работы выполняется. Я пытаюсь запустить его в большем масштабе без огромных переписываний. Нет доказательств использования памяти, ползучего. Каждая строка в списке работы должна составлять менее 50 символов, а оркестратор запускает. Единственная изменяющаяся переменная - это рабочая нагрузка 5K удерживается в списке 5K имен файлов. < /Li>
Это все данные образца. Все файлы идентичны (размер 250 КБ), а список 5K - это лишь список 1K, повторяемый 5 раз (с новыми именами файлов). < /Li>
Производительность страдает как при запуске на моем компьютере, так и при развертывании в Azure. < /Li>
< /ul>
Code: < /p>
< /ul>
: < /p>
< /ul>
: < /p>
< /ul>
: < /p>
< /ul>
.[Function(nameof(ProcessDocuments))]
public async Task ProcessDocuments(
[OrchestrationTrigger] TaskOrchestrationContext context,
int uploadId)
{
bool isValid = await context.CallActivityAsync(nameof(VerifyTemplateData), uploadId);
if (isValid)
{
List documentsData =
(await context.CallActivityAsync(nameof(GetExcelTemplate), uploadId)).ToList();
if (documentsData.Count > 0)
{
List parallelTasks = new List();
try
{
int maxConcurrency = _serviceConfig.MaxProcessingConcurrency;
int batchSize = Math.Max((documentsData.Count / maxConcurrency), 1);
for (int i = 0; i < documentsData.Count; i += batchSize)
{
int cappedBatchSize = Math.Min(batchSize, documentsData.Count - i);
List processingBatch = documentsData.GetRange(i, cappedBatchSize);
HandleDocumentProcessingPayload payload = new HandleDocumentProcessingPayload()
{
UploadId = uploadId,
DocumentNames = processingBatch
};
parallelTasks.Add(context.CallActivityAsync(nameof(HandleDocumentProcessing), payload));
}
// Finish any remaining tasks
await Task.WhenAll(parallelTasks);
Upload upload = await context.CallActivityAsync(nameof(GetUpload), uploadId);
MailData mailData = new MailData() { UploadId = uploadId, UserID = upload.CreatedBy };
await context.CallActivityAsync(nameof(SendMail), mailData);
}
catch (Exception ex)
{
_logger.LogError(ex, "An error occurred while uploading documents for upload {uploadId}.",
uploadId);
}
}
}
return context.InstanceId;
}
[Function(nameof(HandleDocumentProcessing))]
public async Task HandleDocumentProcessing([ActivityTrigger] HandleDocumentProcessingPayload payload, FunctionContext executionContext)
{
try
{
foreach (string documentData in payload.DocumentNames)
{
await UpdateDocument(documentData, payload.UploadId);
}
}
catch (Exception ex)
{
executionContext.GetLogger(nameof(HandleDocumentProcessing)).LogError(ex, "An error occurred while uploading a document batch.");
}
}
public async Task UpdateDocument(string fileName, int uploadId)
{
DocumentResponse response = new DocumentResponse() { IsDocumentUploaded = true };
try
{
response.fileName = fileName;
var document = await _documentService.UpdateDocumentContent(fileName, uploadId);
if (document is not null)
{
Repository? targetRepo = await _uploadService.GetRepository(uploadId);
if (targetRepo == null)
{
throw new Exception(
$"Unexpected error occurred. Could not find repo associated with upload '{uploadId}'. Canceling document processing for document {fileName}.");
}
bool duplicateFileFound = await _documentService.FilenameExistsInRepo(fileName, targetRepo.RepositoryId, uploadId);
if (duplicateFileFound)
{
await _uploadService.AddUploadLogError(uploadId, "This filename is already present in the repository.", document.DocumentId);
}
}
else
{
await _uploadService.AddUploadLogError(uploadId, fileName + " file name is in the Keystone but not uploaded");
response.IsDocumentUploaded = false;
}
}
catch (Exception ex)
{
await _uploadService.AddUploadLogError(uploadId, ex.Message);
response.IsDocumentUploaded = false;
response.ErrorMessage = ex.Message;
}
return response;
}
public async Task UpdateDocumentContent(string fileName, int uploadId)
{
string documentName = string.Empty;
try
{
Document? document = _dbContext.Documents.FirstOrDefault(x => x.UploadId == uploadId && x.OriginalDocumentName == fileName);
if (document is null) return document;
documentName = document.AzureBlobDocumentName;
byte[] bytes;
using (MemoryStream stream = await _blobService.DownloadFile(documentName, _blobStorageSettings.Value.UploadFilesContainerName))
{
bytes = stream.ToArray();
}
if (document.DocumentType == 2)
{
document.Content = ExtractTextFromPdf(bytes);
}
else if (document.DocumentType == 1)
{
document.Content = ExtractTextFromExcel(bytes);
}
document.IsProcessed = true;
_dbContext.Update(document);
await _dbContext.SaveChangesAsync();
return document;
}catch(Exception ex)
{
EventId errorEvent = new EventId(1000, "Document parsing error");
_logger.Log(LogLevel.Error, errorEvent, ex, $"Error occurred while parsing document '{documentName}'.");
throw;
}
}
Подробнее здесь: https://stackoverflow.com/questions/796 ... n-a-larger
Функция azure долговечное время выполнения задачи значительно замедляется при обработке более крупной рабочей нагрузки ⇐ C#
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение