Я сталкиваюсь с проблемой, используя elasticsearchio.read () , чтобы справиться с более чем одним экземпляром запроса. Мои запросы динамически создаются как pcollection на основе входящей группы значений. Я пытаюсь понять, как загрузить параметр .withquery () , который может предоставить эту возможность или любой подход, который обеспечивает гибкость.
Проблема в том, что Elasticsearchio.read () Метод ожидает, что PBEGIN вход для запуска трубопровода, но кажется, что мне нужен доступ за пределами контекста трубопровода. Pbegin представляет собой начало конвейера, и необходимо создать конвейер, который может читать данные из Elasticsearch с использованием ioelasticsearchio.read () .
Могу ли я обернуть Elasticsearchio.read () Code в Create Transform, который создает Pcollect Похоже? PCollection queries = ... // a PCollection of Elasticsearch queries
PCollection queryResults = queries.apply(
ParDo.of(new DoFn() {
@ProcessElement
public void processElement(ProcessContext c) {
String query = c.element();
PCollection results = c.pipeline()
.apply(ElasticsearchIO.read()
.withConnectionConfiguration(
ElasticsearchIO.ConnectionConfiguration.create(hosts, indexName))
.withQuery(query));
c.output(results);
}
})
.apply(Flatten.pCollections()));
< /code>
В целом я задаюсь вопросом о любом из связанных с IO классов, доказанных Beam, который соответствует вводу PBEGIN-если есть средства для введения коллекции. Вот один подход, который может быть многообещающим: < /p>
// Define a ValueProvider for a List
ValueProvider myListProvider = ValueProvider.StaticValueProvider.of(myList);
// Use the ValueProvider to create a PCollection of Strings
PCollection pcoll = pipeline.apply(Create.ofProvider(myListProvider, ListCoder.of()));
Подробнее здесь: https://stackoverflow.com/questions/759 ... egin-input