Концентраторы событий Azure с Confluent.Kafka. Я могу производить, но не потреблятьC#

Место общения программистов C#
Ответить Пред. темаСлед. тема
Anonymous
 Концентраторы событий Azure с Confluent.Kafka. Я могу производить, но не потреблять

Сообщение Anonymous »

Я борюсь с проблемой, которую не могу понять.
С помощью этой конфигурации я могу успешно создать сообщение;

Код: Выделить всё

 var config = new ProducerConfig
{
BootstrapServers = bootstrapServers,
SecurityProtocol = SecurityProtocol.SaslSsl,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = saslUsername,
SaslPassword = saslPassword
};

using (var producer = new ProducerBuilder(config).Build())
{
Console.WriteLine("Enter a message to send to the topic (or 'exit' to quit):");
string message;
while ((message = Console.ReadLine()) != "exit")
{
try
{
var result = await producer.ProduceAsync(topic, new Message { Value = message });
Console.WriteLine($"Message '{message}' sent to topic '{topic}' at offset {result.Offset}");
}
catch (ProduceException e)
{
Console.WriteLine($"Delivery failed: {e.Error.Reason}");
}
}
}
Работает без проблем. Я могу зайти на свой портал Azure и просмотреть каждое написанное мной сообщение.
Затем я делаю то же самое с потребителем (еще одно подобное консольное приложение):
< pre class="lang-cs Prettyprint-override">

Код: Выделить всё

   var config = new ConsumerConfig
{
BootstrapServers = bootstrapServers,
SecurityProtocol = SecurityProtocol.SaslSsl,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = saslUsername,
SaslPassword = saslPassword,
GroupId = consumerGroup,
SslEndpointIdentificationAlgorithm = SslEndpointIdentificationAlgorithm.None, // Disable endpoint identification
//SessionTimeoutMs = 6000,  // To reduce disconnection frequency
//MaxPollIntervalMs = 10000,
//SocketKeepaliveEnable = true,
Debug = "all", // Enables detailed logs
AutoOffsetReset = AutoOffsetReset.Latest,
EnableAutoCommit = true,
SocketKeepaliveEnable = true
};

...
consumer.Subscribe(topic);

CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true;
cts.Cancel();
};

try
{
while (true)
{
try
{
var consumeResult = consumer.Consume(cts.Token);
string logMessage = $"Consumed message '{consumeResult.Message.Value}' at: '{consumeResult.TopicPartitionOffset}'.";
С того момента, как я делаю Consumer.Consume(cts.Token), я не получаю никакого сообщения.
Я покопался в журналах, и единственные проблемы Я вижу (если это проблемы) такие вещи:

Код: Выделить всё

[Confluent.Kafka] Debug: [thrd:main]: Topic anothertest [2]: stored offset INVALID (leader epoch -1), committed offset INVALID (leader epoch -1): not including in commit[Confluent.Kafka] Debug: [thrd:main]: anothertest [1]: skipping offset validation for offset 7 (leader epoch -1): no leader epoch set
[Confluent.Kafka] Debug: [thrd:main]: anothertest [1]: skipping offset validation for offset 7 (leader epoch -1): no leader epoch set
[Confluent.Kafka] Debug: [thrd:main]: Topic anothertest [2]: broker is down: re-query
Думаю, проблема где-то в «чтении», но я не понимаю, в чем, и я перепробовал все возможные комбинации, чтобы понять, что происходит, но безуспешно.
Есть идеи или предложения относительно того, что попробовать дальше?

Подробнее здесь: https://stackoverflow.com/questions/791 ... ot-consume
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «C#»