Как я могу употреблять только последнее (новое) сообщение из темы Кафки с конфликтным потребителем Kafka в C#?C#

Место общения программистов C#
Ответить Пред. темаСлед. тема
Anonymous
 Как я могу употреблять только последнее (новое) сообщение из темы Кафки с конфликтным потребителем Kafka в C#?

Сообщение Anonymous »

У меня есть следующий код для чтения данных из темы Кафки. Моя цель периодически - читать только последнее новейшее сообщение в теме, потому что я хочу использовать данные в живой диаграмме. Я написал следующий код. Но если я запускаю код, я начинаю читать где -нибудь из прошлого (24 часа назад). Я думаю, что я должен определить что -то вроде смещения в моем коде? Как я могу сделать это в конфликтном потребителе Kafka? < /P>

Код: Выделить всё

public void Read_from_Kafka()
{
try
{
var  config = new ConsumerConfig
{
BootstrapServers = kafka_URI,
GroupId = "group",
AutoOffsetReset = AutoOffsetReset.Earliest,
SecurityProtocol = SecurityProtocol.Ssl,
SslCaLocation = "path1",
SslCertificateLocation = "path2",
SslKeyLocation = "path3",
SslKeyPassword = "password",

};

CancellationTokenSource source = new CancellationTokenSource();
CancellationToken cancellationToken = source.Token;

using (var consumer = new ConsumerBuilder(config).Build())
{
consumer.Subscribe(topic_name);
while (!cancellationToken.IsCancellationRequested)
{
var consumeResult = consumer.Consume(cancellationToken);
Kafka_message_total = consumeResult.Message.Value;

using (StreamWriter sw = File.AppendText(json_log_file))
{
sw.WriteLine("JSON: " + Kafka_message_total + " " + Convert.ToString(DateTime.Now));
}
System.Threading.Thread.Sleep(2000);
}
consumer.Close();
}
using (StreamWriter sw = File.AppendText(error_log))
{
sw.WriteLine("Stop Kafka " + " " + Convert.ToString(DateTime.Now));
}
}

catch(Exception ex)
{

using (StreamWriter sw = File.AppendText(error_log))
{
sw.WriteLine("Kafka Read Error: " + ex + " " + Convert.ToString(DateTime.Now));
}
}
}
Update-1
I have tried to set AutoOffsetReset = AutoOffsetReset.Latest but still I am reading data from the past. Я думаю, что этого настройки недостаточно для моей цели.

Подробнее здесь: https://stackoverflow.com/questions/758 ... th-kafka-c
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «C#»