Ошибка в Taskiq Python при выполнении расписаний динамических интерваловPython

Программы на Python
Ответить
Anonymous
 Ошибка в Taskiq Python при выполнении расписаний динамических интервалов

Сообщение Anonymous »

У меня есть бот aiogram с рабочим и планировщиком задач (taskiq redis с Taskiq_aiogram), в обработчике сообщений я запускаю задачу с динамическим интервалом планирования, через некоторое время (30-40 минут) я получаю: redis.Exceptions.ResponseError: NOGROUP Нет такого ключа "taskiq" или группы потребителей "taskiq" в XREADGROUP с опцией GROUP, но когда я запускаю запланированные задачи со статическим интервалом (просто декоратор для асинхронной функции, которая начинает работать после запуска работника и планировщика), все работает без ошибок и задачи выполняются с правильным интервалом.
здесь отbrocker.py:
from typing import Annotated

from taskiq_redis import (
RedisAsyncResultBackend, RedisStreamBroker, RedisScheduleSource
)
from taskiq import TaskiqScheduler, Context, TaskiqDepends
from redis.asyncio import Redis
import taskiq_aiogram
from taskiq.events import TaskiqEvents

from bot import redis_client

from bot.config import REDIS_CLIENT_HOST, REDIS_CLIENT_PORT, REDIS_CLIENT_DB

redis_url = f"redis://{REDIS_CLIENT_HOST}:{REDIS_CLIENT_PORT}/{REDIS_CLIENT_DB}"

result_backend = RedisAsyncResultBackend(
redis_url=redis_url,
)

broker = RedisStreamBroker(
url=redis_url,
).with_result_backend(result_backend)

# ! run: taskiq worker bot.tasks.broker:broker

taskiq_aiogram.init(
broker,
"bot.main:dp",
"bot.main:bot",
)

# define sources
dynamic_schedule_source = RedisScheduleSource(
url=redis_url
)

scheduler = TaskiqScheduler(
broker=broker,
sources=[
dynamic_schedule_source
],
)
# ! run: taskiq scheduler bot.tasks.broker:scheduler --skip-first-run --update-interval 5

# * I also tried to fix error with this
@broker.on_event(TaskiqEvents.WORKER_STARTUP)
async def on_worker_startup(context: Annotated[Context, TaskiqDepends()]):
r = Redis.from_url(redis_url)

try:
await r.xgroup_create(
name="taskiq",
groupname="taskiq",
id="$",
mkstream=True,
)
except Exception as e:
print(e)
if "BUSYGROUP" not in str(e):
raise

# help funcs
async def delete_schedule(id_: str):
await dynamic_schedule_source.delete_schedule(id_)

async def delete_all_schedules():
schedules = await dynamic_schedule_source.get_schedules()
for schedule in schedules:
await delete_schedule(schedule.schedule_id)

async def get_all_schedules_ids():
schedules = await dynamic_schedule_source.get_schedules()
return [schd.schedule_id for schd in schedules]

async def show_all_schedules():
schedules = await dynamic_schedule_source.get_schedules()
for schedule in schedules:
print(schedule)

async def check_schedule(schedule_id: str) -> bool:
key = f"{dynamic_schedule_source._prefix}:data:{schedule_id}"
schedule = await redis_client.client.get(key)

return schedule is not None

tasks.py:
from typing import Annotated, Awaitable, Any
from datetime import datetime, timedelta
from time import perf_counter, time

from taskiq import TaskiqDepends, Context
from aiogram import Bot

from bot.tasks import broker as tasks_broker
import bot.database as db
from bot.database.engine import session as db_session
from bot import redis_client

from bot.weather_parse import get_future_weather
from bot import texts
from bot.config import ADMIN_ID

@tasks_broker.broker.task
async def send_time_diff(start_seconds: float, user_id: int, context: Annotated[Context, TaskiqDepends()], bot: Bot = TaskiqDepends(), ):
schedule_id = context.message.labels["schedule_id"]

text = f"{(time() - start_seconds) / 60} {schedule_id}"
await bot.send_message(user_id, text, disable_notification=True)

обработчик сообщений бота, в котором я запускаю динамическое расписание интервалов:
start_seconds = time()

task = await tasks.send_time_diff.schedule_by_interval(
source=dynamic_schedule_source,
interval=WEATHER_TASK_SEND_INTERVAL,
start_seconds=start_seconds,
user_id=user_id
)

для запуска бота, Taskiq и Redis я использую docker Compose (возможно, это моя ошибка)
docker-compose.yml:

version: '3.8'

services:
bot:
build: .
depends_on:
pg:
condition: service_healthy
environment:
REDIS_CLIENT_HOST: redis
REDIS_CLIENT_PORT: 6379
DB_HOST: pg

command: python -m bot.main

volumes:
- ./app:/bot

taskiq-worker:
build: .
command: taskiq worker bot.tasks.broker:broker
depends_on:
- redis
environment:
REDIS_CLIENT_HOST: redis
REDIS_CLIENT_PORT: 6379
DB_HOST: pg

volumes:
- ./app:/bot

taskiq-scheduler:
build: .
command: taskiq scheduler bot.tasks.broker:scheduler --skip-first-run --update-interval 5
depends_on:
- redis
environment:
REDIS_CLIENT_HOST: redis
REDIS_CLIENT_PORT: 6379
DB_HOST: pg
volumes:
- ./app:/bot

pg:
image: postgres:16.4
environment:
POSTGRES_DB: weather_bot_db
POSTGRES_USER: postgres
POSTGRES_PASSWORD: root
volumes:
- pg_data:/var/lib/postgresql/data/

ports:
- "5433:5432"

healthcheck:
test: [ "CMD", "pg_isready", "-q", "-d", "weather_bot_db", "-U", "postgres" ]
interval: 10s
timeout: 5s
retries: 2

pgadmin:
image: dpage/pgadmin4:latest
environment:
PGADMIN_DEFAULT_EMAIL: admin@admin.org
PGADMIN_DEFAULT_PASSWORD: admin
PGADMIN_CONFIG_SERVER_MODE: 'False'

ports:
- "5050:80"
depends_on:
pg:
condition: service_healthy

redis:
image: "redis:alpine"
ports:
- "6379:6379"
volumes:
- redis_data:/data

volumes:
pg_data:
redis_data:

Dockerfile:
FROM python:3.12-alpine

ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1

WORKDIR /app

COPY . .

RUN pip install --no-cache-dir -r requirements.txt

COPY . /app


Подробнее здесь: https://stackoverflow.com/questions/798 ... -schedules
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»