Преобразовать pcollection в пользовательский классJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Преобразовать pcollection в пользовательский класс

Сообщение Anonymous »

Моя цель состоит в том, чтобы прочитать файл из GCS и написать его в Cassandra.
Новое в Apache Beam /Dataflow, я мог бы найти большую часть руки на сборке с Python. К сожалению, Cassandraio является только java, уроженцем Beam.
Я использовал пример подсчета слов в качестве шаблона и попытаюсь избавиться от Textio.write () и заменить его на Cassandraio. write () .
здесь мой класс Java для таблицы Cassandra p>
здесь мой класс Java для таблицы кассандры.

Код: Выделить всё

package org.apache.beam.examples;

import java.io.Serializable;
import com.datastax.driver.mapping.annotations.Column;
import com.datastax.driver.mapping.annotations.PartitionKey;
import com.datastax.driver.mapping.annotations.Table;

@Table(keyspace = "test", name = "words", readConsistency = "ONE", writeConsistency = "QUORUM",
caseSensitiveKeyspace = false, caseSensitiveTable = false)
public class Words implements Serializable {
//    private static final long serialVersionUID = 1L;

@PartitionKey
@Column(name = "word")
public String word;

@Column(name = "count")
public long count;

public Words() {
}

public Words(String word, int count) {
this.word = word;
this.count = count;
}

@Override
public boolean equals(Object obj) {
Words other = (Words) obj;
return this.word.equals(other.word) && this.count == other.count;
}
}
< /code>
и здесь часть конвейера основного кода. < /p>
static void runWordCount(WordCount.WordCountOptions options) {
Pipeline p = Pipeline.create(options);

// Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the
// static FormatAsTextFn() to the ParDo transform.
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
.apply(new WordCountToCassandra.CountWords())

// Here I'm not sure how to transform PCollection into PCollection

.apply(MapElements.into(TypeDescriptor.of(Words.class)).via(PCollection)
}))

.apply(CassandraIO.write()
.withHosts(Collections.singletonList("my_ip"))
.withPort(9142)
.withKeyspace("test")
.withEntity(Words.class));

p.run().waitUntilFinish();
}
Мое понимание - это использовать ptransform для передачи из pcollection из pcollection . Я не знаю, как это сопоставить.

Подробнее здесь: https://stackoverflow.com/questions/754 ... stom-class
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Apache Beam - преобразовать пустое время данных обратно в Pcollection
    Anonymous » » в форуме Python
    0 Ответы
    17 Просмотры
    Последнее сообщение Anonymous
  • Как создать пустую PCollection>
    Anonymous » » в форуме JAVA
    0 Ответы
    11 Просмотры
    Последнее сообщение Anonymous
  • Чтение файлов AVRO в GCS как PCOLLECTION
    Anonymous » » в форуме JAVA
    0 Ответы
    6 Просмотры
    Последнее сообщение Anonymous
  • Формат объектов Pcollection для Apache Beam для записи на BigQuery с использованием CDC в Python
    Anonymous » » в форуме Python
    0 Ответы
    7 Просмотры
    Последнее сообщение Anonymous
  • Мой пользовательский современный отладчик Visual Studio Visual Studio не может прикрепить мой пользовательский класс в R
    Anonymous » » в форуме C#
    0 Ответы
    19 Просмотры
    Последнее сообщение Anonymous

Вернуться в «JAVA»