PublishEndpoint.Publish() Не отправляет сообщения с помощью Kafka в MassTransitC#

Место общения программистов C#
Ответить
Anonymous
 PublishEndpoint.Publish() Не отправляет сообщения с помощью Kafka в MassTransit

Сообщение Anonymous »

Я столкнулся с проблемой: метод Producer.Produce() работает и успешно отправляет сообщения в Kafka, но методPublishEndpoint.Publish() не выдаёт никаких сообщений. Я использую MassTransit с Kafka и подозреваю, что проблема связана с моей конфигурацией или тем, как MassTransit обрабатывает темы Kafka.
Соответствующая конфигурация:
Вот как я настроил Kafka в моем Startup.cs:

Код: Выделить всё

services.AddMassTransit(x =>
{
x.UsingInMemory();

x.AddRider(rider =>
{
rider.AddProducer("order-submitted");

rider.AddConsumers(Assembly.GetAssembly(typeof(Startup)));

rider.UsingKafka((context, k) =>
{
k.Host(Configuration["Kafka:Host"]);

k.TopicEndpoint("order-submitted", "consumer-group-A", e =>
{
e.AutoOffsetReset = AutoOffsetReset.Earliest;
e.ConfigureConsumer(context);
});
});
});

});
А это код, который я использую для публикации события:

Код: Выделить всё

public class OrderService : IOrderService
{
protected IConfiguration _configuration;
protected readonly IAsyncRepository _repository;

private readonly IPublishEndpoint _publishEndpoint;
private readonly ITopicProducer _producer;

public OrderService(IAsyncRepository repsitory,
IConfiguration configuration,
IPublishEndpoint publishEndpoint,
ITopicProducer producer)
{
_repository = repsitory;
_configuration = configuration;
_publishEndpoint = publishEndpoint;
_producer = producer;
}

public async Task SubmitOrder(long id)
{
var integrationEvent = new OrderSubmittedIntegrationEvent(
order.Id,
order.OrderDate,
order.OrderItems.Select(n => new OrderItemSubmittedIntegrationEvent(n.ItemClassId, n.Units)).ToList()
);

//await _publishEndpoint.Publish(integrationEvent);
await _producer.Produce(integrationEvent);
return validationResult;
}
Будем очень признательны за любую помощь или идеи!
Что работает:
Когда я использую Producer.Produce() с клиентом Kafka напрямую, сообщение успешно отправляется в тему.
Kafka и Zookeeper запущены и работают.
Тема Kafka существует и доступна.
Что не работает:
публикацияКонечная точка.Публикация() не отправляет никаких сообщений, не выдает никаких ошибок или исключений.
Я включил ведение журнала, но видимых ошибок, указывающих на то, что может пойти не так, нет.
Выполните Мне нужно явно запустить шину MassTransit при использовании Kafka?
Требует ли IPublishEndpoint определенной регистрации потребителей для успешной публикации сообщений?
Не хватает ли мне какой-либо конкретной конфигурации в Kafka Rider это может привести к тому, что публикация не работает как ожидалось?

Подробнее здесь: https://stackoverflow.com/questions/791 ... asstransit
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «C#»