Соответствующая конфигурация:
Вот как я настроил Kafka в моем Startup.cs:
Код: Выделить всё
services.AddMassTransit(x =>
{
x.UsingInMemory();
x.AddRider(rider =>
{
rider.AddProducer("order-submitted");
rider.AddConsumers(Assembly.GetAssembly(typeof(Startup)));
rider.UsingKafka((context, k) =>
{
k.Host(Configuration["Kafka:Host"]);
k.TopicEndpoint("order-submitted", "consumer-group-A", e =>
{
e.AutoOffsetReset = AutoOffsetReset.Earliest;
e.ConfigureConsumer(context);
});
});
});
});
Код: Выделить всё
public class OrderService : IOrderService
{
protected IConfiguration _configuration;
protected readonly IAsyncRepository _repository;
private readonly IPublishEndpoint _publishEndpoint;
private readonly ITopicProducer _producer;
public OrderService(IAsyncRepository repsitory,
IConfiguration configuration,
IPublishEndpoint publishEndpoint,
ITopicProducer producer)
{
_repository = repsitory;
_configuration = configuration;
_publishEndpoint = publishEndpoint;
_producer = producer;
}
public async Task SubmitOrder(long id)
{
var integrationEvent = new OrderSubmittedIntegrationEvent(
order.Id,
order.OrderDate,
order.OrderItems.Select(n => new OrderItemSubmittedIntegrationEvent(n.ItemClassId, n.Units)).ToList()
);
//await _publishEndpoint.Publish(integrationEvent);
await _producer.Produce(integrationEvent);
return validationResult;
}
Что работает:
Когда я использую Producer.Produce() с клиентом Kafka напрямую, сообщение успешно отправляется в тему.
Kafka и Zookeeper запущены и работают.
Тема Kafka существует и доступна.
Что не работает:
PublishEndpoint.Publish() не отправляет никаких сообщений, не генерирует никаких ошибок или исключений.
Я включил ведение журнала, но видимых ошибок, указывающих на то, что может пойти не так, нет.
Нужно ли мне явно запускать шину MassTransit при использовании Kafka?
Требует ли IPublishEndpoint определенной регистрации потребителей для успешной публикации сообщений?
Я упустил что-то конкретное конфигурация в Kafka Rider, из-за которой Publish может работать не так, как ожидалось?
Подробнее здесь: https://stackoverflow.com/questions/791 ... asstransit
Мобильная версия