Скрипт PySpark зависает после завершения задания — потоки демона ThreadPoolExecutor + PyJ4 никогда не завершаютсяPython

Программы на Python
Ответить
Anonymous
 Скрипт PySpark зависает после завершения задания — потоки демона ThreadPoolExecutor + PyJ4 никогда не завершаются

Сообщение Anonymous »


---

### Environment
- **Spark:** 3.3.2 (Cloudera parcel `SPARK3-3.3.2.3.3.7191000.0-78-1.p0.56279928`)
- **Python:** 3.10
- **PyJ4:** 0.10.9.5
- **Deployment:** YARN
- **OS:** Linux

---

### Problem

I have a PySpark script that uses `concurrent.futures.ThreadPoolExecutor` to load multiple satellite tables in parallel. The job completes successfully (all tables are loaded, YARN logs show no errors), but **the script never returns control to the shell** — it hangs indefinitely.

When I print active threads at the end of the script, I see two daemon threads that never terminate:


Тема: Thread-1, идентификатор: 139902509782784, демон: True
  • Файл: /opt/cloudera/parcels/SPARK3-3.3.2.../lib/spark3/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py
Тема: Thread-2 (serve_forever), ID: 139902501390080, демон: True
  • Файл: /usr/product/python/python-3.10/lib/python3.10/selectors.py, Функция: select, Строка: 416
- **Thread-1** is the PyJ4 gateway client thread (`clientserver.py`)
- **Thread-2** is a socket server running `serve_forever` (via `selectors.py:416`), likely also related to the PyJ4 callback server

---
CODE:

def worker_function(spark, item):
# Performs some Spark operations on the given item
df = spark.sql(f"SELECT * FROM {item}")
df.write.mode('overwrite').saveAsTable(f"result_{item}")

def run_parallel_tasks(spark, items):
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
futures = {
executor.submit(worker_function, spark, item): item
for item in items
}

for future in concurrent.futures.as_completed(futures):
item = futures[future]
try:
future.result()
except Exception as e:
print(f"Error processing {item}: {e}")
raise e

# Called from main script
run_parallel_tasks(spark, items_list)

---

### What I have already tried

1. `spark.sparkContext._gateway.shutdown_callback_server()` — thread still alive
2. `spark.stop()` — not acceptable, context is reused
3. `executor.shutdown(wait=False)` — no effect on daemon threads
4. `spark.sparkContext._gateway._callback_server.server_close()` — did not close Thread-2
5. `os._exit(0)` — works but bypasses all cleanup, feels like a hack

---

### Question

What is the correct way to fully terminate the PyJ4 callback server and its associated `serve_forever` socket thread after using `ThreadPoolExecutor` in PySpark, **without calling `spark.stop()`** and **without resorting to `os._exit(0)`**?

Is this a known issue with PyJ4 in `clientserver` mode under Cloudera/YARN? Is there a clean shutdown sequence I am missing?


Подробнее: https://stackoverflow.com/questions/799 ... daemon-thr
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»