- Spark: 3.3.2 (пакет Cloudera SPARK3-3.3.2.3.3.7191000.0-78-1.p0.56279928)
- Python: 3.10
- PyJ4: 0.10.9.5
- Развертывание: YARN
- ОС: Linux
У меня есть сценарий PySpark, который использует concurrent.futures.ThreadPoolExecutor для загрузки нескольких спутниковые таблицы параллельно. Задание завершается успешно (все таблицы загружены, в журналах YARN ошибок нет), но скрипт никогда не возвращает управление оболочке — он зависает на неопределенный срок.
Когда я печатаю активные потоки в конце сценария, я вижу два потока демона, которые никогда не завершаются:
Код: Выделить всё
Thread: Thread-1, ID: 139902509782784, daemon: True
- File: /opt/cloudera/parcels/SPARK3-3.3.2.../lib/spark3/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py
Thread: Thread-2 (serve_forever), ID: 139902501390080, daemon: True
- File: /usr/product/python/python-3.10/lib/python3.10/selectors.py, Function: select, Line: 416
- Thread-1 — это клиентский поток шлюза PyJ4 ()
Код: Выделить всё
clientserver.py - Thread-2 — это сервер сокетов, на котором работает own_forever (через selectors.py:416), вероятно, также связанный с сервером обратного вызова PyJ4
Код: Выделить всё
def worker_function(spark, item):
# Performs some Spark operations on the given item
df = spark.sql(f"SELECT * FROM {item}")
df.write.mode('overwrite').saveAsTable(f"result_{item}")
def run_parallel_tasks(spark, items):
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
futures = {
executor.submit(worker_function, spark, item): item
for item in items
}
for future in concurrent.futures.as_completed(futures):
item = futures[future]
try:
future.result()
except Exception as e:
print(f"Error processing {item}: {e}")
raise e
Код: Выделить всё
# Called from main script
run_parallel_tasks(spark, items_list)
- — тема еще жива
Код: Выделить всё
spark.sparkContext._gateway.shutdown_callback_server() - — неприемлемо, контекст используется повторно
Код: Выделить всё
spark.stop() - — не влияет на потоки демона
Код: Выделить всё
executor.shutdown(wait=False) - — поток-2 не закрыт
Код: Выделить всё
spark.sparkContext._gateway._callback_server.server_close() - — работает, но обходит всю очистку, похоже на хак
Код: Выделить всё
os._exit(0)
Каков правильный способ полностью завершить сервер обратного вызова PyJ4 и связанный с ним поток сокетов serve_forever после использования ThreadPoolExecutor в PySpark, без вызова spark.stop() и не прибегая к os._exit(0)?
Известна ли это проблема с PyJ4 в режиме клиент-сервер в Cloudera/YARN? Есть ли четкая последовательность завершения работы, которую мне не хватает?
Подробнее: https://stackoverflow.com/questions/799 ... daemon-thr
Мобильная версия