Как я могу потреблять только последнее (новое) сообщение из темы Kafka с Cafka Confluent Consumer?C#

Место общения программистов C#
Ответить Пред. темаСлед. тема
Anonymous
 Как я могу потреблять только последнее (новое) сообщение из темы Kafka с Cafka Confluent Consumer?

Сообщение Anonymous »

У меня есть следующий код для чтения данных из темы Кафки. Моя цель периодически - читать только последнее новейшее сообщение в теме, потому что я хочу использовать данные в живой диаграмме. Я написал следующий код. Но если я запускаю код, я начинаю читать где -нибудь из прошлого (24 часа назад). Я думаю, что я должен определить что -то вроде смещения в моем коде? Как я могу сделать это в конфликтном потребителе Kafka? < /P>
public void Read_from_Kafka()
{
try
{
var config = new ConsumerConfig
{
BootstrapServers = kafka_URI,
GroupId = "group",
AutoOffsetReset = AutoOffsetReset.Earliest,
SecurityProtocol = SecurityProtocol.Ssl,
SslCaLocation = "path1",
SslCertificateLocation = "path2",
SslKeyLocation = "path3",
SslKeyPassword = "password",

};

CancellationTokenSource source = new CancellationTokenSource();
CancellationToken cancellationToken = source.Token;

using (var consumer = new ConsumerBuilder(config).Build())
{
consumer.Subscribe(topic_name);
while (!cancellationToken.IsCancellationRequested)
{
var consumeResult = consumer.Consume(cancellationToken);
Kafka_message_total = consumeResult.Message.Value;

using (StreamWriter sw = File.AppendText(json_log_file))
{
sw.WriteLine("JSON: " + Kafka_message_total + " " + Convert.ToString(DateTime.Now));
}
System.Threading.Thread.Sleep(2000);
}
consumer.Close();
}
using (StreamWriter sw = File.AppendText(error_log))
{
sw.WriteLine("Stop Kafka " + " " + Convert.ToString(DateTime.Now));
}
}

catch(Exception ex)
{

using (StreamWriter sw = File.AppendText(error_log))
{
sw.WriteLine("Kafka Read Error: " + ex + " " + Convert.ToString(DateTime.Now));
}
}
< /code>
} < /p>
update-1 < /p>
Я попытался установить AutooffsetReset = AutoOffSetReset.latest, но все же я читаю данные из прошлого. Я думаю, что этого настройки недостаточно для моей цели.

Подробнее здесь: https://stackoverflow.com/questions/758 ... th-kafka-c
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Как я могу потреблять только последнее (новое) сообщение из темы Kafka с Cafka Confluent Consumer?
    Anonymous » » в форуме C#
    0 Ответы
    3 Просмотры
    Последнее сообщение Anonymous
  • Это правильная конфигурация Cafka Consumer - под этой настройкой Kafka?
    Anonymous » » в форуме JAVA
    0 Ответы
    7 Просмотры
    Последнее сообщение Anonymous
  • Это правильная конфигурация Cafka Consumer - под этой настройкой Kafka?
    Anonymous » » в форуме JAVA
    0 Ответы
    2 Просмотры
    Последнее сообщение Anonymous
  • Концентраторы событий Azure с Confluent.Kafka. Я могу производить, но не потреблять
    Anonymous » » в форуме C#
    0 Ответы
    21 Просмотры
    Последнее сообщение Anonymous
  • Zookeeper Confluent Kafka «Ошибка открытия файла Zookeeper-gc.log» при запуске Confluent
    Anonymous » » в форуме JAVA
    0 Ответы
    23 Просмотры
    Последнее сообщение Anonymous

Вернуться в «C#»