Как оптимизировать мою производительность, используя асинхронный код PythonPython

Программы на Python
Ответить
Anonymous
 Как оптимизировать мою производительность, используя асинхронный код Python

Сообщение Anonymous »

Я хочу оптимизировать свой код, чтобы быстрее обрабатывать информацию. Впервые играл с асинхронными запросами. А также все еще новичок в Python. Надеюсь, мой код имеет смысл.
Я использую FastAPI в качестве платформы. И aiohttp для отправки моих запросов.
Сейчас меня интересует только общее количество результатов по каждому искомому слову. После этого я выгружу json в БД.
Мой код отправляет запросы к общедоступному API перекрестных ссылок (crossref)
В качестве примера я ищу термины с 02.06.2022 по 03.06.2022 (включительно). Искомые термины: «бумага» (3146 результатов), «аммоний» (1430 результатов) и «отбеливатель» (23 результата). Пример:

Код: Выделить всё

https://api.crossref.org/works?rows=1000&sort=created&mailto=youremail@domain.com&query=paper&filter=from-index-date:2022-06-02,until-index-date:2022-06-03&cursor=*
Это вернет 3146 строк. Мне нужно искать только по одному термину за раз. Я не пытался разбить его на день, чтобы посмотреть, будет ли это быстрее.
В этом также есть рекурсивный контекст. Здесь я чувствую, что неправильно понимаю асинхронную концепцию. Вот почему мне нужен рекурсивный вызов.
Запросы глубокого разбиения на страницы
Глубокое разбиение на страницы с использованием курсоров можно использовать для итерации над большими наборами результатов без каких-либо ограничений на их размер.
Чтобы использовать глубокое разбиение на страницы, сделайте запрос как обычно, но включите параметр курсора со значением *, например:

Код: Выделить всё

https://api.crossref.org/works?rows=1000&sort=created&mailto=youremail@domain.com&query=ammonium&filter=from-index-date:2022-06-02,until-index-date:2022-06-03&cursor=*
В ответе JSON будет предоставлено поле следующего курсора. Чтобы получить следующую страницу результатов, передайте значение next-cursor в качестве параметра курсора. Например:

Код: Выделить всё

https://api.crossref.org/works?rows=1000&sort=created&mailto=youremail@domain.com&query=ammonium&filter=from-index-date:2022-06-02,until-index-date:2022-06-03&cursor=
Совет из документа CrossRef

Клиентам следует проверять количество возвращаемых товаров. Если количество возвращаемых элементов равно количеству ожидаемых строк, значит, достигнут конец набора результатов. Использование следующего курсора за пределами этой точки приведет к получению ответов с пустым списком элементов.

Мое время обработки все еще зашкаливает, всего 3 слова (и 7 запросов), это более 15 секунд. Я пытаюсь сократить это время до 5 секунд, если это возможно? При использовании почтальона ответ на самый длинный запрос занял около 4 секунд.
Это то, что у меня есть на данный момент, если вы хотите попробовать.
schema.py

Код: Выделить всё

class CrossRefSearchRequest(BaseModel):
keywords: List[str]
date_from: Optional[datetime] = None
date_to: Optional[datetime] = None
controler.py

Код: Выделить всё

import time
from fastapi import FastAPI, APIRouter, Request

app = FastAPI(title="CrossRef API", openapi_url=f"{settings.API_V1_STR}/openapi.json")
api_router = APIRouter()
service = CrossRefService()

@api_router.post("/search", status_code=201)
async def search_keywords(*, search_args: CrossRefSearchRequest) -> dict:
fixed_search_args = {
"sort": "created",
"rows": "1000",
"cursor": "*"
}
results = await service.cross_ref_request(search_args, **fixed_search_args)
return {k: len(v) for k, v in results.items()}

# sets the header X-Process-Time, in order to have the time for each request
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
response.headers["X-Process-Time"] = str(process_time)
return response

app.include_router(api_router)

if __name__ == "__main__":
# Use this for debugging purposes only
import uvicorn

uvicorn.run(app, host="0.0.0.0", port=8001, log_level="debug")
service.py

Код: Выделить всё

from datetime import datetime, timedelta

def _setup_date_default(date_from_req: datetime, date_to_req: datetime):
yesterday = datetime.utcnow()- timedelta(days=1)

date_from = yesterday if date_from_req is None else date_from_req
date_to = yesterday if date_to_req is None else date_to_req

return date_from.strftime(DATE_FORMAT_CROSS_REF), date_to.strftime(DATE_FORMAT_CROSS_REF)

class CrossRefService:

def __init__(self):
self.client = CrossRefClient()

# my recursive call for the next cursor
async def _send_client_request(self ,final_result: dict[str, list[str]], keywords: [str], date_from: str, date_to: str, **kwargs):
json_responses = await self.client.cross_ref_request_date_range(keywords, date_from, date_to, **kwargs)

for json_response in json_responses:

message = json_response.get('message', {})
keyword = message.get('query').get('search-terms')
next_cursor = message.get('next-cursor')
total_results = message.get('total-results')
search_results = message.get('items', [{}]) if total_results > 0 else []

if final_result[keyword] is None:
final_result[keyword] = search_results
else:
final_result[keyword].extend(search_results)

if total_results > int(kwargs['rows']) and len(search_results) == int(kwargs['rows']):
kwargs['cursor'] = next_cursor
await self._send_client_request(final_result, [keyword], date_from, date_to, **kwargs)

async def cross_ref_request(self, request: CrossRefSearchRequest, **kwargs) -> dict[str, list[str]]:
date_from, date_to = _setup_date(request.date_from, request.date_to)
results: dict[str, list[str]] = dict.fromkeys(request.keywords)

await self._send_client_request(results, request.keywords, date_from, date_to, **kwargs)

return results
client.py

Код: Выделить всё

import asyncio
from aiohttp import ClientSession

async def _send_request_task(session: ClientSession, url: str):
try:
async with session.get(url) as response:
await response.read()
return response
# exception handler to come
except Exception as e:
print(f"exception for {url}")
print(str(e))

class CrossRefClient:
base_url = "https://api.crossref.org/works?" \
"query={}&" \
"filter=from-index-date:{},until-index-date:{}&" \
"sort={}&" \
"rows={}&"  \
"cursor={}"

def __init__(self) -> None:
self.headers = {
"User-Agent": f"my_app/v0.1 (example.com/; mailto:youremail@domain.com) using FastAPI"
}

async def cross_ref_request_date_range(
self, keywords: [str], date_from: str, date_to: str, **kwargs
) -> list:
async with ClientSession(headers=self.headers) as session:
tasks = [
asyncio.create_task(
_send_request_task(session, self.base_url.format(
keyword, date_from, date_to, kwargs['sort'], kwargs['rows'], kwargs['cursor']
)),
name=TASK_NAME_BASE.format(keyword, date_from, date_to)
)
for keyword in keywords
]

responses = await asyncio.gather(*tasks)

return [await response.json() for response in responses]
Как лучше это оптимизировать и лучше использовать асинхронные вызовы? Кроме того, этот рекурсивный цикл может быть не лучшим способом сделать это. Есть идеи по этому поводу?
Я реализовал решение для синхронных вызовов, и оно еще медленнее. Так что, думаю, я не так уж и далеко.
Спасибо!

Подробнее здесь: https://stackoverflow.com/questions/725 ... ython-code
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»