У меня есть бот aiogram с рабочим и планировщиком задач (taskiq redis с Taskiq_aiogram), в обработчике сообщений я запускаю задачу с динамическим интервалом планирования, через некоторое время (30-40 минут) я получаю: redis.Exceptions.ResponseError: NOGROUP Нет такого ключа "taskiq" или группы потребителей "taskiq" в XREADGROUP с опцией GROUP, но когда я запускаю запланированные задачи со статическим интервалом (просто декоратор для асинхронной функции, которая начинает работать после запуска работника и планировщика), все работает без ошибок и задачи выполняются с правильным интервалом.
здесь отbrocker.py:
from typing import Annotated
from taskiq_redis import (
RedisAsyncResultBackend, RedisStreamBroker, RedisScheduleSource
)
from taskiq import TaskiqScheduler, Context, TaskiqDepends
from redis.asyncio import Redis
import taskiq_aiogram
from taskiq.events import TaskiqEvents
from bot import redis_client
from bot.config import REDIS_CLIENT_HOST, REDIS_CLIENT_PORT, REDIS_CLIENT_DB
redis_url = f"redis://{REDIS_CLIENT_HOST}:{REDIS_CLIENT_PORT}/{REDIS_CLIENT_DB}"
result_backend = RedisAsyncResultBackend(
redis_url=redis_url,
)
broker = RedisStreamBroker(
url=redis_url,
).with_result_backend(result_backend)
# ! run: taskiq worker bot.tasks.broker:broker
taskiq_aiogram.init(
broker,
"bot.main:dp",
"bot.main:bot",
)
# define sources
dynamic_schedule_source = RedisScheduleSource(
url=redis_url
)
scheduler = TaskiqScheduler(
broker=broker,
sources=[
dynamic_schedule_source
],
)
# ! run: taskiq scheduler bot.tasks.broker:scheduler --skip-first-run --update-interval 5
# * I also tried to fix error with this
@broker.on_event(TaskiqEvents.WORKER_STARTUP)
async def on_worker_startup(context: Annotated[Context, TaskiqDepends()]):
r = Redis.from_url(redis_url)
try:
await r.xgroup_create(
name="taskiq",
groupname="taskiq",
id="$",
mkstream=True,
)
except Exception as e:
print(e)
if "BUSYGROUP" not in str(e):
raise
# help funcs
async def delete_schedule(id_: str):
await dynamic_schedule_source.delete_schedule(id_)
async def delete_all_schedules():
schedules = await dynamic_schedule_source.get_schedules()
for schedule in schedules:
await delete_schedule(schedule.schedule_id)
async def get_all_schedules_ids():
schedules = await dynamic_schedule_source.get_schedules()
return [schd.schedule_id for schd in schedules]
async def show_all_schedules():
schedules = await dynamic_schedule_source.get_schedules()
for schedule in schedules:
print(schedule)
async def check_schedule(schedule_id: str) -> bool:
key = f"{dynamic_schedule_source._prefix}:data:{schedule_id}"
schedule = await redis_client.client.get(key)
return schedule is not None
tasks.py:
from typing import Annotated, Awaitable, Any
from datetime import datetime, timedelta
from time import perf_counter, time
from taskiq import TaskiqDepends, Context
from aiogram import Bot
from bot.tasks import broker as tasks_broker
import bot.database as db
from bot.database.engine import session as db_session
from bot import redis_client
from bot.weather_parse import get_future_weather
from bot import texts
from bot.config import ADMIN_ID
@tasks_broker.broker.task
async def send_time_diff(start_seconds: float, user_id: int, context: Annotated[Context, TaskiqDepends()], bot: Bot = TaskiqDepends(), ):
schedule_id = context.message.labels["schedule_id"]
text = f"{(time() - start_seconds) / 60} {schedule_id}"
await bot.send_message(user_id, text, disable_notification=True)
обработчик сообщений бота, в котором я запускаю динамическое расписание интервалов:
start_seconds = time()
task = await tasks.send_time_diff.schedule_by_interval(
source=dynamic_schedule_source,
interval=WEATHER_TASK_SEND_INTERVAL,
start_seconds=start_seconds,
user_id=user_id
)
для запуска бота, Taskiq и Redis я использую docker Compose (возможно, это моя ошибка)
docker-compose.yml:
version: '3.8'
services:
bot:
build: .
depends_on:
pg:
condition: service_healthy
environment:
REDIS_CLIENT_HOST: redis
REDIS_CLIENT_PORT: 6379
DB_HOST: pg
command: python -m bot.main
volumes:
- ./app:/bot
taskiq-worker:
build: .
command: taskiq worker bot.tasks.broker:broker
depends_on:
- redis
environment:
REDIS_CLIENT_HOST: redis
REDIS_CLIENT_PORT: 6379
DB_HOST: pg
volumes:
- ./app:/bot
taskiq-scheduler:
build: .
command: taskiq scheduler bot.tasks.broker:scheduler --skip-first-run --update-interval 5
depends_on:
- redis
environment:
REDIS_CLIENT_HOST: redis
REDIS_CLIENT_PORT: 6379
DB_HOST: pg
volumes:
- ./app:/bot
pg:
image: postgres:16.4
environment:
POSTGRES_DB: weather_bot_db
POSTGRES_USER: postgres
POSTGRES_PASSWORD: root
volumes:
- pg_data:/var/lib/postgresql/data/
ports:
- "5433:5432"
healthcheck:
test: [ "CMD", "pg_isready", "-q", "-d", "weather_bot_db", "-U", "postgres" ]
interval: 10s
timeout: 5s
retries: 2
pgadmin:
image: dpage/pgadmin4:latest
environment:
PGADMIN_DEFAULT_EMAIL: admin@admin.org
PGADMIN_DEFAULT_PASSWORD: admin
PGADMIN_CONFIG_SERVER_MODE: 'False'
ports:
- "5050:80"
depends_on:
pg:
condition: service_healthy
redis:
image: "redis:alpine"
ports:
- "6379:6379"
volumes:
- redis_data:/data
volumes:
pg_data:
redis_data:
Dockerfile:
FROM python:3.12-alpine
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
WORKDIR /app
COPY . .
RUN pip install --no-cache-dir -r requirements.txt
COPY . /app
Подробнее здесь: https://stackoverflow.com/questions/798 ... -schedules
Ошибка в Taskiq Python при выполнении расписаний динамических интервалов ⇐ Python
Программы на Python
1767291771
Anonymous
У меня есть бот aiogram с рабочим и планировщиком задач (taskiq redis с Taskiq_aiogram), в обработчике сообщений я запускаю задачу с динамическим интервалом планирования, через некоторое время (30-40 минут) я получаю: redis.Exceptions.ResponseError: NOGROUP Нет такого ключа "taskiq" или группы потребителей "taskiq" в XREADGROUP с опцией GROUP, но когда я запускаю запланированные задачи со статическим интервалом (просто декоратор для асинхронной функции, которая начинает работать после запуска работника и планировщика), все работает без ошибок и задачи выполняются с правильным интервалом.
здесь отbrocker.py:
from typing import Annotated
from taskiq_redis import (
RedisAsyncResultBackend, RedisStreamBroker, RedisScheduleSource
)
from taskiq import TaskiqScheduler, Context, TaskiqDepends
from redis.asyncio import Redis
import taskiq_aiogram
from taskiq.events import TaskiqEvents
from bot import redis_client
from bot.config import REDIS_CLIENT_HOST, REDIS_CLIENT_PORT, REDIS_CLIENT_DB
redis_url = f"redis://{REDIS_CLIENT_HOST}:{REDIS_CLIENT_PORT}/{REDIS_CLIENT_DB}"
result_backend = RedisAsyncResultBackend(
redis_url=redis_url,
)
broker = RedisStreamBroker(
url=redis_url,
).with_result_backend(result_backend)
# ! run: taskiq worker bot.tasks.broker:broker
taskiq_aiogram.init(
broker,
"bot.main:dp",
"bot.main:bot",
)
# define sources
dynamic_schedule_source = RedisScheduleSource(
url=redis_url
)
scheduler = TaskiqScheduler(
broker=broker,
sources=[
dynamic_schedule_source
],
)
# ! run: taskiq scheduler bot.tasks.broker:scheduler --skip-first-run --update-interval 5
# * I also tried to fix error with this
@broker.on_event(TaskiqEvents.WORKER_STARTUP)
async def on_worker_startup(context: Annotated[Context, TaskiqDepends()]):
r = Redis.from_url(redis_url)
try:
await r.xgroup_create(
name="taskiq",
groupname="taskiq",
id="$",
mkstream=True,
)
except Exception as e:
print(e)
if "BUSYGROUP" not in str(e):
raise
# help funcs
async def delete_schedule(id_: str):
await dynamic_schedule_source.delete_schedule(id_)
async def delete_all_schedules():
schedules = await dynamic_schedule_source.get_schedules()
for schedule in schedules:
await delete_schedule(schedule.schedule_id)
async def get_all_schedules_ids():
schedules = await dynamic_schedule_source.get_schedules()
return [schd.schedule_id for schd in schedules]
async def show_all_schedules():
schedules = await dynamic_schedule_source.get_schedules()
for schedule in schedules:
print(schedule)
async def check_schedule(schedule_id: str) -> bool:
key = f"{dynamic_schedule_source._prefix}:data:{schedule_id}"
schedule = await redis_client.client.get(key)
return schedule is not None
tasks.py:
from typing import Annotated, Awaitable, Any
from datetime import datetime, timedelta
from time import perf_counter, time
from taskiq import TaskiqDepends, Context
from aiogram import Bot
from bot.tasks import broker as tasks_broker
import bot.database as db
from bot.database.engine import session as db_session
from bot import redis_client
from bot.weather_parse import get_future_weather
from bot import texts
from bot.config import ADMIN_ID
@tasks_broker.broker.task
async def send_time_diff(start_seconds: float, user_id: int, context: Annotated[Context, TaskiqDepends()], bot: Bot = TaskiqDepends(), ):
schedule_id = context.message.labels["schedule_id"]
text = f"{(time() - start_seconds) / 60} {schedule_id}"
await bot.send_message(user_id, text, disable_notification=True)
обработчик сообщений бота, в котором я запускаю динамическое расписание интервалов:
start_seconds = time()
task = await tasks.send_time_diff.schedule_by_interval(
source=dynamic_schedule_source,
interval=WEATHER_TASK_SEND_INTERVAL,
start_seconds=start_seconds,
user_id=user_id
)
для запуска бота, Taskiq и Redis я использую docker Compose (возможно, это моя ошибка)
docker-compose.yml:
version: '3.8'
services:
bot:
build: .
depends_on:
pg:
condition: service_healthy
environment:
REDIS_CLIENT_HOST: redis
REDIS_CLIENT_PORT: 6379
DB_HOST: pg
command: python -m bot.main
volumes:
- ./app:/bot
taskiq-worker:
build: .
command: taskiq worker bot.tasks.broker:broker
depends_on:
- redis
environment:
REDIS_CLIENT_HOST: redis
REDIS_CLIENT_PORT: 6379
DB_HOST: pg
volumes:
- ./app:/bot
taskiq-scheduler:
build: .
command: taskiq scheduler bot.tasks.broker:scheduler --skip-first-run --update-interval 5
depends_on:
- redis
environment:
REDIS_CLIENT_HOST: redis
REDIS_CLIENT_PORT: 6379
DB_HOST: pg
volumes:
- ./app:/bot
pg:
image: postgres:16.4
environment:
POSTGRES_DB: weather_bot_db
POSTGRES_USER: postgres
POSTGRES_PASSWORD: root
volumes:
- pg_data:/var/lib/postgresql/data/
ports:
- "5433:5432"
healthcheck:
test: [ "CMD", "pg_isready", "-q", "-d", "weather_bot_db", "-U", "postgres" ]
interval: 10s
timeout: 5s
retries: 2
pgadmin:
image: dpage/pgadmin4:latest
environment:
PGADMIN_DEFAULT_EMAIL: admin@admin.org
PGADMIN_DEFAULT_PASSWORD: admin
PGADMIN_CONFIG_SERVER_MODE: 'False'
ports:
- "5050:80"
depends_on:
pg:
condition: service_healthy
redis:
image: "redis:alpine"
ports:
- "6379:6379"
volumes:
- redis_data:/data
volumes:
pg_data:
redis_data:
Dockerfile:
FROM python:3.12-alpine
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
WORKDIR /app
COPY . .
RUN pip install --no-cache-dir -r requirements.txt
COPY . /app
Подробнее здесь: [url]https://stackoverflow.com/questions/79858728/error-in-taskiq-python-while-executing-dynamic-interval-schedules[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия