Почему моя одноэлементная служба создается несколько раз в .NET 8 при использовании размещенных служб?C#

Место общения программистов C#
Ответить
Anonymous
 Почему моя одноэлементная служба создается несколько раз в .NET 8 при использовании размещенных служб?

Сообщение Anonymous »

Несмотря на то, что он зарегистрирован как синглтон, он создается несколько раз вместо того, чтобы поддерживать один экземпляр во всем приложении. Может ли это быть связано с тем, как служба разрешается или настраивается, особенно при взаимодействии с размещенной службой или построителем соединений?"
Я создаю платформу в .NET Core 8 для регистрации потребителей для служебной шины Azure (аналогично работает Masstransit), но я столкнулся с проблемой, из-за которой мой синглтон ServiceBusManager не поддерживает регистрацию процессора.
Репозиторий Github: https://github.com/matvi/servicebusframework
1. Сначала я регистрирую потребителей в ServiceBusManager.AddConsumers и добавляю их в список _processors.
2.- Затем я запускаю размещенную службу, которая должна иметь тот же экземпляр ServiceManager и должна запускать метод ServiceManager.StartProcessingAsync, но список _processors пусто
builder.Services.AddSingleton();
builder.Services.AddConsumerServiceBusConnection(x =>
{
var topicName = builder.Configuration["ServiceBusSettings:TopicName"];
var subscriptionName = builder.Configuration["ServiceBusSettings:SubscriptionName"];
x.AddConsumer(topicName, subscriptionName);
x.AddConsumer("testtopic", "testsuscription");
});

Метод расширения:
public static class ServiceCollectionExtensions
{
public static IServiceCollection AddConsumerServiceBusConnection(
this IServiceCollection services,
Action configure)
{
// Build the service provider to resolve the singleton instance
using (var serviceProvider = services.BuildServiceProvider())
{
var serviceBusManager = serviceProvider.GetRequiredService();
var builder = new ServiceBusConnectionBuilder(serviceBusManager);
configure(builder);
}
return services;
}
}

ServiceBusConnectionBuilder:
public class ServiceBusConnectionBuilder
{
private readonly IServiceBusManager _serviceBusManager;
public ServiceBusConnectionBuilder(IServiceBusManager serviceBusManager)
{
_serviceBusManager = serviceBusManager;
}
public ServiceBusConnectionBuilder AddConsumer(string topicName, string suscrptionName)
where TConsumer : IServiceBusConsumer
{
_serviceBusManager.AddConsumer(topicName, suscrptionName);
return this;
}
}

Реализация ServiceBusManager:
public class ServiceBusManager : IServiceBusManager
{
private readonly ServiceBusClient _serviceBusClient;
private readonly IServiceProvider _serviceProvider;
private readonly ConcurrentDictionary _processors;

public ServiceBusManager(ServiceBusClient serviceBusClient,
IServiceProvider serviceProvider,
IOptions serviceBusSettings)
{
_serviceBusClient = serviceBusClient ?? throw new ArgumentNullException(nameof(serviceBusClient));
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
_processors = new ConcurrentDictionary();
}

public void AddConsumer(string topicName, string subscriptionName) where TConsumer : IServiceBusConsumer
{
if (string.IsNullOrWhiteSpace(topicName))
throw new ArgumentException("Topic name cannot be null or empty.", nameof(topicName));
if (string.IsNullOrWhiteSpace(subscriptionName))
throw new ArgumentException("Subscription name cannot be null or empty.", nameof(subscriptionName));

var processor = _serviceBusClient.CreateProcessor(topicName, subscriptionName, new ServiceBusProcessorOptions
{
AutoCompleteMessages = false,
MaxConcurrentCalls = 1,
PrefetchCount = 10
});

processor.ProcessMessageAsync += async args =>
{
using var scope = _serviceProvider.CreateAsyncScope();
var consumer = scope.ServiceProvider.GetRequiredService();
await consumer.ProcessMessage(args);
};

processor.ProcessErrorAsync += args =>
{
using var scope = _serviceProvider.CreateAsyncScope();
var consumer = scope.ServiceProvider.GetRequiredService();
return consumer.ProcessError(args);
};

if (!_processors.TryAdd($"{topicName}:{subscriptionName}", processor))
{
throw new InvalidOperationException($"Consumer for {topicName}:{subscriptionName} is already registered.");
}
}

public async Task StartProcessingAsync(CancellationToken cancellationToken)
{
var startTasks = _processors.Values.Select(processor => processor.StartProcessingAsync(cancellationToken));
await Task.WhenAll(startTasks);
}

public async Task StopProcessingAsync()
{
var stopTasks = _processors.Values.Select(processor => processor.StopProcessingAsync());
await Task.WhenAll(stopTasks);
}
}

Потребительский интерфейс:
public interface IServiceBusConsumer
{
Task ProcessMessage(ProcessMessageEventArgs args);
Task ProcessError(ProcessErrorEventArgs args);
}

Наконец-то у меня есть размещенный сервис
public class ServiceBusHostedService : CronJobServiceBase
{
private readonly IServiceProvider _serviceProvider;
private AsyncServiceScope _scope;
private IHostedServiceTask _taskService;

public ServiceBusHostedService(
IOptions hostedServiceSettings
, ILogger log,
IServiceProvider serviceProvider) : base(hostedServiceSettings, log)
{
_serviceProvider = serviceProvider;
}

protected override async Task ExecuteTaskAsync(CancellationToken cancellationToken)
{
AppInsights.TrackTrace("Starting EventHubHostedService");
_scope = _serviceProvider.CreateAsyncScope();
_taskService = _scope.ServiceProvider.GetRequiredService();
await _taskService.StartAsync(cancellationToken);
}

protected override async Task DisposeScope()
{
await _taskService.StopAsync(CancellationToken.None);
await _scope.DisposeAsync();
}
}

public class EventServiceBusServiceTask : IEventServiceBusServiceTask
{
private readonly IServiceBusManager _serviceBusManager;

public EventServiceBusServiceTask(IServiceBusManager serviceBusManager)
{
_serviceBusManager = serviceBusManager;
}

public async Task StartAsync(CancellationToken cancellationToken)
{
await _serviceBusManager.StartProcessingAsync(cancellationToken);
}

public async Task StopAsync(CancellationToken cancellationToken)
{
await _serviceBusManager.StopProcessingAsync();
}

}


Подробнее здесь: https://stackoverflow.com/questions/793 ... 8-when-usi
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «C#»