Mongo не обновляет поля, когда операция выполняется с использованием многопроцессорной обработки.Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Mongo не обновляет поля, когда операция выполняется с использованием многопроцессорной обработки.

Сообщение Anonymous »

В моем коде есть 5 процессов, которые параллельно запускают 5 update_one на моем экземпляре mongo (без реплик). Каждый update_one изменяет другое поле в одном и том же документе. Переменная matched_count равна 1 для каждого из них, поэтому операция корректно подтверждается, поскольку я использую транзакции и для записи установлено значение большинства.
Тем не менее, когда я прочитайте документ, некоторые поля не обновлены.
Я не ожидаю перезаписи, поскольку в одних и тех же полях нет перекрывающихся обновлений.
Почему это происходит?
РЕДАКТИРОВАНИЕ: добавление некоторых фрагментов моего кода
Здесь я запускаю процессы и жду их результатов:

Код: Выделить всё

...
futures = []
with ProcessPoolExecutor(mp_context=mp.get_context('spawn')) as executor:
futures = [
executor.submit(
self._enrich_peril, claim_number, key, partial, from_scratch
)
for key in indicator_keys
]

for f in as_completed(futures):
indicator_results.append(f.result())
...

with self._lorentz_claims_repository as session:

def __update_claim(session: ClientSession):
update_result = self._lorentz_claims_repository.update_one_fields_synch(
claim_number=claim_number,
fields=claim_updated_fields,
session=session,
)

if update_result.matched_count == 0:
LOGGER.error(
f'Claim {claim_number} has not been updated for \
{indicator_name}'
)
return False

if session.with_transaction(__update_claim) is False:
return False

return True
Это код, с помощью которого я запускаю обновление mongo:

Код: Выделить всё

def update_one_synch(
self, filters: dict, fields: dict, session: ClientSession | None = None
) -> UpdateResult:
if session is not None:
client = self._mongo_client
else:
client = MongoClient(self._connection_url)

if client is None:
raise RuntimeError('MongoClient is None')

db = client[self._database_name]
collection = db[self._collection_name]

result = collection.with_options(
write_concern=WriteConcern(w='majority')
).update_one(filters, {'$set': fields}, session=session)

if session is None:
client.close()

return result
Mongo — это версия 5.0, и документ выглядит так:

Код: Выделить всё

{
_id: "...",
epic: [...],
flood: [...],
...
}
Каждый процесс работает в отдельном поле (т. е. эпик, флуд...)

Подробнее здесь: https://stackoverflow.com/questions/790 ... processing
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»