Вот мой код конечной точки:
Код: Выделить всё
@routes.get('/ping')
async def handle_ping(_) -> web.Response:
try:
import time
import asyncio
for i in range(10):
await asyncio.sleep(1)
return web.json_response(
data=PingResult(
service_name=service_name,
version=SERVICE_VERSION,
storage_path=str(storage_dir.path),
daemon_pid=daemon.pid,
daemon_status=str(daemon.status.value),
).dict()
)
except asyncio.CancelledError as ce:
print('Request was cancelled')
return HTTPBadRequest(ErrorResult(error='Request was cancelled'))
Код: Выделить всё
async def ping(timeout=10) -> PingResult:
async with aiohttp.ClientSession(timeout=ClientTimeout(total=timeout)) as session:
async with session.get('http://localhost:5002/ping') as resp:
body = await resp.json()
return PingResult.parse_obj(body)
Код: Выделить всё
from aiohttp import web
from pydantic import BaseModel
class ErrorResult(TypedDict):
error: str
class HTTPBadRequest(web.HTTPBadRequest):
def __init__(self, error: Mapping) -> None:
super().__init__(text=dumps(error), content_type='application/json')
class PingResult(BaseModel):
service_name: str
version: str
storage_path: str
daemon_pid: int
daemon_status: str
Похоже, я неправильно понимаю саму идею отмены запроса, но я могу понять, как мне достичь своей главной цели.
Подробнее здесь: https://stackoverflow.com/questions/791 ... exceeds-it