Python asyncio: параллельные задачи записи часто отменяются, как обеспечить попеременное выполнение?Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Python asyncio: параллельные задачи записи часто отменяются, как обеспечить попеременное выполнение?

Сообщение Anonymous »

Я разрабатываю асинхронную систему кэширования на Python с использованием asyncio. У меня есть две одновременные операции записи, которые я хочу чередовать, но в настоящее время часто отменяется только одна побочная задача.
Вот моя текущая реализация в python3.10:

Код: Выделить всё

import asyncio
from collections import defaultdict
from contextlib import asynccontextmanager
from copy import deepcopy
from datetime import datetime

class LocalCacheUserToken:
_g: dict[str, dict] = {}
_tasks: dict[str, asyncio.Task] = {}
_locks: defaultdict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
_write_lock = asyncio.Lock()

@classmethod
@asynccontextmanager
async def acquire_lock(cls, key: str):
async with cls._locks[key]:
yield

@classmethod
async def get(cls, key: str):
async with cls.acquire_lock(key):
value = deepcopy(cls._g.get(key, None))
return value

@classmethod
async def setex(cls, key: str, time: int, value: dict):
async with cls._write_lock:
async with cls.acquire_lock(key):
print(f"[{datetime.now()}] {value=} get lock")
if key in cls._tasks:
cls._tasks[key].cancel()

cls._g[key] = value

task = asyncio.create_task(cls._delay_delete(key, time, value))
cls._tasks[key] = task

@classmethod
async def _delay_delete(cls, key, time, value):
try:
await asyncio.sleep(time)
async with cls.acquire_lock(key):
del cls._g[key]
cls._tasks.pop(key, None)
except asyncio.CancelledError:
print(f"[{datetime.now()}] Deletion task for {key=}, {value=} was cancelled")

async def p(a: LocalCacheUserToken):
while True:
print(f"[{datetime.now()}] {await a.get('test')=}")
await asyncio.sleep(1)

async def write(a: LocalCacheUserToken, k: str):
for i in range(20):
await a.setex("test", 5, {'value': f'{k}->{i}'})
await asyncio.sleep(0.01)

# 示例使用
async def main():
asyncio.create_task(p(LocalCacheUserToken))
asyncio.create_task(write(LocalCacheUserToken, k='test1'))
asyncio.create_task(write(LocalCacheUserToken, k='test2'))

await asyncio.sleep(10)

asyncio.run(main())
выходы:

Код: Выделить всё

[2024-06-28 01:42:35.054703] await a.get('test')=None
[2024-06-28 01:42:35.054703] value={'value': 'test1->0'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->0'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test1->1'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->1'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->0'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->2'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->2'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->1'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->3'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->3'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->2'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->4'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->4'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->3'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->5'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->5'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->4'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->6'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->6'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->5'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->7'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->7'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->6'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->8'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->8'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value':  'test2->7'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->9'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->9'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->8'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->10'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->10'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->9'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->11'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->11'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->10'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->12'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->12'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->11'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->13'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->13'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->12'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->14'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->14'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->13'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->15'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->15'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->14'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->16'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->16'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->15'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->17'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->17'} get lock
[2024-06-28 01:42:35.070327] Deletion task for key='test', value={'value': 'test2->16'} was cancelled
[2024-06-28 01:42:35.070327] value={'value': 'test1->18'} get lock
[2024-06-28 01:42:35.070327] value={'value': 'test2->18'} get lock
[2024-06-28 01:42:35.070327] Deletion task for key='test', value={'value': 'test2->17'} was cancelled
[2024-06-28 01:42:35.070327] value={'value': 'test1->19'} get lock
[2024-06-28 01:42:35.070327] value={'value': 'test2->19'} get lock
[2024-06-28 01:42:35.070327] Deletion task for key='test', value={'value': 'test2->18'} was cancelled
[2024-06-28 01:42:36.056942] await a.get('test')={'value': 'test2->19'}
[2024-06-28 01:42:37.063445] await a.get('test')={'value': 'test2->19'}
[2024-06-28 01:42:38.064713] await a.get('test')={'value': 'test2->19'}
[2024-06-28 01:42:39.069732] await a.get('test')={'value': 'test2->19'}
[2024-06-28 01:42:40.077492] await a.get('test')=None
[2024-06-28 01:42:41.087926] await a.get('test')=None
[2024-06-28 01:42:42.103530] await a.get('test')=None
[2024-06-28 01:42:43.107391] await a.get('test')=None
[2024-06-28 01:42:44.108523] await a.get('test')=None
В чем причина этой проблемы и как ее исправить?

Подробнее здесь: https://stackoverflow.com/questions/786 ... ure-altern
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»