Здравствуйте, я создаю конвейер данных луча Apache, который работает в GCP с использованием DataFlow в качестве бегуна. Ниже приведен мой код, он не выдает никаких ошибок, но проблема в том, что данные не записываются в bigquery, когда я проверяю задание. график выполнения, похоже, нет активности в разделе конвейера «запись в bigquery», но я вижу действия в данных, передаваемых из активности pubsub. Я пошел еще дальше, чтобы добавить триггер, но данные по-прежнему не поступают. .Пожалуйста помогите.Спасибо
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.dataframe.convert import to_dataframe, to_pcollection
from apache_beam import window
import typing
import numpy as np
import pandas as pd
class BmsSchema(typing.NamedTuple):
can_data_frame_1: typing.Optional[str]
beam.coders.registry.register_coder(BmsSchema, beam.coders.RowCoder)
class ParsePubSubMessage(beam.DoFn):
def process(self, message):
import json
all_columns = [
"can_data_frame_1"
]
main_dict = dict(zip(all_columns, [None] * len(all_columns)))
record = json.loads(message.decode('utf-8'))
main_dict.update(record)
yield {
all_columns[0]: main_dict[all_columns[0]]}
def run():
options = PipelineOptions(
project='dwingestion',
runner='DataflowRunner',
streaming=True,
temp_location='gs://....../temp',
staging_location='gs://.........../staging',
region='europe-west1',
job_name='.........streaming-pipeline-dataflow',
save_main_session=True,
flags=['--allow_unsafe_triggers']
)
options.view_as(StandardOptions).streaming = True
input_subscription = 'projects/..._data_streaming'
table_schema = {
"fields": [
{"name": "current_mA", "type": "INTEGER", "mode": "NULLABLE"}
]
}
with beam.Pipeline(options=options) as p:
messages = (p
| 'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=input_subscription)
| 'Apply Fixed Window' >> beam.WindowInto(
window.FixedWindows(60),
trigger=beam.trigger.AfterWatermark(),
allowed_lateness=window.Duration(10),
accumulation_mode=beam.trigger.AccumulationMode.DISCARDING
)
| 'Parse PubSub Message' >> beam.ParDo(ParsePubSubMessage())
| 'Attaching the schema' >> beam.Map(lambda x: BmsSchema(**x)).with_output_types(BmsSchema)
)
# Convert the messages to a DataFrame
df = to_dataframe(messages)
# Extract and process the 'current_mA' field
df['current_mA'] = df['can_data_frame_1'].str[4:8].apply(lambda x: int(x, 16) if pd.notna(x) else 0)
df['current_mA'] = df['current_mA'].where(df['current_mA'] < 0x8000, df['current_mA'] - 0x10000)
df['current_mA'] = df['current_mA'] * 10
# Convert back to PCollection and map to dictionaries
transformed_pcol = (
to_pcollection(df)
| 'Log Transformed PCollection' >> beam.Map(lambda x: (print(f"Transformed Row: {x}"), x)[1]) # Debugging
| 'Convert to Dict with Native Types' >> beam.Map(lambda row: {
"current_mA": int(row.current_mA) if row.current_mA is not None else None
})
)
# Write to BigQuery
transformed_pcol | 'Write to BigQuery' >> WriteToBigQuery(
table='..........table_test_all_columns_04',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
schema=table_schema,
custom_gcs_temp_location='gs://......_template/temp'
)
if __name__ == '__main__':
run()
Подробнее здесь: https://stackoverflow.com/questions/791 ... g-and-a-tr
Конвейер лучей Apache не работает до завершения после применения окон и триггера и никаких ошибок не возникает ⇐ Python
Программы на Python
1731923237
Anonymous
Здравствуйте, я создаю конвейер данных луча Apache, который работает в GCP с использованием DataFlow в качестве бегуна. Ниже приведен мой код, он не выдает никаких ошибок, но проблема в том, что данные не записываются в bigquery, когда я проверяю задание. график выполнения, похоже, нет активности в разделе конвейера «запись в bigquery», но я вижу действия в данных, передаваемых из активности pubsub. Я пошел еще дальше, чтобы добавить триггер, но данные по-прежнему не поступают. .Пожалуйста помогите.Спасибо
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.dataframe.convert import to_dataframe, to_pcollection
from apache_beam import window
import typing
import numpy as np
import pandas as pd
class BmsSchema(typing.NamedTuple):
can_data_frame_1: typing.Optional[str]
beam.coders.registry.register_coder(BmsSchema, beam.coders.RowCoder)
class ParsePubSubMessage(beam.DoFn):
def process(self, message):
import json
all_columns = [
"can_data_frame_1"
]
main_dict = dict(zip(all_columns, [None] * len(all_columns)))
record = json.loads(message.decode('utf-8'))
main_dict.update(record)
yield {
all_columns[0]: main_dict[all_columns[0]]}
def run():
options = PipelineOptions(
project='dwingestion',
runner='DataflowRunner',
streaming=True,
temp_location='gs://....../temp',
staging_location='gs://.........../staging',
region='europe-west1',
job_name='.........streaming-pipeline-dataflow',
save_main_session=True,
flags=['--allow_unsafe_triggers']
)
options.view_as(StandardOptions).streaming = True
input_subscription = 'projects/..._data_streaming'
table_schema = {
"fields": [
{"name": "current_mA", "type": "INTEGER", "mode": "NULLABLE"}
]
}
with beam.Pipeline(options=options) as p:
messages = (p
| 'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=input_subscription)
| 'Apply Fixed Window' >> beam.WindowInto(
window.FixedWindows(60),
trigger=beam.trigger.AfterWatermark(),
allowed_lateness=window.Duration(10),
accumulation_mode=beam.trigger.AccumulationMode.DISCARDING
)
| 'Parse PubSub Message' >> beam.ParDo(ParsePubSubMessage())
| 'Attaching the schema' >> beam.Map(lambda x: BmsSchema(**x)).with_output_types(BmsSchema)
)
# Convert the messages to a DataFrame
df = to_dataframe(messages)
# Extract and process the 'current_mA' field
df['current_mA'] = df['can_data_frame_1'].str[4:8].apply(lambda x: int(x, 16) if pd.notna(x) else 0)
df['current_mA'] = df['current_mA'].where(df['current_mA'] < 0x8000, df['current_mA'] - 0x10000)
df['current_mA'] = df['current_mA'] * 10
# Convert back to PCollection and map to dictionaries
transformed_pcol = (
to_pcollection(df)
| 'Log Transformed PCollection' >> beam.Map(lambda x: (print(f"Transformed Row: {x}"), x)[1]) # Debugging
| 'Convert to Dict with Native Types' >> beam.Map(lambda row: {
"current_mA": int(row.current_mA) if row.current_mA is not None else None
})
)
# Write to BigQuery
transformed_pcol | 'Write to BigQuery' >> WriteToBigQuery(
table='..........table_test_all_columns_04',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
schema=table_schema,
custom_gcs_temp_location='gs://......_template/temp'
)
if __name__ == '__main__':
run()
Подробнее здесь: [url]https://stackoverflow.com/questions/79199416/apache-beam-pipeline-not-running-to-completion-after-applying-windowing-and-a-tr[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия