Воздушный поток Apache: имя модуля «воздушный поток» отсутствует. Задача не выполненаPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Воздушный поток Apache: имя модуля «воздушный поток» отсутствует. Задача не выполнена

Сообщение Anonymous »

Я установил Airflow с помощью докера согласно этому видео. (Windows+WSL+Ubuntu+Docker)
У меня есть файл my_dag.py (заменен APIKEY):

Код: Выделить всё

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import pandas as pd

def _read_api_data():
x1 = requests.get('http://api.openweathermap.org/data/2.5/weather?q=London&appid=APIKEY')
x2 = requests.get('http://api.openweathermap.org/data/2.5/weather?q=Moscow&appid=APIKEY')

responses = [x1, x2]
output_object = {"responses": []}
for x in responses:
json_object = x.json()
output_object["responses"].append(json_object)
json_str = json.dumps(output_object, indent=4)
# Writing to weather_data.json
with open("/tmp/weather_data.json", "w") as outfile:
outfile.write(json_str)

def _download_data():
with open('/tmp/weather_data.json') as f:
d = json.load(f)
responses = d["responses"]
temps_K = [round(r["main"]["temp"] - 273.15, 2) for r in responses]
names = [r["name"] for r in responses]
df = pd.DataFrame({"names": names, "temps_K": temps_K})
return df

def _process_data(ti):
df = ti.xcom_pull(task_ids="download_data")
df["temps_C"] = round(temps["temps_K"] - 273.15, 2)
df.to_csv('/tmp/processed_weather_data.csv')
return df

def _save_data(ti):
df = ti.xcom_pull(task_ids="process_data")
df.to_parquet('/tmp/weather.parquet')

with DAG("weather_data_pipeline_dag",
schedule_interval="@once",
start_date=datetime(2024,7,14)) as dag:
taskPython0 = PythonOperator(
task_id = "read_api_data",
python_callable = _read_api_data
)
taskPython1 = PythonOperator(
task_id = "download_data",
python_callable = _download_data
)
taskPython2 = PythonOperator(
task_id = "process_data",
python_callable = _process_data
)
taskPython3 = PythonOperator(
task_id = "save_data",
python_callable = _save_data
)
taskPython0 >> taskPython1 >> taskPython2 >> taskPython3
Но моя попытка не удалась. Задача read_api_data не выполнена.
Изображение
. Как это решить? где я могу прочитать отслеживание LOG моего DAG? похоже обдув не дает подробностей.

Подробнее здесь: https://stackoverflow.com/questions/787 ... ask-failed
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»