Проблемы с подключением экземпляра Postgres Google Cloud sql из конвейера луча ⇐ Python
Проблемы с подключением экземпляра Postgres Google Cloud sql из конвейера луча
У меня возникли проблемы с подключением к экземпляру Postgresql в Google Cloud SQL, и я хотел обратиться за помощью. Я не уверен, что решением будет запуск механизма подключения или чего-то в этом роде, но вот моя проблема. Мой код выглядит следующим образом
импортировать apache_beam как луч из apache_beam.options.pipeline_options импортировать PipelineOptions из луч_нуггетс.io импортировать реляционную_дб source_config =relational_db.SourceConfiguration( имя_драйвера='postgresql+pg8000', хост = 'локальный хост', порт=5432, имя пользователя = ИМЯ ПОЛЬЗОВАТЕЛЯ, пароль= ПАРОЛЬ, база данных = ИМЯ-БД, create_if_missing=Истина, ) table_config = реляционная_дб.TableConfiguration( name=ИМЯ-ТАБЛИЦЫ, create_if_missing = Ложь, Primary_key_columns=["ключ"], create_insert_f=ФУНКЦИЯ, ) с лучом.Pipeline(options= Pipeline_options) в качестве конвейера: update_pipe= ( трубопровод | 'QueryTable' >> луч.io.ReadFromBigQuery( таблица = ТАБЛИЦА) | «ОБНОВЛЕНИЕ БД» >>relation_db.Write(source_config=source_config, table_config=table_config) ) Выполнение такого кода привело к следующей ошибке:
RuntimeError: sqlalchemy.exc.InterfaceError: (pg8000.Exceptions.InterfaceError) Невозможно создать соединение с локальным хостом и портом 5432 (время ожидания — None, а source_address — None). Я прочитал документацию и некоторые связанные вопросы по stackoverflow, где я видел некоторые предложения, такие как подключение по частному IP или попытка аутентификации с помощью Gcloud CLI и другие подобные вещи. Но мое недоумение связано со следующим. Если я использую sqlalchemy, не пытаясь реализовать ее в лучевом конвейере Apache, я не получаю такого же отказа в соединении. И один из аргументов явно определяет типы IP, которые следует искать как общедоступные
импортировать систему импортировать sqlalchemy из google.cloud.sql.connector Коннектор импорта, IPTypes # инициализируем объект соединителя Python соединитель = соединитель() # Функция подключения к базе данных Python Connector защита getconn(): конн = разъем.connect( CLOUD-SQL-CONNECTION-NAME, # Имя подключения экземпляра Cloud SQL "pg8000", пользователь = ИМЯ ПОЛЬЗОВАТЕЛЯ, пароль=ПАРОЛЬ, db=ИМЯ-БД, ip_type= IPTypes.PUBLIC # IPTypes.PRIVATE для частного IP ) возврат соединения # создаем пул соединений с аргументом «создатель» для нашей функции объекта соединения пул = sqlalchemy.create_engine( "postgresql+pg8000://", создатель = getconn, ) # взаимодействовать с базой данных Cloud SQL, используя пул соединений с помощью Pool.connect() как db_conn: result = db_conn.execute(sqlalchemy.text("SELECT * fromusers_copy LIMIT 10")).fetchall() Итак, мой вопрос: есть ли способ включить соединение/движок в конвейер Beam, чтобы избежать ошибки? Или мне нужно изменить аргументы исходной конфигурации, чтобы включить имя подключения к экземпляру Cloud SQL?
Спасибо за помощь и за то, что прочитали мою проблему.
У меня возникли проблемы с подключением к экземпляру Postgresql в Google Cloud SQL, и я хотел обратиться за помощью. Я не уверен, что решением будет запуск механизма подключения или чего-то в этом роде, но вот моя проблема. Мой код выглядит следующим образом
импортировать apache_beam как луч из apache_beam.options.pipeline_options импортировать PipelineOptions из луч_нуггетс.io импортировать реляционную_дб source_config =relational_db.SourceConfiguration( имя_драйвера='postgresql+pg8000', хост = 'локальный хост', порт=5432, имя пользователя = ИМЯ ПОЛЬЗОВАТЕЛЯ, пароль= ПАРОЛЬ, база данных = ИМЯ-БД, create_if_missing=Истина, ) table_config = реляционная_дб.TableConfiguration( name=ИМЯ-ТАБЛИЦЫ, create_if_missing = Ложь, Primary_key_columns=["ключ"], create_insert_f=ФУНКЦИЯ, ) с лучом.Pipeline(options= Pipeline_options) в качестве конвейера: update_pipe= ( трубопровод | 'QueryTable' >> луч.io.ReadFromBigQuery( таблица = ТАБЛИЦА) | «ОБНОВЛЕНИЕ БД» >>relation_db.Write(source_config=source_config, table_config=table_config) ) Выполнение такого кода привело к следующей ошибке:
RuntimeError: sqlalchemy.exc.InterfaceError: (pg8000.Exceptions.InterfaceError) Невозможно создать соединение с локальным хостом и портом 5432 (время ожидания — None, а source_address — None). Я прочитал документацию и некоторые связанные вопросы по stackoverflow, где я видел некоторые предложения, такие как подключение по частному IP или попытка аутентификации с помощью Gcloud CLI и другие подобные вещи. Но мое недоумение связано со следующим. Если я использую sqlalchemy, не пытаясь реализовать ее в лучевом конвейере Apache, я не получаю такого же отказа в соединении. И один из аргументов явно определяет типы IP, которые следует искать как общедоступные
импортировать систему импортировать sqlalchemy из google.cloud.sql.connector Коннектор импорта, IPTypes # инициализируем объект соединителя Python соединитель = соединитель() # Функция подключения к базе данных Python Connector защита getconn(): конн = разъем.connect( CLOUD-SQL-CONNECTION-NAME, # Имя подключения экземпляра Cloud SQL "pg8000", пользователь = ИМЯ ПОЛЬЗОВАТЕЛЯ, пароль=ПАРОЛЬ, db=ИМЯ-БД, ip_type= IPTypes.PUBLIC # IPTypes.PRIVATE для частного IP ) возврат соединения # создаем пул соединений с аргументом «создатель» для нашей функции объекта соединения пул = sqlalchemy.create_engine( "postgresql+pg8000://", создатель = getconn, ) # взаимодействовать с базой данных Cloud SQL, используя пул соединений с помощью Pool.connect() как db_conn: result = db_conn.execute(sqlalchemy.text("SELECT * fromusers_copy LIMIT 10")).fetchall() Итак, мой вопрос: есть ли способ включить соединение/движок в конвейер Beam, чтобы избежать ошибки? Или мне нужно изменить аргументы исходной конфигурации, чтобы включить имя подключения к экземпляру Cloud SQL?
Спасибо за помощь и за то, что прочитали мою проблему.
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение