Соответствующая конфигурация:
Вот как я настроил Kafka в моем Startup.cs:
Код: Выделить всё
services.AddMassTransit(x =>
{
x.UsingInMemory();
x.AddRider(rider =>
{
rider.AddProducer("order-submitted");
rider.AddConsumers(Assembly.GetAssembly(typeof(Startup)));
rider.UsingKafka((context, k) =>
{
k.Host(Configuration["Kafka:Host"]);
k.TopicEndpoint("order-submitted", "consumer-group-A", e =>
{
e.AutoOffsetReset = AutoOffsetReset.Earliest;
e.ConfigureConsumer(context);
});
});
});
});
Код: Выделить всё
public class OrderService : IOrderService
{
protected IConfiguration _configuration;
protected readonly IAsyncRepository _repository;
private readonly IPublishEndpoint _publishEndpoint;
private readonly ITopicProducer _producer;
public OrderService(IAsyncRepository repsitory,
IConfiguration configuration,
IPublishEndpoint publishEndpoint,
ITopicProducer producer)
{
_repository = repsitory;
_configuration = configuration;
_publishEndpoint = publishEndpoint;
_producer = producer;
}
public async Task SubmitOrder(long id)
{
var integrationEvent = new OrderSubmittedIntegrationEvent(
order.Id,
order.OrderDate,
order.OrderItems.Select(n => new OrderItemSubmittedIntegrationEvent(n.ItemClassId, n.Units)).ToList()
);
//await _publishEndpoint.Publish(integrationEvent);
await _producer.Produce(integrationEvent);
return validationResult;
}
Что работает:
Когда я использую Producer.Produce() с клиентом Kafka напрямую, сообщение успешно отправляется в тему.
Kafka и Zookeeper запущены и работают.
Тема Kafka существует и доступна.
Что не работает:
публикацияКонечная точка.Публикация() не отправляет никаких сообщений, не выдает никаких ошибок или исключений.
Я включил ведение журнала, но видимых ошибок, указывающих на то, что может пойти не так, нет.
Выполните Мне нужно явно запустить шину MassTransit при использовании Kafka?
Требует ли IPublishEndpoint определенной регистрации потребителей для успешной публикации сообщений?
Не хватает ли мне какой-либо конкретной конфигурации в Kafka Rider это может привести к тому, что публикация не работает как ожидалось?
Подробнее здесь: https://stackoverflow.com/questions/791 ... asstransit
Мобильная версия