Цель состоит в том, чтобы ParDo лениво загружал все имена файлов в моей корзине, а затем загружал и обрабатывал их на максимальном количестве рабочих процессов в do_something_with_df.
Учитывая следующее код фрагмент:
Код: Выделить всё
import apache_beam as beam
class AcquireCsvs(beam.DoFn):
def process(self, element: None) -> Iterable[pd.Dataframe]:
files = beam.io.GcsIO().list_files("some location")
for file in files:
yield self._load_file(file)
def _load_file(self, filename) -> pd.Dataframe:
with beam.io.GcsIO().open(filename) as file:
return pd.read_csv(file)
with beam.Pipeline() as p:
(
p
| beam.Create([None])
| beam.ParDo(AcquireCsvs())
| beam.Map(do_something_with_df)
)
Спасибо за любые подсказки, которые вы можете предоставить!
Подробнее здесь: https://stackoverflow.com/questions/792 ... e-executed
Мобильная версия