Как я могу подключиться к Bigquery в Airflow, используя JSON федеративного токена AWS?Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Как я могу подключиться к Bigquery в Airflow, используя JSON федеративного токена AWS?

Сообщение Anonymous »

Мне нужно создать коннектор воздушного потока, который считывает данные из Google BigQuery, несмотря на то, что он размещен на aws.
Моя команда разработчиков создала секретный json в формате:
{
"type": "external_account",
"audience": xx,
"subject_token_type": xx,
"service_account_impersonation_url": xx,
"token_uri": xx,
"credential_source": {xx, xx, xx}
}

что, по моему мнению, соответствует https://google.aip.dev/auth/4117
Они загрузили секрет в SSM в качестве параметра, который я пытаюсь получить используя SsmHook.getparameter_value(). Это может работать или не работать, но я понятия не имею, как я могу использовать это для настройки BigQueryGetDataOperator, который, похоже, требует аутентификации другим способом.
Если я просто вытащу секрет и запущу оператор, мой ответ будет таким: :
airflow.exceptions.AirflowNotFoundException: The conn_id `google_cloud_default` isn't defined

Может ли кто-нибудь посоветовать, как лучше действовать, учитывая эти инфраструктурные ограничения? Спасибо
from datetime import datetime
import pandas as pd
from google.cloud import bigquery
from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryGetDataOperator
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.providers.amazon.aws.hooks.ssm import SsmHook

PROJECT = "xxx"
DATASET = "xxx"
TABLE = "xxx"

def get_ssm_parameter(parameter_name, with_decryption=True):
hook = SsmHook()
return hook.get_parameter_value(parameter_name, with_decryption)

def get_token():
api_token = get_ssm_parameter(
parameter_name="/prod/xxx/airflow", with_decryption=True
)

with DAG(
"example-dag",
schedule_interval=None,
start_date=datetime(2021, 1, 1),
catchup=False,
) as dag:
start = EmptyOperator(task_id="start")
t1 = PythonOperator(
task_id="read-bigquery",
python_callable=get_token,
)
get_data = BigQueryGetDataOperator(
task_id="get_data",
dataset_id=DATASET,
table_id=TABLE,
max_results=10,
)
end = EmptyOperator(task_id="end")
start >> t1 >> get_data >> end



Подробнее здесь: https://stackoverflow.com/questions/783 ... token-json
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»