Скрипт PySpark зависает после завершения задания — потоки демона ThreadPoolExecutor + PyJ4 никогда не завершаютсяPython

Программы на Python
Ответить
Anonymous
 Скрипт PySpark зависает после завершения задания — потоки демона ThreadPoolExecutor + PyJ4 никогда не завершаются

Сообщение Anonymous »

Окружающая среда
  • Spark: 3.3.2 (пакет Cloudera SPARK3-3.3.2.3.3.7191000.0-78-1.p0.56279928)
  • Python: 3.10
  • PyJ4: 0.10.9.5
  • Развертывание: YARN
  • ОС: Linux
Проблема
У меня есть сценарий PySpark, который использует concurrent.futures.ThreadPoolExecutor для загрузки нескольких спутниковые таблицы параллельно. Задание завершается успешно (все таблицы загружены, в журналах YARN ошибок нет), но скрипт никогда не возвращает управление оболочке — он зависает на неопределенный срок.
Когда я печатаю активные потоки в конце сценария, я вижу два потока демона, которые никогда не завершаются:

Код: Выделить всё

Thread: Thread-1, ID: 139902509782784, daemon: True
- File: /opt/cloudera/parcels/SPARK3-3.3.2.../lib/spark3/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py

Thread: Thread-2 (serve_forever), ID: 139902501390080, daemon: True
- File: /usr/product/python/python-3.10/lib/python3.10/selectors.py, Function: select, Line: 416
  • Thread-1 — это клиентский поток шлюза PyJ4 (

    Код: Выделить всё

    clientserver.py
    )
  • Thread-2 — это сервер сокетов, на котором работает own_forever (через selectors.py:416), вероятно, также связанный с сервером обратного вызова PyJ4
Код

Код: Выделить всё

def worker_function(spark, item):
# Performs some Spark operations on the given item
df = spark.sql(f"SELECT * FROM {item}")
df.write.mode('overwrite').saveAsTable(f"result_{item}")

def run_parallel_tasks(spark, items):
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
futures = {
executor.submit(worker_function, spark, item): item
for item in items
}

for future in concurrent.futures.as_completed(futures):
item = futures[future]
try:
future.result()
except Exception as e:
print(f"Error processing {item}: {e}")
raise e

Код: Выделить всё

# Called from main script
run_parallel_tasks(spark, items_list)
То, что я уже пробовал
  • Код: Выделить всё

    spark.sparkContext._gateway.shutdown_callback_server()
    — тема еще жива
  • Код: Выделить всё

    spark.stop()
    — неприемлемо, контекст используется повторно
  • Код: Выделить всё

    executor.shutdown(wait=False)
    — не влияет на потоки демона
  • Код: Выделить всё

    spark.sparkContext._gateway._callback_server.server_close()
    — поток-2 не закрыт
  • Код: Выделить всё

    os._exit(0)
    — работает, но обходит всю очистку, похоже на хак
Вопрос
Каков правильный способ полностью завершить сервер обратного вызова PyJ4 и связанный с ним поток сокетов serve_forever после использования ThreadPoolExecutor в PySpark, без вызова spark.stop() и не прибегая к os._exit(0)?
Известна ли это проблема с PyJ4 в режиме клиент-сервер в Cloudera/YARN? Есть ли четкая последовательность завершения работы, которую мне не хватает?

Подробнее: https://stackoverflow.com/questions/799 ... daemon-thr
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»