MassTransit — публикация в очередь, используемую другим исполнителем.C#

Место общения программистов C#
Ответить
Anonymous
 MassTransit — публикация в очередь, используемую другим исполнителем.

Сообщение Anonymous »

Я использую MassTransit с RabbitMQ, и у меня есть рабочий процесс, который настраивает очереди и потребителей. Я хочу иметь возможность назначать ему «роли», сообщая ему, какие очереди он должен использовать.
Мой код выглядит следующим образом:

Код: Выделить всё

x.UsingRabbitMq((context, config) =>
{
config.Host(server, virtualHost, h =>
{
h.Username(username);
h.Password(password);
});

services.AddMassTransit(x =>
{
x.AddConsumer();
x.AddConsumer();
x.AddConsumer();
});

config.ReceiveEndpoint("Orders", cfg =>
{
cfg.Durable = true;
cfg.ConcurrentMessageLimit = 10;
cfg.PrefetchCount = 40;
cfg.ConfigureConsumer(context, cfg => {});
}

config.ReceiveEndpoint("Orders.Messages", cfg =>
{
cfg.Durable = true;
cfg.ConcurrentMessageLimit = 10;
cfg.PrefetchCount = 40;
cfg.ConfigureConsumer(context, cfg => {});
}

config.ReceiveEndpoint("Inventory", cfg =>
{
cfg.Durable = true;
cfg.ConcurrentMessageLimit = 10;
cfg.PrefetchCount = 40;
cfg.ConfigureConsumer(context, cfg => {});
}
}
Приведенный выше код работает должным образом: создаются очереди и используются сообщения. Важно отметить, что OrderConsumer как часть своей логики создает сообщение, опубликованное в Inventory, которое используется InventoryConsumer.
Чтобы сделать его обусловленным рабочими ролями, я меняю его на что-то вроде этого:

Код: Выделить всё

x.UsingRabbitMq((context, config) =>
{
config.Host(server, virtualHost, h =>
{
h.Username(username);
h.Password(password);
});

services.AddMassTransit(x =>
{
if (workerRoles.OrderProcessor)
{
x.AddConsumer();
x.AddConsumer();
}

if (workerRoles.InventoryProcessor)
{
x.AddConsumer();
}
});

if (workerRoles.OrderProcessor)
{
config.ReceiveEndpoint("Orders", cfg =>
{
cfg.Durable = true;
cfg.ConcurrentMessageLimit = 10;
cfg.PrefetchCount = 40;
cfg.ConfigureConsumer(context, cfg => {});
}

config.ReceiveEndpoint("Orders.Messages", cfg =>
{
cfg.Durable = true;
cfg.ConcurrentMessageLimit = 10;
cfg.PrefetchCount = 40;
cfg.ConfigureConsumer(context, cfg => {});
}
}

if (workerRoles.InventoryProcessor)
{
config.ReceiveEndpoint("Inventory", cfg =>
{
cfg.Durable = true;
cfg.ConcurrentMessageLimit = 10;
cfg.PrefetchCount = 40;
cfg.ConfigureConsumer(context, cfg => {});
}
}
}
Когда я добавляю условия, очередь Inventory не создается, и поэтому сообщения, опубликованные OrderConsumer, не ставятся в очередь и поэтому не обрабатываются (другим работником, у которого включен флаг InventoryProcessor).
Я попытался изменить логику, чтобы код был точно таким же, как в первом примере, но очередь имеет ConcurrentMessageLimit и PrefetchCount будет равен 0 - но это просто помещает сообщения в очередь и немедленно перемещает их в очередь Inventory_skipped.
Что я хочу выполнить/понять, так это как мне справиться с ситуацией, когда работник должен публиковать сообщения в очереди, не используемой этим работником. Должны ли все мои работники в таком случае определять все очереди, с которыми работают все остальные работники? (представьте себе ситуацию, когда работник не определяет очередь и не начинает работать, но работник, определяющий очередь, еще не запущен и не работает для ее создания).

Подробнее здесь: https://stackoverflow.com/questions/723 ... ent-worker
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «C#»