У меня есть большой фрейм данных PySpark, содержащий 250 миллионов строк и всего 2 столбца. Я использую код minHash, найденный здесь. Я попытался записать полученный фрейм данных в файлы паркета с помощью adj_sdf.write.mode("append").parquet("/output/folder/"). Однако я продолжал получать сообщение об ошибке: использование эфемерного локального хранилища модуля превышает общий лимит контейнеров. Я не могу увеличить ресурсы кластера, поэтому мне интересно, есть ли способы вместо этого оптимизировать код PySpark.
На данный момент я сделал следующее:
- Разбейте кадр данных перед запуском функции minHash: sdf = sdf.repartition(200)
- Отфильтруйте пары, которые вряд ли поделятся большим количеством хеша значения перед последним шагом, который включает в себя два соединения (
Код: Выделить всё
hash_sdf.alias('a').join(...)):
filtered_sdf = hash_sdf.filter(f.size(f.col('nodeSet')) > threshold)
, где порог = int(0,2 * n_draws)
- Установите количество разделов в случайном порядке: spark.conf.set("spark.sql.shuffle.partitions ", "200")
Что еще я могу сделать, чтобы записать фрейм данных в файлы паркета, не столкнувшись с проблемами с ресурсами?
Подробнее здесь:
https://stackoverflow.com/questions/792 ... -resources