Потребительская библиотека RabbitMQ для .Net в качестве фоновой службы ⇐ C#
-
Гость
Потребительская библиотека RabbitMQ для .Net в качестве фоновой службы
Пытаемся реализовать потребительскую библиотеку RabbitMQ для .Net с использованием фоновой службы. При выполнении потребительской части невозможно AddConsumer и использовать метод Consume для получения сообщения и его возврата.
До сих пор я делал следующее --
ConsumerService: BackgroundService, затем в ExecuteAsync
частная статическая асинхронная задача CreateAndStartBus(CancellationToken cancelToken) { var busControl = Bus.Factory.CreateUsingRabbitMq(x => { x.Host(new Uri("rabbitmq://unknownhost:32787"), h => { h.Username("кого это волнует"); h.Password("Охруд"); h.RequestedConnectionTimeout(2000); }); х.Автостарт = правда; x.ReceiveEndpoint(x.Host, «Очередь», конечная точка => { ** endpoint.Consumer(() => новый TestConsumer());** }); }); Ручка BusHandle; используя (var timeout = новый CancellationTokenSource(TimeSpan.FromSeconds(5000))) { handle = ждут busControl.StartAsync(timeout.Token).OrCanceled(cancellationToken); } } Предположим, что TestConsumer — это общее сообщение/TMessage, иначе пользователь также может его не предоставить. И в этом сервисе только я могу извлечь сообщение и вернуть его с помощью Consume.
Пожалуйста, подскажите, если у кого-то есть идеи по этому поводу.
Спасибо. СМ
Попробовал встроенную сборку TConsumer Message, она работает, но такой подход не нужен, это должен быть TMEssage, о чем библиотека, возможно, понятия не имеет.
обновление 1 Рабочий пример --> Обработчик в клиенте --
публичный класс EventDHandler : IEventDtoHandler { общедоступная асинхронная задача HandleAsync (EventMessageDto eventDto) { Console.ForegroundColor = ConsoleColor.Green; вар waitTime = новый Random().Next(100, 500); ждать Task.Delay(waitTime); Console.WriteLine($"Guid - {eventDto.Id}, - Полученное сообщение - {eventDto.Message}"); Консоль.ResetColor(); } } в фоновой службе — ConBGService: BackgroundService
x.ReceiveEndpoint(_rabbitMqSettings.ConsumerQueue, конечная точка => { foreach (var ConsumerEvent в ConsumerEventDtoHandlers)//Это для обработчика { **endpoint.Consumer(() => new EventDtoHandlerAdapter(consumerEvent));** } общедоступный класс **EventDtoHandlerAdapter** : IConsumer где TConsumer: IEventDtoHandler где TEventDto: класс, IEventDto { частный только для чтения TConsumer _consumer; общедоступный EventDtoHandlerAdapter (потребитель TConsumer) { _consumer = потребитель; } общедоступная асинхронная задача **Consume** (контекст ConsumeContext) { ждут _consumer.**HandleAsync**(context.Message); } общедоступный интерфейс **IEventDto** { } общедоступный интерфейс **IEventDtoHandler** { } общедоступный интерфейс **IEventDtoHandler** : IEventDtoHandler где TEventDto: IEventDto { Задача HandleAsync (TEventDto eventDto); } Есть ли способ сделать его очень общим? endpoint.Consumer(() => new EventDtoHandlerAdapter(consumerEvent)) Или сделать определение настолько общим, чтобы я мог передать любой обработчик с помощью DTO? нравиться IEventDtoHandler, который будет обрабатывать строковое сообщение от потребителя?
Заранее спасибо.
Пытаемся реализовать потребительскую библиотеку RabbitMQ для .Net с использованием фоновой службы. При выполнении потребительской части невозможно AddConsumer и использовать метод Consume для получения сообщения и его возврата.
До сих пор я делал следующее --
ConsumerService: BackgroundService, затем в ExecuteAsync
частная статическая асинхронная задача CreateAndStartBus(CancellationToken cancelToken) { var busControl = Bus.Factory.CreateUsingRabbitMq(x => { x.Host(new Uri("rabbitmq://unknownhost:32787"), h => { h.Username("кого это волнует"); h.Password("Охруд"); h.RequestedConnectionTimeout(2000); }); х.Автостарт = правда; x.ReceiveEndpoint(x.Host, «Очередь», конечная точка => { ** endpoint.Consumer(() => новый TestConsumer());** }); }); Ручка BusHandle; используя (var timeout = новый CancellationTokenSource(TimeSpan.FromSeconds(5000))) { handle = ждут busControl.StartAsync(timeout.Token).OrCanceled(cancellationToken); } } Предположим, что TestConsumer — это общее сообщение/TMessage, иначе пользователь также может его не предоставить. И в этом сервисе только я могу извлечь сообщение и вернуть его с помощью Consume.
Пожалуйста, подскажите, если у кого-то есть идеи по этому поводу.
Спасибо. СМ
Попробовал встроенную сборку TConsumer Message, она работает, но такой подход не нужен, это должен быть TMEssage, о чем библиотека, возможно, понятия не имеет.
обновление 1 Рабочий пример --> Обработчик в клиенте --
публичный класс EventDHandler : IEventDtoHandler { общедоступная асинхронная задача HandleAsync (EventMessageDto eventDto) { Console.ForegroundColor = ConsoleColor.Green; вар waitTime = новый Random().Next(100, 500); ждать Task.Delay(waitTime); Console.WriteLine($"Guid - {eventDto.Id}, - Полученное сообщение - {eventDto.Message}"); Консоль.ResetColor(); } } в фоновой службе — ConBGService: BackgroundService
x.ReceiveEndpoint(_rabbitMqSettings.ConsumerQueue, конечная точка => { foreach (var ConsumerEvent в ConsumerEventDtoHandlers)//Это для обработчика { **endpoint.Consumer(() => new EventDtoHandlerAdapter(consumerEvent));** } общедоступный класс **EventDtoHandlerAdapter** : IConsumer где TConsumer: IEventDtoHandler где TEventDto: класс, IEventDto { частный только для чтения TConsumer _consumer; общедоступный EventDtoHandlerAdapter (потребитель TConsumer) { _consumer = потребитель; } общедоступная асинхронная задача **Consume** (контекст ConsumeContext) { ждут _consumer.**HandleAsync**(context.Message); } общедоступный интерфейс **IEventDto** { } общедоступный интерфейс **IEventDtoHandler** { } общедоступный интерфейс **IEventDtoHandler** : IEventDtoHandler где TEventDto: IEventDto { Задача HandleAsync (TEventDto eventDto); } Есть ли способ сделать его очень общим? endpoint.Consumer(() => new EventDtoHandlerAdapter(consumerEvent)) Или сделать определение настолько общим, чтобы я мог передать любой обработчик с помощью DTO? нравиться IEventDtoHandler, который будет обрабатывать строковое сообщение от потребителя?
Заранее спасибо.
Мобильная версия