Я использую веб -API ASP.net Core 8 для чтения событий из Azure EventHub. Служба, которая читает события, находится внутри хостинганга .
Перед обработкой каждого события мне нужно проверить здоровье внешнего API с использованием Polly для повторных изданий. Если API не работает после всех попыток повторения, я пытаюсь остановить обработку EventHub в течение нескольких минут. Тем не менее, приложение висит бесконечно при остановке EventHub. Он не терпит неудачу и не бросает никаких исключений - он просто застрял. > В результате этот метод
будет выполнять сетевой ввод -вывод и может потребоваться ждать раздела
, которые были активны для завершения. < /P>
< /blockquote>
Сводка: < /p>
[*] Я читаю события из Azure EventHub < /li>
Я проверяю здоровье API, используя Polly's Retry Механизм
[*] Если Полли исчерпает все повторные данные, и API остается недоступным, я останавливаю EventHub
[*] Приложение висит бесконечно после остановки EventHub
< /ul>
Вызов stopprocessingAsync () < /code>, по -видимому, заставляет процессор EventHub повесить бесконечно. Если так, как я могу правильно прекратить обработку и возобновить позже?public class EventHubServiceTask : IEventHubServiceTask
{
private CancellationToken _startCancellationToken;
public EventHubServiceTask()
{
var storageClient = new BlobContainerClient(_blobStorageSettings.ConnectionString, _blobStorageSettings.ContainerName);
_eventProcessorClient = new EventProcessorClient(storageClient, EventHubConsumerClient.DefaultConsumerGroupName, _eventHubSettings.ConnectionString, _eventHubSettings.EventHubName);
_eventProcessorClient.ProcessEventAsync += ProcessEventHandler;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
_startCancellationToken = cancellationToken;
await _eventProcessorClient.StartProcessingAsync(cancellationToken);
}
private async Task ProcessEventHandler(ProcessEventArgs eventArgs)
{
MyEvent? myevent= JsonConvert.DeserializeObject(eventArgs.Data.EventBody.ToString()) ?? new MyEvent();
if (await ServiceHealthCheckNotAvailable(myevent.CorrelationId, eventArgs.CancellationToken))
{
await HandleHealthCheckFailureAsync(myevent.CorrelationId, eventArgs.CancellationToken);
}
await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
}
private async Task ServiceHealthCheckNotAvailable(Guid correlationId, CancellationToken cancellationToken)
{
return !await CheckHealthAsync(correlationId, cancellationToken);
}
private async Task HandleHealthCheckFailureAsync(Guid correlationId, CancellationToken cancellationToken)
{
try
{
await _eventProcessorClient.StopProcessingAsync(_startCancellationToken);
await Task.Delay(TimeSpan.FromSeconds(_retryPolicySettings.EventHubSleepTimeSeconds), cancellationToken);
await _eventProcessorClient.StartProcessingAsync(cancellationToken);
}
catch (Exception e)
{
throw;
}
}
private async Task CheckHealthAsync(Guid correlationId, CancellationToken cancellationToken)
{
var policy = Policy
.HandleResult(result => !result)
.WaitAndRetryAsync(_retryPolicySettings.RetryCount, retryAttempt => TimeSpan.FromSeconds(retryAttempt));
return await policy.ExecuteAsync(async () => await _service.IsHealthyAsync(correlationId, cancellationToken));
}
}
Подробнее здесь: https://stackoverflow.com/questions/794 ... lient-stop
Почему обработка Azure EventHub видит при остановке (EventProcessorClient.stopprocessingAsync)? ⇐ C#
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение