В моем конвейере потоковой передачи лучей Apache у меня есть неограниченный источник pub/sub, который я использую с окнами сеанса.
Есть некоторые ограниченные данные конфигурации, которые мне нужно передать в некоторые из DoFns конвейера в качестве побочных входных данных. Эти данные находятся в BigQuery. Ситуация медленно меняется, я ожидаю несколько изменений в месяц в разные моменты времени. Чтобы объединить ограниченные и неограниченные данные, я применил этот шаблон, который создает периодический импульс каждый час. Впоследствии DoFn считывает данные конфигурации из BQ, преобразует их в dict и возвращает.
Позже результат вышеупомянутого передается одному из DoFn основной конвейер в качестве побочного ввода.
При выполнении конвейера с помощью LocalRunner я получаю довольно неопределенный
RuntimeError: Transform node AppliedPTransform(PeriodicImpulse/GenSequence/ProcessKeyedElements/GroupByKey/GroupByKey, _GroupByKeyOnly) was not replaced as expected.
Однако, если я заменю шаг PeriodicPulse простым Create(["DummyValue"]), конвейер будет работать нормально (конечно, за исключением того факта, что он будет игнорировать все изменения в данных конфигурации, произошедшие после первоначального чтения из BQ).
Что мне нужно изменить, чтобы все заработало?
n = 1
SESSION_GAP_SIZE = 3600 * 24
p_opt = PipelineOptions(
pipeline_args, streaming=True, save_main_session=True,allow_unsafe_triggers=True,runner="DirectRunner"
, ...)
with Pipeline(options=p_opt) as p:
cfg_data = (p
| 'PeriodicImpulse' >> PeriodicImpulse(fire_interval=3600,apply_windowing=True)
| "Retrieve Segment Config from BQ" >> ParDo(get_segment_config_from_bq)
)
main_p = (
p
| "Read Stream from Pub/Sub" >> io.ReadFromPubSub(subscription=SUBSCRIPTION,with_attributes=True)
| "Filter 1" >> Filter(Filter1())
| "Filter 2" >> Filter(Filter2())
| "Decode Pub/Sub Messages" >> ParDo(ReadPubSubMessage())
| "Extract Composite Key" >> ParDo(ExtractKey())
| "Build Session Windows" >> WindowInto(window.Sessions(SESSION_GAP_SIZE ), trigger=AfterCount(n),accumulation_mode=AccumulationMode.ACCUMULATING)
| "Another GroupByKey" >> GroupByKey()
| "Enrich Stream Data by Config" >> ParDo(EnrichWithConfig(),segment_cfg=pvalue.AsSingleton(cfg_data))
| "Output to PubSub" >> WriteToPubSub(topic=TARGET_TOPIC)
)
Подробнее здесь: https://stackoverflow.com/questions/784 ... ptransform
Медленное обновление побочных входных данных и окон сеанса — узел преобразования AppliedPTransform не был заменен должны ⇐ Python
Программы на Python
1732019328
Anonymous
В моем конвейере потоковой передачи лучей Apache у меня есть неограниченный источник pub/sub, который я использую с окнами сеанса.
Есть некоторые ограниченные данные конфигурации, которые мне нужно передать в некоторые из DoFns конвейера в качестве побочных входных данных. Эти данные находятся в BigQuery. Ситуация медленно меняется, я ожидаю несколько изменений в месяц в разные моменты времени. Чтобы объединить ограниченные и неограниченные данные, я применил этот шаблон, который создает периодический импульс каждый час. Впоследствии DoFn считывает данные конфигурации из BQ, преобразует их в dict и возвращает.
Позже результат вышеупомянутого передается одному из DoFn основной конвейер в качестве побочного ввода.
При выполнении конвейера с помощью LocalRunner я получаю довольно неопределенный
RuntimeError: Transform node AppliedPTransform(PeriodicImpulse/GenSequence/ProcessKeyedElements/GroupByKey/GroupByKey, _GroupByKeyOnly) was not replaced as expected.
Однако, если я заменю шаг PeriodicPulse простым Create(["DummyValue"]), конвейер будет работать нормально (конечно, за исключением того факта, что он будет игнорировать все изменения в данных конфигурации, произошедшие после первоначального чтения из BQ).
Что мне нужно изменить, чтобы все заработало?
n = 1
SESSION_GAP_SIZE = 3600 * 24
p_opt = PipelineOptions(
pipeline_args, streaming=True, save_main_session=True,allow_unsafe_triggers=True,runner="DirectRunner"
, ...)
with Pipeline(options=p_opt) as p:
cfg_data = (p
| 'PeriodicImpulse' >> PeriodicImpulse(fire_interval=3600,apply_windowing=True)
| "Retrieve Segment Config from BQ" >> ParDo(get_segment_config_from_bq)
)
main_p = (
p
| "Read Stream from Pub/Sub" >> io.ReadFromPubSub(subscription=SUBSCRIPTION,with_attributes=True)
| "Filter 1" >> Filter(Filter1())
| "Filter 2" >> Filter(Filter2())
| "Decode Pub/Sub Messages" >> ParDo(ReadPubSubMessage())
| "Extract Composite Key" >> ParDo(ExtractKey())
| "Build Session Windows" >> WindowInto(window.Sessions(SESSION_GAP_SIZE ), trigger=AfterCount(n),accumulation_mode=AccumulationMode.ACCUMULATING)
| "Another GroupByKey" >> GroupByKey()
| "Enrich Stream Data by Config" >> ParDo(EnrichWithConfig(),segment_cfg=pvalue.AsSingleton(cfg_data))
| "Output to PubSub" >> WriteToPubSub(topic=TARGET_TOPIC)
)
Подробнее здесь: [url]https://stackoverflow.com/questions/78492177/slowly-updating-side-inputs-session-windows-transform-node-appliedptransform[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия