Я столкнулся с проблемой со скриптом Python для потока данных GCP в рабочем процессе действий GitHub, когда я пытаюсь записать данные из Pub/Sub в BigQuery.
Он возвращает ошибку, связанную с записью в Big Query. Скрипт Python:
INFO:root:Using BigQuery Table: gcp-live-data-xxx:tx.tx
INFO:root:Using BigQuery Schema: TX_ID:STRING, TX_TX:TIMESTAMP, CUSTOMER_ID:STRING, TERMINAL_ID:STRING, TX_AMOUNT:NUMERIC
INFO:root:Output Table: gcp-live-data-xxx:tx.tx
INFO:root:Output Schema: TX_ID:STRING, TX_TX:TIMESTAMP, CUSTOMER_ID:STRING, TERMINAL_ID:STRING, TX_AMOUNT:NUMERIC
WARNING:apache_beam.options.pipeline_options:Unable to check soft delete policy due to import error.
WARNING:apache_beam.options.pipeline_options:Unable to check soft delete policy due to import error.
Traceback (most recent call last):
File "/xxx/src/python/dataflow-tx-pipeline.py", line 92, in
run()
Using BigQuery Table: gcp-live-data-xxx:tx.tx
Using BigQuery Schema: TX_ID:STRING, TX_TX:TIMESTAMP, CUSTOMER_ID:STRING, TERMINAL_ID:STRING, TX_AMOUNT:NUMERIC
Output Table: gcp-live-data-xxx:tx.tx
Output Schema: TX_ID:STRING, TX_TX:TIMESTAMP, CUSTOMER_ID:STRING, TERMINAL_ID:STRING, TX_AMOUNT:NUMERIC
File "/xxx/src/python/dataflow-tx-pipeline.py", line 83, in run
| "WriteToBigQuery" >> beam.io.WriteToBigQuery(
^^^^^^^^^^^^^^^^^^^^^^^^
File "/xxx/actions-runner/_work/_tool/Python/3.11.4/x64/lib/python3.11/site-packages/apache_beam/io/gcp/bigquery.py", line 2102, in __init__
self.table_reference = bigquery_tools.parse_table_reference(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/xxx/actions-runner/_work/_tool/Python/3.11.4/x64/lib/python3.11/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 263, in parse_table_reference
if isinstance(table, TableReference):
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: isinstance() arg 2 must be a type, a tuple of types, or a union
Error: Process completed with exit code 1.
Проверено на данный момент:
жестко закодированная схема,
проверьте, есть ли переменные предоставлены правильно,
Я столкнулся с проблемой со скриптом Python для потока данных GCP в рабочем процессе действий GitHub, когда я пытаюсь записать данные из Pub/Sub в BigQuery. Он возвращает ошибку, связанную с записью в Big Query. [b]Скрипт Python:[/b] [code]import argparse import json import os import logging import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
class CustomParsing(beam.DoFn): """ Custom ParallelDo class to apply a custom transformation """
def to_runner_api_parameter(self, unused_context): # Not very relevant, returns a URN (uniform resource name) and the payload return "beam:transforms:custom_parsing:custom_v0", None
def process(self, element: bytes, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam): """ Simple processing function to parse the data and add a timestamp For additional params see: https://beam.apache.org/releases/pydoc/2.7.0/apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn """ parsed = json.loads(element.decode("utf-8")) parsed["timestamp"] = timestamp.to_rfc3339() yield parsed
def run(): # Parsing arguments parser = argparse.ArgumentParser() parser.add_argument( "--input_subscription", help='Input PubSub subscription of the form "projects/ /subscriptions/."', default=INPUT_SUBSCRIPTION, ) parser.add_argument( "--output_table", help="Output BigQuery Table", default=BIGQUERY_TABLE ) parser.add_argument( "--output_schema", help="Output BigQuery Schema in text format", default=BIGQUERY_SCHEMA, ) known_args, pipeline_args = parser.parse_known_args()
# Use the Bash shell regardless whether the GitHub Actions runner is ubuntu-latest, macos-latest, or windows-latest defaults: run: shell: bash working-directory: .
# Authenticate with Google Cloud - name: "Authenticate with Google Cloud" env: GOOGLE_APPLICATION_CREDENTIALS: ${{ secrets.GOOGLE_APPLICATION_CREDENTIALS }} run: | echo "${GOOGLE_APPLICATION_CREDENTIALS}" > ${HOME}/gcloud.json gcloud auth activate-service-account --key-file=${HOME}/gcloud.json gcloud config set project $PROJECT_ID
# Run Python script to trigger Dataflow job - name: "Run Dataflow Job" env: GOOGLE_APPLICATION_CREDENTIALS: ${HOME}/gcloud.json run: | python src/python/dataflow-tx-pipeline.py --runner DataflowRunner --project gcp-live-data-xxx --region us-central1 --temp_location gs://ff-tx-dataflow/temp --staging_location gs://ff-tx-dataflow/staging [/code] [b]Ошибка:[/b] [code]INFO:root:Using BigQuery Table: gcp-live-data-xxx:tx.tx INFO:root:Using BigQuery Schema: TX_ID:STRING, TX_TX:TIMESTAMP, CUSTOMER_ID:STRING, TERMINAL_ID:STRING, TX_AMOUNT:NUMERIC INFO:root:Output Table: gcp-live-data-xxx:tx.tx INFO:root:Output Schema: TX_ID:STRING, TX_TX:TIMESTAMP, CUSTOMER_ID:STRING, TERMINAL_ID:STRING, TX_AMOUNT:NUMERIC WARNING:apache_beam.options.pipeline_options:Unable to check soft delete policy due to import error. WARNING:apache_beam.options.pipeline_options:Unable to check soft delete policy due to import error. Traceback (most recent call last): File "/xxx/src/python/dataflow-tx-pipeline.py", line 92, in run() Using BigQuery Table: gcp-live-data-xxx:tx.tx Using BigQuery Schema: TX_ID:STRING, TX_TX:TIMESTAMP, CUSTOMER_ID:STRING, TERMINAL_ID:STRING, TX_AMOUNT:NUMERIC Output Table: gcp-live-data-xxx:tx.tx Output Schema: TX_ID:STRING, TX_TX:TIMESTAMP, CUSTOMER_ID:STRING, TERMINAL_ID:STRING, TX_AMOUNT:NUMERIC File "/xxx/src/python/dataflow-tx-pipeline.py", line 83, in run | "WriteToBigQuery" >> beam.io.WriteToBigQuery( ^^^^^^^^^^^^^^^^^^^^^^^^ File "/xxx/actions-runner/_work/_tool/Python/3.11.4/x64/lib/python3.11/site-packages/apache_beam/io/gcp/bigquery.py", line 2102, in __init__ self.table_reference = bigquery_tools.parse_table_reference( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/xxx/actions-runner/_work/_tool/Python/3.11.4/x64/lib/python3.11/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 263, in parse_table_reference if isinstance(table, TableReference): ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ TypeError: isinstance() arg 2 must be a type, a tuple of types, or a union Error: Process completed with exit code 1. [/code] Проверено на данный момент: [list] [*]жестко закодированная схема, [*]проверьте, есть ли переменные предоставлены правильно, [*]заменитеknown_args.output_table на dict(). [/list] Что такое Я пропал?