Я пробовал использовать Prefect с проектом Fastapi. Затем, когда я обновил журналы и повторно развернул репозиторий, а также развертывания и потоки Prefect. Он запускается и отображает журналы (по сути, Prefect по-прежнему указывает на более старый снимок репозитория github. Я попробовал реализовать несколько шагов, например удаление push:null из файла .yaml. Теперь я упомянул prefect.yaml, создал функцию репозитория github для хранения и один из потоков Prefect. Тем не менее он указывает на более старый экземпляр. Пожалуйста, предложите что-нибудь.
prefect.yaml файл
# Prefect Deployment Configuration
# This file defines how flows are deployed and scheduled
# Prefect server/cloud configuration
name: parliament-explorer
prefect-version: 3.4.24
# Build configuration
build: null
# Pull configuration (for retrieving flow code)
pull:
- prefect.deployments.steps.set_working_directory:
directory: "/opt/truecivic"
# Push configuration (for storing flow code)
# push:
# - prefect.deployments.steps.push_code_to_block:
# block_name: truecivic-repo
# Deployment definitions
deployments:
# Hourly bill fetching
- name: fetch-bills-hourly
entrypoint: src/prefect_flows/bill_flows.py:fetch_latest_bills_flow
work_pool:
name: default-agent-pool
storage:
block: truecivic-repo
schedule:
cron: "0 * * * *" # Every hour at minute 0
timezone: "UTC"
parameters:
limit: 50
tags:
- production
- bills
- hourly
description: "Fetch latest 50 bills every hour to keep data fresh"
# Daily bill fetching
- name: fetch-bills-daily
entrypoint: src/prefect_flows/bill_flows.py:fetch_latest_bills_flow
work_pool:
name: default-agent-pool
storage:
block: truecivic-repo
schedule:
cron: "0 2 * * *" # Daily at 2:00 AM UTC
timezone: "UTC"
parameters:
limit: 100
tags:
- production
- bills
- daily
description: "Fetch latest 100 bills daily at 2 AM UTC"
# Daily monitoring
- name: monitor-daily
entrypoint: src/prefect_flows/bill_flows.py:monitor_fetch_operations_flow
work_pool:
name: default-agent-pool
storage:
block: truecivic-repo
schedule:
cron: "0 3 * * *" # Daily at 3:00 AM UTC (after fetch)
timezone: "UTC"
parameters:
hours_back: 24
tags:
- production
- monitoring
- daily
description: "Monitor fetch operations daily at 3 AM UTC"
# Hourly vote ingestion (lightweight)
- name: fetch-votes-hourly
entrypoint: src/prefect_flows/vote_with_records_flow.py:fetch_latest_votes_hourly_flow
work_pool:
name: default-agent-pool
storage:
block: truecivic-repo
schedule:
cron: "10 * * * *" # Every hour at minute 10 (offset from bills)
timezone: "UTC"
parameters:
limit: 40
start_date: null
tags:
- production
- votes
- hourly
description: "Fetch the latest votes each hour with conservative limits"
# Daily vote ingestion with records
- name: fetch-votes-daily
entrypoint: src/prefect_flows/vote_with_records_flow.py:fetch_votes_with_records_flow
work_pool:
name: default-agent-pool
storage:
block: truecivic-repo
schedule:
cron: "40 2 * * *" # Daily at 02:40 AM UTC
timezone: "UTC"
parameters:
parliament: 45
session: null
limit: 150
fetch_records: true
start_date: null
tags:
- production
- votes
- daily
description: "Daily vote ingestion with full record fetch and rate-aware limits"
# Hourly debate ingestion (recent snapshots)
- name: fetch-debates-hourly
entrypoint: src/prefect_flows/debate_flow.py:fetch_recent_debates_flow
work_pool:
name: default-agent-pool
storage:
block: truecivic-repo
schedule:
cron: "20 * * * *" # Every hour at minute 20
timezone: "UTC"
parameters:
limit: 20
tags:
- production
- debates
- hourly
description: "Fetch the most recent debates hourly to keep speech data fresh"
# Daily debates with full speech capture
- name: fetch-debates-daily
entrypoint: src/prefect_flows/debate_flow.py:fetch_debates_with_speeches_flow
work_pool:
name: default-agent-pool
storage:
block: truecivic-repo
schedule:
cron: "15 1 * * *" # Daily at 01:15 AM UTC
timezone: "UTC"
parameters:
limit: 12
parliament: null
session: null
tags:
- production
- debates
- daily
description: "Daily sweep of debates with speech extraction"
# Daily committee ingestion and targeted meetings
- name: fetch-committees-daily
entrypoint: src/prefect_flows/committee_flow.py:fetch_all_committees_daily_flow
work_pool:
name: default-agent-pool
storage:
block: truecivic-repo
schedule:
cron: "0 4 * * *" # Daily at 04:00 AM UTC
timezone: "UTC"
tags:
- production
- committees
- daily
description: "Refresh committee metadata daily"
- name: fetch-committee-meetings-daily
entrypoint: src/prefect_flows/committee_flow.py:fetch_top_committees_meetings_daily_flow
work_pool:
name: default-agent-pool
storage:
block: truecivic-repo
schedule:
cron: "30 4 * * *" # Daily at 04:30 AM UTC
timezone: "UTC"
tags:
- production
- committees
- daily
description: "Fetch recent meetings for key committees each day"
# On-demand parliament/session backfill
- name: backfill-parliament-session
entrypoint: src/prefect_flows/bill_flows.py:fetch_parliament_session_bills_flow
work_pool:
name: default-agent-pool
storage:
block: truecivic-repo
# No schedule - run manually
tags:
- backfill
- bills
- manual
description: "Backfill all bills from a specific parliament and session (manual trigger)"
# On-demand 2025 scoped backfill (bills, votes, debates, committees)
- name: backfill-2025-sample
entrypoint: src/prefect_flows/backfill_flow.py:backfill_2025_flow
work_pool:
name: default-agent-pool
storage:
block: truecivic-repo
# No schedule - triggered manually when needed
parameters:
bill_limit: null
vote_limit: null
debate_limit: null
committee_limit: null
meetings_limit: null
politician_limit: null
parliament: null
session: null
full: true
start_year: null
end_year: null
tags:
- backfill
- manual
- 2025
description: "Run the scoped 2025 backfill (adjust limits when triggering manually)"
# Decade-scale backfill convenience deployment
- name: backfill-decade
entrypoint: src/prefect_flows/backfill_flow.py:backfill_decade_flow
work_pool:
name: default-agent-pool
storage:
block: truecivic-repo
parameters:
bill_limit: null
vote_limit: null
debate_limit: null
committee_limit: null
meetings_limit: null
politician_limit: null
parliament: null
session: null
tags:
- backfill
- manual
- decade
description: "Backfill a decade of parliamentary data (2015-2025) with safe throttles"
# Every-six-hour document embedding generation
- name: generate-document-embeddings
entrypoint: src/prefect_flows/embedding_flow.py:generate_document_embeddings_flow
work_pool:
name: default-agent-pool
storage:
block: truecivic-repo
schedule:
cron: "0 */6 * * *" # Every 6 hours at the top of the hour
timezone: "UTC"
parameters:
limit: 200
entity_types:
- speech
- debate
language: "en"
content_type: null
max_words_per_chunk: 750
tags:
- production
- embeddings
- every-six-hours
description: "Generate embeddings for newly ingested documents every 6 hours"
# Manual Alembic migration runner
- name: alembic-upgrade
entrypoint: src/prefect_flows/alembic_flow.py:alembic_upgrade_flow
work_pool:
name: default-agent-pool
storage:
block: truecivic-repo
parameters:
revision: "head"
config_path: "alembic.ini"
sql: false
tag: null
tags:
- maintenance
- migrations
- manual
description: "Run Alembic migrations from within the Prefect/Railway environment (manual trigger)"
create_github_storage.py
from prefect_github import GitHubRepository
from prefect.utilities.asyncutils import sync_compatible
# Define the GitHub block
github_block = GitHubRepository(
repository_url="https://github.com/needa01/truecivic.git",
reference="main",
)
def create_github_storage_block(block_name: str = "truecivic-repo", overwrite: bool = True):
"""
Safely create the GitHubRepository block synchronously.
"""
try:
sync_save = sync_compatible(github_block.save)
# Call the wrapped save function
sync_save(name=block_name, overwrite=overwrite)
print(f"GitHub storage block '{block_name}' created successfully!")
except Exception as e:
print(f"GitHub storage block creation failed: {e}")
# ONLY create block if this script is run directly
if __name__ == "__main__":
create_github_storage_block()
backfill_flow.py
"""Prefect flow wrapper around the 2025 backfill helper.
Provides a deployment-friendly interface so we can trigger the backfill from
Prefect Cloud / Railway workers with custom limits.
"""
try:
from .create_github_storage import github_block
except Exception as e:
print("GitHub storage block not created yet:", e)
import logging
from argparse import Namespace
from typing import Any, Optional
from prefect import flow, get_run_logger
from scripts.backfill_2025_sample import backfill_2025
def _coerce_to_int(logger: logging.Logger, name: str, value: Any) -> Optional[int]:
"""Best-effort conversion of dynamic Prefect parameters to integers."""
if value is None:
return None
if isinstance(value, int):
return value
if isinstance(value, float) and value.is_integer():
return int(value)
if isinstance(value, str):
cleaned = value.strip()
if cleaned == "":
logger.warning("Backfill parameter %s was an empty string; treating as None", name)
return None
try:
return int(cleaned)
except ValueError:
logger.warning("Backfill parameter %s='%s' is not an integer; treating as None", name, value)
return None
logger.warning("Backfill parameter %s of type %s is unsupported; treating as None", name, type(value))
return None
def _build_args(
logger: logging.Logger,
*,
bill_limit: Any,
vote_limit: Any,
debate_limit: Any,
committee_limit: Any,
meetings_limit: Any,
politician_limit: Any,
parliament: Any,
session: Any,
full: bool,
start_year: Any,
end_year: Any,
) -> Namespace:
return Namespace(
bill_limit=_coerce_to_int(logger, "bill_limit", bill_limit),
vote_limit=_coerce_to_int(logger, "vote_limit", vote_limit),
debate_limit=_coerce_to_int(logger, "debate_limit", debate_limit),
committee_limit=_coerce_to_int(logger, "committee_limit", committee_limit),
meetings_limit=_coerce_to_int(logger, "meetings_limit", meetings_limit),
politician_limit=_coerce_to_int(logger, "politician_limit", politician_limit),
parliament=_coerce_to_int(logger, "parliament", parliament),
session=_coerce_to_int(logger, "session", session),
full=bool(full),
start_year=_coerce_to_int(logger, "start_year", start_year),
end_year=_coerce_to_int(logger, "end_year", end_year),
)
@flow(name="backfill-2025")
async def backfill_2025_flow(
bill_limit: Optional[int] = None,
vote_limit: Optional[int] = None,
debate_limit: Optional[int] = None,
committee_limit: Optional[int] = None,
meetings_limit: Optional[int] = None,
politician_limit: Optional[int] = None,
parliament: Any = None,
session: Any = None,
full: bool = True,
start_year: Optional[int] = None,
end_year: Optional[int] = None,
):
"""
Prefect flow entrypoint for the 2025 backfill.
Args mirror the CLI helper so we can reuse logic across both entrypoints.
"""
logger = get_run_logger()
def _coerce_to_int(name: str, value: Any) -> Optional[int]:
if value is None:
return None
if isinstance(value, int):
return value
if isinstance(value, float) and value.is_integer():
return int(value)
if isinstance(value, str):
cleaned = value.strip()
if cleaned == "":
logger.warning("Backfill parameter %s was an empty string; treating as None", name)
return None
try:
return int(cleaned)
except ValueError:
logger.warning("Backfill parameter %s='%s' is not an integer; treating as None", name, value)
return None
logger.warning("Backfill parameter %s of type %s is unsupported; treating as None", name, type(value))
return None
logger.info(
"Starting backfill with parameters: full=%s, start_year=%s, end_year=%s, bill_limit=%s, "
"vote_limit=%s, debate_limit=%s, committee_limit=%s, meetings_limit=%s, politician_limit=%s, "
"parliament=%s, session=%s",
full,
start_year,
end_year,
bill_limit,
vote_limit,
debate_limit,
committee_limit,
meetings_limit,
politician_limit,
parliament,
session,
)
args = _build_args(
logger,
bill_limit=bill_limit,
vote_limit=vote_limit,
debate_limit=debate_limit,
committee_limit=committee_limit,
meetings_limit=meetings_limit,
politician_limit=politician_limit,
parliament=parliament,
session=session,
full=full,
start_year=start_year,
end_year=end_year,
)
logger.info(
"Sanitized backfill args: full=%s, start_year=%s, end_year=%s, bill_limit=%s, vote_limit=%s, "
"debate_limit=%s, committee_limit=%s, meetings_limit=%s, politician_limit=%s, parliament=%s, session=%s",
args.full,
args.start_year,
args.end_year,
args.bill_limit,
args.vote_limit,
args.debate_limit,
args.committee_limit,
args.meetings_limit,
args.politician_limit,
args.parliament,
args.session,
)
return await backfill_2025(args)
@flow(name="backfill-decade")
async def backfill_decade_flow(
bill_limit: Optional[int] = None,
vote_limit: Optional[int] = None,
debate_limit: Optional[int] = None,
committee_limit: Optional[int] = None,
meetings_limit: Optional[int] = None,
politician_limit: Optional[int] = None,
parliament: Any = None,
session: Any = None,
):
"""Convenience wrapper to backfill the last decade (2015-2025)."""
logger = get_run_logger()
logger.info("Launching decade backfill (2015-2025) with full dataset mode enabled")
args = _build_args(
logger,
bill_limit=bill_limit,
vote_limit=vote_limit,
debate_limit=debate_limit,
committee_limit=committee_limit,
meetings_limit=meetings_limit,
politician_limit=politician_limit,
parliament=parliament,
session=session,
full=True,
start_year=2015,
end_year=2025,
)
logger.info(
"Sanitized decade backfill args: bill_limit=%s, vote_limit=%s, debate_limit=%s, committee_limit=%s, "
"meetings_limit=%s, politician_limit=%s, parliament=%s, session=%s",
args.bill_limit,
args.vote_limit,
args.debate_limit,
args.committee_limit,
args.meetings_limit,
args.politician_limit,
args.parliament,
args.session,
)
return await backfill_2025(args)
Подробнее здесь: https://stackoverflow.com/questions/797 ... d-edits-in