Мне нужно создать коннектор воздушного потока, который считывает данные из Google BigQuery, несмотря на то, что он размещен на aws.
Моя команда разработчиков создала секретный json в формате:
{
"type": "external_account",
"audience": xx,
"subject_token_type": xx,
"service_account_impersonation_url": xx,
"token_uri": xx,
"credential_source": {xx, xx, xx}
}
что, по моему мнению, соответствует https://google.aip.dev/auth/4117
Они загрузили секрет в SSM в качестве параметра, который я пытаюсь получить используя SsmHook.getparameter_value(). Это может работать или не работать, но я понятия не имею, как я могу использовать это для настройки BigQueryGetDataOperator, который, похоже, требует аутентификации другим способом.
Если я просто вытащу секрет и запущу оператор, мой ответ будет таким: :
airflow.exceptions.AirflowNotFoundException: The conn_id `google_cloud_default` isn't defined
Может ли кто-нибудь посоветовать, как лучше действовать, учитывая эти инфраструктурные ограничения? Спасибо
from datetime import datetime
import pandas as pd
from google.cloud import bigquery
from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryGetDataOperator
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.providers.amazon.aws.hooks.ssm import SsmHook
PROJECT = "xxx"
DATASET = "xxx"
TABLE = "xxx"
def get_ssm_parameter(parameter_name, with_decryption=True):
hook = SsmHook()
return hook.get_parameter_value(parameter_name, with_decryption)
def get_token():
api_token = get_ssm_parameter(
parameter_name="/prod/xxx/airflow", with_decryption=True
)
with DAG(
"example-dag",
schedule_interval=None,
start_date=datetime(2021, 1, 1),
catchup=False,
) as dag:
start = EmptyOperator(task_id="start")
t1 = PythonOperator(
task_id="read-bigquery",
python_callable=get_token,
)
get_data = BigQueryGetDataOperator(
task_id="get_data",
dataset_id=DATASET,
table_id=TABLE,
max_results=10,
)
end = EmptyOperator(task_id="end")
start >> t1 >> get_data >> end
Подробнее здесь: https://stackoverflow.com/questions/783 ... token-json
Как я могу подключиться к Bigquery в Airflow, используя JSON федеративного токена AWS? ⇐ Python
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение