Проблема слияния C # Kafka с сериализацией avroC#

Место общения программистов C#
Ответить
Anonymous
 Проблема слияния C # Kafka с сериализацией avro

Сообщение Anonymous »

Я использую Docker для запуска Kafka и других сервисов из https://github.com/confluentinc/cp-all-in-one
с слитными пакетами Nuget для Kafka, avro и SchemaRegistry в моем тестовом проекте.
Если дело касается отправки сообщений json, у меня до сих пор нет проблем, но я борюсь с отправкой сериализованных сообщений avro.
Я видел https://github.com/confluentinc/conflue ... roSpecific пример, и я пытался сделать это таким же образом, но в конечном итоге получаю исключение, как показано ниже:

Локально: ошибка сериализации значения

в Confluent.Kafka.Producer

Код: Выделить всё

2.
d__52.MoveNext() at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Runtime.CompilerServices.TaskAwaiter
1.GetResult()
в Kafka_producer.KafkaService.d__10.MoveNext() в
C:\Users\lu95eb\source\repos\Kafka_playground\Kafka Producer\KafkaService.cs:строка 126

с внутренним исключением

Ссылка на объект не установлена на экземпляр объекта.

в Confluent.SchemaRegistry.Serdes.SpecificSerializerImpl

Код: Выделить всё

1..ctor(ISchemaRegistryClient schemaRegistryClient, Boolean autoRegisterSchema, Int32 initialBufferSize) at Confluent.SchemaRegistry.Serdes.AvroSerializer
1.d__6.MoveNext()
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task Task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task Task)
at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(Task Task)
в Confluent.Kafka.Producer`2.d__52.MoveNext()

Вот мой классSpecificRecord

Код: Выделить всё

public class UserInfo : ISpecificRecord
{
public string Name { get; set; }
public int[] Numbers { get; set; }

public Schema Schema => Schema.Parse(@"
{
""name"": ""UserInfo"",
""type"": ""record"",
""namespace"": ""kafka"",
""fields"": [
{
""name"": ""Name"",
""type"": ""string""
},
{
""name"": ""Numbers"",
""type"": {
""type"": ""array"",
""items"": ""int""
}
}
]
}
");

public object Get(int fieldPos)
{
switch (fieldPos)
{
case 0: return Name;
case 1: return Numbers;
default: throw new AvroRuntimeException($"Bad index {fieldPos} in Get()");
}
}

public void Put(int fieldPos, object fieldValue)
{
switch (fieldPos)
{
case 0: Name = (string)fieldValue; break;
case 1: Numbers = (int[])fieldValue;  break;
default: throw new AvroRuntimeException($"Bad index {fieldPos} in Put()");
}
}
}
И метод отправки сообщения

Код: Выделить всё

private async Task SendSpecificRecord(UserInfo userInfo)
{
using (var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { Url = _schemaRegistryUrl }))
using (var producer =
new ProducerBuilder(new ProducerConfig { BootstrapServers = _brokerUrl })
.SetKeySerializer(new AvroSerializer(schemaRegistry))
.SetValueSerializer(new AvroSerializer(schemaRegistry))
.Build())
{

var message = new Message
{
Key = userInfo.Name,
Value = userInfo
};

await producer.ProduceAsync(SpecificTopic, message);
}
}
KafkaService.cs:строка 126 ожидает производителя.ProduceAsync(SpecificTopic, message);
Как я писал вначале, у меня нет проблем с реестром схемы — у меня зарегистрированы схемы, и они правильно работают для json, у меня нет проблем с темами, брокером, потребителем или чем-то еще.
Я буду благодарен, если кто-нибудь подскажет мне, что я делаю неправильно.
Заранее спасибо.

Подробнее здесь: https://stackoverflow.com/questions/625 ... ialization
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «C#»