Как динамически создать развертывание префекта из потока GIT-источника в конечной точке FastAPI?Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Как динамически создать развертывание префекта из потока GIT-источника в конечной точке FastAPI?

Сообщение Anonymous »

Мы пытаемся развернуть конечную точку FastAPI, которая называет функцию для динамического создания префектного развертывания (полученного из GIT), но его сбой.
ниже используется код. < /p>

Код: Выделить всё

async def create_deployment_for_batch_job(
batch_job_id: int,
project_id: int,
user_id: str,
query: str,
limit: int,
priority: str = "low",
cron_schedule: str = None,
):
"""Creates a Prefect deployment for a batch job"""
print("start")
deployment_name = f"batch_job_{batch_job_id}"
print("deployment_name")
# Create deployment that will call the endpoint
deployment = await flow.from_source(
source=git_repo,
entrypoint="backend/app/api/v1/endpoints/batch.py:run_batch_pipeline_flow",

)
await deployment.deploy(
name=deployment_name,
work_pool_name="my-local-pool",
parameters={
"query": query,
"limit": limit,
"priority": priority,
"user_id": user_id,
"project_id": project_id,
"batch_job_id": batch_job_id
},
# schedule=CronSchedule(cron=cron_schedule, timezone="UTC") if cron_schedule else None,
tags=["batch-pipeline", "prefect-git-deployment"]
)
< /code>
Это вызывается в рамках ниже функции: < /p>
@router.post("/projects/{project_id}/batch_job_creation/")
async def create_batch_job_publication_retrieval(
project_id: int,
query: str,
limit: int ,
priority: str,
user: CurrentUser,
cron_job_schedule: Optional[str] = None,
):
"""
Create a new batch job for publication retrieval.
"""
user_id, _ = user
batch_service = BatchService()
try:
match = re.search(r'\("(\d{4})"\[PDAT\] *: *"(\d{4})"\[PDAT\]\)', query)
if match:
start_date = match.group(1)
end_date = match.group(2)
else:
start_date = None
end_date = None
#started_at = time.time()
current_year = datetime.now(timezone.utc).year
start_date = start_date or str(current_year - 5)  # Default to 5 years ago
end_date = end_date or str(current_year)          # Default to current year

batch_job_id = await batch_service.create_batch_job(project_id=project_id, search_query=query, start_date=start_date, end_date=end_date,
max_publications=limit, priority=priority, user_id=user_id, cron_job_schedule = cron_job_schedule)

# TODO: This functionality works fine when run using from a standalone script [deployment_test.py in tests] but is failing here. Need to Debug.
# Create Prefect deployment
print("passed batch job", batch_job_id)
await create_deployment_for_batch_job(
batch_job_id=batch_job_id,
project_id=project_id,
user_id=user_id,
query=query,
limit=limit,
priority=priority,
cron_schedule=cron_job_schedule
)
< /code>
Приведенный выше код дает эти ошибки < /p>

не может быть расклею Coroutine Coroutine Coroutine Object не имеет функции
«Развертывание» (если я удаляю await with flow_from_source
function не может вызвать Asyncio в рамках выполнения, когда I
не может вызвать Asyncio в рамках выполнения (когда I
не может вызвать Asyncio в рамках работы. asyncio.run ()) < /p>
< /blockquote>
также попытался сделать это синхронной функцией (create_deployment_for_batch_job
) и попытка с помощью create_task , а затем ждать эту задачу.
Пробое эти ссылки: и несколько, но без доступности.>

Подробнее здесь: https://stackoverflow.com/questions/795 ... w-within-a
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»