У меня есть следующий код для чтения данных из темы Кафки. Моя цель периодически - читать только последнее новейшее сообщение в теме, потому что я хочу использовать данные в живой диаграмме. Я написал следующий код. Но если я запускаю код, я начинаю читать где -нибудь из прошлого (24 часа назад). Я думаю, что я должен определить что -то вроде смещения в моем коде? Как я могу сделать это в конфликтном потребителе Kafka? < /P>
public void Read_from_Kafka()
{
try
{
var config = new ConsumerConfig
{
BootstrapServers = kafka_URI,
GroupId = "group",
AutoOffsetReset = AutoOffsetReset.Earliest,
SecurityProtocol = SecurityProtocol.Ssl,
SslCaLocation = "path1",
SslCertificateLocation = "path2",
SslKeyLocation = "path3",
SslKeyPassword = "password",
};
CancellationTokenSource source = new CancellationTokenSource();
CancellationToken cancellationToken = source.Token;
using (var consumer = new ConsumerBuilder(config).Build())
{
consumer.Subscribe(topic_name);
while (!cancellationToken.IsCancellationRequested)
{
var consumeResult = consumer.Consume(cancellationToken);
Kafka_message_total = consumeResult.Message.Value;
using (StreamWriter sw = File.AppendText(json_log_file))
{
sw.WriteLine("JSON: " + Kafka_message_total + " " + Convert.ToString(DateTime.Now));
}
System.Threading.Thread.Sleep(2000);
}
consumer.Close();
}
using (StreamWriter sw = File.AppendText(error_log))
{
sw.WriteLine("Stop Kafka " + " " + Convert.ToString(DateTime.Now));
}
}
catch(Exception ex)
{
using (StreamWriter sw = File.AppendText(error_log))
{
sw.WriteLine("Kafka Read Error: " + ex + " " + Convert.ToString(DateTime.Now));
}
}
< /code>
} < /p>
update-1 < /p>
Я попытался установить AutooffsetReset = AutoOffSetReset.latest, но все же я читаю данные из прошлого. Я думаю, что этого настройки недостаточно для моей цели.
Подробнее здесь: https://stackoverflow.com/questions/758 ... th-kafka-c
Как я могу потреблять только последнее (новое) сообщение из темы Kafka с Cafka Confluent Consumer? ⇐ C#
Место общения программистов C#
1754900697
Anonymous
У меня есть следующий код для чтения данных из темы Кафки. Моя цель периодически - читать только последнее новейшее сообщение в теме, потому что я хочу использовать данные в живой диаграмме. Я написал следующий код. Но если я запускаю код, я начинаю читать где -нибудь из прошлого (24 часа назад). Я думаю, что я должен определить что -то вроде смещения в моем коде? Как я могу сделать это в конфликтном потребителе Kafka? < /P>
public void Read_from_Kafka()
{
try
{
var config = new ConsumerConfig
{
BootstrapServers = kafka_URI,
GroupId = "group",
AutoOffsetReset = AutoOffsetReset.Earliest,
SecurityProtocol = SecurityProtocol.Ssl,
SslCaLocation = "path1",
SslCertificateLocation = "path2",
SslKeyLocation = "path3",
SslKeyPassword = "password",
};
CancellationTokenSource source = new CancellationTokenSource();
CancellationToken cancellationToken = source.Token;
using (var consumer = new ConsumerBuilder(config).Build())
{
consumer.Subscribe(topic_name);
while (!cancellationToken.IsCancellationRequested)
{
var consumeResult = consumer.Consume(cancellationToken);
Kafka_message_total = consumeResult.Message.Value;
using (StreamWriter sw = File.AppendText(json_log_file))
{
sw.WriteLine("JSON: " + Kafka_message_total + " " + Convert.ToString(DateTime.Now));
}
System.Threading.Thread.Sleep(2000);
}
consumer.Close();
}
using (StreamWriter sw = File.AppendText(error_log))
{
sw.WriteLine("Stop Kafka " + " " + Convert.ToString(DateTime.Now));
}
}
catch(Exception ex)
{
using (StreamWriter sw = File.AppendText(error_log))
{
sw.WriteLine("Kafka Read Error: " + ex + " " + Convert.ToString(DateTime.Now));
}
}
< /code>
} < /p>
update-1 < /p>
Я попытался установить AutooffsetReset = AutoOffSetReset.latest, но все же я читаю данные из прошлого. Я думаю, что этого настройки недостаточно для моей цели.
Подробнее здесь: [url]https://stackoverflow.com/questions/75886136/how-can-i-consume-just-the-last-newest-message-from-a-kafka-topic-with-kafka-c[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия