В моем конвейере потоковой передачи лучей Apache у меня есть неограниченный источник pub/sub, который я использую с окнами сеанса.
Есть некоторые ограниченные данные конфигурации, которые мне нужно передать в некоторые из DoFns конвейера в качестве побочных входных данных. Эти данные находятся в BigQuery. Ситуация медленно меняется, я ожидаю несколько изменений в месяц в разные моменты времени. Чтобы объединить ограниченные и неограниченные данные, я применил этот шаблон, который создает периодический импульс каждый час. Впоследствии DoFn считывает данные конфигурации из BQ, преобразует их в dict и возвращает.
Позже результат вышеупомянутого передается одному из DoFn основной конвейер в качестве побочного ввода.
При выполнении конвейера с помощью LocalRunner я получаю довольно неопределенный
RuntimeError: Transform node AppliedPTransform(PeriodicImpulse/GenSequence/ProcessKeyedElements/GroupByKey/GroupByKey, _GroupByKeyOnly) was not replaced as expected.
Однако, если я заменю шаг PeriodicPulse простым Create(["DummyValue"]), конвейер будет работать нормально (конечно, за исключением того факта, что он будет игнорировать все изменения в данных конфигурации, произошедшие после первоначального чтения из BQ).
Что мне нужно изменить, чтобы все заработало?
n = 1
SESSION_GAP_SIZE = 3600 * 24
p_opt = PipelineOptions(
pipeline_args, streaming=True, save_main_session=True,allow_unsafe_triggers=True,runner="DirectRunner"
, ...)
with Pipeline(options=p_opt) as p:
cfg_data = (p
| 'PeriodicImpulse' >> PeriodicImpulse(fire_interval=3600,apply_windowing=True)
| "Retrieve Segment Config from BQ" >> ParDo(get_segment_config_from_bq)
)
main_p = (
p
| "Read Stream from Pub/Sub" >> io.ReadFromPubSub(subscription=SUBSCRIPTION,with_attributes=True)
| "Filter 1" >> Filter(Filter1())
| "Filter 2" >> Filter(Filter2())
| "Decode Pub/Sub Messages" >> ParDo(ReadPubSubMessage())
| "Extract Composite Key" >> ParDo(ExtractKey())
| "Build Session Windows" >> WindowInto(window.Sessions(SESSION_GAP_SIZE ), trigger=AfterCount(n),accumulation_mode=AccumulationMode.ACCUMULATING)
| "Another GroupByKey" >> GroupByKey()
| "Enrich Stream Data by Config" >> ParDo(EnrichWithConfig(),segment_cfg=pvalue.AsSingleton(cfg_data))
| "Output to PubSub" >> WriteToPubSub(topic=TARGET_TOPIC)
)
Подробнее здесь: https://stackoverflow.com/questions/784 ... ptransform
Медленное обновление побочных входных данных и окон сеанса — узел преобразования AppliedPTransform не был заменен должны ⇐ Python
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение