Почему из потока смены гаечного ключа не читаются записи?Python

Программы на Python
Ответить
Anonymous
 Почему из потока смены гаечного ключа не читаются записи?

Сообщение Anonymous »

Я пытаюсь написать поток данных Python GCP, который обрабатывает записи из потока изменений Spanner и распечатывает их. Я запускаю его локально, и он работает, но не печатает записи при обновлении записи в базе данных.
Моя настройка:
  • создала базу данных Spanner, таблицу под названием Пациенты и поток изменений под названием PatientsChangeStream. Поток изменений выглядит следующим образом:
CREATE CHANGE STREAM PatientsChangeStream
FOR Patients
OPTIONS (
value_capture_type = 'NEW_ROW',
retention_period = '7d',
exclude_insert = false,
exclude_update = false,
exclude_delete = false
);

Вот код Python. Кажется, он подключается к базе данных, привязывается к потоку изменений, но не печатает никаких записей после того, как я вставляю или обновляю записи в таблице Пациенты.
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions
from apache_beam.io.gcp.spanner import ReadChangeStreamFromSpanner
from datetime import datetime, timedelta, timezone
from google.oauth2 import service_account

class PrintChangeRecord(beam.DoFn):
def __init__(self):
self.count = 0

def process(self, element):
self.count += 1
print(f"Change Record #{self.count}: {element}")
yield element

def run_pipeline(project_id, instance_id, database_id, change_stream_name, metadata_database_id, credentials_path):
# Configure pipeline options with Google Cloud settings
options = PipelineOptions()

google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = project_id
google_cloud_options.region = 'us-central1'

# Use service account key file
credentials = service_account.Credentials.from_service_account_file(
credentials_path,
scopes=['https://www.googleapis.com/auth/cloud-platform']
)
google_cloud_options.service_account_email = credentials.service_account_email

# Set environment variable for the pipeline to run locally
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credentials_path
print(f"Using service account: {credentials.service_account_email}")

standard_options = options.view_as(StandardOptions)
standard_options.runner = 'DirectRunner' # Use 'DataflowRunner' for production

# subtract 24 hours from the start_time for inclusiveStartAt
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(hours=24) # Look back 24 hours

print(f"==> start time is {start_time.isoformat()}")
print(f"==> end time is {end_time.isoformat()}")

with beam.Pipeline(options=options) as pipeline:
print("Pipeline object created")
# Read from the Spanner change stream
change_stream_records = (pipeline | "ReadChangeStream" >> ReadChangeStreamFromSpanner(
project_id=project_id,
instance_id=instance_id,
database_id=database_id,
changeStreamName=change_stream_name,
metadataDatabase=metadata_database_id,
metadataInstance=instance_id,
inclusiveStartAt=start_time.isoformat(),
inclusiveEndAt=end_time.isoformat()
))

# Print each change record
change_stream_records | "PrintRecords" >> beam.ParDo(PrintChangeRecord())

print("Pipeline execution completed")

if __name__ == "__main__":
# Replace with your actual project, instance, database, and change stream details
PROJECT_ID = "xxxxxx"
INSTANCE_ID = "yyyyyy"
DATABASE_ID = "zzzzzz"
CHANGE_STREAM_NAME = "PatientsChangeStream"
METADATA_DATABASE_ID = "metadata" # A separate database for metadata
CREDENTIALS_PATH = "c:/xxxxx/yyyyyyy.json"

try:
print("Starting pipeline...")
run_pipeline(PROJECT_ID, INSTANCE_ID, DATABASE_ID, CHANGE_STREAM_NAME, METADATA_DATABASE_ID, CREDENTIALS_PATH)
print("Script completed")
except KeyboardInterrupt:
print("Script killed by user")
except Exception as ex:
print(f"An exception occurred: {str(ex)}")



Подробнее здесь: https://stackoverflow.com/questions/798 ... nge-stream
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»