Apache Beam написать записи kafka в файл avroJAVA

Программисты JAVA общаются здесь
Anonymous
Apache Beam написать записи kafka в файл avro

Сообщение Anonymous »

Я хотел бы прочитать пару строк из темы Kafka и создать файл AVRO. < /p>
У меня есть работа по частичным коду, который читает из темы Kafka и печатает на консоли. Кафка тема и отпечатки в консоли < /p>

Код: Выделить всё


public class BeamConsumer {

public static void main(String[] args) throws IOException {

PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);
Schema schema =
new Schema.Parser()
.parse(new File("schema.avsc"));

PTransform
>> input =
KafkaIO.read()
.withBootstrapServers(
"${kafkaserveraddress}")
.withTopic("my-topic") // use
// withTopics(List) to read from multiple topics.
.withKeyDeserializer(
ConfluentSchemaRegistryDeserializerProvider.of(
"${schemaregistryaddress}",
"schemaregistrysubjectkey"))
.withValueDeserializer(
ConfluentSchemaRegistryDeserializerProvider.of(
"${schemaregistryaddress}",
"schemaregistrysubjectvalue"))
.withConsumerConfigUpdates(
ImmutableMap.of(
ConsumerConfig.GROUP_ID_CONFIG,
"my-group-id",
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
"SSL",
SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
"/truststore.jks",
SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
"******",
SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
"/keystore.jks",
SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
"*******",
SslConfigs.SSL_KEY_PASSWORD_CONFIG,
"*******",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"latest",
ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,
600000));

pipeline
.apply(input)
.apply(
"ExtractRecord",
ParDo.of(
new DoFn<
KafkaRecord,
KafkaRecord>() {
@DoFn.ProcessElement
public void processElement(ProcessContext c) {
KafkaRecord record =
(KafkaRecord) c.element();
KV log = record.getKV();
System.out.println("Key Obtained: " + log.getKey());
System.out.println("Value Obtained: " + log.getValue().toString());
c.output(record);
}
}));
//            .apply("WriteToAvro",
// AvroIO.writeGenericRecords(schema).to("/Users/mjain34/code/avroutils/src/main/resources/file.avro"));
//
PipelineResult run = pipeline.run();
run.waitUntilFinish(Duration.standardSeconds(1000));
}
}

Примечание: я изменил значения конфигураций здесь, чтобы скрыть личную информацию


Подробнее здесь: https://stackoverflow.com/questions/761 ... -avro-file

Вернуться в «JAVA»