Перебор больших XML-данных при многопроцессорной обработке с использованием PythonPython

Программы на Python
Ответить
Anonymous
 Перебор больших XML-данных при многопроцессорной обработке с использованием Python

Сообщение Anonymous »

Мне приходится периодически проверять большой XML-файл с миллионами записей.
context = iter(etree.iterparse(product_file_path, tag="Record", events=("start", "end")))
_, root = next(context)
start_tag = None
xml_dict = None
for event, elem in context:
if event == "start" and start_tag is None:
start_tag = elem.tag
if event == "end":
pickled_elem = etree.tostring(elem) # This will make sense later
xml_dict = _etree_to_dict(pickled_elem)
_update_product(self.category, xml_dict)
start_tag = None
xml_dict = None
root.clear()

Хотя мне удалось просмотреть файл с помощью lxml, не перегружая оперативную память, это занимает слишком много времени, поэтому я пытался выполнить это в многопоточном режиме.
Я новичок в многопоточности, и мне удалось собрать решение с помощью команд управления Django и с помощью этого ответа
ProcPoolExc = futures.ProcessPoolExecutor
ThreadPoolExc = futures.ThreadPoolExecutor

class Command(BaseCommand):

def handle(self, *args, **options):

# ( other unnecessary code)

context = iter(etree.iterparse(product_file_path, tag="Detail", events=("start", "end")))
_, root = next(context)
start_tag = None
xml_dict = None
xml_dict_futures = []
product_update_futures = []

with ProcPoolExc(max_workers=threads) as ppe, ThreadPoolExc(max_workers=threads) as tpe:
for event, elem in context:
if event == "start" and start_tag is None:
start_tag = elem.tag
if event == "end":
xml_dict_futures.append(ppe.submit(_etree_to_dict, elem))
start_tag = None
root.clear()

for future in futures.as_completed(xml_dict_futures):
xml_dict = future.result()
product_update_futures.append(tpe.submit(_update_product, *(self.category, xml_dict)))

for fut in futures.as_completed(product_update_futures):
e = fut.exception()
print("success" if not e else e)

Это работает. Однако это вернуло меня к той же проблеме с памятью при работе с большими XML-файлами, в которых я ждал некоторое время, пока мой компьютер не выйдет из строя из-за нехватки оперативной памяти. Полагаю, это потому, что я сохраняю все в xml_dict_futures и Product_update_futures? Есть ли способ оптимизировать это, чтобы избежать этой проблемы?
Я пытался использовать промежуточную функцию и ThreadPoolExecutor.map, но, похоже, я делаю это неправильно, потому что он останавливается и ничего не показывает
def _queue_update(default_category, start_tag, root, event, elem):
if event == "start" and start_tag is None:
start_tag = elem.tag
if event == "end":
pickled_elem = etree.tostring(elem)
_update_product(default_category, pickled_elem)
start_tag = None
root.clear()

а затем
with futures.ThreadPoolExecutor(threads) as executor:
executor.map(
_queue_update, [(self.category, start_tag, root, event, elem) for event, elem in context]
)


Подробнее здесь: https://stackoverflow.com/questions/791 ... ing-python
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»