Однако раз в день, обычно в 06:00, отправка сообщений занимает очень много времени. Примерно 1-2 минуты. Тогда из-за этого и конечный автомат дает сбой) по моему срок действия какого-то сообщения истекает. В противном случае сообщения обрабатываются за секунды. Какое-то время я думал, что это из-за холодного запуска. Но сообщение медленно доходит до Worker и WebApi. Я в отчаянии по этому поводу, потому что не могу найти больше информации. Эти журналы представляют собой информацию из потребительского метода. Если перед журналом нет другого кода.
Я не вижу ошибок в журнале. Единственное, что есть в журнале, это периодические сообщения. Это известная и весьма неприятная проблема.
Результат: Отменено
Исключение: Grpc.Core.RpcException: Status(StatusCode="Cancelled", Detail ="Вызов отменен клиентом.", DebugException="System.OperationCanceledException: операция была отменена.")
---> System.OperationCanceledException: операция была отменена.
--- Конец трассировка внутреннего стека исключений ---
в Microsoft.Azure.Functions.Worker.ServiceBusMessageActions.CompleteMessageAsync(сообщение ServiceBusReceivedMessage, CancellationToken cancelToken) в D:\a_work\1\s\extensions\Worker.Extensions.ServiceBus\src\ ServiceBusMessageActions.cs:строка 78
at Worker.FunctionApp.DefaultServiceBusFunction.AsTopicResultWithCompleteMessage[TConsumer](сообщение ServiceBusReceivedMessage, ServiceBusMessageActions messageActions, String theme, String subs, CancellationToken cancelToken) в D:\a\1\s\Src\Worker .FunctionApp\DefaultServiceBusFunction.cs:строка 50
в Worker.FunctionApp.Started.Run(сообщение ServiceBusReceivedMessage, ServiceBusMessageActions messageActions, CancellationToken cancelToken) в D:\a\1\s\Src\Worker.FunctionApp\Started.cs :строка 24
в Worker.FunctionApp.DirectFunctionExecutor.ExecuteAsync(контекст FunctionContext) в D:\a\1\s\Src\Worker.FunctionApp\obj\Release\net8.0\Microsoft.Azure.Functions.Worker .Sdk.Generators\Microsoft.Azure.Functions.Worker.Sdk.Generators.FunctionExecutorGenerator\GeneratedFunctionExecutor.g.cs:строка 39
в Microsoft.Azure.Functions.Worker.OutputBindings.OutputBindingsMiddleware.Invoke(FunctionContext context, FunctionExecutionDelegate следующий) в D:\a_work\1\s\src\DotNetWorker.Core\OutputBindings\OutputBindingsMiddleware.cs:строка 13
в Microsoft.Azure.Functions.Worker.FunctionsApplication.InvokeFunctionAsync(контекст FunctionContext) в D:\ a_work\1\s\src\DotNetWorker.Core\FunctionsApplication.cs:строка 91
в Microsoft.Azure.Functions.Worker.Handlers.InvokeHandler.InvokeAsync(запрос InvokeRequest) в D:\a_work\1\s\ src\DotNetWorker.Grpc\Handlers\IndictionHandler.cs:строка 88
Стек: в Microsoft.Azure.Functions.Worker.ServiceBusMessageActions.CompleteMessageAsync(сообщение ServiceBusReceivedMessage, CancellationToken cancelToken) в D:\a_work\1\s\extensions \Worker.Extensions.ServiceBus\src\ServiceBusMessageActions.cs:строка 78
at Worker.FunctionApp.DefaultServiceBusFunction.AsTopicResultWithCompleteMessage[TConsumer](сообщение ServiceBusReceivedMessage, ServiceBusMessageActions messageActions, строка темы, строковые подпрограммы, CancellationToken cancelToken) в D:\ a\1\s\Src\Worker.FunctionApp\DefaultServiceBusFunction.cs:строка 50
в Worker.FunctionApp.Started.Run(сообщение ServiceBusReceivedMessage, ServiceBusMessageActions messageActions, CancellationToken cancelToken) в D:\a\1\s\ Src\Worker.FunctionApp\Started.cs:строка 24
в Worker.FunctionApp.DirectFunctionExecutor.ExecuteAsync(контекст FunctionContext) в D:\a\1\s\Src\Worker.FunctionApp\obj\Release\net8. 0\Microsoft.Azure.Functions.Worker.Sdk.Generators\Microsoft.Azure.Functions.Worker.Sdk.Generators.FunctionExecutorGenerator\GeneratedFunctionExecutor.g.cs:строка 39
в Microsoft.Azure.Functions.Worker.OutputBindings .OutputBindingsMiddleware.Invoke(контекст FunctionContext, FunctionExecutionDelegate далее) в D:\a_work\1\s\src\DotNetWorker.Core\OutputBindings\OutputBindingsMiddleware.cs:строка 13
в Microsoft.Azure.Functions.Worker.FunctionsApplication. InvokeFunctionAsync(контекст FunctionContext) в D:\a_work\1\s\src\DotNetWorker.Core\FunctionsApplication.cs:строка 91
в Microsoft.Azure.Functions.Worker.Handlers.IndictionHandler.InvokeAsync(запрос InvokeRequest) в D:\a_work\1\s\src\DotNetWorker.Grpc\Handlers\IndictionHandler.cs:строка 88
Спасибо
РЕДАКТИРОВАТЬ:
Я также использую MassTransit в WebApi и Worker.
WebApi:
services.AddMassTransit(o =>
{
o.AddConsumers(Assembly.Load("Messaging"));
o.AddSagaStateMachine()
.InMemoryRepository();
o.UsingAzureServiceBus(async (context, cfg) =>
{
cfg.Host(new HostSettings
{
ServiceUri = new Uri("sb://´.."),
TokenCredential = new DefaultAzureCredential()
});
cfg.ReceiveEndpoint(formatter.Saga(), e =>
{
e.ConfigureSaga(context);
});
cfg.SubscriptionEndpoint(formatter.Consumer(), e =>
{
e.ConfigureConsumer(context);
});
// Calculate report
await CreateOrUpdateTopicWithSubs(adminClient, formatter.MessageName(), formatterWorker.ConsumerFromMessageName());
});
});
Рабочий:
services.AddMassTransitForAzureFunctions(cfg =>
{
cfg.AddConsumers(Assembly.Load("Messaging"));
}, "ServiceBusConnection", (context, configuration) =>
{
var settings = new HostSettings
{
ServiceUri = new Uri("sb://...."),
TokenCredential = new DefaultAzureCredential(),
};
configuration.Host(settings);
});
host.json
{
"version": "2.0",
"logging": {
"logLevel": {
"Default": "Trace",
"System": "Information",
"Microsoft": "Information"
},
"applicationInsights": {
"samplingSettings": {
"isEnabled": true,
"excludedTypes": "Request;Exception",
"maxTelemetryItemsPerSecond": 20
},
"enableLiveMetricsFilters": true,
"logLevel": {
"Default": "Information"
}
}
},
"extensions": {
"serviceBus": {
"prefetchCount": 32,
"messageHandlerOptions": {
"autoComplete": false,
"maxConcurrentCalls": 32,
"maxAutoRenewDuration": "00:30:00"
}
},
"eventHub": {
"maxBatchSize": 64,
"prefetchCount": 256,
"batchCheckpointFrequency": 1
}
}
}
StartedFunction:
public class StartedFunction : DefaultServiceBusFunction
{
public StartedFunction(IMessageReceiver receiver) : base(receiver)
{
}
[Microsoft.Azure.Functions.Worker.Function(nameof(StartedFunction))]
public async Task Run([Microsoft.Azure.Functions.Worker.ServiceBusTrigger(_topicName, _subscriptionName, Connection = _connection)] ServiceBusReceivedMessage message,
ServiceBusMessageActions messageActions, CancellationToken cancellationToken)
{
await AsTopicResultWithCompleteMessage(message, messageActions, _topicName, _subscriptionName, cancellationToken);
}
}
DefaultServiceBusFunction:
public abstract class DefaultServiceBusFunction
{
private readonly IMessageReceiver _receiver;
public bool IsComplete { get; private set; } = false;
public DefaultServiceBusFunction(IMessageReceiver receiver)
{
_receiver = receiver;
}
public async Task AsTopicResultWithCompleteMessage(ServiceBusReceivedMessage message, ServiceBusMessageActions messageActions, string topic, string subs, CancellationToken cancellationToken) where TConsumer : class, IConsumer
{
try
{
await _receiver.HandleConsumer(topic, subs, message, cancellationToken);
if (!IsComplete)
{
await messageActions.CompleteMessageAsync(message, cancellationToken);
IsComplete = true;
}
}
catch
{
if (!IsComplete)
{
await messageActions.CompleteMessageAsync(message, cancellationToken);
IsComplete = true;
}
throw;
}
}
}
Подробнее здесь: https://stackoverflow.com/questions/790 ... to-recieve