Сценарий Python для потока данных GCP не удалось выполнить в потоке действий GitHubPython

Программы на Python
Ответить
Anonymous
 Сценарий Python для потока данных GCP не удалось выполнить в потоке действий GitHub

Сообщение Anonymous »

Я столкнулся с проблемой со скриптом Python для потока данных GCP в рабочем процессе действий GitHub, когда я пытаюсь записать данные из Pub/Sub в BigQuery.
Он возвращает ошибку, связанную с записью в Big Query.
Скрипт Python:

Код: Выделить всё

import argparse
import json
import os
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions

logging.basicConfig(level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)

# Service account key path
INPUT_SUBSCRIPTION = f"projects/{os.getenv('PROJECT_ID')}/subscriptions/{os.getenv('SUBSCRIPTION_NAME')}"
#BIGQUERY_TABLE = f"{os.getenv('PROJECT_ID')}:{os.getenv('DATASET_NAME')}.{os.getenv('TABLE_NAME')}"
BIGQUERY_TABLE = "gcp-live-data-xxx:tx.tx"

# Load schema from file
# with open('terraform/schemas/tx_tx_schema.json', 'r') as file:
#     SCHEMA = json.load(file)

# BIGQUERY_SCHEMA = ",".join([f"{item['name']}:{item['type']}" for item in SCHEMA])
BIGQUERY_SCHEMA = "TX_ID:STRING, TX_TX:TIMESTAMP, CUSTOMER_ID:STRING, TERMINAL_ID:STRING, TX_AMOUNT:NUMERIC"

# Debugging
logging.info(f"Using BigQuery Table: {BIGQUERY_TABLE}")
logging.info(f"Using BigQuery Schema: {BIGQUERY_SCHEMA}")
print(f"Using BigQuery Table: {BIGQUERY_TABLE}")
print(f"Using BigQuery Schema: {BIGQUERY_SCHEMA}")

class CustomParsing(beam.DoFn):
""" Custom ParallelDo class to apply a custom transformation """

def to_runner_api_parameter(self, unused_context):
# Not very relevant, returns a URN (uniform resource name) and the payload
return "beam:transforms:custom_parsing:custom_v0", None

def process(self, element: bytes, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam):
"""
Simple processing function to parse the data and add a timestamp
For additional params see:
https://beam.apache.org/releases/pydoc/2.7.0/apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn
"""
parsed = json.loads(element.decode("utf-8"))
parsed["timestamp"] = timestamp.to_rfc3339()
yield parsed

def run():
# Parsing arguments
parser = argparse.ArgumentParser()
parser.add_argument(
"--input_subscription",
help='Input PubSub subscription of the form "projects/
/subscriptions/."',
default=INPUT_SUBSCRIPTION,
)
parser.add_argument(
"--output_table", help="Output BigQuery Table", default=BIGQUERY_TABLE
)
parser.add_argument(
"--output_schema",
help="Output BigQuery Schema in text format",
default=BIGQUERY_SCHEMA,
)
known_args, pipeline_args = parser.parse_known_args()

# Debbuging
logging.info(f"Output Table: {known_args.output_table}")
logging.info(f"Output Schema: {known_args.output_schema}")
print(f"Output Table: {known_args.output_table}")
print(f"Output Schema: {known_args.output_schema}")

# Creating pipeline options
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(StandardOptions).streaming = True

# Defining our pipeline and its steps
with beam.Pipeline(options=pipeline_options) as p:
(
p
| "ReadFromPubSub" >> beam.io.gcp.pubsub.ReadFromPubSub(
subscription=known_args.input_subscription, timestamp_attribute=None
)
| "CustomParse" >> beam.ParDo(CustomParsing())
| "WriteToBigQuery" >>  beam.io.WriteToBigQuery(
table=known_args.output_table,
schema=known_args.output_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
)
)

if __name__ == "__main__":
run()
Код вакансии GitHub:

Код: Выделить всё

dataflow-job:
name: 'Dataflow-job'
runs-on: [self-hosted, linux, x64, gcp, terraform, iac]

# Use the Bash shell regardless whether the GitHub Actions runner is ubuntu-latest, macos-latest, or windows-latest
defaults:
run:
shell: bash
working-directory: .

env:
PROJECT_ID: "${{ vars.PROJECT_ID }}"
SUBSCRIPTION_NAME: "${{ vars.SUBSCRIPTION_NAME }}"
DATASET_NAME: "${{ vars.DATASET_NAME }}"
TABLE_NAME: "${{ vars.TABLE_NAME }}"
REGION: "${{ vars.REGION }}"

steps:
# Checkout the repository to the GitHub Actions runner
- name: Checkout
uses: actions/checkout@v4

# Set up Python environment
- name: "Set up Python"
uses: actions/setup-python@v4
with:
python-version: "3.11.4"

# Install dependencies
- name: "Install dependencies"
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt

# Authenticate with Google Cloud
- name: "Authenticate with Google Cloud"
env:
GOOGLE_APPLICATION_CREDENTIALS: ${{ secrets.GOOGLE_APPLICATION_CREDENTIALS }}
run: |
echo "${GOOGLE_APPLICATION_CREDENTIALS}" > ${HOME}/gcloud.json
gcloud auth activate-service-account --key-file=${HOME}/gcloud.json
gcloud config set project $PROJECT_ID

# Run Python script to trigger Dataflow job
- name: "Run Dataflow Job"
env:
GOOGLE_APPLICATION_CREDENTIALS: ${HOME}/gcloud.json
run: |
python src/python/dataflow-tx-pipeline.py --runner DataflowRunner --project gcp-live-data-xxx --region us-central1 --temp_location gs://ff-tx-dataflow/temp --staging_location gs://ff-tx-dataflow/staging
Ошибка:

Код: Выделить всё

INFO:root:Using BigQuery Table: gcp-live-data-xxx:tx.tx
INFO:root:Using BigQuery Schema: TX_ID:STRING, TX_TX:TIMESTAMP, CUSTOMER_ID:STRING, TERMINAL_ID:STRING, TX_AMOUNT:NUMERIC
INFO:root:Output Table: gcp-live-data-xxx:tx.tx
INFO:root:Output Schema: TX_ID:STRING, TX_TX:TIMESTAMP, CUSTOMER_ID:STRING, TERMINAL_ID:STRING, TX_AMOUNT:NUMERIC
WARNING:apache_beam.options.pipeline_options:Unable to check soft delete policy due to import error.
WARNING:apache_beam.options.pipeline_options:Unable to check soft delete policy due to import error.
Traceback (most recent call last):
File "/xxx/src/python/dataflow-tx-pipeline.py", line 92, in 
run()
Using BigQuery Table: gcp-live-data-xxx:tx.tx
Using BigQuery Schema: TX_ID:STRING, TX_TX:TIMESTAMP, CUSTOMER_ID:STRING, TERMINAL_ID:STRING, TX_AMOUNT:NUMERIC
Output Table: gcp-live-data-xxx:tx.tx
Output Schema: TX_ID:STRING, TX_TX:TIMESTAMP, CUSTOMER_ID:STRING, TERMINAL_ID:STRING, TX_AMOUNT:NUMERIC
File "/xxx/src/python/dataflow-tx-pipeline.py", line 83, in run
| "WriteToBigQuery" >> beam.io.WriteToBigQuery(
^^^^^^^^^^^^^^^^^^^^^^^^
File "/xxx/actions-runner/_work/_tool/Python/3.11.4/x64/lib/python3.11/site-packages/apache_beam/io/gcp/bigquery.py", line 2102, in __init__
self.table_reference = bigquery_tools.parse_table_reference(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/xxx/actions-runner/_work/_tool/Python/3.11.4/x64/lib/python3.11/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 263, in parse_table_reference
if isinstance(table, TableReference):
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: isinstance() arg 2 must be a type, a tuple of types, or a union
Error: Process completed with exit code 1.
Проверено на данный момент:
  • жестко закодированная схема,
  • проверьте, есть ли переменные предоставлены правильно,
  • заменитеknown_args.output_table на dict().
Что такое Я пропал?

Подробнее здесь: https://stackoverflow.com/questions/791 ... tions-flow
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»