с слитными пакетами Nuget для Kafka, avro и SchemaRegistry в моем тестовом проекте.
Если дело касается отправки сообщений json, у меня до сих пор нет проблем, но я борюсь с отправкой сериализованных сообщений avro.
Я видел https://github.com/confluentinc/conflue ... roSpecific пример, и я пытался сделать это таким же образом, но в конечном итоге получаю исключение, как показано ниже:
Локально: ошибка сериализации значения
в Confluent.Kafka.Producer
Код: Выделить всё
2.
d__52.MoveNext() at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Runtime.CompilerServices.TaskAwaiterв Kafka_producer.KafkaService.d__10.MoveNext() в
C:\Users\lu95eb\source\repos\Kafka_playground\Kafka Producer\KafkaService.cs:строка 126
с внутренним исключением
Ссылка на объект не установлена на экземпляр объекта.
в Confluent.SchemaRegistry.Serdes.SpecificSerializerImpl
Код: Выделить всё
1..ctor(ISchemaRegistryClient schemaRegistryClient, Boolean autoRegisterSchema, Int32 initialBufferSize) at Confluent.SchemaRegistry.Serdes.AvroSerializerat System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task Task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task Task)
at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(Task Task)
в Confluent.Kafka.Producer`2.d__52.MoveNext()
Вот мой классSpecificRecord
Код: Выделить всё
public class UserInfo : ISpecificRecord
{
public string Name { get; set; }
public int[] Numbers { get; set; }
public Schema Schema => Schema.Parse(@"
{
""name"": ""UserInfo"",
""type"": ""record"",
""namespace"": ""kafka"",
""fields"": [
{
""name"": ""Name"",
""type"": ""string""
},
{
""name"": ""Numbers"",
""type"": {
""type"": ""array"",
""items"": ""int""
}
}
]
}
");
public object Get(int fieldPos)
{
switch (fieldPos)
{
case 0: return Name;
case 1: return Numbers;
default: throw new AvroRuntimeException($"Bad index {fieldPos} in Get()");
}
}
public void Put(int fieldPos, object fieldValue)
{
switch (fieldPos)
{
case 0: Name = (string)fieldValue; break;
case 1: Numbers = (int[])fieldValue; break;
default: throw new AvroRuntimeException($"Bad index {fieldPos} in Put()");
}
}
}
Код: Выделить всё
private async Task SendSpecificRecord(UserInfo userInfo)
{
using (var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { Url = _schemaRegistryUrl }))
using (var producer =
new ProducerBuilder(new ProducerConfig { BootstrapServers = _brokerUrl })
.SetKeySerializer(new AvroSerializer(schemaRegistry))
.SetValueSerializer(new AvroSerializer(schemaRegistry))
.Build())
{
var message = new Message
{
Key = userInfo.Name,
Value = userInfo
};
await producer.ProduceAsync(SpecificTopic, message);
}
}
Как я писал вначале, у меня нет проблем с реестром схемы — у меня зарегистрированы схемы, и они правильно работают для json, у меня нет проблем с темами, брокером, потребителем или чем-то еще.
Я буду благодарен, если кто-нибудь подскажет мне, что я делаю неправильно.
Заранее спасибо.
Подробнее здесь: https://stackoverflow.com/questions/625 ... ialization
Мобильная версия