Медленное обновление побочных входных данных и окон сеанса — узел преобразования AppliedPTransform не был заменен должныPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Медленное обновление побочных входных данных и окон сеанса — узел преобразования AppliedPTransform не был заменен должны

Сообщение Anonymous »

В моем конвейере потоковой передачи лучей Apache у меня есть неограниченный источник pub/sub, который я использую с окнами сеанса.
Есть некоторые ограниченные данные конфигурации, которые мне нужно передать в некоторые из DoFns конвейера в качестве побочных входных данных. Эти данные находятся в BigQuery. Ситуация медленно меняется, я ожидаю несколько изменений в месяц в разные моменты времени. Чтобы объединить ограниченные и неограниченные данные, я применил этот шаблон, который создает периодический импульс каждый час. Впоследствии DoFn считывает данные конфигурации из BQ, преобразует их в dict и возвращает.
Позже результат вышеупомянутого передается одному из DoFn основной конвейер в качестве побочного ввода.
При выполнении конвейера с помощью LocalRunner я получаю довольно неопределенный
RuntimeError: Transform node AppliedPTransform(PeriodicImpulse/GenSequence/ProcessKeyedElements/GroupByKey/GroupByKey, _GroupByKeyOnly) was not replaced as expected.
Однако, если я заменю шаг PeriodicPulse простым Create(["DummyValue"]), конвейер будет работать нормально (конечно, за исключением того факта, что он будет игнорировать все изменения в данных конфигурации, произошедшие после первоначального чтения из BQ).
Что мне нужно изменить, чтобы все заработало?

n = 1
SESSION_GAP_SIZE = 3600 * 24

p_opt = PipelineOptions(
pipeline_args, streaming=True, save_main_session=True,allow_unsafe_triggers=True,runner="DirectRunner"
, ...)

with Pipeline(options=p_opt) as p:

cfg_data = (p
| 'PeriodicImpulse' >> PeriodicImpulse(fire_interval=3600,apply_windowing=True)
| "Retrieve Segment Config from BQ" >> ParDo(get_segment_config_from_bq)
)

main_p = (
p
| "Read Stream from Pub/Sub" >> io.ReadFromPubSub(subscription=SUBSCRIPTION,with_attributes=True)
| "Filter 1" >> Filter(Filter1())
| "Filter 2" >> Filter(Filter2())
| "Decode Pub/Sub Messages" >> ParDo(ReadPubSubMessage())
| "Extract Composite Key" >> ParDo(ExtractKey())
| "Build Session Windows" >> WindowInto(window.Sessions(SESSION_GAP_SIZE ), trigger=AfterCount(n),accumulation_mode=AccumulationMode.ACCUMULATING)
| "Another GroupByKey" >> GroupByKey()
| "Enrich Stream Data by Config" >> ParDo(EnrichWithConfig(),segment_cfg=pvalue.AsSingleton(cfg_data))
| "Output to PubSub" >> WriteToPubSub(topic=TARGET_TOPIC)
)


Подробнее здесь: https://stackoverflow.com/questions/784 ... ptransform
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»