У меня есть асинхронный блок кода, который обновляет файл и его метаданные:
async for chunk in request.stream():
self.storage_strategy.update(metadata, chunk)
metadata.upload_offset += len(chunk)
self.metadata_strategy.update(metadata)
self.storage_strategy может либо обновить локальный файл, либо добавить его к объекту S3 (или к любому другому методу хранения).
self.metadata_strategy может обновлять метаданные локально (например, в файле) или в базе данных (или других хранилищах).
Обе стратегии отвечают за добавление данных в уже существующий файл и обновление соответствующих метаданных< /p>
Я хочу обеспечить атомарность этих операций. Если в какой-либо момент self.storage_strategy.update произойдет сбой, мне нужно выполнить откат файла до предыдущего состояния.
Аналогично, если self.storage_strategy.update завершится успешно, но self.metadata_strategy.update не удается, мне нужно откатить хранилище, а также метаданные до исходного состояния.
Кроме того, я думаю, мне нужно реализовать некоторую блокировку для конкретных файл и метаданные для предотвращения состояний гонки
Я предполагаю, что мне нужно реализовать методы отката() в обеих стратегиях, но я не уверен, как это правильно спроектировать, особенно для таких случаев, как откат объекта S3 или записи базы данных. Возможно, это должно быть похоже на контекстный менеджер.
Как разработать гибкий механизм блокировки и отката, который работает в различных комбинациях Storage_strategy и Metadata_strategy?
class UploadMetadata:
upload_storage_path = "tmp/"
class StorageStrategy:
def update(self, upload_metadata: UploadMetadata, chunk: bytes):
with open(upload_metadata.upload_storage_path, "ab") as f:
f.write(chunk)
def rollback(self):
pass
class MetadataStrategy:
def update(upload_metadata: UploadMetadata):
with open(upload_metadata.upload_metadata_path, "r+", encoding="utf-8") as f:
existing_data = json.load(f)
existing_data.update(upload_metadata.model_dump())
f.seek(0)
json.dump(existing_data, f, default=str, ensure_ascii=False, indent=4)
f.truncate()
def rollback(self):
pass
async def process_file(request, storage_strategy,metadata_strategy, metadata):
storage_strategy = StorageStrategy()
metadata_strategy = MetadataStrategy()
with some_lock:
try:
async for chunk in request.stream():
storage_strategy.update(metadata, chunk)
metadata.upload_offset+=len(chunk)
metadata_strategy.update(metadata)
except Exception:
storage_strategy.rollback()
metadata_strategy.rollback()
raise
Подробнее здесь: https://stackoverflow.com/questions/790 ... -in-python
Как реализовать атомарные обновления с откатом в Python? ⇐ Python
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение