Запишите пиковое использование памяти заданием Pyspark на EMR в файл.JAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Запишите пиковое использование памяти заданием Pyspark на EMR в файл.

Сообщение Anonymous »

Мы выполняем множество заданий Pyspark на EMR. Выполняемый конвейер тот же, но входные данные могут сильно изменить пиковое использование памяти, и это использование со временем растет. Я хотел бы автоматически записывать пиковое использование памяти для каждого шага, отправленного в кластер EMR — по сути, «вы использовали 10 ТБ ОЗУ на пике использования» (да, это обычное дело для наших работ). Наши задания не ограничиваются процессором или какими-либо другими показателями, меня волнует только память, хотя, если запись всех совокупных показателей упрощает подход, я открыт для этого.
Если это так имеет значение, мы используем режим кластера с пряжей в качестве менеджера кластера. Мы также отправляем эти задания в виде контейнеров Docker.
Я пробовал следующие два подхода, но очень готов предположить, что я слишком много думаю. это... не кажется, что это должно быть так уж сложно сделать:

Код: Выделить всё

spark = init_spark()

do_some_stuff()

spark.metrics.peakMemoryUtilization()

Поэтому, если я что-то упустил, пожалуйста, сообщите мне.
Попытка 1 – пользовательский класс, расширяющий прослушиватель Spark JVM< /p>
Поигравшись с некоторыми предложениями Github CoPilot (я знаю), мне посоветовали использовать SparkListeners для этой задачи, однако я не добился успеха с этим подходом. Предлагаемый подход заключается в создании специального класса, который использует доступ JVM к задачам.

Код: Выделить всё

from pyspark.sql import SparkSession
from pyspark import SparkConf
from py4j.java_gateway import java_import

class MemoryUsageListener:
def __init__(self):
self.peak_memory_usage = 0

def on_task_end(self, task_end):
metrics = task_end.taskMetrics()
memory_used = metrics.peakExecutionMemory()
if memory_used > self.peak_memory_usage:
self.peak_memory_usage = memory_used

# Getter function for adding this to the SparkContext's Listeners
def get_listener(self, sc):
gw = sc._gateway
java_import(gw.jvm, "org.apache.spark.scheduler.SparkListener")
java_import(gw.jvm, "org.apache.spark.scheduler.SparkListenerTaskEnd")

class JavaListener(gw.jvm.SparkListener):
def __init__(self, parent):
super(gw.jvm.SparkListener, self).__init__()
self.parent = parent

def onTaskEnd(self, taskEnd):
self.parent.on_task_end(taskEnd)

return JavaListener(self)

def get_peak_memory_usage(self):
return self.peak_memory_usage
Настройка примерно такая:

Код: Выделить всё

conf = SparkConf().setAppName("MemoryUsageTracker")
spark = SparkSession.builder.config(conf=conf).getOrCreate()
sc = spark.sparkContext

# Create and register the listener
memory_listener = MemoryUsageListener()
java_listener = memory_listener.get_listener(sc)
sc._jsc.sc().addSparkListener(java_listener)

... do some spark stuff here (run the actual job) ...

peak_memory_usage = memory_listener.get_peak_memory_usage()
write_to_file(peak_memory_usage)

Я много экспериментировал с приведенным выше предложением, но столкнулся со следующим: JavaClass.__init__() «принимает 3 позиционных аргумента, но было задано 4», когда класс инициализируется, в частности он ненавидит класс JavaListener(gw.jvm.SparkListener). Я попытался переместить суперобъявление, вынеся класс Java в совершенно отдельную функцию, без сигары. Это кажется интуитивно понятным, но, к сожалению, я не понимаю, как работает этот ООП-подход.
Попытка 2 – вызовы API для Yarn
Хорошо, интуитивное решение — просто спросить Spark, что он делает, — не сработало. Еще одно предложение, которое я нашел, заключалось в том, чтобы использовать библиотеку запросов и обратиться к конечной точке API, чтобы получить подробную информацию об исполнителях.

Код: Выделить всё

import requests
from pyspark.sql import SparkSession
from pyspark import SparkConf

def get_executor_memory_metrics(app_id, spark_history_server_url):
response = requests.get(f"{spark_history_server_url}/api/v1/applications/{app_id}/executors")
if response.status_code != 200:
raise Exception(f"Failed to fetch executor metrics, status code: {response.status_code}")
executors = response.json()
return executors

def calculate_peak_memory_usage(executors):
# i'm not certain this is the correct dict access, it's what copilot generated, but it's not my current issue
peak_memory_usage = max(executor['peakMemoryMetrics']['JVMHeapMemory'] for executor in executors if 'peakMemoryMetrics' in executor)
return peak_memory_usage

sc = spark.sparkContext
app_id = sc.applicationId

master_url = sc.master
# I'm not convinced this logic is actually sound, but I don't have access to modify the cluster / yarn settings.
yarn_resource_manager_host = socket.getfqdn()
yarn_port = 18080

# At the end of the job
executors = get_executor_memory_metrics(app_id, f"{yarn_resource_manager_host}:{yarn_port}")
peak_memory_usage = calculate_peak_memory_usage(executors)
И очевидно, что приведенный выше код будет упакован в поток для зацикливания и постоянного мониторинга пикового использования - однако, прежде чем перейти к этому фрагменту, я на самом деле не могу получить часть запроса работаю с постоянной ошибкой «Отказ в соединении». Я подтвердил, что он правильно получает IP-адрес кластера и идентификатор приложения и вводит URL-адрес, хотя я не могу поделиться ими здесь — я не уверен, что номер порта правильный, но я не уверен, есть ли лучший способ. чтобы получить к нему доступ во время выполнения.

Подробнее здесь: https://stackoverflow.com/questions/788 ... -to-a-file
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»