Мы объявляем все события «Publisher» и «Listener» в Program.cs следующим образом (я добавлю только соответствующие части):
Код: Выделить всё
Program.csКод: Выделить всё
await RunServer.RunAsync(
args,
(builder, options) =>
{
options.CQRS.Enabled = true;
options.CQRS.Assemblies = [typeof(xxx.Domain.AssemblyInfo).Assembly];
options.Event.Enabled = true;
options.Event.ConnString = builder.Configuration.GetConnectionString("RabbitMQ")!;
options.Event.PublishQueues =
RabbitMQUtils.CreatePublishQueue("DataSync_Exchange",
[
(QueueIdentifier.JobManager_JobRequest, "Job_Request")
]
)
)
.ToArray();
options.Event.ListenQueues =
RabbitMQUtils.CreateListenerQueue("DataSync_Exchange",
QueueIdentifier.JobManager_JobRequest,
[
"Job_Request"
]
);
options.OtherServices = () =>
{
builder.Services.AddHostedService();
};
}
);
Код: Выделить всё
CreatePublishQueueКод: Выделить всё
public static RabbitMQQueueAlias[] CreatePublishQueue(string exchange, (QueueIdentifier Alias, string RoutingKey)[] queues)
{
string exchange2 = exchange;
return queues.Select(((QueueIdentifier Alias, string RoutingKey) q) => new RabbitMQQueueAlias
{
Alias = q.Alias,
RoutingKey = new string[1] { q.RoutingKey },
Exchange = exchange2,
Active = true,
Durable = true
}).ToArray();
}
Код: Выделить всё
CreateListenerQueueКод: Выделить всё
public static RabbitMQQueueAlias[] CreateListenerQueue(string exchange, QueueIdentifier alias, string[] routingKey, string? postfix = null)
{
string text = alias.ToString();
string text2 = text.Substring(2, text.Length - 2);
if (postfix != null)
{
text2 += postfix;
}
return new RabbitMQQueueAlias[1]
{
new RabbitMQQueueAlias
{
RoutingKey = routingKey,
Alias = alias,
Exchange = exchange,
Queue = text2,
Active = true,
Durable = true
}
};
}
Код: Выделить всё
IJobUtility.csКод: Выделить всё
public interface IJobUtility
{
Task PublishRequestJob(string name, string? notes = null, string @params = "");
}
public sealed class JobUtility : IJobUtility
{
private readonly IEventBus _eventBus;
private readonly IVdcIdGenerator _gen;
public JobUtility(IEventBus eventBus, IVdcIdGenerator gen)
{
_eventBus = eventBus;
_gen = gen;
}
public async Task PublishRequestJob(string name, string? notes = null, string @params = "")
{
var id = _gen.GenerateId();
await _eventBus.PublishAsync(new JobRequestV0Event(
id,
new Guid(1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0),
_gen.WorkerId,
_gen.GenerateIdAsString(),
new JobRequestV0Event.B(name, @params, notes)));
}
}
Код: Выделить всё
SendAsyncКод: Выделить всё
protected virtual Task SendAsync(object graph, Guid id, string eventName, string? correlationId, bool useEnvelop)
{
string eventName2 = eventName;
object graph2 = graph;
_channel.ConfirmSelect();
IBasicProperties basicProperties = _channel.CreateBasicProperties();
basicProperties.Persistent = true;
basicProperties.ContentType = "application/json";
basicProperties.MessageId = id.ToString();
IBasicProperties basicProperties2 = basicProperties;
if (basicProperties2.Headers == null)
{
IDictionary dictionary2 = (basicProperties2.Headers = new Dictionary());
}
basicProperties.Headers["X-Correlation-ID"] = correlationId;
basicProperties.Headers["X-Event"] = eventName2;
basicProperties.Headers["X-Has-Envelop"] = useEnvelop;
_setup?.Invoke(graph2, basicProperties);
IEnumerable enumerable = _queue;
if (_filter != null)
{
enumerable = enumerable.Where((RabbitMQQueueAlias queue) => _filter(queue.Alias, GenEventName(queue, eventName2, graph2), graph2));
}
foreach (RabbitMQQueueAlias item in enumerable)
{
object obj = _transform?.Invoke(item.Alias, eventName2, graph2) ?? graph2;
string[] routingKey = item.RoutingKey;
foreach (string text in routingKey)
{
if (useEnvelop)
{
obj = new MessageEnvelop(obj, _resolver.Resolver(obj.GetType()) ?? text);
}
byte[] bytes = Encoding.UTF8.GetBytes(_serializer.Serialize(obj));
_channel.BasicPublish(item.Exchange, text, mandatory: true, basicProperties, bytes);
_channel.WaitForConfirms();
_logger?.LogInformation("Publish Event to {Exchange}, {RoutingKey}, {@Event}", item.Exchange, item.RoutingKey, obj);
}
}
return Task.CompletedTask;
}
Код: Выделить всё
JobRequestEventHandlerКод: Выделить всё
internal class JobRequestEventHandler : IVdcEventHandler
{
private readonly IServiceProvider _provider;
public JobRequestEventHandler(IServiceProvider provider)
{
_provider = provider;
}
public async ValueTask HandleAsync(JobRequestV0Event request, CancellationToken ct = default)
{
Console.WriteLine("JobRequest is started being consumed...");
using var scope = _provider.CreateScope();
var jobUtility = scope.ServiceProvider.GetService();
...
}
}


Я не могу сказать, проблема в издателе (возможно, потому что в сообщениях в очереди я ничего не вижу, а общее количество всегда равно 0), или проблема может быть в обработчике CQRS, но дело в том, что я установил точку останова в прослушивателе (обработчике) и он никогда не попадает, Console.WriteLine ничего не делает и никаких исключений не создается.
Любая помощь?
Подробнее здесь: https://stackoverflow.com/questions/787 ... ot-working
Мобильная версия