---
### Environment
- **Spark:** 3.3.2 (Cloudera parcel `SPARK3-3.3.2.3.3.7191000.0-78-1.p0.56279928`)
- **Python:** 3.10
- **PyJ4:** 0.10.9.5
- **Deployment:** YARN
- **OS:** Linux
---
### Problem
I have a PySpark script that uses `concurrent.futures.ThreadPoolExecutor` to load multiple satellite tables in parallel. The job completes successfully (all tables are loaded, YARN logs show no errors), but **the script never returns control to the shell** — it hangs indefinitely.
When I print active threads at the end of the script, I see two daemon threads that never terminate:
Тема: Thread-1, идентификатор: 139902509782784, демон: True
- Файл: /opt/cloudera/parcels/SPARK3-3.3.2.../lib/spark3/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py
- Файл: /usr/product/python/python-3.10/lib/python3.10/selectors.py, Функция: select, Строка: 416
- **Thread-2** is a socket server running `serve_forever` (via `selectors.py:416`), likely also related to the PyJ4 callback server
---
CODE:
def worker_function(spark, item):
# Performs some Spark operations on the given item
df = spark.sql(f"SELECT * FROM {item}")
df.write.mode('overwrite').saveAsTable(f"result_{item}")
def run_parallel_tasks(spark, items):
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
futures = {
executor.submit(worker_function, spark, item): item
for item in items
}
for future in concurrent.futures.as_completed(futures):
item = futures[future]
try:
future.result()
except Exception as e:
print(f"Error processing {item}: {e}")
raise e
# Called from main script
run_parallel_tasks(spark, items_list)
---
### What I have already tried
1. `spark.sparkContext._gateway.shutdown_callback_server()` — thread still alive
2. `spark.stop()` — not acceptable, context is reused
3. `executor.shutdown(wait=False)` — no effect on daemon threads
4. `spark.sparkContext._gateway._callback_server.server_close()` — did not close Thread-2
5. `os._exit(0)` — works but bypasses all cleanup, feels like a hack
---
### Question
What is the correct way to fully terminate the PyJ4 callback server and its associated `serve_forever` socket thread after using `ThreadPoolExecutor` in PySpark, **without calling `spark.stop()`** and **without resorting to `os._exit(0)`**?
Is this a known issue with PyJ4 in `clientserver` mode under Cloudera/YARN? Is there a clean shutdown sequence I am missing?
Подробнее: https://stackoverflow.com/questions/799 ... daemon-thr
Мобильная версия