С помощью этой конфигурации я могу успешно создать сообщение;
Код: Выделить всё
var config = new ProducerConfig
{
BootstrapServers = bootstrapServers,
SecurityProtocol = SecurityProtocol.SaslSsl,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = saslUsername,
SaslPassword = saslPassword
};
using (var producer = new ProducerBuilder(config).Build())
{
Console.WriteLine("Enter a message to send to the topic (or 'exit' to quit):");
string message;
while ((message = Console.ReadLine()) != "exit")
{
try
{
var result = await producer.ProduceAsync(topic, new Message { Value = message });
Console.WriteLine($"Message '{message}' sent to topic '{topic}' at offset {result.Offset}");
}
catch (ProduceException e)
{
Console.WriteLine($"Delivery failed: {e.Error.Reason}");
}
}
}
Затем я делаю то же самое с потребителем (еще одно подобное консольное приложение):
< pre class="lang-cs Prettyprint-override">
Код: Выделить всё
var config = new ConsumerConfig
{
BootstrapServers = bootstrapServers,
SecurityProtocol = SecurityProtocol.SaslSsl,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = saslUsername,
SaslPassword = saslPassword,
GroupId = consumerGroup,
SslEndpointIdentificationAlgorithm = SslEndpointIdentificationAlgorithm.None, // Disable endpoint identification
//SessionTimeoutMs = 6000, // To reduce disconnection frequency
//MaxPollIntervalMs = 10000,
//SocketKeepaliveEnable = true,
Debug = "all", // Enables detailed logs
AutoOffsetReset = AutoOffsetReset.Latest,
EnableAutoCommit = true,
SocketKeepaliveEnable = true
};
...
consumer.Subscribe(topic);
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true;
cts.Cancel();
};
try
{
while (true)
{
try
{
var consumeResult = consumer.Consume(cts.Token);
string logMessage = $"Consumed message '{consumeResult.Message.Value}' at: '{consumeResult.TopicPartitionOffset}'.";
Я покопался в журналах, и единственные проблемы Я вижу (если это проблемы) такие вещи:
Код: Выделить всё
[Confluent.Kafka] Debug: [thrd:main]: Topic anothertest [2]: stored offset INVALID (leader epoch -1), committed offset INVALID (leader epoch -1): not including in commit[Confluent.Kafka] Debug: [thrd:main]: anothertest [1]: skipping offset validation for offset 7 (leader epoch -1): no leader epoch set
[Confluent.Kafka] Debug: [thrd:main]: anothertest [1]: skipping offset validation for offset 7 (leader epoch -1): no leader epoch set
[Confluent.Kafka] Debug: [thrd:main]: Topic anothertest [2]: broker is down: re-query
Есть идеи или предложения относительно того, что попробовать дальше?
Подробнее здесь: https://stackoverflow.com/questions/791 ... ot-consume