Я новичок в Dagster и начал свой первый проект, в котором я использую DuckDB для чтения файлов json из папки и записи их в Ducklake. Мой Ducklake использует Azure Data Lake Storage Gen2 для хранения и Postgres в качестве каталога метаданных.
Запись в ADLS возможна начиная с версии DuckDB 1.4.3 и прекрасно работает вне моего проекта.
Локально (через dg dev) я могу без проблем запустить ресурс Dagster, чтобы данные поступали в Ducklake.
Теперь у меня все работает в контейнерах через Docker Compose (1 для ведения журнала, 1 для веб-сервера, 1 для демона и 1 для базы кода), и он не работает. Запуск можно запустить, но он прерывается в момент записи с сообщениями об ошибках:
Ошибка: Ошибка ввода-вывода: AzureBlobStorageFileSystem не удалось открыть файл
и
Ошибка DuckDB: не удалось получить новое соединение для: «https://mystorageaccount.blob.core.windows.net»
Я уже запустил отдельный контейнер в качестве теста, который запускается с тем же образом, что и сервер кодовой базы Dagster, и выполняет только сценарий Python ресурса. Там все работает. Мне кажется, что это не работает только в контексте Docker проекта Dagster.
Кто-нибудь может мне помочь, потому что я в отчаянии на данный момент.
Это код актива. Я знаю, что код не идеален и во многих местах требует рефакторинга, но это должна быть первая попытка заставить этот процесс работать:
import dagster as dg
from logs.defs.partitions import time_based_partitions
from logs.defs.assets import constants
import duckdb
from pathlib import Path
@dg.asset(
partitions_def=time_based_partitions.daily_partition,
group_name="raw"
)
def Logs(context: dg.AssetExecutionContext) -> dg.MaterializeResult:
LOG_TYPE = "Home"
SCHEMA = "raw"
partition_date = context.partition_key
duckdb_path = Path(__file__).parent.parent.parent.parent / 'duckdb' / 'data.duckdb'
with duckdb.connect(database=duckdb_path, read_only=False) as con:
duckdb_version = con.execute("SELECT version() AS version;").fetchall()
context.log.info(f"DuckDB-Version: {duckdb_version}")
try:
con.execute("INSTALL azure;")
con.execute("INSTALL ducklake;")
con.execute("INSTALL postgres;")
con.execute("LOAD azure;")
con.execute("LOAD ducklake;")
con.execute("LOAD postgres;")
con.execute("SET azure_transport_option_type = 'curl';")
except:
context.log.warning("Loading DUCKDB extensions failed.")
return
try:
azure_secret = con.execute(f"""
CREATE PERSISTENT SECRET IF NOT EXISTS azure_lake (
TYPE azure,
CONNECTION_STRING 'xxxxxx');
""").fetchall()
# SCOPE ['az://mystorageaccount.blob.core.windows.net/lake/']
context.log.info(azure_secret)
except Exception as e:
context.log.error(f"Secret-Error: {azure_secret}")
context.log.error(f"Error: {e}")
raise dg.Failure()
try:
con.execute(f"ATTACH 'ducklake:postgres:dbname=db user=user password=pwd host=postgresdb.postgres.database.azure.com port=5432' AS lake (DATA_PATH 'az://mystorageaccount.blob.core.windows.net/lake/');")
except duckdb.Error as e:
context.log.error("Ducklake connection failed.")
context.log.error(f"DuckDB-Error: {e}")
raise dg.Failure()
except Exception as e:
context.log.error("Ducklake connection failed.")
context.log.error(f"Error: {e}")
raise dg.Failure()
try:
con.execute(f"DELETE FROM lake.{SCHEMA}.{LOG_TYPE} WHERE partition_date = CAST('{partition_date}' AS DATE)")
except Exception as e:
context.log.error(f"Table {SCHEMA}.{LOG_TYPE} not found.")
context.log.error(f"Error: {e}")
for machinename in constants.LOG_MACHINES:
base_path = constants.LOG_FILE_ROOT_PATH.format(partition_date) + f'/{LOG_TYPE}/{machinename}/*.json'
context.log.info(base_path)
try:
con.execute(f"""
INSERT INTO lake.{SCHEMA}.{LOG_TYPE}
SELECT
{constants.COLUMNS_Home},
CAST('{partition_date}' AS DATE) as partition_date,
now() as tst
FROM read_json('{base_path}')
""")
except duckdb.Error as e:
context.log.error("Ducklake Connection failed.")
context.log.error(f"DuckDB-Error: {e}")
raise dg.Failure()
except Exception as e:
context.log.error("Ducklake Connection failed.")
context.log.error(f"Error: {e}")
raise dg.Failure()
count = con.execute(f"SELECT count(*) FROM lake.{SCHEMA}.{LOG_TYPE} WHERE partition_date = '{partition_date}'").fetchone()[0]
return dg.MaterializeResult(metadata={
"row_count": dg.MetadataValue.int(count)
})
И файл Dockerfile:
FROM python:3.12-slim
RUN apt-get update -y && apt-get install -y --no-install-recommends curl g++ unixodbc-dev cifs-utils ca-certificates
COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/
WORKDIR /opt/dagster/app
COPY . /opt/dagster/app
ENV UV_PROJECT_ENVIRONMENT="/usr/local/"
RUN uv sync --locked --no-dev
EXPOSE 4000
HEALTHCHECK --timeout=10s --start-period=15s --interval=20s --retries=4 CMD ["dagster", "api", "grpc-health-check", "-p", "4000"]
CMD ["dagster", "code-server", "start", "-h", "0.0.0.0", "-p", "4000", "-f", "definitions.py"]
Подробнее здесь: https://stackoverflow.com/questions/798 ... ject-was-d
Dagster: запись в Ducklake на базе Azure не работает, если проект был развернут через Docker ⇐ Python
Программы на Python
1769760581
Anonymous
Я новичок в Dagster и начал свой первый проект, в котором я использую DuckDB для чтения файлов json из папки и записи их в Ducklake. Мой Ducklake использует Azure Data Lake Storage Gen2 для хранения и Postgres в качестве каталога метаданных.
Запись в ADLS возможна начиная с версии DuckDB 1.4.3 и прекрасно работает вне моего проекта.
Локально (через dg dev) я могу без проблем запустить ресурс Dagster, чтобы данные поступали в Ducklake.
Теперь у меня все работает в контейнерах через Docker Compose (1 для ведения журнала, 1 для веб-сервера, 1 для демона и 1 для базы кода), и он не работает. Запуск можно запустить, но он прерывается в момент записи с сообщениями об ошибках:
Ошибка: Ошибка ввода-вывода: AzureBlobStorageFileSystem не удалось открыть файл
и
Ошибка DuckDB: не удалось получить новое соединение для: «https://mystorageaccount.blob.core.windows.net»
Я уже запустил отдельный контейнер в качестве теста, который запускается с тем же образом, что и сервер кодовой базы Dagster, и выполняет только сценарий Python ресурса. Там все работает. Мне кажется, что это не работает только в контексте Docker проекта Dagster.
Кто-нибудь может мне помочь, потому что я в отчаянии на данный момент.
Это код актива. Я знаю, что код не идеален и во многих местах требует рефакторинга, но это должна быть первая попытка заставить этот процесс работать:
import dagster as dg
from logs.defs.partitions import time_based_partitions
from logs.defs.assets import constants
import duckdb
from pathlib import Path
@dg.asset(
partitions_def=time_based_partitions.daily_partition,
group_name="raw"
)
def Logs(context: dg.AssetExecutionContext) -> dg.MaterializeResult:
LOG_TYPE = "Home"
SCHEMA = "raw"
partition_date = context.partition_key
duckdb_path = Path(__file__).parent.parent.parent.parent / 'duckdb' / 'data.duckdb'
with duckdb.connect(database=duckdb_path, read_only=False) as con:
duckdb_version = con.execute("SELECT version() AS version;").fetchall()
context.log.info(f"DuckDB-Version: {duckdb_version}")
try:
con.execute("INSTALL azure;")
con.execute("INSTALL ducklake;")
con.execute("INSTALL postgres;")
con.execute("LOAD azure;")
con.execute("LOAD ducklake;")
con.execute("LOAD postgres;")
con.execute("SET azure_transport_option_type = 'curl';")
except:
context.log.warning("Loading DUCKDB extensions failed.")
return
try:
azure_secret = con.execute(f"""
CREATE PERSISTENT SECRET IF NOT EXISTS azure_lake (
TYPE azure,
CONNECTION_STRING 'xxxxxx');
""").fetchall()
# SCOPE ['az://mystorageaccount.blob.core.windows.net/lake/']
context.log.info(azure_secret)
except Exception as e:
context.log.error(f"Secret-Error: {azure_secret}")
context.log.error(f"Error: {e}")
raise dg.Failure()
try:
con.execute(f"ATTACH 'ducklake:postgres:dbname=db user=user password=pwd host=postgresdb.postgres.database.azure.com port=5432' AS lake (DATA_PATH 'az://mystorageaccount.blob.core.windows.net/lake/');")
except duckdb.Error as e:
context.log.error("Ducklake connection failed.")
context.log.error(f"DuckDB-Error: {e}")
raise dg.Failure()
except Exception as e:
context.log.error("Ducklake connection failed.")
context.log.error(f"Error: {e}")
raise dg.Failure()
try:
con.execute(f"DELETE FROM lake.{SCHEMA}.{LOG_TYPE} WHERE partition_date = CAST('{partition_date}' AS DATE)")
except Exception as e:
context.log.error(f"Table {SCHEMA}.{LOG_TYPE} not found.")
context.log.error(f"Error: {e}")
for machinename in constants.LOG_MACHINES:
base_path = constants.LOG_FILE_ROOT_PATH.format(partition_date) + f'/{LOG_TYPE}/{machinename}/*.json'
context.log.info(base_path)
try:
con.execute(f"""
INSERT INTO lake.{SCHEMA}.{LOG_TYPE}
SELECT
{constants.COLUMNS_Home},
CAST('{partition_date}' AS DATE) as partition_date,
now() as tst
FROM read_json('{base_path}')
""")
except duckdb.Error as e:
context.log.error("Ducklake Connection failed.")
context.log.error(f"DuckDB-Error: {e}")
raise dg.Failure()
except Exception as e:
context.log.error("Ducklake Connection failed.")
context.log.error(f"Error: {e}")
raise dg.Failure()
count = con.execute(f"SELECT count(*) FROM lake.{SCHEMA}.{LOG_TYPE} WHERE partition_date = '{partition_date}'").fetchone()[0]
return dg.MaterializeResult(metadata={
"row_count": dg.MetadataValue.int(count)
})
И файл Dockerfile:
FROM python:3.12-slim
RUN apt-get update -y && apt-get install -y --no-install-recommends curl g++ unixodbc-dev cifs-utils ca-certificates
COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/
WORKDIR /opt/dagster/app
COPY . /opt/dagster/app
ENV UV_PROJECT_ENVIRONMENT="/usr/local/"
RUN uv sync --locked --no-dev
EXPOSE 4000
HEALTHCHECK --timeout=10s --start-period=15s --interval=20s --retries=4 CMD ["dagster", "api", "grpc-health-check", "-p", "4000"]
CMD ["dagster", "code-server", "start", "-h", "0.0.0.0", "-p", "4000", "-f", "definitions.py"]
Подробнее здесь: [url]https://stackoverflow.com/questions/79868131/dagster-writing-to-an-azure-based-ducklake-does-not-work-when-the-project-was-d[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия