На странице airflow отображается пустой список дагов. . Когда я проверяю настольное приложение Docker после запуска команды «docker-compose up —d», все контейнеры инициализируются, но когда я проверяю журналы планировщика, они пусты.
также в по какой-то причине журналы веб-сервера и планировщика я не вижу, как устанавливаются зависимости из requirments.txt
Я не знаю, какие еще детали добавить, но я добавлю свой код .
Заранее спасибо


docker-compose.yml
Код: Выделить всё
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.4.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
healthcheck:
test: ['CMD', 'bash', '-c', "echo 'ruok' | nc localhost 2181"]
interval: 10s
timeout: 5s
retries: 5
networks:
- confluent
broker:
image: confluentinc/cp-server:7.4.0
hostname: broker
container_name: broker
depends_on:
zookeeper:
condition: service_healthy
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'false'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
networks:
- confluent
healthcheck:
test: ['CMD', 'bash', '-c', 'nc -z localhost 9092']
interval: 10s
timeout: 5s
retries: 5
schema-registry:
image: confluentinc/cp-schema-registry:7.4.0
hostname: schema-registry
container_name: schema-registry
depends_on:
broker:
condition: service_healthy
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
networks:
- confluent
healthcheck:
test: [ "CMD", "curl", "-f", "http://localhost:8081/" ]
interval: 30s
timeout: 10s
retries: 5
control-center:
image: confluentinc/cp-enterprise-control-center:7.4.0
hostname: control-center
container_name: control-center
depends_on:
broker:
condition: service_healthy
schema-registry:
condition: service_healthy
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
CONFLIENT_METRICS_ENABLE: 'false'
PORT: 9021
networks:
- confluent
healthcheck:
test: [ "CMD", "curl", "-f", "http://localhost:9021/health" ]
interval: 30s
timeout: 10s
retries: 5
webserver:
image: apache/airflow
command: webserver
entrypoint: ['/opt/airflow/script/entrypoint.sh']
depends_on:
- postgres
environment:
- LOAD_EX=n
- EXECUTOR=Sequential
- AIRFLOW__DATABASE__SQL_ALECHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgresql:5432/airflow
- AIRFLOW_WEBSERVER_SECRET_KEY=secret_key
logging:
options:
max-size: 10m
max-file: "3"
volumes:
- ./dags:/opt/airflow/dags
- ./script/entrypoint.sh:/opt/airflow/script/entrypoint.sh
- ./requirments.txt:/opt/airflow/requirments.txt
ports:
- "8080:8080"
healthcheck:
test: ['CMD-SHELL', "[ -f /opt/airflow/airflow-webserver.pid ]"]
interval: 30s
timeout: 30s
retries: 3
networks:
- confluent
scheduler:
image: apache/airflow
depends_on:
webserver:
condition: service_healthy
volumes:
- ./dags:/opt/airflow/dags
- ./script/entrypoint.sh:/opt/airflow/scripts/encryption.sh
- ./requirements.txt:/opt/airflow/requirements.txt
environment:
- LOAD_EX=n
- EXECUTOR=Sequential
- AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgresql:5432/airflow
- AIRFLOW_WEBSERVER_SECRET_KEY=secret_key
command: bash -c "pip install -r ./requirements.txt && airflow db upgrade && airflow scheduler"
networks:
- confluent
postgres:
image: postgres
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
logging:
options:
max-size: 10m
max-file: "3"
networks:
- confluent
spark-master:
image: bitnami/spark:latest
command: bin/spark-class org.apache.spark.deploy.master.Master
ports:
- "9090:8080"
- "7077:7077"
networks:
- confluent
spark-worker:
image: bitnami/spark:latest
command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
depends_on:
- spark-master
environment:
SPARK_MODE: worker
SPARK_WORKER_CORES: 2
SPARK_WORKER_MEMORY: 1g
SPARK_MASTER_URL: spark://spark-master:7077
networks:
- confluent
cassandra_db:
image: cassandra:latest
container_name: cassandra
hostname: cassandra
ports:
- "9042:9042"
environment:
- MAX_HEAP_SIZE=512M
- HEAP_NEWSIZE=100M
- CASSANDRA_USERNAME=cassandra
- CASSANDRA_PASSWORD=cassandra
networks:
- confluent
networks:
confluent:
enter code here
Код: Выделить всё
import logging
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
import uuid
default_args = {
'owner': 'airscholar',
'start_date': datetime(2025, 8, 3, 10, 00)
}
def get_data():
import requests
res = requests.get('https://randomuser.me/api/').json()
return res['results'][0]
def format_data(res):
data = {}
location = res['location']
data['id'] = str(uuid.uuid4())
data['first_name'] = res['name']['first']
data['last_name'] = res['name']['last']
data['gender'] = res['gender']
data['address'] = f"{str(location['street']['number'])} {location['street']['name']}, " \
f"{location['city']}, {location['state']}, {location['country']}"
data['post_code'] = location['postcode']
data['email'] = res['email']
data['username'] = res['login']['username']
data['dob'] = res['dob']['date']
data['registered_date'] = res['registered']['date']
data['phone'] = res['phone']
data['picture'] = res['picture']['medium']
return data
def stream_data():
import json
from kafka import KafkaProducer
import time
producer = KafkaProducer(bootstrap_servers=['broker:29092'], max_block_ms=5000)
curr_time = time.time()
while True:
if time.time() > curr_time + 60:
break
try:
res = get_data()
res = format_data(res)
producer.send('users_created', json.dumps(res).encode('utf-8'))
except Exception as e:
logging.error(f'An error occured: {e}')
continue
with DAG('user_automation',
default_args=default_args,
schedule_interval='@daily',
catchup=False) as dag:
streaming_task = PythonOperator(
task_id='stream_data_from_api',
python_callable=stream_data
)
Код: Выделить всё
#!/bin/bash
set -e
if [ -e "/opt/airflow/requirements.txt" ]; then
$(command python) pip install --upgrade pip
$(command -v pip) install --user -r requirements.txt
fi
if [ ! -f "/opt/airflow/airflow.db" ]; then
airflow db init && \
airflow users create \
--username admin \
--firstname admin \
--lastname admin \
--role Admin \
--email admin@example.com \
--password admin
fi
$(command -v airflow) db upgrade
exec airflow webserver
Подробнее здесь: https://stackoverflow.com/questions/791 ... st-may-not
Мобильная версия