У меня 9 основных процессов:
[*]Функция для получения данных из API, последующего преобразования их в RDD и, наконец, в DataFrame Pandas (существует 7 API).
[*]Получите данные из API, используя функцию выше, а затем преобразуйте их. в Spark DataFrames (выбирается 7 наборов данных).
[*]Процесс A, который состоит из 3 подпроцессов: расчет затрат, запуск алгоритма и генерация результатов (каждый подпроцесс использует Spark SQL ).
[*]Процесс Б, состоящий из 3 подпроцессов: расчета затрат, выполнения алгоритма и генерации результатов (каждый подпроцесс использует Spark SQL).
[*]Процесс C, который состоит из 3-х подпроцессов: расчет затрат, запуск алгоритма и генерация результатов (каждый подпроцесс использует Spark SQL).
[*]Процесс D, состоящий из 3-х подпроцессов: расчет затрат , запуск алгоритма и генерация результатов (каждый подпроцесс использует Spark SQL).
[*]Процесс E, который состоит из 3 подпроцессов: расчет затрат, запуск алгоритма и генерация результатов. (каждый подпроцесс использует Spark SQL).
[*]Выполните объединение результатов из A, B, C, D и E.
[*]Отправьте результат объединения в API.
< /ol>
В процессе 2 я добавил .cache() в каждый spark.sql. Аналогично, в каждом подпроцессе процессов A, B, C, D и E я также добавил .cache() в каждый spark.sql. Запросы сложные, и для алгоритма, используемого в подпроцессе 2, существуют дополнительные пользовательские функции.
Пример кэширования в процессе 2:
Код: Выделить всё
get_first_api_df(API_LIMIT).createOrReplaceTempView("first_api")
first_ = spark.sql("""
SELECT fa.*
FROM first_api fa
INNER JOIN (...{}...{}...)
""".format(API_DATE_FORMAT, API_DATE_FORMAT)).cache()
Подпроцесс 1:
Код: Выделить всё
print("process query_cost_A")
cost_A = spark.sql(query_cost_A).cache()
cost_A.createOrReplaceTempView("cost_matrix_A")
Код: Выделить всё
print("process solver_result_all_A")
solver_A = spark.sql(solver_result_all_A).cache()
solver_A.createOrReplaceTempView("solver_result_all_A")
Код: Выделить всё
print("process db_solver_result_A")
db_solver_A = spark.sql(db_solver_result_A).cache()
db_solver_A.createOrReplaceTempView("db_solver_result_A")
Проблема в том, что я постоянно сталкиваюсь с ошибкой кучи Java во время процесса D, особенно в первом подпроцессе. Процессы от A до C выполняются без сбоев, но во время процесса D происходит сбой.
Конфигурация сеанса Spark:
Код: Выделить всё
spark = SparkSession.builder \
.appName("Project - GPU - Cache") \
.config("spark.plugins", "com.nvidia.spark.SQLPlugin") \
.config("spark.rapids.sql.enabled", "true") \
.config("spark.rapids.sql.concurrentGpuTasks", "2") \
.config("spark.rapids.sql.hasExtendedYearValues", "false") \
.config("spark.dynamicAllocation.enabled", "true") \
.config("spark.shuffle.service.enabled", "true") \
.config("spark.dynamicAllocation.maxExecutors", "5") \
.config("spark.sql.crossJoin.enabled", "true") \
.config("spark.sql.shuffle.partitions", "200") \
.config("spark.memory.fraction", "0.8") \
.config("spark.memory.storageFraction", "0.2") \
.getOrCreate()
Код: Выделить всё
time spark-submit --master local[*] \
--conf "spark.driver.memoryOverhead=4G" \
--conf "spark.executor.memoryOverhead=10G" \
--driver-memory 20G \
--executor-memory 40G \
--jars "/home/rapids-4-spark_2.12-24.06.1.jar" \
--conf spark.sql.session.timeZone=UTC \
--conf spark.rapids.sql.exec.InMemoryTableScanExec=true \
"/home/script_gpu_cache.py" >> /home/log.txt
Я пробовал увеличить пространство кучи Java с помощью Xms и Xmx, но проблема не устранена. Я также пробовал удалить кеш в каждом процессе, но результат остался прежним. Я не уверен, следует ли мне увеличить память драйвера до 30 ГБ и память исполнителя до 60 ГБ или даже до 40 ГБ и 80 ГБ.
Подробнее здесь: https://stackoverflow.com/questions/792 ... rapids-gpu