API .Net 8: отправка/прослушивание RabbitMQ не работаетC#

Место общения программистов C#
Ответить
Anonymous
 API .Net 8: отправка/прослушивание RabbitMQ не работает

Сообщение Anonymous »

Я работаю над API .Net 8, где мы используем множество пользовательских (самостоятельных) библиотек Nuget, например, ту, которая включает в себя некоторые функции RabbitMQ, и для добавления дополнительной информации API также использует шаблон CQRS (Mediator). для обработки событий RabbitMQ.
Мы объявляем все события «Publisher» и «Listener» в Program.cs следующим образом (я добавлю только соответствующие части):

Код: Выделить всё

Program.cs

Код: Выделить всё

await RunServer.RunAsync(
args,
(builder, options) =>
{
options.CQRS.Enabled = true;
options.CQRS.Assemblies = [typeof(xxx.Domain.AssemblyInfo).Assembly];

options.Event.Enabled = true;
options.Event.ConnString = builder.Configuration.GetConnectionString("RabbitMQ")!;

options.Event.PublishQueues =
RabbitMQUtils.CreatePublishQueue("DataSync_Exchange",
[
(QueueIdentifier.JobManager_JobRequest, "Job_Request")
]
)
)
.ToArray();

options.Event.ListenQueues =
RabbitMQUtils.CreateListenerQueue("DataSync_Exchange",
QueueIdentifier.JobManager_JobRequest,
[
"Job_Request"
]
);

options.OtherServices = () =>
{
builder.Services.AddHostedService();
};
}
);
(Для простоты я использую здесь только «JobRequestV0Event»).

Код: Выделить всё

CreatePublishQueue

Код: Выделить всё

public static RabbitMQQueueAlias[] CreatePublishQueue(string exchange, (QueueIdentifier Alias, string RoutingKey)[] queues)
{
string exchange2 = exchange;
return queues.Select(((QueueIdentifier Alias, string RoutingKey) q) => new RabbitMQQueueAlias
{
Alias = q.Alias,
RoutingKey = new string[1] { q.RoutingKey },
Exchange = exchange2,
Active = true,
Durable = true
}).ToArray();
}

Код: Выделить всё

CreateListenerQueue

Код: Выделить всё

public static RabbitMQQueueAlias[] CreateListenerQueue(string exchange, QueueIdentifier alias, string[] routingKey, string? postfix = null)
{
string text = alias.ToString();
string text2 = text.Substring(2, text.Length - 2);
if (postfix != null)
{
text2 += postfix;
}

return new RabbitMQQueueAlias[1]
{
new RabbitMQQueueAlias
{
RoutingKey = routingKey,
Alias = alias,
Exchange = exchange,
Queue = text2,
Active = true,
Durable = true
}
};
}
В «HypervisorTrackingBackgroundService», который можно увидеть в Program.cs, публикация запускается следующим образом:

Код: Выделить всё

IJobUtility.cs

Код: Выделить всё

public interface IJobUtility
{
Task PublishRequestJob(string name, string? notes = null, string @params = "");
}
public sealed class JobUtility : IJobUtility
{
private readonly IEventBus _eventBus;
private readonly IVdcIdGenerator _gen;

public JobUtility(IEventBus eventBus, IVdcIdGenerator gen)
{
_eventBus = eventBus;
_gen = gen;
}

public async Task PublishRequestJob(string name, string? notes = null, string @params = "")
{
var id = _gen.GenerateId();

await _eventBus.PublishAsync(new JobRequestV0Event(
id,
new Guid(1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0),
_gen.WorkerId,
_gen.GenerateIdAsString(),
new JobRequestV0Event.B(name, @params, notes)));
}

}
Где «PublishAsync» — это оболочка для «SendAsync» (находится в EventBus)

Код: Выделить всё

SendAsync

Код: Выделить всё

protected virtual Task SendAsync(object graph, Guid id, string eventName, string? correlationId, bool useEnvelop)
{
string eventName2 = eventName;
object graph2 = graph;
_channel.ConfirmSelect();
IBasicProperties basicProperties = _channel.CreateBasicProperties();
basicProperties.Persistent = true;
basicProperties.ContentType = "application/json";
basicProperties.MessageId = id.ToString();
IBasicProperties basicProperties2 = basicProperties;
if (basicProperties2.Headers == null)
{
IDictionary  dictionary2 = (basicProperties2.Headers = new Dictionary());
}
basicProperties.Headers["X-Correlation-ID"] = correlationId;
basicProperties.Headers["X-Event"] = eventName2;
basicProperties.Headers["X-Has-Envelop"] = useEnvelop;
_setup?.Invoke(graph2, basicProperties);
IEnumerable enumerable = _queue;
if (_filter != null)
{
enumerable = enumerable.Where((RabbitMQQueueAlias queue) => _filter(queue.Alias, GenEventName(queue, eventName2, graph2), graph2));
}
foreach (RabbitMQQueueAlias item in enumerable)
{
object obj = _transform?.Invoke(item.Alias, eventName2, graph2) ?? graph2;
string[] routingKey = item.RoutingKey;
foreach (string text in routingKey)
{
if (useEnvelop)
{
obj = new MessageEnvelop(obj, _resolver.Resolver(obj.GetType()) ?? text);
}
byte[] bytes = Encoding.UTF8.GetBytes(_serializer.Serialize(obj));
_channel.BasicPublish(item.Exchange, text, mandatory: true, basicProperties, bytes);
_channel.WaitForConfirms();
_logger?.LogInformation("Publish Event to {Exchange}, {RoutingKey}, {@Event}", item.Exchange, item.RoutingKey, obj);
}
}
return Task.CompletedTask;
}
Фоновая служба выполняется без проблем, поэтому «Опубликовать» (по крайней мере, она не генерирует никаких исключений), но обработчик CQRS, который должен прослушивать эту публикацию, никогда не запускается ( точка останова никогда не достигается).

Код: Выделить всё

JobRequestEventHandler

Код: Выделить всё

internal class JobRequestEventHandler : IVdcEventHandler
{
private readonly IServiceProvider _provider;

public JobRequestEventHandler(IServiceProvider provider)
{
_provider = provider;
}

public async ValueTask HandleAsync(JobRequestV0Event request, CancellationToken ct = default)
{
Console.WriteLine("JobRequest is started being consumed...");

using var scope = _provider.CreateScope();
var jobUtility = scope.ServiceProvider.GetService();

...
}
}
После запуска «PublishAsync» я проверяю консоль управления RabbitMQ и вижу, что некоторые данные проходят через «Скорость сообщений», но на графиках «Сообщения в очереди» данные вообще не отображаются, и в разделе «Подробности» общее количество всегда равно 0.
Изображение

Изображение

Я не могу сказать, проблема в издателе (возможно, потому что в сообщениях в очереди я ничего не вижу, а общее количество всегда равно 0), или проблема может быть в обработчике CQRS, но дело в том, что я установил точку останова в прослушивателе (обработчике) и он никогда не попадает, Console.WriteLine ничего не делает и никаких исключений не создается.
Любая помощь?

Подробнее здесь: https://stackoverflow.com/questions/787 ... ot-working
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «C#»