Я развернул/запустил скрипт, следуя инструкциям здесь
Код: Выделить всё
class BmsSchema(typing.NamedTuple):
ident: int
beam.coders.registry.register_coder(BmsSchema, beam.coders.RowCoder)
class ParsePubSubMessage(beam.DoFn):
def process(self, message):
import json
# Creating the main_dict that has all the columns
all_columns = ['ident']
main_dict = dict(zip(all_columns, [None] * len(all_columns)))
# Parse the JSON message
record = json.loads(message.decode('utf-8'))
main_dict.update(record)
yield {
'ident': main_dict["ident"]
}
def run():
# Define pipeline options
options = PipelineOptions(
project='.....',
runner='DataflowRunner',
streaming=True, # Enable streaming mode
temp_location='gs://......temp',
staging_location='gs://.....staging',
region='europe-west1',
job_name='.......dataflow-test'
)
# Set streaming mode
options.view_as(StandardOptions).streaming = True
# Pub/Sub subscription
input_subscription = 'projects/dwingestion/subscriptions/flespi_data_streaming'
table_schema = {
"fields": [
{"name": "ident", "type": "STRING", "mode": "NULLABLE"}
]
}
# Create the pipeline
with beam.Pipeline(options=options) as p:
# Read from Pub/Sub and parse the messages
messages = (p
| 'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=input_subscription)
| 'Parse PubSub Message' >> beam.ParDo(ParsePubSubMessage())
| 'Attaching the schema' >> beam.Map(lambda x: BmsSchema(**x)).with_output_types(BmsSchema)
)
# Convert the messages to df
df = to_dataframe(messages)
transformed_pcol = to_pcollection(df)
# Write to BigQuery
Сообщение об ошибке от работника: generic::unknown: Traceback (большинство последний вызов):
Файл «apache_beam/runners/common.py», строка 1501, в apache_beam.runners.common.DoFnRunner.process
Файл «apache_beam/runners/common.py», строка 690 , в apache_beam.runners.common.SimpleInvoker.invoke_process
Файл "/usr/local/lib/python3.12/dist-packages/apache_beam/transforms/core.py", строка 2084, в
оболочке = лямбда x: [fn(x)]
^^^^^
Файл "/home/coyugi/bms_test_schema.py", строка 66, в
кроме ImportError как exc:
NameError: имя «BmsSchema» не определено
Во время обработки вышеуказанного исключения произошло другое исключение:
Traceback (последний вызов):
Файл "/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", строка 311, в _execute
ответ = задача()
^^^^^^
Файл "/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", строка 386, в
лямбда: self.create_worker().do_instruction(запрос), запрос)
^^^^^^^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^
Файл "/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", строка 656, в do_instruction
return getattr(self, request_type)(
^^^^^^^^^^^^^^^^^^^^^^^^^^^Файл «/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py», строка 694, вprocess_bundle
bundle_processor.process_bundle(instruction_id))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Файл «/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py», строка 1119, вprocess_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
Файл «/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py», строка 237, в Process_encoded
self.output(decoded_value)
Файл «apache_beam/runners/worker/operations.py», строка 567, в apache_beam.runners.worker.operations.Operation.output
Файл «apache_beam/runners/worker/operations.py», строка 569, в apache_beam. runners.worker.operations.Operation.output
Файл «apache_beam/runners/worker/operations.py», строка 260, в apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
Файл «apache_beam/runners» /worker/operations.py", строка 263, в apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
Файл "apache_beam/runners/worker/operations.py", строка 950, в apache_beam.runners.worker. Operations.DoOperation.process
Файл «apache_beam/runners/worker/operations.py», строка 951, в apache_beam.runners.worker.operations.DoOperation.process
Файл «apache_beam/runners/common.py» ", строка 1503, в apache_beam.runners.common.DoFnRunner.process
Файл "apache_beam/runners/common.py", строка 1592, в apache_beam.runners.common.DoFnRunner._reraise_augmented
Файл "apache_beam" /runners/common.py", строка 1501, в apache_beam.runners.common.DoFnRunner.process
Файл "apache_beam/runners/common.py", строка 689, в apache_beam.runners.common.SimpleInvoker.invoke_processФайл «apache_beam/runners/common.py», строка 1687, в apache_beam.runners.common._OutputHandler.handle_process_outputs
Файл «apache_beam/runners/common.py», строка 1800, в apache_beam.runners. common._OutputHandler._write_value_to_tag
Файл «apache_beam/runners/worker/operations.py», строка 263, в apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
Файл «apache_beam/runners/worker/operations» .py", строка 950, в apache_beam.runners.worker.operations.DoOperation.process
Файл "apache_beam/runners/worker/operations.py", строка 951, в apache_beam.runners.worker.operations.DoOperation. процесс
Файл «apache_beam/runners/common.py», строка 1503, в apache_beam.runners.common.DoFnRunner.process
Файл «apache_beam/runners/common.py», строка 1592, в apache_beam. runners.common.DoFnRunner._reraise_augmented
Файл «apache_beam/runners/common.py», строка 1501, в apache_beam.runners.common.DoFnRunner.process
Файл «apache_beam/runners/common.py», строка 689 в apache_beam.runners.common.SimpleInvoker.invoke_process
Файл «apache_beam/runners/common.py», строка 1687 в apache_beam.runners.common._OutputHandler.handle_process_outputs
Файл «apache_beam/runners» /common.py", строка 1800, в apache_beam.runners.common._OutputHandler._write_value_to_tag
Файл "apache_beam/runners/worker/operations.py", строка 263, в apache_beam.runners.worker.operations.SingletonElementConsumerSet. получить
Файл «apache_beam/runners/worker/operations.py», строка 950, в apache_beam.runners.worker.operations.DoOperation.process
Файл «apache_beam/runners/worker/operations.py», строка 951 в apache_beam.runners.worker.operations.DoOperation.process
Файл «apache_beam/runners/common.py», строка 1503 в apache_beam.runners.common.DoFnRunner.process
Файл «apache_beam» /runners/common.py", строка 1613, в apache_beam.runners.common.DoFnRunner._reraise_augmented
Файл "apache_beam/runners/common.py", строка 1501, в apache_beam.runners.common.DoFnRunner.processФайл «apache_beam/runners/common.py», строка 690, в apache_beam.runners.common.SimpleInvoker.invoke_process
Файл «/usr/local/lib/python3.12/dist-packages/apache_beam/ Transforms/core.py", строка 2084, в
wrapper = Lambda x: [fn(x)]
^^^^^
Файл "/home/coyugi/bms_test_schema.py" , строка 66, в
кроме ImportError как exc:
NameError: имя «BmsSchema» не определено [при выполнении «Присоединения схемы-ptransform-67»]
прошло:
==>
dist_proc/dax/workflow/worker/fnapi_service_impl.cc:1191
Подробнее здесь: https://stackoverflow.com/questions/791 ... g-the-sche