PublishEndpoint.Publish() Не отправляет сообщения с помощью Kafka в MassTransitC#

Место общения программистов C#
Ответить
Anonymous
 PublishEndpoint.Publish() Не отправляет сообщения с помощью Kafka в MassTransit

Сообщение Anonymous »

Я столкнулся с проблемой: метод Producer.Produce() работает и успешно отправляет сообщения в Kafka, но методPublishEndpoint.Publish() не выдаёт никаких сообщений. Я использую MassTransit с Kafka и подозреваю, что проблема связана с моей конфигурацией или тем, как MassTransit обрабатывает темы Kafka.
Соответствующая конфигурация:
Вот как я настроил Kafka в моем Startup.cs:

Код: Выделить всё

services.AddMassTransit(x =>
{
x.UsingInMemory();

x.AddRider(rider =>
{
rider.AddProducer("order-submitted");

rider.AddConsumers(Assembly.GetAssembly(typeof(Startup)));

rider.UsingKafka((context, k) =>
{
k.Host(Configuration["Kafka:Host"]);

k.TopicEndpoint("order-submitted", "consumer-group-A", e =>
{
e.AutoOffsetReset = AutoOffsetReset.Earliest;
e.ConfigureConsumer(context);
});
});
});

});
А это код, который я использую для публикации события:

Код: Выделить всё

public class OrderService : IOrderService
{
protected IConfiguration _configuration;
protected readonly IAsyncRepository _repository;

private readonly IPublishEndpoint _publishEndpoint;
private readonly ITopicProducer _producer;

public OrderService(IAsyncRepository repsitory,
IConfiguration configuration,
IPublishEndpoint publishEndpoint,
ITopicProducer producer)
{
_repository = repsitory;
_configuration = configuration;
_publishEndpoint = publishEndpoint;
_producer = producer;
}

public async Task SubmitOrder(long id)
{
var integrationEvent = new OrderSubmittedIntegrationEvent(
order.Id,
order.OrderDate,
order.OrderItems.Select(n => new OrderItemSubmittedIntegrationEvent(n.ItemClassId, n.Units)).ToList()
);

//await _publishEndpoint.Publish(integrationEvent);
await _producer.Produce(integrationEvent);
return validationResult;
}
Будем очень признательны за любую помощь или идеи!
Что работает:
Когда я использую Producer.Produce() с клиентом Kafka напрямую, сообщение успешно отправляется в тему.
Kafka и Zookeeper запущены и работают.
Тема Kafka существует и доступна.
Что не работает:
PublishEndpoint.Publish() не отправляет никаких сообщений, не генерирует никаких ошибок или исключений.
Я включил ведение журнала, но видимых ошибок, указывающих на то, что может пойти не так, нет.
Нужно ли мне явно запускать шину MassTransit при использовании Kafka?
Требует ли IPublishEndpoint определенной регистрации потребителей для успешной публикации сообщений?
Я упустил что-то конкретное конфигурация в Kafka Rider, из-за которой Publish может работать не так, как ожидалось?

Подробнее здесь: https://stackoverflow.com/questions/791 ... asstransit
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «C#»