Anonymous
Python asyncio: параллельные задачи записи часто отменяются, как обеспечить попеременное выполнение?
Сообщение
Anonymous » 28 июн 2024, 01:53
Я разрабатываю асинхронную систему кэширования на Python с использованием asyncio. У меня есть две одновременные операции записи, которые я хочу чередовать, но в настоящее время часто отменяется только одна побочная задача.
Вот моя текущая реализация в python3.10:
Код: Выделить всё
import asyncio
from collections import defaultdict
from contextlib import asynccontextmanager
from copy import deepcopy
from datetime import datetime
class LocalCacheUserToken:
_g: dict[str, dict] = {}
_tasks: dict[str, asyncio.Task] = {}
_locks: defaultdict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
_write_lock = asyncio.Lock()
@classmethod
@asynccontextmanager
async def acquire_lock(cls, key: str):
async with cls._locks[key]:
yield
@classmethod
async def get(cls, key: str):
async with cls.acquire_lock(key):
value = deepcopy(cls._g.get(key, None))
return value
@classmethod
async def setex(cls, key: str, time: int, value: dict):
async with cls._write_lock:
async with cls.acquire_lock(key):
print(f"[{datetime.now()}] {value=} get lock")
if key in cls._tasks:
cls._tasks[key].cancel()
cls._g[key] = value
task = asyncio.create_task(cls._delay_delete(key, time, value))
cls._tasks[key] = task
@classmethod
async def _delay_delete(cls, key, time, value):
try:
await asyncio.sleep(time)
async with cls.acquire_lock(key):
del cls._g[key]
cls._tasks.pop(key, None)
except asyncio.CancelledError:
print(f"[{datetime.now()}] Deletion task for {key=}, {value=} was cancelled")
async def p(a: LocalCacheUserToken):
while True:
print(f"[{datetime.now()}] {await a.get('test')=}")
await asyncio.sleep(1)
async def write(a: LocalCacheUserToken, k: str):
for i in range(20):
await a.setex("test", 5, {'value': f'{k}->{i}'})
await asyncio.sleep(0.01)
# 示例使用
async def main():
asyncio.create_task(p(LocalCacheUserToken))
asyncio.create_task(write(LocalCacheUserToken, k='test1'))
asyncio.create_task(write(LocalCacheUserToken, k='test2'))
await asyncio.sleep(10)
asyncio.run(main())
выходы:
Код: Выделить всё
[2024-06-28 01:42:35.054703] await a.get('test')=None
[2024-06-28 01:42:35.054703] value={'value': 'test1->0'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->0'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test1->1'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->1'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->0'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->2'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->2'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->1'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->3'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->3'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->2'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->4'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->4'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->3'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->5'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->5'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->4'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->6'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->6'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->5'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->7'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->7'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->6'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->8'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->8'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->7'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->9'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->9'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->8'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->10'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->10'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->9'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->11'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->11'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->10'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->12'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->12'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->11'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->13'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->13'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->12'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->14'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->14'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->13'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->15'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->15'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->14'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->16'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->16'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->15'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->17'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->17'} get lock
[2024-06-28 01:42:35.070327] Deletion task for key='test', value={'value': 'test2->16'} was cancelled
[2024-06-28 01:42:35.070327] value={'value': 'test1->18'} get lock
[2024-06-28 01:42:35.070327] value={'value': 'test2->18'} get lock
[2024-06-28 01:42:35.070327] Deletion task for key='test', value={'value': 'test2->17'} was cancelled
[2024-06-28 01:42:35.070327] value={'value': 'test1->19'} get lock
[2024-06-28 01:42:35.070327] value={'value': 'test2->19'} get lock
[2024-06-28 01:42:35.070327] Deletion task for key='test', value={'value': 'test2->18'} was cancelled
[2024-06-28 01:42:36.056942] await a.get('test')={'value': 'test2->19'}
[2024-06-28 01:42:37.063445] await a.get('test')={'value': 'test2->19'}
[2024-06-28 01:42:38.064713] await a.get('test')={'value': 'test2->19'}
[2024-06-28 01:42:39.069732] await a.get('test')={'value': 'test2->19'}
[2024-06-28 01:42:40.077492] await a.get('test')=None
[2024-06-28 01:42:41.087926] await a.get('test')=None
[2024-06-28 01:42:42.103530] await a.get('test')=None
[2024-06-28 01:42:43.107391] await a.get('test')=None
[2024-06-28 01:42:44.108523] await a.get('test')=None
В чем причина этой проблемы и как ее исправить?
Подробнее здесь:
https://stackoverflow.com/questions/786 ... ure-altern
1719528795
Anonymous
Я разрабатываю асинхронную систему кэширования на Python с использованием asyncio. У меня есть две одновременные операции записи, которые я хочу чередовать, но в настоящее время часто отменяется только одна побочная задача. Вот моя текущая реализация в python3.10: [code]import asyncio from collections import defaultdict from contextlib import asynccontextmanager from copy import deepcopy from datetime import datetime class LocalCacheUserToken: _g: dict[str, dict] = {} _tasks: dict[str, asyncio.Task] = {} _locks: defaultdict[str, asyncio.Lock] = defaultdict(asyncio.Lock) _write_lock = asyncio.Lock() @classmethod @asynccontextmanager async def acquire_lock(cls, key: str): async with cls._locks[key]: yield @classmethod async def get(cls, key: str): async with cls.acquire_lock(key): value = deepcopy(cls._g.get(key, None)) return value @classmethod async def setex(cls, key: str, time: int, value: dict): async with cls._write_lock: async with cls.acquire_lock(key): print(f"[{datetime.now()}] {value=} get lock") if key in cls._tasks: cls._tasks[key].cancel() cls._g[key] = value task = asyncio.create_task(cls._delay_delete(key, time, value)) cls._tasks[key] = task @classmethod async def _delay_delete(cls, key, time, value): try: await asyncio.sleep(time) async with cls.acquire_lock(key): del cls._g[key] cls._tasks.pop(key, None) except asyncio.CancelledError: print(f"[{datetime.now()}] Deletion task for {key=}, {value=} was cancelled") async def p(a: LocalCacheUserToken): while True: print(f"[{datetime.now()}] {await a.get('test')=}") await asyncio.sleep(1) async def write(a: LocalCacheUserToken, k: str): for i in range(20): await a.setex("test", 5, {'value': f'{k}->{i}'}) await asyncio.sleep(0.01) # 示例使用 async def main(): asyncio.create_task(p(LocalCacheUserToken)) asyncio.create_task(write(LocalCacheUserToken, k='test1')) asyncio.create_task(write(LocalCacheUserToken, k='test2')) await asyncio.sleep(10) asyncio.run(main()) [/code] выходы: [code][2024-06-28 01:42:35.054703] await a.get('test')=None [2024-06-28 01:42:35.054703] value={'value': 'test1->0'} get lock [2024-06-28 01:42:35.054703] value={'value': 'test2->0'} get lock [2024-06-28 01:42:35.054703] value={'value': 'test1->1'} get lock [2024-06-28 01:42:35.054703] value={'value': 'test2->1'} get lock [2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->0'} was cancelled [2024-06-28 01:42:35.054703] value={'value': 'test1->2'} get lock [2024-06-28 01:42:35.054703] value={'value': 'test2->2'} get lock [2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->1'} was cancelled [2024-06-28 01:42:35.054703] value={'value': 'test1->3'} get lock [2024-06-28 01:42:35.054703] value={'value': 'test2->3'} get lock [2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->2'} was cancelled [2024-06-28 01:42:35.054703] value={'value': 'test1->4'} get lock [2024-06-28 01:42:35.054703] value={'value': 'test2->4'} get lock [2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->3'} was cancelled [2024-06-28 01:42:35.054703] value={'value': 'test1->5'} get lock [2024-06-28 01:42:35.054703] value={'value': 'test2->5'} get lock [2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->4'} was cancelled [2024-06-28 01:42:35.054703] value={'value': 'test1->6'} get lock [2024-06-28 01:42:35.054703] value={'value': 'test2->6'} get lock [2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->5'} was cancelled [2024-06-28 01:42:35.054703] value={'value': 'test1->7'} get lock [2024-06-28 01:42:35.054703] value={'value': 'test2->7'} get lock [2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->6'} was cancelled [2024-06-28 01:42:35.054703] value={'value': 'test1->8'} get lock [2024-06-28 01:42:35.054703] value={'value': 'test2->8'} get lock [2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->7'} was cancelled [2024-06-28 01:42:35.054703] value={'value': 'test1->9'} get lock [2024-06-28 01:42:35.054703] value={'value': 'test2->9'} get lock [2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->8'} was cancelled [2024-06-28 01:42:35.054703] value={'value': 'test1->10'} get lock [2024-06-28 01:42:35.054703] value={'value': 'test2->10'} get lock [2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->9'} was cancelled [2024-06-28 01:42:35.054703] value={'value': 'test1->11'} get lock [2024-06-28 01:42:35.054703] value={'value': 'test2->11'} get lock [2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->10'} was cancelled [2024-06-28 01:42:35.054703] value={'value': 'test1->12'} get lock [2024-06-28 01:42:35.054703] value={'value': 'test2->12'} get lock [2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->11'} was cancelled [2024-06-28 01:42:35.054703] value={'value': 'test1->13'} get lock [2024-06-28 01:42:35.054703] value={'value': 'test2->13'} get lock [2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->12'} was cancelled [2024-06-28 01:42:35.054703] value={'value': 'test1->14'} get lock [2024-06-28 01:42:35.054703] value={'value': 'test2->14'} get lock [2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->13'} was cancelled [2024-06-28 01:42:35.054703] value={'value': 'test1->15'} get lock [2024-06-28 01:42:35.054703] value={'value': 'test2->15'} get lock [2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->14'} was cancelled [2024-06-28 01:42:35.054703] value={'value': 'test1->16'} get lock [2024-06-28 01:42:35.054703] value={'value': 'test2->16'} get lock [2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->15'} was cancelled [2024-06-28 01:42:35.054703] value={'value': 'test1->17'} get lock [2024-06-28 01:42:35.054703] value={'value': 'test2->17'} get lock [2024-06-28 01:42:35.070327] Deletion task for key='test', value={'value': 'test2->16'} was cancelled [2024-06-28 01:42:35.070327] value={'value': 'test1->18'} get lock [2024-06-28 01:42:35.070327] value={'value': 'test2->18'} get lock [2024-06-28 01:42:35.070327] Deletion task for key='test', value={'value': 'test2->17'} was cancelled [2024-06-28 01:42:35.070327] value={'value': 'test1->19'} get lock [2024-06-28 01:42:35.070327] value={'value': 'test2->19'} get lock [2024-06-28 01:42:35.070327] Deletion task for key='test', value={'value': 'test2->18'} was cancelled [2024-06-28 01:42:36.056942] await a.get('test')={'value': 'test2->19'} [2024-06-28 01:42:37.063445] await a.get('test')={'value': 'test2->19'} [2024-06-28 01:42:38.064713] await a.get('test')={'value': 'test2->19'} [2024-06-28 01:42:39.069732] await a.get('test')={'value': 'test2->19'} [2024-06-28 01:42:40.077492] await a.get('test')=None [2024-06-28 01:42:41.087926] await a.get('test')=None [2024-06-28 01:42:42.103530] await a.get('test')=None [2024-06-28 01:42:43.107391] await a.get('test')=None [2024-06-28 01:42:44.108523] await a.get('test')=None [/code] В чем причина этой проблемы и как ее исправить? Подробнее здесь: [url]https://stackoverflow.com/questions/78679289/python-asyncio-concurrent-write-tasks-frequently-canceled-how-to-ensure-altern[/url]