Несмотря на то, что он зарегистрирован как синглтон, он создается несколько раз вместо того, чтобы поддерживать один экземпляр во всем приложении. Может ли это быть связано с тем, как служба разрешается или настраивается, особенно при взаимодействии с размещенной службой или построителем соединений?"
Я создаю платформу в .NET Core 8 для регистрации потребителей для служебной шины Azure (аналогично работает Masstransit), но я столкнулся с проблемой, из-за которой мой синглтон ServiceBusManager не поддерживает регистрацию процессора.
Репозиторий Github: https://github.com/matvi/servicebusframework
1. Сначала я регистрирую потребителей в ServiceBusManager.AddConsumers и добавляю их в список _processors.
2.- Затем я запускаю размещенную службу, которая должна иметь тот же экземпляр ServiceManager и должна запускать метод ServiceManager.StartProcessingAsync, но список _processors пусто
builder.Services.AddSingleton();
builder.Services.AddConsumerServiceBusConnection(x =>
{
var topicName = builder.Configuration["ServiceBusSettings:TopicName"];
var subscriptionName = builder.Configuration["ServiceBusSettings:SubscriptionName"];
x.AddConsumer(topicName, subscriptionName);
x.AddConsumer("testtopic", "testsuscription");
});
Метод расширения:
public static class ServiceCollectionExtensions
{
public static IServiceCollection AddConsumerServiceBusConnection(
this IServiceCollection services,
Action configure)
{
// Build the service provider to resolve the singleton instance
using (var serviceProvider = services.BuildServiceProvider())
{
var serviceBusManager = serviceProvider.GetRequiredService();
var builder = new ServiceBusConnectionBuilder(serviceBusManager);
configure(builder);
}
return services;
}
}
ServiceBusConnectionBuilder:
public class ServiceBusConnectionBuilder
{
private readonly IServiceBusManager _serviceBusManager;
public ServiceBusConnectionBuilder(IServiceBusManager serviceBusManager)
{
_serviceBusManager = serviceBusManager;
}
public ServiceBusConnectionBuilder AddConsumer(string topicName, string suscrptionName)
where TConsumer : IServiceBusConsumer
{
_serviceBusManager.AddConsumer(topicName, suscrptionName);
return this;
}
}
Реализация ServiceBusManager:
public class ServiceBusManager : IServiceBusManager
{
private readonly ServiceBusClient _serviceBusClient;
private readonly IServiceProvider _serviceProvider;
private readonly ConcurrentDictionary _processors;
public ServiceBusManager(ServiceBusClient serviceBusClient,
IServiceProvider serviceProvider,
IOptions serviceBusSettings)
{
_serviceBusClient = serviceBusClient ?? throw new ArgumentNullException(nameof(serviceBusClient));
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
_processors = new ConcurrentDictionary();
}
public void AddConsumer(string topicName, string subscriptionName) where TConsumer : IServiceBusConsumer
{
if (string.IsNullOrWhiteSpace(topicName))
throw new ArgumentException("Topic name cannot be null or empty.", nameof(topicName));
if (string.IsNullOrWhiteSpace(subscriptionName))
throw new ArgumentException("Subscription name cannot be null or empty.", nameof(subscriptionName));
var processor = _serviceBusClient.CreateProcessor(topicName, subscriptionName, new ServiceBusProcessorOptions
{
AutoCompleteMessages = false,
MaxConcurrentCalls = 1,
PrefetchCount = 10
});
processor.ProcessMessageAsync += async args =>
{
using var scope = _serviceProvider.CreateAsyncScope();
var consumer = scope.ServiceProvider.GetRequiredService();
await consumer.ProcessMessage(args);
};
processor.ProcessErrorAsync += args =>
{
using var scope = _serviceProvider.CreateAsyncScope();
var consumer = scope.ServiceProvider.GetRequiredService();
return consumer.ProcessError(args);
};
if (!_processors.TryAdd($"{topicName}:{subscriptionName}", processor))
{
throw new InvalidOperationException($"Consumer for {topicName}:{subscriptionName} is already registered.");
}
}
public async Task StartProcessingAsync(CancellationToken cancellationToken)
{
var startTasks = _processors.Values.Select(processor => processor.StartProcessingAsync(cancellationToken));
await Task.WhenAll(startTasks);
}
public async Task StopProcessingAsync()
{
var stopTasks = _processors.Values.Select(processor => processor.StopProcessingAsync());
await Task.WhenAll(stopTasks);
}
}
Потребительский интерфейс:
public interface IServiceBusConsumer
{
Task ProcessMessage(ProcessMessageEventArgs args);
Task ProcessError(ProcessErrorEventArgs args);
}
Наконец-то у меня есть размещенный сервис
public class ServiceBusHostedService : CronJobServiceBase
{
private readonly IServiceProvider _serviceProvider;
private AsyncServiceScope _scope;
private IHostedServiceTask _taskService;
public ServiceBusHostedService(
IOptions hostedServiceSettings
, ILogger log,
IServiceProvider serviceProvider) : base(hostedServiceSettings, log)
{
_serviceProvider = serviceProvider;
}
protected override async Task ExecuteTaskAsync(CancellationToken cancellationToken)
{
AppInsights.TrackTrace("Starting EventHubHostedService");
_scope = _serviceProvider.CreateAsyncScope();
_taskService = _scope.ServiceProvider.GetRequiredService();
await _taskService.StartAsync(cancellationToken);
}
protected override async Task DisposeScope()
{
await _taskService.StopAsync(CancellationToken.None);
await _scope.DisposeAsync();
}
}
public class EventServiceBusServiceTask : IEventServiceBusServiceTask
{
private readonly IServiceBusManager _serviceBusManager;
public EventServiceBusServiceTask(IServiceBusManager serviceBusManager)
{
_serviceBusManager = serviceBusManager;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
await _serviceBusManager.StartProcessingAsync(cancellationToken);
}
public async Task StopAsync(CancellationToken cancellationToken)
{
await _serviceBusManager.StopProcessingAsync();
}
}
Подробнее здесь: https://stackoverflow.com/questions/793 ... 8-when-usi
Почему моя одноэлементная служба создается несколько раз в .NET 8 при использовании размещенных служб? ⇐ C#
Место общения программистов C#
1737167754
Anonymous
Несмотря на то, что он зарегистрирован как синглтон, он создается несколько раз вместо того, чтобы поддерживать один экземпляр во всем приложении. Может ли это быть связано с тем, как служба разрешается или настраивается, особенно при взаимодействии с размещенной службой или построителем соединений?"
Я создаю платформу в .NET Core 8 для регистрации потребителей для служебной шины Azure (аналогично работает Masstransit), но я столкнулся с проблемой, из-за которой мой синглтон ServiceBusManager не поддерживает регистрацию процессора.
Репозиторий Github: https://github.com/matvi/servicebusframework
[b]1. Сначала я регистрирую потребителей в ServiceBusManager.AddConsumers и добавляю их в список _processors.[/b]
[b]2.- Затем я запускаю размещенную службу, которая должна иметь тот же экземпляр ServiceManager и должна запускать метод ServiceManager.StartProcessingAsync, но список _processors пусто[/b]
builder.Services.AddSingleton();
builder.Services.AddConsumerServiceBusConnection(x =>
{
var topicName = builder.Configuration["ServiceBusSettings:TopicName"];
var subscriptionName = builder.Configuration["ServiceBusSettings:SubscriptionName"];
x.AddConsumer(topicName, subscriptionName);
x.AddConsumer("testtopic", "testsuscription");
});
Метод расширения:
public static class ServiceCollectionExtensions
{
public static IServiceCollection AddConsumerServiceBusConnection(
this IServiceCollection services,
Action configure)
{
// Build the service provider to resolve the singleton instance
using (var serviceProvider = services.BuildServiceProvider())
{
var serviceBusManager = serviceProvider.GetRequiredService();
var builder = new ServiceBusConnectionBuilder(serviceBusManager);
configure(builder);
}
return services;
}
}
ServiceBusConnectionBuilder:
public class ServiceBusConnectionBuilder
{
private readonly IServiceBusManager _serviceBusManager;
public ServiceBusConnectionBuilder(IServiceBusManager serviceBusManager)
{
_serviceBusManager = serviceBusManager;
}
public ServiceBusConnectionBuilder AddConsumer(string topicName, string suscrptionName)
where TConsumer : IServiceBusConsumer
{
_serviceBusManager.AddConsumer(topicName, suscrptionName);
return this;
}
}
Реализация ServiceBusManager:
public class ServiceBusManager : IServiceBusManager
{
private readonly ServiceBusClient _serviceBusClient;
private readonly IServiceProvider _serviceProvider;
private readonly ConcurrentDictionary _processors;
public ServiceBusManager(ServiceBusClient serviceBusClient,
IServiceProvider serviceProvider,
IOptions serviceBusSettings)
{
_serviceBusClient = serviceBusClient ?? throw new ArgumentNullException(nameof(serviceBusClient));
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
_processors = new ConcurrentDictionary();
}
public void AddConsumer(string topicName, string subscriptionName) where TConsumer : IServiceBusConsumer
{
if (string.IsNullOrWhiteSpace(topicName))
throw new ArgumentException("Topic name cannot be null or empty.", nameof(topicName));
if (string.IsNullOrWhiteSpace(subscriptionName))
throw new ArgumentException("Subscription name cannot be null or empty.", nameof(subscriptionName));
var processor = _serviceBusClient.CreateProcessor(topicName, subscriptionName, new ServiceBusProcessorOptions
{
AutoCompleteMessages = false,
MaxConcurrentCalls = 1,
PrefetchCount = 10
});
processor.ProcessMessageAsync += async args =>
{
using var scope = _serviceProvider.CreateAsyncScope();
var consumer = scope.ServiceProvider.GetRequiredService();
await consumer.ProcessMessage(args);
};
processor.ProcessErrorAsync += args =>
{
using var scope = _serviceProvider.CreateAsyncScope();
var consumer = scope.ServiceProvider.GetRequiredService();
return consumer.ProcessError(args);
};
if (!_processors.TryAdd($"{topicName}:{subscriptionName}", processor))
{
throw new InvalidOperationException($"Consumer for {topicName}:{subscriptionName} is already registered.");
}
}
public async Task StartProcessingAsync(CancellationToken cancellationToken)
{
var startTasks = _processors.Values.Select(processor => processor.StartProcessingAsync(cancellationToken));
await Task.WhenAll(startTasks);
}
public async Task StopProcessingAsync()
{
var stopTasks = _processors.Values.Select(processor => processor.StopProcessingAsync());
await Task.WhenAll(stopTasks);
}
}
Потребительский интерфейс:
public interface IServiceBusConsumer
{
Task ProcessMessage(ProcessMessageEventArgs args);
Task ProcessError(ProcessErrorEventArgs args);
}
Наконец-то у меня есть размещенный сервис
public class ServiceBusHostedService : CronJobServiceBase
{
private readonly IServiceProvider _serviceProvider;
private AsyncServiceScope _scope;
private IHostedServiceTask _taskService;
public ServiceBusHostedService(
IOptions hostedServiceSettings
, ILogger log,
IServiceProvider serviceProvider) : base(hostedServiceSettings, log)
{
_serviceProvider = serviceProvider;
}
protected override async Task ExecuteTaskAsync(CancellationToken cancellationToken)
{
AppInsights.TrackTrace("Starting EventHubHostedService");
_scope = _serviceProvider.CreateAsyncScope();
_taskService = _scope.ServiceProvider.GetRequiredService();
await _taskService.StartAsync(cancellationToken);
}
protected override async Task DisposeScope()
{
await _taskService.StopAsync(CancellationToken.None);
await _scope.DisposeAsync();
}
}
public class EventServiceBusServiceTask : IEventServiceBusServiceTask
{
private readonly IServiceBusManager _serviceBusManager;
public EventServiceBusServiceTask(IServiceBusManager serviceBusManager)
{
_serviceBusManager = serviceBusManager;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
await _serviceBusManager.StartProcessingAsync(cancellationToken);
}
public async Task StopAsync(CancellationToken cancellationToken)
{
await _serviceBusManager.StopProcessingAsync();
}
}
Подробнее здесь: [url]https://stackoverflow.com/questions/79366242/why-is-my-singleton-service-being-instanciated-multiple-times-in-net-8-when-usi[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия