Я отправляю запросы к внешнему API, получаю ответ и записываю его в файл. Все работает нормально, однако я получаю сообщение «Задача была уничтожена, но ожидает выполнения!» предупреждение о том, что я хочу выполнить очистку.
Я сымитировал процесс, описанный ниже. Я получаю элементы из источника (например, списка) элементов и по мере их получения помещаю их в очередь, сигнализируя об окончании списка, но помещая контрольное значение None.
write_q продолжает получать элементы из этой очереди до тех пор, пока не получит контрольный сигнал, а затем вырвется наружу.
Ниже приведен код, который покажет, что моя write_task( ) отменяется до завершения. Какова правильная конструкция для решения этой проблемы?
import asyncio
import aiofiles
import aiocsv
import json
async def task(l: list, write_q: asyncio.Queue) -> None:
# Read tasks from source of data
for i in l:
# Put a request task into the queue
req: dict = {
"headers": {"Accept": "application/json"},
"url": "https://httpbin.org/post",
"data": i
}
await write_q.put(req)
# Sentinel value to signal we are done receiving from source
await write_q.put(None)
async def write_task(write_q: asyncio.Queue) -> None:
headers: bool = True
while True:
async with aiofiles.open("file.csv", mode="a+", newline='') as f:
w = aiocsv.AsyncWriter(f)
# Get data out of the queue to write it
data = await write_q.get()
if not data:
write_q.task_done()
await f.flush()
break
if headers:
await w.writerow([
"status",
"data",
])
headers = False
# Write the data from the response
await w.writerow([
"200",
json.dumps(data)
])
write_q.task_done()
async def main() -> None:
# Create fake data to POST
items: list[str] = [["hello", "world"], ["asyncio", "test"]] * 5
# Queues for orchestrating
write_q = asyncio.Queue()
producer = asyncio.create_task(
task(items, write_q)
)
consumer = asyncio.create_task(
write_task(write_q)
)
errors = await asyncio.gather(producer, return_exceptions=True)
print(f"INFO: Producer has completed! exceptions: {errors}")
# Wait for queue to empty and cancel the consumer
await write_q.join()
consumer.cancel()
print("INFO: write consumer has completed! ")
print("INFO: Complete!")
if __name__ == "__main__":
loop = asyncio.new_event_loop()
loop.run_until_complete(main())
Подробнее здесь: https://stackoverflow.com/questions/786 ... as-pending
Задача Asyncio была уничтожена, но ожидала выполнения ⇐ Python
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
Asyncio Async Funcitons вешает с Asyncio.gather. (Код работает без Asyncio.gather)
Anonymous » » в форуме Python - 0 Ответы
- 27 Просмотры
-
Последнее сообщение Anonymous
-