Я новичок в Beam, и я не знаю, как я могу опубликовать пользовательское сообщение с помощью kafkaio.write.
Я должен использовать kafkaavroserializer.class в качестве сериализатора значения. < /p>
public void publish(CustomObject customObj) {
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
PCollection response =
p.apply(Create.of(customObj))
.apply(KafkaIO.write()
.withBootstrapServers("localhost:9092")
.withTopic("myTopic")
.withKeySerializer(LongDeserializer.class)
.withValueSerializer(KafkaAvroSerializer.class)
.updateConsumerProperties(ImmutableMap.of("auto.offset.reset", (Object)"earliest"))
);
p.run().waitUntilFinish();
}
Подробнее здесь: https://stackoverflow.com/questions/774 ... ion-for-ge