Apache Beam публикует сообщение Kafka с Kafkaio и Kafkaavroserialization для GenericRecordJAVA

Программисты JAVA общаются здесь
Anonymous
Apache Beam публикует сообщение Kafka с Kafkaio и Kafkaavroserialization для GenericRecord

Сообщение Anonymous »

Я новичок в Beam, и я не знаю, как я могу опубликовать пользовательское сообщение с помощью kafkaio.write.
Я должен использовать kafkaavroserializer.class в качестве сериализатора значения. < /p>
public void publish(CustomObject customObj) {
PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options);

PCollection response =
p.apply(Create.of(customObj))
.apply(KafkaIO.write()
.withBootstrapServers("localhost:9092")
.withTopic("myTopic")
.withKeySerializer(LongDeserializer.class)
.withValueSerializer(KafkaAvroSerializer.class)

.updateConsumerProperties(ImmutableMap.of("auto.offset.reset", (Object)"earliest"))

);
p.run().waitUntilFinish();
}


Подробнее здесь: https://stackoverflow.com/questions/774 ... ion-for-ge

Вернуться в «JAVA»