Я пытаюсь настроить запланированную переоборудование в MassTransit при использовании kafka в качестве транспорта. REDELIVERY с помощью Hangfire , как это предлагается в документации. /> Я следовал инструкциям и заменил usedelayedredelivery с помощью userschedudredelivery .
Я также настроил Hangfire , как показано в документации.
Вот моя конфигурация:
services.AddMassTransit(configurator =>
{
configurator.AddPublishMessageScheduler();
configurator.AddHangfireConsumers();
configurator.SetKebabCaseEndpointNameFormatter();
configurator.UsingInMemory((context, cfg) =>
{
cfg.UsePublishMessageScheduler();
cfg.ConfigureEndpoints(context);
});
configurator.AddRider(rider =>
{
rider.AddConsumer();
rider.UsingKafka((context, factoryConfigurator) =>
{
var kafkaSettings = context.GetRequiredService().Value;
factoryConfigurator.Host(kafkaSettings.Host);
factoryConfigurator.TopicEndpoint(
kafkaSettings.ReputationTopic,
kafkaSettings.ReputationConsumerGroup,
cfg =>
{
cfg.ConfigureConsumer(context);
cfg.UseScheduledRedelivery(r => r.Intervals(ScheduledRedeliveryIntervals));
cfg.UseMessageRetry(r => r.Intervals(ImmediateRetryIntervals));
cfg.UseInMemoryOutbox(context);
});
});
});
});
< /code>
services.AddHangfire(x => x.UsePostgreSqlStorage(options =>
{
var connectionString = configuration.GetConnectionString(postgresSqlConnectionName);
options.UseNpgsqlConnection(connectionString);
})
.SetDataCompatibilityLevel(CompatibilityLevel.Version_180)
.UseSimpleAssemblyNameTypeSerializer()
.UseRecommendedSerializerSettings()
.UseSerilogLogProvider()
.UseFilter(new AutomaticRetryAttribute
{
Attempts = 10,
//5sec, 10 sec, 15sec, 30sec, 1min, 5min, 10min, 1h, 12h, 24h
DelaysInSeconds = [5, 10, 15, 30, 60, 300, 600, 1800, 43200, 86400]
}));
services.AddHangfireServer();
< /code>
What happens:
[*]Immediate retries work as expected.
[*]Scheduled redelivery Создает задание Hangfire (я вижу его в Dashfire панели панели).
Задача успешно выполняется, но впоследствии ничего не происходит-сообщение не переполнено Kafka , а мой потребитель никогда не запускается снова. Панель инструментов: < /p>
// Id: #324
using MassTransit.HangfireIntegration;
var scheduleJob = Activate();
await scheduleJob.SendMessage(
FromJson("{
\"HashId\":\"f053000015a25d25b0ec08dddd701bff\",
\"DestinationAddress\":\"loopback://localhost/kafka/main-events-topic\",
\"ContentType\":\"application/vnd.masstransit+json\",
\"ResponseAddress\":\"\",
\"FaultAddress\":\"\",
\"Body\":\"{\r\n \\\"messageId\\\": \\\"f0530000-15a2-5d25-4ce1-08dddd701c25\\\",\r\n \\\"requestId\\\": null,\r\n \\\"correlationId\\\": \\\"ca6d9695-f3e2-4f32-934b-24c4af52feb0\\\",\r\n \\\"conversationId\\\": \\\"44550000-5d25-0015-e878-08dddd6fc501\\\",\r\n \\\"initiatorId\\\": null,\r\n \\\"sourceAddress\\\": \\\"loopback://localhost/\\\",\r\n \\\"destinationAddress\\\": null,\r\n \\\"responseAddress\\\": null,\r\n \\\"faultAddress\\\": null,\r\n \\\"messageType\\\": [\r\n \\\"urn:message:UserService.Domain.Events:BaseEvent\\\"\r\n ],\r\n \\\"message\\\": {\r\n \\\"eventId\\\": \\\"ca6d9695-f3e2-4f32-934b-24c4af52feb0\\\",\r\n \\\"eventType\\\": \\\"QuestionUpvote\\\",\r\n \\\"userId\\\": 12\r\n },\r\n \\\"expirationTime\\\": null,\r\n \\\"sentTime\\\": \\\"2025-08-17T09:26:13.2771324Z\\\",\r\n \\\"headers\\\": {\r\n \\\"MessageId\\\": \\\"44550000-5d25-0015-e240-08dddd6fc501\\\",\r\n \\\"CorrelationId\\\": \\\"ca6d9695-f3e2-4f32-934b-24c4af52feb0\\\",\r\n \\\"ConversationId\\\": \\\"44550000-5d25-0015-e878-08dddd6fc501\\\",\r\n \\\"DestinationAddress\\\": \\\"loopback://localhost/kafka/main-events-topic\\\",\r\n \\\"SourceAddress\\\": \\\"loopback://localhost/\\\",\r\n \\\"MT-Activity-Id\\\": \\\"00-84da22b3a6b6023ddd446a02d845717d-2063eaf79320b845-01\\\",\r\n \\\"MT-OriginalMessageId\\\": \\\"44550000-5d25-0015-e240-08dddd6fc501\\\",\r\n \\\"MT-Redelivery-Count\\\": 1\r\n },\r\n \\\"host\\\": {\r\n \\\"machineName\\\": \\\"DESKTOP-P6ELB1O\\\",\r\n \\\"processName\\\": \\\"UserService.Api\\\",\r\n \\\"processId\\\": 21488,\r\n \\\"assembly\\\": \\\"UserService.Api\\\",\r\n \\\"assemblyVersion\\\": \\\"1.0.0.0\\\",\r\n \\\"frameworkVersion\\\": \\\"9.0.8\\\",\r\n \\\"massTransitVersion\\\": \\\"8.5.1.0\\\",\r\n \\\"operatingSystemVersion\\\": \\\"Microsoft Windows NT 10.0.26100.0\\\"\r\n }\r\n}\",
\"MessageId\":\"f0530000-15a2-5d25-80bf-08dddd701c0e\",
\"CorrelationId\":\"ca6d9695-f3e2-4f32-934b-24c4af52feb0\",
\"ConversationId\":\"44550000-5d25-0015-e878-08dddd6fc501\",
\"TokenId\":\"f0530000-15a2-5d25-b0ec-08dddd701bff\",
\"HeadersAsJson\":\"{\r\n \\\"MessageId\\\": \\\"44550000-5d25-0015-e240-08dddd6fc501\\\",\r\n \\\"CorrelationId\\\": \\\"ca6d9695-f3e2-4f32-934b-24c4af52feb0\\\",\r\n \\\"ConversationId\\\": \\\"44550000-5d25-0015-e878-08dddd6fc501\\\",\r\n \\\"DestinationAddress\\\": \\\"loopback://localhost/kafka/main-events-topic\\\",\r\n \\\"SourceAddress\\\": \\\"loopback://localhost/\\\",\r\n \\\"MT-Activity-Id\\\": \\\"00-84da22b3a6b6023ddd446a02d845717d-2063eaf79320b845-01\\\",\r\n \\\"MT-OriginalMessageId\\\": \\\"44550000-5d25-0015-e240-08dddd6fc501\\\",\r\n \\\"MT-Redelivery-Count\\\": 1\r\n}\",
\"MessageType\":\"urn:message:UserService.Domain.Events:BaseEvent\",
\"Destination\":\"loopback://localhost/kafka/main-events-topic\"
}"),
null);
< /code>
My questions:
Do I need to configure UsePublishMessageScheduler() И при использовании inmemory , так и при использовании Kafka ? Действительно повторно и доставлен потребителям?public class ReputationEventConsumer : IConsumer
{
public Task Consume(ConsumeContext context)
{
// Throw exception on purpose to trigger retries/redelivery
throw new NotImplementedByDesignException();
}
}
Подробнее здесь: https://stackoverflow.com/questions/797 ... s-not-work
MassTransit Запланированная перераспределение с Kafka и Hangfire не работает ⇐ C#
Место общения программистов C#
1755424974
Anonymous
Я пытаюсь настроить запланированную переоборудование в MassTransit при использовании kafka в качестве транспорта. REDELIVERY с помощью Hangfire , как это предлагается в документации. /> Я следовал инструкциям и заменил usedelayedredelivery с помощью userschedudredelivery .
Я также настроил Hangfire , как показано в документации.
Вот моя конфигурация:
services.AddMassTransit(configurator =>
{
configurator.AddPublishMessageScheduler();
configurator.AddHangfireConsumers();
configurator.SetKebabCaseEndpointNameFormatter();
configurator.UsingInMemory((context, cfg) =>
{
cfg.UsePublishMessageScheduler();
cfg.ConfigureEndpoints(context);
});
configurator.AddRider(rider =>
{
rider.AddConsumer();
rider.UsingKafka((context, factoryConfigurator) =>
{
var kafkaSettings = context.GetRequiredService().Value;
factoryConfigurator.Host(kafkaSettings.Host);
factoryConfigurator.TopicEndpoint(
kafkaSettings.ReputationTopic,
kafkaSettings.ReputationConsumerGroup,
cfg =>
{
cfg.ConfigureConsumer(context);
cfg.UseScheduledRedelivery(r => r.Intervals(ScheduledRedeliveryIntervals));
cfg.UseMessageRetry(r => r.Intervals(ImmediateRetryIntervals));
cfg.UseInMemoryOutbox(context);
});
});
});
});
< /code>
services.AddHangfire(x => x.UsePostgreSqlStorage(options =>
{
var connectionString = configuration.GetConnectionString(postgresSqlConnectionName);
options.UseNpgsqlConnection(connectionString);
})
.SetDataCompatibilityLevel(CompatibilityLevel.Version_180)
.UseSimpleAssemblyNameTypeSerializer()
.UseRecommendedSerializerSettings()
.UseSerilogLogProvider()
.UseFilter(new AutomaticRetryAttribute
{
Attempts = 10,
//5sec, 10 sec, 15sec, 30sec, 1min, 5min, 10min, 1h, 12h, 24h
DelaysInSeconds = [5, 10, 15, 30, 60, 300, 600, 1800, 43200, 86400]
}));
services.AddHangfireServer();
< /code>
What happens:
[*]Immediate retries work as expected.
[*]Scheduled redelivery Создает задание Hangfire (я вижу его в Dashfire панели панели).
Задача успешно выполняется, но впоследствии ничего не происходит-сообщение не переполнено Kafka , а мой потребитель никогда не запускается снова. Панель инструментов: < /p>
// Id: #324
using MassTransit.HangfireIntegration;
var scheduleJob = Activate();
await scheduleJob.SendMessage(
FromJson("{
\"HashId\":\"f053000015a25d25b0ec08dddd701bff\",
\"DestinationAddress\":\"loopback://localhost/kafka/main-events-topic\",
\"ContentType\":\"application/vnd.masstransit+json\",
\"ResponseAddress\":\"\",
\"FaultAddress\":\"\",
\"Body\":\"{\r\n \\\"messageId\\\": \\\"f0530000-15a2-5d25-4ce1-08dddd701c25\\\",\r\n \\\"requestId\\\": null,\r\n \\\"correlationId\\\": \\\"ca6d9695-f3e2-4f32-934b-24c4af52feb0\\\",\r\n \\\"conversationId\\\": \\\"44550000-5d25-0015-e878-08dddd6fc501\\\",\r\n \\\"initiatorId\\\": null,\r\n \\\"sourceAddress\\\": \\\"loopback://localhost/\\\",\r\n \\\"destinationAddress\\\": null,\r\n \\\"responseAddress\\\": null,\r\n \\\"faultAddress\\\": null,\r\n \\\"messageType\\\": [\r\n \\\"urn:message:UserService.Domain.Events:BaseEvent\\\"\r\n ],\r\n \\\"message\\\": {\r\n \\\"eventId\\\": \\\"ca6d9695-f3e2-4f32-934b-24c4af52feb0\\\",\r\n \\\"eventType\\\": \\\"QuestionUpvote\\\",\r\n \\\"userId\\\": 12\r\n },\r\n \\\"expirationTime\\\": null,\r\n \\\"sentTime\\\": \\\"2025-08-17T09:26:13.2771324Z\\\",\r\n \\\"headers\\\": {\r\n \\\"MessageId\\\": \\\"44550000-5d25-0015-e240-08dddd6fc501\\\",\r\n \\\"CorrelationId\\\": \\\"ca6d9695-f3e2-4f32-934b-24c4af52feb0\\\",\r\n \\\"ConversationId\\\": \\\"44550000-5d25-0015-e878-08dddd6fc501\\\",\r\n \\\"DestinationAddress\\\": \\\"loopback://localhost/kafka/main-events-topic\\\",\r\n \\\"SourceAddress\\\": \\\"loopback://localhost/\\\",\r\n \\\"MT-Activity-Id\\\": \\\"00-84da22b3a6b6023ddd446a02d845717d-2063eaf79320b845-01\\\",\r\n \\\"MT-OriginalMessageId\\\": \\\"44550000-5d25-0015-e240-08dddd6fc501\\\",\r\n \\\"MT-Redelivery-Count\\\": 1\r\n },\r\n \\\"host\\\": {\r\n \\\"machineName\\\": \\\"DESKTOP-P6ELB1O\\\",\r\n \\\"processName\\\": \\\"UserService.Api\\\",\r\n \\\"processId\\\": 21488,\r\n \\\"assembly\\\": \\\"UserService.Api\\\",\r\n \\\"assemblyVersion\\\": \\\"1.0.0.0\\\",\r\n \\\"frameworkVersion\\\": \\\"9.0.8\\\",\r\n \\\"massTransitVersion\\\": \\\"8.5.1.0\\\",\r\n \\\"operatingSystemVersion\\\": \\\"Microsoft Windows NT 10.0.26100.0\\\"\r\n }\r\n}\",
\"MessageId\":\"f0530000-15a2-5d25-80bf-08dddd701c0e\",
\"CorrelationId\":\"ca6d9695-f3e2-4f32-934b-24c4af52feb0\",
\"ConversationId\":\"44550000-5d25-0015-e878-08dddd6fc501\",
\"TokenId\":\"f0530000-15a2-5d25-b0ec-08dddd701bff\",
\"HeadersAsJson\":\"{\r\n \\\"MessageId\\\": \\\"44550000-5d25-0015-e240-08dddd6fc501\\\",\r\n \\\"CorrelationId\\\": \\\"ca6d9695-f3e2-4f32-934b-24c4af52feb0\\\",\r\n \\\"ConversationId\\\": \\\"44550000-5d25-0015-e878-08dddd6fc501\\\",\r\n \\\"DestinationAddress\\\": \\\"loopback://localhost/kafka/main-events-topic\\\",\r\n \\\"SourceAddress\\\": \\\"loopback://localhost/\\\",\r\n \\\"MT-Activity-Id\\\": \\\"00-84da22b3a6b6023ddd446a02d845717d-2063eaf79320b845-01\\\",\r\n \\\"MT-OriginalMessageId\\\": \\\"44550000-5d25-0015-e240-08dddd6fc501\\\",\r\n \\\"MT-Redelivery-Count\\\": 1\r\n}\",
\"MessageType\":\"urn:message:UserService.Domain.Events:BaseEvent\",
\"Destination\":\"loopback://localhost/kafka/main-events-topic\"
}"),
null);
< /code>
My questions:
Do I need to configure UsePublishMessageScheduler() И при использовании inmemory , так и при использовании Kafka ? Действительно повторно и доставлен потребителям?public class ReputationEventConsumer : IConsumer
{
public Task Consume(ConsumeContext context)
{
// Throw exception on purpose to trigger retries/redelivery
throw new NotImplementedByDesignException();
}
}
Подробнее здесь: [url]https://stackoverflow.com/questions/79737756/masstransit-scheduled-redelivery-with-kafka-and-hangfire-does-not-work[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия