Я передаю эту переменную, используя пользовательскую варианты трубопровода. Код для создания параметров конвейера выглядит следующим образом:
Код: Выделить всё
pipeline_options = ProcessBillRequests.CustomOptions(
project=gcp_project_id,
region="us-east1",
job_name=job_name,
temp_location=f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/temp',
staging_location=f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/staging',
runner='DataflowRunner',
save_main_session=True,
service_account_email= service_account,
subnetwork=os.environ.get(SUBNETWORK_URL),
extra_packages=[uplight_telemetry_tar_file_path],
setup_file=setup_file_path,
OTEL_SERVICE_NAME=otel_service_name,
OTEL_RESOURCE_ATTRIBUTES=otel_resource_attributes
# Set values for additional custom variables as needed
Код: Выделить всё
result = (
pipeline
| "ReadPendingRecordsFromDB" >> read_from_db
| "Parse input PCollection" >> beam.Map(ProcessBillRequests.parse_bill_data_requests)
| "Fetch bills " >> beam.ParDo(ProcessBillRequests.FetchBillInformation())
)
pipeline.run().wait_until_finish()
Подробнее здесь: https://stackoverflow.com/questions/776 ... ow-workers