Пространство кучи Java в PySpark при работе с использованием графического процессора RapidsPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Пространство кучи Java в PySpark при работе с использованием графического процессора Rapids

Сообщение Anonymous »

У меня возникла проблема при отправке Spark. Я не уверен, связана ли проблема с моим скриптом Spark или с моими ресурсами (но подозреваю, что это связано с ресурсами).
У меня 9 основных процессов:

[*]Функция для получения данных из API, последующего преобразования их в RDD и, наконец, в DataFrame Pandas (существует 7 API).
[*]Получите данные из API, используя функцию выше, а затем преобразуйте их. в Spark DataFrames (выбирается 7 наборов данных).
[*]Процесс A, который состоит из 3 подпроцессов: расчет затрат, запуск алгоритма и генерация результатов (каждый подпроцесс использует Spark SQL ).
[*]Процесс Б, состоящий из 3 подпроцессов: расчета затрат, выполнения алгоритма и генерации результатов (каждый подпроцесс использует Spark SQL).
[*]Процесс C, который состоит из 3-х подпроцессов: расчет затрат, запуск алгоритма и генерация результатов (каждый подпроцесс использует Spark SQL).
[*]Процесс D, состоящий из 3-х подпроцессов: расчет затрат , запуск алгоритма и генерация результатов (каждый подпроцесс использует Spark SQL).
[*]Процесс E, который состоит из 3 подпроцессов: расчет затрат, запуск алгоритма и генерация результатов. (каждый подпроцесс использует Spark SQL).
[*]Выполните объединение результатов из A, B, C, D и E.
[*]Отправьте результат объединения в API.
< /ol>
В процессе 2 я добавил .cache() в каждый spark.sql. Аналогично, в каждом подпроцессе процессов A, B, C, D и E я также добавил .cache() в каждый spark.sql. Запросы сложные, и для алгоритма, используемого в подпроцессе 2, существуют дополнительные пользовательские функции.
Пример кэширования в процессе 2:

Код: Выделить всё

get_first_api_df(API_LIMIT).createOrReplaceTempView("first_api")
first_ = spark.sql("""
SELECT fa.*
FROM first_api fa
INNER JOIN (...{}...{}...)
""".format(API_DATE_FORMAT, API_DATE_FORMAT)).cache()
Пример кэширования в процессах 3, 4, 5, 6 и 7:
Подпроцесс 1:

Код: Выделить всё

print("process query_cost_A")
cost_A = spark.sql(query_cost_A).cache()
cost_A.createOrReplaceTempView("cost_matrix_A")
Подпроцесс 2:

Код: Выделить всё

print("process solver_result_all_A")
solver_A = spark.sql(solver_result_all_A).cache()
solver_A.createOrReplaceTempView("solver_result_all_A")
Подпроцесс 3:

Код: Выделить всё

print("process db_solver_result_A")
db_solver_A = spark.sql(db_solver_result_A).cache()
db_solver_A.createOrReplaceTempView("db_solver_result_A")
Операция объединения извлекает данные из db_solver_result_A, db_solver_result_B, db_solver_result_C и т. д.
Проблема в том, что я постоянно сталкиваюсь с ошибкой кучи Java во время процесса D, особенно в первом подпроцессе. Процессы от A до C выполняются без сбоев, но во время процесса D происходит сбой.
Конфигурация сеанса Spark:

Код: Выделить всё

spark = SparkSession.builder \
.appName("Project - GPU - Cache") \
.config("spark.plugins", "com.nvidia.spark.SQLPlugin") \
.config("spark.rapids.sql.enabled", "true") \
.config("spark.rapids.sql.concurrentGpuTasks", "2") \
.config("spark.rapids.sql.hasExtendedYearValues", "false") \
.config("spark.dynamicAllocation.enabled", "true") \
.config("spark.shuffle.service.enabled", "true") \
.config("spark.dynamicAllocation.maxExecutors", "5") \
.config("spark.sql.crossJoin.enabled", "true") \
.config("spark.sql.shuffle.partitions", "200") \
.config("spark.memory.fraction", "0.8") \
.config("spark.memory.storageFraction", "0.2") \
.getOrCreate()
Команда отправки Spark:

Код: Выделить всё

time spark-submit --master local[*] \
--conf "spark.driver.memoryOverhead=4G" \
--conf "spark.executor.memoryOverhead=10G" \
--driver-memory 20G \
--executor-memory 40G \
--jars "/home/rapids-4-spark_2.12-24.06.1.jar" \
--conf spark.sql.session.timeZone=UTC \
--conf spark.rapids.sql.exec.InMemoryTableScanExec=true \
"/home/script_gpu_cache.py" >> /home/log.txt
Будем очень признательны за любые предложения и советы.
Я пробовал увеличить пространство кучи Java с помощью Xms и Xmx, но проблема не устранена. Я также пробовал удалить кеш в каждом процессе, но результат остался прежним. Я не уверен, следует ли мне увеличить память драйвера до 30 ГБ и память исполнителя до 60 ГБ или даже до 40 ГБ и 80 ГБ.

Подробнее здесь: https://stackoverflow.com/questions/792 ... rapids-gpu
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»