Код: Выделить всё
using System;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
namespace commons.Kafka;
public abstract class KafkaConsumer : BackgroundService where T : class
{
private readonly IConsumer _consumer;
private readonly string _groupId;
private readonly ILogger _logger;
protected KafkaConsumer(ILogger logger, IConfigurationSection config, params string[] topics)
{
_logger = logger;
_groupId = GetType().Name.ToKebabCase();
_consumer = new ConsumerBuilder(new ConsumerConfig
{
BootstrapServers = config["Kafka:BootstrapServers"],
GroupId = _groupId,
AutoOffsetReset = AutoOffsetReset.Earliest
}).Build();
_consumer.Subscribe(topics);
}
protected abstract Task ProcessMessageAsync(Headers headers, string key, T value);
protected virtual async Task ConsumeAsync(CancellationToken stoppingToken)
{
try
{
var consumeResult = _consumer.Consume(stoppingToken);
if (consumeResult != null)
{
_logger.LogInformation("{Consumer}: consumed {Key}", _groupId, consumeResult.Message.Key);
if (Type.GetTypeCode(typeof(T)) != TypeCode.Object)
{
await ProcessMessageAsync(consumeResult.Message.Headers, consumeResult.Message.Key,
consumeResult.Message.Value as T);
}
else
{
var value = JsonConvert.DeserializeObject(consumeResult.Message.Value);
await ProcessMessageAsync(consumeResult.Message.Headers, consumeResult.Message.Key, value);
}
}
}
catch (OperationCanceledException)
{
// Ignore cancellation exception
}
catch (Exception ex)
{
_logger.LogError(ex, "{Consumer}: Error consuming message", _groupId);
}
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("{Consumer}: starting", _groupId);
stoppingToken.Register(() =>
{
_logger.LogInformation("{Consumer}: stopping", _groupId);
_consumer.Close();
});
// create a new process to consume messages
while (!stoppingToken.IsCancellationRequested)
await Task.Run(() => ConsumeAsync(stoppingToken), stoppingToken);
_consumer.Close();
}
}
журнал здесь
[ 16:17:35 INF] обновленный потребитель: начинается
[16:17:35 INF] Сейчас слушаю: http://localhost:46000
[16:17:35 ИНФ] Приложение запущено. Нажмите Ctrl+C, чтобы завершить работу.
[16:17:35 INF] Среда хостинга: Разработка
[16:17:35 INF] Корневой путь содержимого: D:\PYXIS.API.CHANGELOG\src\PYXIS.API.CHANGELOG
[16:18:05 INF] update-consumer: потребляется 1
[16:18:05 INF] потребитель обновлений: использовано 2
[16:18:06 INF] потребитель обновлений: использовано 3
[16:18:20 INF] потребитель обновлений: использовано 4
[16:18:22 INF] потребитель обновлений: использовано 5
[16:18:24 INF] обновление-потребитель: израсходовано 6
%5|1733390315.292|PARTCNT|rdkafka#consumer-1| [thrd:main]: количество разделов в теме обновленной_записи изменено с 1 на 0
%3|1733390315.292|ОШИБКА|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: update_record [0]: нужный раздел больше не доступен (локально: неизвестный раздел)
%3|1733390315.292|ОШИБКА|rdkafka#consumer -1| [thrd:app]: rdkafka#consumer-1: update_record [0]: тема не существует (брокер: неизвестная тема или раздел)
%3|1733390315.526|ОШИБКА|rdkafka#consumer -1| [thrd:app]: rdkafka#consumer-1: update_record [0]: тема не существует (брокер: неизвестная тема или раздел)
%3|1733390316.528|ОШИБКА|rdkafka#consumer -1| [thrd:app]: rdkafka#consumer-1: update_record [0]: тема не существует (Брокер: неизвестная тема или раздел)
Подробнее здесь: https://stackoverflow.com/questions/792 ... onsumer-st