Код: Выделить всё
async with cur.copy("""COPY (SELECT {COLUMNS} FROM totals) TO STDOUT WITH (FORMAT text)""") as copy:
async def batcher():
i = 0
while True:
i += 1
log_and_add_to_meta(job, Stream.STDOUT, f"Uploading total batch {i}\n")
batch = []
for _ in range(10000):
x = await copy.read()
if not x:
if batch:
yield b"".join(batch)
log_and_add_to_meta(job, Stream.STDOUT, "Finished with batching\n")
return
batch.append(x)
yield b"".join(batch)
resp = await client.post(
"{ENDPOINT}",
content=batcher(),
cookies={"session": TOKEN},
)
log_and_add_to_meta(job, Stream.STDOUT, "Finished uploading all totals\n")
Код: Выделить всё
clientА вот код сервера:
Код: Выделить всё
@router.post("{ENDPOINT}")
async def import_totals(
request: Request,
user: SuperUser,
cnn: DbCnn,
):
async with cnn.cursor() as cur:
async with cnn.transaction():
(...)
await create_totals_buffer(cur, request.stream())
(...)
return Response(status_code=200)
async def create_totals_buffer(cur: DbCnn.cursor, stream: AsyncGenerator):
await cur.execute("CREATE TEMP TABLE totals_buffer (LIKE totals, id serial) ON COMMIT DROP")
LOGGER.debug("Create temp totals_buffer")
async with cur.copy("""COPY totals_buffer ({COLUMNS}) FROM STDIN WITH (FORMAT text)""") as cp:
async for total in stream:
await cp.write(total)
Но когда я запускаю его с большим количеством строк (несколько миллионов), в то время как сервер все еще получает все ожидаемые данные и регистрирует ответ 200, клиент остается зависшим в ожидании client.post (т. е. я никогда не вижу зарегистрированного сообщения «Завершена загрузка всех итогов»).
Я пробовал все виды вещей. Добавление пустой строки в конец генератора (
Код: Выделить всё
yield b''Код: Выделить всё
async for _ in stream: passСтроки небольшие — 10 столбцов, в каждом либо короткий текст, либо целое число. Для потоковой передачи миллионов строк требуется всего несколько секунд, поэтому я действительно не понимаю, почему клиент должен испытывать трудности.
ChatGPT продолжает говорить о противодавлении, но для меня это не имеет смысла; Насколько я понимаю противодавление, оно возникает, когда клиент пытается отправить данные быстрее, чем сервер может их обработать, а это означает, что память клиента заполняется и в конечном итоге блокируется. В моем случае сервер получает все данные и возвращает 200 — значит, работа клиента закончена, верно? Чего оно еще ждет? (Обратите внимание, что сервер не возвращает никаких данных.)
Подробнее здесь: https://stackoverflow.com/questions/798 ... over-httpx
Мобильная версия