Для этого я увидел, что искра предоставляет конфигурацию - spark.sql.execution.pyspark.udf.idleTimeoutSeconds.
Насколько я понимаю, исполнитель Python запускает рабочие процессы Python и отвечает за реализацию этого параметра. Бегун Python запускает рабочие процессы Python и продолжает проверять, превысили ли рабочие время ожидания простоя. Как только тайм-аут простоя превышен, рабочие уничтожаются. Но фабрика рабочих Python, которая создает рабочие процессы в исполнителях, имеет фиксированный тайм-аут в 60 секунд. Итак, если время простоя превышает 60 секунд, это не будет работать должным образом.
- Правильно ли я понимаю ситуацию?
- Если я хочу повторно использовать рабочие процессы Python после инициализации, как мне действовать?
Ссылка: https://books.japila.pl/pyspark-interna ... erFactory/, https://blog.devgenius.io/apache-spark- ... 591c5f32bf
Версия Spark: 3.5.2, режим развертывания: несколько рабочих процессов в DBx, тип UDF: Pandas UDF (от итератора к итератору)
Пример кода UDF pandas (ссылка)
model_bc = spark.sparkContext.broadcast(ml_model)
@pandas_udf("long")
def infer(batches: Iterator[pd.Series]) -> Iterator[pd.Series]:
model = model_bc.value # initialize model
for plaintexts in batches:
... processing by model ...
Подробнее здесь: https://stackoverflow.com/questions/798 ... init-set-u
Мобильная версия