Десериализация Apache Kafka Avro: невозможно десериализовать или декодировать сообщение определенного типа.JAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Десериализация Apache Kafka Avro: невозможно десериализовать или декодировать сообщение определенного типа.

Сообщение Anonymous »

Я пытаюсь использовать Avro Serialize с Apache Kafka для сериализации/десериализации сообщений. Я создаю одного производителя, который используется для сериализации сообщения определенного типа и отправки его в очередь. Когда сообщение успешно отправлено в очередь, наш потребитель выбирает сообщение и пытается его обработать, но при попытке мы сталкиваемся с исключением для байтов, относящихся к конкретному объекту. Исключением является следующее:

Код: Выделить всё

[error] (run-main-0) java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.harmeetsingh13.java.avroserializer.Customer
java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.harmeetsingh13.java.avroserializer.Customer
at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.lambda$infiniteConsumer$0(AvroSpecificDeserializer.java:51)
at java.lang.Iterable.forEach(Iterable.java:75)
at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.infiniteConsumer(AvroSpecificDeserializer.java:46)
at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.main(AvroSpecificDeserializer.java:63)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
В исключительных случаях мы используем какой-то неудобный способ чтения данных, ниже приведен наш код:

Код производителя Kafka:

Код: Выделить всё

static {
kafkaProps.put("bootstrap.servers", "localhost:9092");
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
kafkaProps.put("schema.registry.url", "http://localhost:8081");
kafkaProducer = new KafkaProducer(kafkaProps);
}

public static void main(String[] args) throws InterruptedException, IOException {
Customer customer1 = new Customer(1002, "Jimmy");

Parser parser = new Parser();
Schema schema = parser.parse(AvroSpecificProducer.class
.getClassLoader().getResourceAsStream("avro/customer.avsc"));

SpecificDatumWriter writer = new SpecificDatumWriter(schema);
try(ByteArrayOutputStream os = new ByteArrayOutputStream()) {
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(os, null);
writer.write(customer1, encoder);
encoder.flush();

byte[] avroBytes = os.toByteArray();

ProducerRecord record1 = new ProducerRecord("CustomerSpecificCountry",
"Customer One 11 ", avroBytes
);

asyncSend(record1);
}

Thread.sleep(10000);
}
Потребительский кодекс Kafka:

Код: Выделить всё

static {
kafkaProps.put("bootstrap.servers", "localhost:9092");
kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "CustomerCountryGroup1");
kafkaProps.put("schema.registry.url", "http://localhost:8081");
}

public static void infiniteConsumer() throws IOException {
try(KafkaConsumer kafkaConsumer = new KafkaConsumer(kafkaProps)) {
kafkaConsumer.subscribe(Arrays.asList("CustomerSpecificCountry"));

while(true) {
ConsumerRecords records = kafkaConsumer.poll(100);
System.out.println(">>>>>>>>>>>>>");
Customer customer = customerDatumReader.read(null, binaryDecoder);
System.out.println(customer);
} catch (IOException e) {
e.printStackTrace();
}
});
}

}
}
Используя потребителя в консоли, мы успешно можем получить сообщение. Итак, как же декодировать сообщение в наши файлы pojo?

Подробнее здесь: https://stackoverflow.com/questions/421 ... cific-type
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»