from fastapi import FastAPI, HTTPException
import uuid
import asyncio
import random
import time
from loguru import logger
import redis
from contextlib import asynccontextmanager
# Redis connection
redis_client = redis.Redis(host='localhost', port=6381, db=1, decode_responses=True)
# Configuration
TASK_EXPIRATION_SECONDS = 3600 # 1 hour - adjust as needed
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Lifespan context manager for startup and shutdown events"""
logger.info("Starting FastAPI application")
try:
# Test Redis connection
redis_client.ping()
logger.success("Redis connection established")
except redis.ConnectionError:
logger.error("Failed to connect to Redis")
raise
yield
logger.info("Shutting down FastAPI application")
redis_client.close()
app = FastAPI(lifespan=lifespan)
# Synchronous blocking function (simulates long-running task)
def handle_process(task_id: str) -> None:
"""Simulate a blocking process that takes 5-15 seconds"""
processing_time = random.randint(5, 15)
logger.info(f"Task {task_id}: Starting handle_process, will take {processing_time} seconds")
# Simulate blocking work
time.sleep(processing_time)
logger.info(f"Task {task_id}: handle_process completed")
# Function that needs to handle the synchronous blocking function
async def run_task(task_id: str) -> None:
"""Run task asynchronously by handling the synchronous blocking function"""
logger.info(f"Task {task_id}: Starting run_task")
# Run the blocking function in a thread pool to avoid blocking the event loop
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, handle_process, task_id)
# Update task status to 'done' in Redis with expiration
redis_client.setex(f"task:{task_id}", TASK_EXPIRATION_SECONDS, "done")
logger.success(f"Task {task_id}: Completed and status updated to 'done' (expires in {TASK_EXPIRATION_SECONDS}s)")
@app.post("/task", response_model=dict)
async def create_task() -> dict:
"""Create a new task and return task ID"""
task_id = str(uuid.uuid4())
logger.info(f"Creating new task: {task_id}")
# Store initial task status in Redis WITH EXPIRATION
redis_client.setex(f"task:{task_id}", TASK_EXPIRATION_SECONDS, "processing")
logger.debug(f"Task {task_id}: Status set to 'processing' in Redis (expires in {TASK_EXPIRATION_SECONDS}s)")
# Start the task asynchronously without waiting for completion
asyncio.create_task(run_task(task_id))
logger.debug(f"Task {task_id}: run_task started asynchronously")
return {"task_id": task_id, "status": "processing"}
@app.get("/task/{task_id}", response_model=dict)
async def get_task_status(task_id: str) -> dict:
"""Get the status of a task by ID"""
logger.debug(f"Checking status for task: {task_id}")
# Get task status from Redis
status = redis_client.get(f"task:{task_id}")
if status is None:
logger.warning(f"Task {task_id}: Not found (may have expired)")
raise HTTPException(status_code=404, detail="Task not found or expired")
# Get TTL (Time To Live) to show when it will expire
ttl = redis_client.ttl(f"task:{task_id}")
logger.debug(f"Task {task_id}: Status is '{status}', expires in {ttl}s")
return {"task_id": task_id, "status": status, "expires_in_seconds": ttl}
if __name__ == "__main__":
import uvicorn
logger.info("Starting FastAPI server")
uvicorn.run(app, host="0.0.0.0", port=8000)
Я протестировал его, и он работает асинхронно. Например:
import asyncio
import httpx
import time
import random
# Here we test both task creation and monitoring concurrently
async def create_task(client, task_num):
"""Create a task and return the task_id"""
start = time.time()
response = await client.post("http://localhost:8000/task")
end = time.time()
task_id = response.json()["task_id"]
print(f"Task {task_num}: Created in {end-start:.2f}s - ID: {task_id}")
return task_id
async def monitor_task(client, task_id, task_num):
"""Monitor a task's status periodically until completion"""
print(f"Task {task_num}: Starting monitoring for {task_id}")
while True:
try:
response = await client.get(f"http://localhost:8000/task/{task_id}")
status_data = response.json()
status = status_data["status"]
expires_in = status_data["expires_in_seconds"]
print(f"Task {task_num}: Status '{status}', expires in {expires_in}s")
if status == "done":
print(f"Task {task_num}: COMPLETED!")
break
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
print(f"Task {task_num}: Not found or expired")
break
else:
print(f"Task {task_num}: Error checking status - {e}")
break
# Wait before checking again (random interval to avoid synchronized requests)
await asyncio.sleep(random.uniform(1.0, 3.0))
async def create_and_monitor_task(client, task_num):
"""Create a task and immediately start monitoring it"""
task_id = await create_task(client, task_num)
# Start monitoring without blocking task creation
asyncio.create_task(monitor_task(client, task_id, task_num))
return task_id
async def main():
print("Starting concurrent task creation and monitoring test...")
print("=" * 60)
async with httpx.AsyncClient() as client:
# Create and monitor multiple tasks concurrently
tasks = [create_and_monitor_task(client, i) for i in range(5)]
task_ids = await asyncio.gather(*tasks)
print("\n" + "=" * 60)
print(f"All {len(task_ids)} tasks created. Monitoring will continue...")
print("=" * 60)
# Keep the main coroutine running while monitoring continues
# Wait for a reasonable time to see most tasks complete
await asyncio.sleep(20)
print("\n" + "=" * 60)
print("Test completed - some tasks might still be running in background")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(main())
Моя цель — создать приложение Gradio, в котором я могу нажать кнопку, отправить задачу, а затем проверить ее статус. Я потратил много часов, но проверка статуса задачи всегда блокировалась, поэтому я не мог отправлять задачи одновременно.
Пока, наконец, мне это не удалось:
import gradio as gr
import httpx
import asyncio
import uuid
from typing import Dict, List
import time
# Store task information for the UI
task_states: Dict[str, dict] = {}
async def create_task():
"""Create a new task"""
try:
async with httpx.AsyncClient() as client:
response = await client.post("http://localhost:8000/task")
data = response.json()
task_id = data["task_id"]
# Store task info with a unique session ID
session_id = str(uuid.uuid4())
task_states[session_id] = {
"task_id": task_id,
"status": "processing",
"created_at": time.time(),
"updates": [f"Task created: {task_id}"]
}
# Start monitoring this task
asyncio.create_task(monitor_task(session_id, task_id))
return f"Task created successfully!\nTask ID: {task_id}", session_id
except Exception as e:
return f"Error creating task: {str(e)}", ""
async def monitor_task(session_id: str, task_id: str):
"""Monitor a task's status in the background"""
async with httpx.AsyncClient() as client:
while session_id in task_states:
try:
response = await client.get(f"http://localhost:8000/task/{task_id}")
status_data = response.json()
new_status = status_data["status"]
expires_in = status_data["expires_in_seconds"]
# Update task state
if session_id in task_states:
task_states[session_id]["status"] = new_status
task_states[session_id]["expires_in"] = expires_in
# Add update message if status changed
current_updates = task_states[session_id]["updates"]
if not current_updates or f"Status: {new_status}" not in current_updates[-1]:
update_msg = f"Status: {new_status}, Expires in: {expires_in}s"
task_states[session_id]["updates"].append(update_msg)
# Stop monitoring if task is done
if new_status == "done":
if session_id in task_states:
task_states[session_id]["updates"].append("✅ Task completed!")
break
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
if session_id in task_states:
task_states[session_id]["updates"].append("❌ Task not found or expired")
break
else:
if session_id in task_states:
task_states[session_id]["updates"].append(f"❌ Error: {e}")
break
except Exception as e:
if session_id in task_states:
task_states[session_id]["updates"].append(f"❌ Monitoring error: {e}")
break
await asyncio.sleep(2) # Check every 2 seconds
def get_task_updates(session_id: str):
"""Get current updates for a task"""
if not session_id or session_id not in task_states:
return "No active task. Please create a new task first."
task_info = task_states[session_id]
updates = task_info["updates"]
# Format output with timestamps
output = []
for i, update in enumerate(updates):
output.append(f"{i+1}. {update}")
return "\n".join(output)
def clear_old_tasks():
"""Clean up old completed tasks to prevent memory leaks"""
current_time = time.time()
expired_sessions = []
for session_id, task_info in task_states.items():
# Remove tasks that were created more than 1 hour ago
if current_time - task_info["created_at"] > 3600:
expired_sessions.append(session_id)
for session_id in expired_sessions:
del task_states[session_id]
# Create the Gradio interface
with gr.Blocks(title="Task Manager") as demo:
gr.Markdown("# 🚀 Async Task Manager")
gr.Markdown("Create and monitor long-running tasks. Each browser session can run independent tasks!")
with gr.Row():
with gr.Column():
gr.Markdown("## Create New Task")
create_btn = gr.Button("🚀 Create New Task", variant="primary")
create_output = gr.Textbox(label="Task Creation Result", lines=3, interactive=False)
session_display = gr.Textbox(label="Session ID", visible=False)
with gr.Column():
gr.Markdown("## Task Progress")
updates_display = gr.Textbox(
label="Task Updates",
lines=10,
interactive=False,
placeholder="Task updates will appear here..."
)
refresh_btn = gr.Button("🔄 Refresh Status")
clear_btn = gr.Button("🗑️ Clear Completed Tasks")
gr.Markdown("---")
gr.Markdown("### How to use:")
gr.Markdown("- Click **Create New Task** to start a new background task")
gr.Markdown("- The status will automatically update every 2 seconds")
gr.Markdown("- Open this page in multiple browsers to run concurrent tasks")
gr.Markdown("- Each browser session manages its own task independently")
# Event handlers
create_btn.click(
fn=create_task,
outputs=[create_output, session_display]
)
def refresh_updates(session_id):
clear_old_tasks() # Clean up old tasks
return get_task_updates(session_id)
refresh_btn.click(
fn=refresh_updates,
inputs=[session_display],
outputs=[updates_display]
)
clear_btn.click(
fn=lambda: "Cleared completed tasks! Create a new task to start fresh.",
outputs=[updates_display]
)
# Auto-refresh
demo.load(
fn=lambda: "Welcome! Create a new task to get started.",
outputs=[updates_display]
)
if __name__ == "__main__":
# Start the Gradio app
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
show_error=True
)
и наконец! Я мог запустить проверку, не блокируя отправку задачи. Итак, теперь, если я открою это в двух браузерах
Я смогу создать новую задачу. И я вижу в серверной части, что статус задачи проверяется, не блокируя того же другого пользователя, делающего то же самое.
Единственное, чего мне не удалось сделать, это сделать так, чтобы текстовое поле справа (Обновления задачи) обновлялось само по себе. Мне всегда приходится нажимать «Обновить статус», чтобы получить статус.
Есть ли способ Gradio обновить какой-либо элемент (в данном случае текст update_display ), чтобы я мог видеть статус без необходимости нажимать кнопку?
У меня есть этот бэкэнд: [code]from fastapi import FastAPI, HTTPException import uuid import asyncio import random import time from loguru import logger import redis from contextlib import asynccontextmanager
# Function that needs to handle the synchronous blocking function async def run_task(task_id: str) -> None: """Run task asynchronously by handling the synchronous blocking function""" logger.info(f"Task {task_id}: Starting run_task")
# Run the blocking function in a thread pool to avoid blocking the event loop loop = asyncio.get_event_loop() await loop.run_in_executor(None, handle_process, task_id)
# Update task status to 'done' in Redis with expiration redis_client.setex(f"task:{task_id}", TASK_EXPIRATION_SECONDS, "done") logger.success(f"Task {task_id}: Completed and status updated to 'done' (expires in {TASK_EXPIRATION_SECONDS}s)")
@app.post("/task", response_model=dict) async def create_task() -> dict: """Create a new task and return task ID""" task_id = str(uuid.uuid4())
logger.info(f"Creating new task: {task_id}")
# Store initial task status in Redis WITH EXPIRATION redis_client.setex(f"task:{task_id}", TASK_EXPIRATION_SECONDS, "processing") logger.debug(f"Task {task_id}: Status set to 'processing' in Redis (expires in {TASK_EXPIRATION_SECONDS}s)")
# Start the task asynchronously without waiting for completion asyncio.create_task(run_task(task_id)) logger.debug(f"Task {task_id}: run_task started asynchronously")
@app.get("/task/{task_id}", response_model=dict) async def get_task_status(task_id: str) -> dict: """Get the status of a task by ID""" logger.debug(f"Checking status for task: {task_id}")
# Get task status from Redis status = redis_client.get(f"task:{task_id}")
if status is None: logger.warning(f"Task {task_id}: Not found (may have expired)") raise HTTPException(status_code=404, detail="Task not found or expired")
# Get TTL (Time To Live) to show when it will expire ttl = redis_client.ttl(f"task:{task_id}") logger.debug(f"Task {task_id}: Status is '{status}', expires in {ttl}s")
if __name__ == "__main__": import uvicorn logger.info("Starting FastAPI server") uvicorn.run(app, host="0.0.0.0", port=8000) [/code] Я протестировал его, и он работает асинхронно. Например: [code]import asyncio import httpx import time import random
# Here we test both task creation and monitoring concurrently
async def create_task(client, task_num): """Create a task and return the task_id""" start = time.time() response = await client.post("http://localhost:8000/task") end = time.time() task_id = response.json()["task_id"] print(f"Task {task_num}: Created in {end-start:.2f}s - ID: {task_id}") return task_id
async def monitor_task(client, task_id, task_num): """Monitor a task's status periodically until completion""" print(f"Task {task_num}: Starting monitoring for {task_id}")
while True: try: response = await client.get(f"http://localhost:8000/task/{task_id}") status_data = response.json() status = status_data["status"] expires_in = status_data["expires_in_seconds"]
print(f"Task {task_num}: Status '{status}', expires in {expires_in}s")
if status == "done": print(f"Task {task_num}: COMPLETED!") break
except httpx.HTTPStatusError as e: if e.response.status_code == 404: print(f"Task {task_num}: Not found or expired") break else: print(f"Task {task_num}: Error checking status - {e}") break
# Wait before checking again (random interval to avoid synchronized requests) await asyncio.sleep(random.uniform(1.0, 3.0))
async def create_and_monitor_task(client, task_num): """Create a task and immediately start monitoring it""" task_id = await create_task(client, task_num)
# Start monitoring without blocking task creation asyncio.create_task(monitor_task(client, task_id, task_num))
async with httpx.AsyncClient() as client: # Create and monitor multiple tasks concurrently tasks = [create_and_monitor_task(client, i) for i in range(5)] task_ids = await asyncio.gather(*tasks)
# Keep the main coroutine running while monitoring continues # Wait for a reasonable time to see most tasks complete await asyncio.sleep(20)
print("\n" + "=" * 60) print("Test completed - some tasks might still be running in background") print("=" * 60)
if __name__ == "__main__": asyncio.run(main()) [/code] Моя цель — создать приложение Gradio, в котором я могу нажать кнопку, отправить задачу, а затем проверить ее статус. Я потратил много часов, но проверка статуса задачи всегда блокировалась, поэтому я не мог отправлять задачи одновременно. Пока, наконец, мне это не удалось: [code]import gradio as gr import httpx import asyncio import uuid from typing import Dict, List import time
# Store task information for the UI task_states: Dict[str, dict] = {}
async def create_task(): """Create a new task""" try: async with httpx.AsyncClient() as client: response = await client.post("http://localhost:8000/task") data = response.json() task_id = data["task_id"]
# Store task info with a unique session ID session_id = str(uuid.uuid4()) task_states[session_id] = { "task_id": task_id, "status": "processing", "created_at": time.time(), "updates": [f"Task created: {task_id}"] }
# Start monitoring this task asyncio.create_task(monitor_task(session_id, task_id))
return f"Task created successfully!\nTask ID: {task_id}", session_id
except Exception as e: return f"Error creating task: {str(e)}", ""
async def monitor_task(session_id: str, task_id: str): """Monitor a task's status in the background""" async with httpx.AsyncClient() as client: while session_id in task_states: try: response = await client.get(f"http://localhost:8000/task/{task_id}") status_data = response.json() new_status = status_data["status"] expires_in = status_data["expires_in_seconds"]
# Update task state if session_id in task_states: task_states[session_id]["status"] = new_status task_states[session_id]["expires_in"] = expires_in
# Add update message if status changed current_updates = task_states[session_id]["updates"] if not current_updates or f"Status: {new_status}" not in current_updates[-1]: update_msg = f"Status: {new_status}, Expires in: {expires_in}s" task_states[session_id]["updates"].append(update_msg)
# Stop monitoring if task is done if new_status == "done": if session_id in task_states: task_states[session_id]["updates"].append("✅ Task completed!") break
except httpx.HTTPStatusError as e: if e.response.status_code == 404: if session_id in task_states: task_states[session_id]["updates"].append("❌ Task not found or expired") break else: if session_id in task_states: task_states[session_id]["updates"].append(f"❌ Error: {e}") break except Exception as e: if session_id in task_states: task_states[session_id]["updates"].append(f"❌ Monitoring error: {e}") break
await asyncio.sleep(2) # Check every 2 seconds
def get_task_updates(session_id: str): """Get current updates for a task""" if not session_id or session_id not in task_states: return "No active task. Please create a new task first."
# Format output with timestamps output = [] for i, update in enumerate(updates): output.append(f"{i+1}. {update}")
return "\n".join(output)
def clear_old_tasks(): """Clean up old completed tasks to prevent memory leaks""" current_time = time.time() expired_sessions = []
for session_id, task_info in task_states.items(): # Remove tasks that were created more than 1 hour ago if current_time - task_info["created_at"] > 3600: expired_sessions.append(session_id)
for session_id in expired_sessions: del task_states[session_id]
# Create the Gradio interface with gr.Blocks(title="Task Manager") as demo: gr.Markdown("# 🚀 Async Task Manager") gr.Markdown("Create and monitor long-running tasks. Each browser session can run independent tasks!")
with gr.Row(): with gr.Column(): gr.Markdown("## Create New Task") create_btn = gr.Button("🚀 Create New Task", variant="primary") create_output = gr.Textbox(label="Task Creation Result", lines=3, interactive=False) session_display = gr.Textbox(label="Session ID", visible=False)
gr.Markdown("---") gr.Markdown("### How to use:") gr.Markdown("- Click **Create New Task** to start a new background task") gr.Markdown("- The status will automatically update every 2 seconds") gr.Markdown("- Open this page in multiple browsers to run concurrent tasks") gr.Markdown("- Each browser session manages its own task independently")
clear_btn.click( fn=lambda: "Cleared completed tasks! Create a new task to start fresh.", outputs=[updates_display] )
# Auto-refresh demo.load( fn=lambda: "Welcome! Create a new task to get started.", outputs=[updates_display] )
if __name__ == "__main__": # Start the Gradio app demo.launch( server_name="0.0.0.0", server_port=7860, share=False, show_error=True ) [/code] и наконец! Я мог запустить проверку, не блокируя отправку задачи. Итак, теперь, если я открою это в двух браузерах [img]https://i.sstatic.net/Ozu6yP18.png[/img]
Я смогу создать новую задачу. И я вижу в серверной части, что статус задачи проверяется, не блокируя того же другого пользователя, делающего то же самое. Единственное, чего мне не удалось сделать, это сделать так, чтобы текстовое поле справа (Обновления задачи) обновлялось само по себе. Мне всегда приходится нажимать «Обновить статус», чтобы получить статус. Есть ли способ Gradio обновить какой-либо элемент (в данном случае текст update_display ), чтобы я мог видеть статус без необходимости нажимать кнопку?