Я создаю платформу в .NET Core 8 для регистрации потребителей для Azure Service Bus (аналогично работает Masstransit), но я столкнулся с проблемой, из-за которой мой синглтон ServiceBusManager не поддерживает регистрацию процессора.
Репозиторий Github: https://github.com/matvi/servicebusframework
- Сначала я регистрирую потребителей в ServiceBusManager.AddConsumers и добавляю их в список _processors.
- Затем я запускаю размещенную службу, которая должна иметь тот же экземпляр ServiceManager и должен запустить метод ServiceManager.StartProcessingAsync, но список _processors пуст
builder.Services.AddSingleton();
builder.Services.AddConsumerServiceBusConnection(x =>
{
var topicName = builder.Configuration["ServiceBusSettings:TopicName"];
var subscriptionName = builder.Configuration["ServiceBusSettings:SubscriptionName"];
x.AddConsumer(topicName, subscriptionName);
x.AddConsumer("testtopic", "testsuscription");
});
Метод расширения:
public static class ServiceCollectionExtensions
{
public static IServiceCollection AddConsumerServiceBusConnection(
this IServiceCollection services,
Action configure)
{
// Build the service provider to resolve the singleton instance
using (var serviceProvider = services.BuildServiceProvider())
{
var serviceBusManager = serviceProvider.GetRequiredService();
var builder = new ServiceBusConnectionBuilder(serviceBusManager);
configure(builder);
}
return services;
}
}
ServiceBusConnectionBuilder:
public class ServiceBusConnectionBuilder
{
private readonly IServiceBusManager _serviceBusManager;
public ServiceBusConnectionBuilder(IServiceBusManager serviceBusManager)
{
_serviceBusManager = serviceBusManager;
}
public ServiceBusConnectionBuilder AddConsumer(string topicName, string suscrptionName)
where TConsumer : IServiceBusConsumer
{
_serviceBusManager.AddConsumer(topicName, suscrptionName);
return this;
}
}
ServiceBusManager реализация:
public class ServiceBusManager : IServiceBusManager
{
private readonly ServiceBusClient _serviceBusClient;
private readonly IServiceProvider _serviceProvider;
private readonly ConcurrentDictionary _processors;
public ServiceBusManager(ServiceBusClient serviceBusClient,
IServiceProvider serviceProvider,
IOptions serviceBusSettings)
{
_serviceBusClient = serviceBusClient ?? throw new ArgumentNullException(nameof(serviceBusClient));
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
_processors = new ConcurrentDictionary();
}
public void AddConsumer(string topicName, string subscriptionName) where TConsumer : IServiceBusConsumer
{
if (string.IsNullOrWhiteSpace(topicName))
throw new ArgumentException("Topic name cannot be null or empty.", nameof(topicName));
if (string.IsNullOrWhiteSpace(subscriptionName))
throw new ArgumentException("Subscription name cannot be null or empty.", nameof(subscriptionName));
var processor = _serviceBusClient.CreateProcessor(topicName, subscriptionName, new ServiceBusProcessorOptions
{
AutoCompleteMessages = false,
MaxConcurrentCalls = 1,
PrefetchCount = 10
});
processor.ProcessMessageAsync += async args =>
{
using var scope = _serviceProvider.CreateAsyncScope();
var consumer = scope.ServiceProvider.GetRequiredService();
await consumer.ProcessMessage(args);
};
processor.ProcessErrorAsync += args =>
{
using var scope = _serviceProvider.CreateAsyncScope();
var consumer = scope.ServiceProvider.GetRequiredService();
return consumer.ProcessError(args);
};
if (!_processors.TryAdd($"{topicName}:{subscriptionName}", processor))
{
throw new InvalidOperationException($"Consumer for {topicName}:{subscriptionName} is already registered.");
}
}
public async Task StartProcessingAsync(CancellationToken cancellationToken)
{
var startTasks = _processors.Values.Select(processor => processor.StartProcessingAsync(cancellationToken));
await Task.WhenAll(startTasks);
}
public async Task StopProcessingAsync()
{
var stopTasks = _processors.Values.Select(processor => processor.StopProcessingAsync());
await Task.WhenAll(stopTasks);
}
}
Потребительский интерфейс:
public interface IServiceBusConsumer
{
Task ProcessMessage(ProcessMessageEventArgs args);
Task ProcessError(ProcessErrorEventArgs args);
}
Наконец-то у меня есть размещенный сервис
public class ServiceBusHostedService : CronJobServiceBase
{
private readonly IServiceProvider _serviceProvider;
private AsyncServiceScope _scope;
private IHostedServiceTask _taskService;
public ServiceBusHostedService(
IOptions hostedServiceSettings
, ILogger log,
IServiceProvider serviceProvider) : base(hostedServiceSettings, log)
{
_serviceProvider = serviceProvider;
}
protected override async Task ExecuteTaskAsync(CancellationToken cancellationToken)
{
AppInsights.TrackTrace("Starting EventHubHostedService");
_scope = _serviceProvider.CreateAsyncScope();
_taskService = _scope.ServiceProvider.GetRequiredService();
await _taskService.StartAsync(cancellationToken);
}
protected override async Task DisposeScope()
{
await _taskService.StopAsync(CancellationToken.None);
await _scope.DisposeAsync();
}
}
public class EventServiceBusServiceTask : IEventServiceBusServiceTask
{
private readonly IServiceBusManager _serviceBusManager;
public EventServiceBusServiceTask(IServiceBusManager serviceBusManager)
{
_serviceBusManager = serviceBusManager;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
await _serviceBusManager.StartProcessingAsync(cancellationToken);
}
public async Task StopAsync(CancellationToken cancellationToken)
{
await _serviceBusManager.StopProcessingAsync();
}
}
Подробнее здесь: https://stackoverflow.com/questions/793 ... 8-when-usi
Мобильная версия