Apache Beam Ioelasticsearchio.read () Метод (Java), который ожидает ввода PBEGIN и средства для обработки коллекции запрJAVA

Программисты JAVA общаются здесь
Anonymous
Apache Beam Ioelasticsearchio.read () Метод (Java), который ожидает ввода PBEGIN и средства для обработки коллекции запр

Сообщение Anonymous »

Я сталкиваюсь с проблемой, используя elasticsearchio.read () , чтобы справиться с более чем одним экземпляром запроса. Мои запросы динамически создаются как pcollection на основе входящей группы значений. Я пытаюсь понять, как загрузить параметр .withquery () , который может предоставить эту возможность или любой подход, который обеспечивает гибкость.
Проблема в том, что Elasticsearchio.read () Метод ожидает, что PBEGIN вход для запуска трубопровода, но кажется, что мне нужен доступ за пределами контекста трубопровода. Pbegin представляет собой начало конвейера, и необходимо создать конвейер, который может читать данные из Elasticsearch с использованием ioelasticsearchio.read () .
Могу ли я обернуть Elasticsearchio.read () Code в Create Transform, который создает Pcollect Похоже? PCollection queries = ... // a PCollection of Elasticsearch queries

PCollection queryResults = queries.apply(
ParDo.of(new DoFn() {
@ProcessElement
public void processElement(ProcessContext c) {
String query = c.element();
PCollection results = c.pipeline()
.apply(ElasticsearchIO.read()
.withConnectionConfiguration(
ElasticsearchIO.ConnectionConfiguration.create(hosts, indexName))
.withQuery(query));
c.output(results);
}
})
.apply(Flatten.pCollections()));
< /code>
В целом я задаюсь вопросом о любом из связанных с IO классов, доказанных Beam, который соответствует вводу PBEGIN-если есть средства для введения коллекции. Вот один подход, который может быть многообещающим: < /p>
// Define a ValueProvider for a List
ValueProvider myListProvider = ValueProvider.StaticValueProvider.of(myList);

// Use the ValueProvider to create a PCollection of Strings
PCollection pcoll = pipeline.apply(Create.ofProvider(myListProvider, ListCoder.of()));


Подробнее здесь: https://stackoverflow.com/questions/759 ... egin-input

Вернуться в «JAVA»