У меня есть следующий код для чтения данных из темы Кафки. Моя цель периодически - читать только последнее новейшее сообщение в теме, потому что я хочу использовать данные в живой диаграмме. Я написал следующий код. Но если я запускаю код, я начинаю читать где -нибудь из прошлого (24 часа назад). Я думаю, что я должен определить что -то вроде смещения в моем коде? Как я могу сделать это в конфликтном потребителе Kafka? < /P>
public void Read_from_Kafka()
{
try
{
var config = new ConsumerConfig
{
BootstrapServers = kafka_URI,
GroupId = "group",
AutoOffsetReset = AutoOffsetReset.Earliest,
SecurityProtocol = SecurityProtocol.Ssl,
SslCaLocation = "path1",
SslCertificateLocation = "path2",
SslKeyLocation = "path3",
SslKeyPassword = "password",
};
CancellationTokenSource source = new CancellationTokenSource();
CancellationToken cancellationToken = source.Token;
using (var consumer = new ConsumerBuilder(config).Build())
{
consumer.Subscribe(topic_name);
while (!cancellationToken.IsCancellationRequested)
{
var consumeResult = consumer.Consume(cancellationToken);
Kafka_message_total = consumeResult.Message.Value;
using (StreamWriter sw = File.AppendText(json_log_file))
{
sw.WriteLine("JSON: " + Kafka_message_total + " " + Convert.ToString(DateTime.Now));
}
System.Threading.Thread.Sleep(2000);
}
consumer.Close();
}
using (StreamWriter sw = File.AppendText(error_log))
{
sw.WriteLine("Stop Kafka " + " " + Convert.ToString(DateTime.Now));
}
}
catch(Exception ex)
{
using (StreamWriter sw = File.AppendText(error_log))
{
sw.WriteLine("Kafka Read Error: " + ex + " " + Convert.ToString(DateTime.Now));
}
}
< /code>
} < /p>
update-1 < /p>
Я попытался установить AutooffsetReset = AutoOffSetReset.latest, но все же я читаю данные из прошлого. Я думаю, что этого настройки недостаточно для моей цели.
Подробнее здесь: https://stackoverflow.com/questions/758 ... th-kafka-c
Как я могу потреблять только последнее (новое) сообщение из темы Kafka с Cafka Confluent Consumer? ⇐ C#
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
Концентраторы событий Azure с Confluent.Kafka. Я могу производить, но не потреблять
Anonymous » » в форуме C# - 0 Ответы
- 21 Просмотры
-
Последнее сообщение Anonymous
-
-
-
Zookeeper Confluent Kafka «Ошибка открытия файла Zookeeper-gc.log» при запуске Confluent
Anonymous » » в форуме JAVA - 0 Ответы
- 23 Просмотры
-
Последнее сообщение Anonymous
-