Многопроцессорность Python больше не работает после перехода с Airflow 2 на Airflow 3Python

Программы на Python
Ответить
Anonymous
 Многопроцессорность Python больше не работает после перехода с Airflow 2 на Airflow 3

Сообщение Anonymous »

В настоящее время я переношу свои процессы и группы обеспечения доступности баз данных с Airflow 2 на Airflow 3. При этом я сталкиваюсь с проблемой, заключающейся в том, что группа обеспечения доступности баз данных, выполняющая анализатор XML, зависает после завершения своей работы. В результате эта Задача всегда остается в рабочем режиме. При устранении неполадок мне удалось сузить проблему до многопроцессорности Python. До сих пор я пробовал несколько решений, но не нашел решения. Я объясню эти шаги в следующем вопросе.
Исходная ситуация
В настоящее время я использую виртуальную машину разработки Oracle Linux 9, на которой установлен Airflow 3. Теперь на этой машине должен быть запущен в работу даг, вызывающий синтаксический анализатор XML. Этот парсер содержит следующую многопроцессорную обработку (упрощенную)
parse_xml.py

Код: Выделить всё

def parse_xml(output_folder: str, input_folder: str, config_path: str  = None):

#decleration of all parameters

logger.info('Start processing of XMLs (Multiprocessing)')
with Manager() as manager:
ctx = get_context()
lock = manager.Lock()
with ctx.Pool(processes=pool_size) as pool:
logger.info("Start of XML Parsing")
for _ in pool.imap_unordered(
xml_reader.multiprocessing_xml_handler,
[(chunk, table_query, csv_path, lock, threshhold_processes) for chunk in chunks],
): pass
logger.info("Finished processing the XML data")
Эта функция вызывается внутри задачи или в целях тестирования через main.py. Метод xml_reader.multiprocessing_xml_handler инициализируется внутри собственного класса в собственном модуле.
xml_read.py

Код: Выделить всё

class xml_reader:
@staticmethod
def multiprocessing_xml_handler(args):
chunk, table_query, csv_path, lock, threshhold_processes = args
chunk_xml_dataFrames = {}
for xml in chunk:
try:
xml_dataFrames = xml_reader.process_xml(
xml, table_query, csv_path, lock
)
for csv_file in xml_dataFrames.keys():
result_dict = xml_dataFrames[csv_file]
if csv_file in chunk_xml_dataFrames:
tmp_dict = chunk_xml_dataFrames[csv_file]
chunk_xml_dataFrames[csv_file] = ToolBox.dict_extend(
result_dict, tmp_dict
)
else:
chunk_xml_dataFrames[csv_file] = result_dict

except Exception as e:
logger.error(f"Error:{e} while processing: {xml}")

ToolBox.flush_to_csv(chunk_xml_dataFrames,lock)
logger.warning("I flushed")
Этот метод вызывается многопроцессорной обработкой и преобразует фрагменты XML в формат CSV. Также между процессами не происходит обмена результатами обработки.
Попытки устранения неполадок
  • В Airflow 3 есть различные исполнители. Я пробовал последовательный и локальный. Последовательный и локальный режимы гарантируют, что группа обеспечения доступности баз данных запускается, но остается в режиме выполнения. К сожалению, мы вынуждены использовать LocalExecutor из-за инфраструктуры.
  • Я пытался настроить программу Python и Airflow 3 на один и тот же метод создания процессов. Точнее, я использовал три регулировочных винта. Во-первых, airflow.cfg с mp_start_method и Execute_tasks_new_python_interpreter. Кроме того, я перевел свой скрипт на такую ​​версию, используя get_context в Python. Я протестировал следующие комбинации.
    введите сюда описание изображения.
    Во время разветвления Dag остается в рабочем режиме, а при его создании программа продолжает зависать.
  • Парсер тестировался на двух системах: Windows и Linux. Парсер работал без проблем на обеих системах. Только когда Airlfow использовался для обработки данных на компьютере с Linux, DAG переставал работать.
  • После того, как многопроцессорность Python была удалена и заменена последовательным выполнением, DAG работал без каких-либо проблем.
    Завершение
    Я читал на некоторых форумах, что Airlfow перегружен сигналами процесса и поэтому неправильно перехватывает код прерывания. Возможно, у вас есть идеи или объяснения по этому поводу?
    Есть ли другие способы более точно определить проблему?
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»