Моя настройка:
- создала базу данных Spanner, таблицу под названием Пациенты и поток изменений под названием PatientsChangeStream. Поток изменений выглядит следующим образом:
FOR Patients
OPTIONS (
value_capture_type = 'NEW_ROW',
retention_period = '7d',
exclude_insert = false,
exclude_update = false,
exclude_delete = false
);
Вот код Python. Кажется, он подключается к базе данных, привязывается к потоку изменений, но не печатает никаких записей после того, как я вставляю или обновляю записи в таблице Пациенты.
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions
from apache_beam.io.gcp.spanner import ReadChangeStreamFromSpanner
from datetime import datetime, timedelta, timezone
from google.oauth2 import service_account
class PrintChangeRecord(beam.DoFn):
def __init__(self):
self.count = 0
def process(self, element):
self.count += 1
print(f"Change Record #{self.count}: {element}")
yield element
def run_pipeline(project_id, instance_id, database_id, change_stream_name, metadata_database_id, credentials_path):
# Configure pipeline options with Google Cloud settings
options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = project_id
google_cloud_options.region = 'us-central1'
# Use service account key file
credentials = service_account.Credentials.from_service_account_file(
credentials_path,
scopes=['https://www.googleapis.com/auth/cloud-platform']
)
google_cloud_options.service_account_email = credentials.service_account_email
# Set environment variable for the pipeline to run locally
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credentials_path
print(f"Using service account: {credentials.service_account_email}")
standard_options = options.view_as(StandardOptions)
standard_options.runner = 'DirectRunner' # Use 'DataflowRunner' for production
# subtract 24 hours from the start_time for inclusiveStartAt
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(hours=24) # Look back 24 hours
print(f"==> start time is {start_time.isoformat()}")
print(f"==> end time is {end_time.isoformat()}")
with beam.Pipeline(options=options) as pipeline:
print("Pipeline object created")
# Read from the Spanner change stream
change_stream_records = (pipeline | "ReadChangeStream" >> ReadChangeStreamFromSpanner(
project_id=project_id,
instance_id=instance_id,
database_id=database_id,
changeStreamName=change_stream_name,
metadataDatabase=metadata_database_id,
metadataInstance=instance_id,
inclusiveStartAt=start_time.isoformat(),
inclusiveEndAt=end_time.isoformat()
))
# Print each change record
change_stream_records | "PrintRecords" >> beam.ParDo(PrintChangeRecord())
print("Pipeline execution completed")
if __name__ == "__main__":
# Replace with your actual project, instance, database, and change stream details
PROJECT_ID = "xxxxxx"
INSTANCE_ID = "yyyyyy"
DATABASE_ID = "zzzzzz"
CHANGE_STREAM_NAME = "PatientsChangeStream"
METADATA_DATABASE_ID = "metadata" # A separate database for metadata
CREDENTIALS_PATH = "c:/xxxxx/yyyyyyy.json"
try:
print("Starting pipeline...")
run_pipeline(PROJECT_ID, INSTANCE_ID, DATABASE_ID, CHANGE_STREAM_NAME, METADATA_DATABASE_ID, CREDENTIALS_PATH)
print("Script completed")
except KeyboardInterrupt:
print("Script killed by user")
except Exception as ex:
print(f"An exception occurred: {str(ex)}")
Подробнее здесь: https://stackoverflow.com/questions/798 ... nge-stream
Мобильная версия