Если это так имеет значение, мы используем режим кластера с пряжей в качестве менеджера кластера. Мы также отправляем эти задания в виде контейнеров Docker.
Я пробовал следующие два подхода, но очень готов предположить, что я слишком много думаю. это... не кажется, что это должно быть так уж сложно сделать:
Код: Выделить всё
spark = init_spark()
do_some_stuff()
spark.metrics.peakMemoryUtilization()
Попытка 1 – пользовательский класс, расширяющий прослушиватель Spark JVM< /p>
Поигравшись с некоторыми предложениями Github CoPilot (я знаю), мне посоветовали использовать SparkListeners для этой задачи, однако я не добился успеха с этим подходом. Предлагаемый подход заключается в создании специального класса, который использует доступ JVM к задачам.
Код: Выделить всё
from pyspark.sql import SparkSession
from pyspark import SparkConf
from py4j.java_gateway import java_import
class MemoryUsageListener:
def __init__(self):
self.peak_memory_usage = 0
def on_task_end(self, task_end):
metrics = task_end.taskMetrics()
memory_used = metrics.peakExecutionMemory()
if memory_used > self.peak_memory_usage:
self.peak_memory_usage = memory_used
# Getter function for adding this to the SparkContext's Listeners
def get_listener(self, sc):
gw = sc._gateway
java_import(gw.jvm, "org.apache.spark.scheduler.SparkListener")
java_import(gw.jvm, "org.apache.spark.scheduler.SparkListenerTaskEnd")
class JavaListener(gw.jvm.SparkListener):
def __init__(self, parent):
super(gw.jvm.SparkListener, self).__init__()
self.parent = parent
def onTaskEnd(self, taskEnd):
self.parent.on_task_end(taskEnd)
return JavaListener(self)
def get_peak_memory_usage(self):
return self.peak_memory_usage
Код: Выделить всё
conf = SparkConf().setAppName("MemoryUsageTracker")
spark = SparkSession.builder.config(conf=conf).getOrCreate()
sc = spark.sparkContext
# Create and register the listener
memory_listener = MemoryUsageListener()
java_listener = memory_listener.get_listener(sc)
sc._jsc.sc().addSparkListener(java_listener)
... do some spark stuff here (run the actual job) ...
peak_memory_usage = memory_listener.get_peak_memory_usage()
write_to_file(peak_memory_usage)
Попытка 2 – вызовы API для Yarn
Хорошо, интуитивное решение — просто спросить Spark, что он делает, — не сработало. Еще одно предложение, которое я нашел, заключалось в том, чтобы использовать библиотеку запросов и обратиться к конечной точке API, чтобы получить подробную информацию об исполнителях.
Код: Выделить всё
import requests
from pyspark.sql import SparkSession
from pyspark import SparkConf
def get_executor_memory_metrics(app_id, spark_history_server_url):
response = requests.get(f"{spark_history_server_url}/api/v1/applications/{app_id}/executors")
if response.status_code != 200:
raise Exception(f"Failed to fetch executor metrics, status code: {response.status_code}")
executors = response.json()
return executors
def calculate_peak_memory_usage(executors):
# i'm not certain this is the correct dict access, it's what copilot generated, but it's not my current issue
peak_memory_usage = max(executor['peakMemoryMetrics']['JVMHeapMemory'] for executor in executors if 'peakMemoryMetrics' in executor)
return peak_memory_usage
sc = spark.sparkContext
app_id = sc.applicationId
master_url = sc.master
# I'm not convinced this logic is actually sound, but I don't have access to modify the cluster / yarn settings.
yarn_resource_manager_host = socket.getfqdn()
yarn_port = 18080
# At the end of the job
executors = get_executor_memory_metrics(app_id, f"{yarn_resource_manager_host}:{yarn_port}")
peak_memory_usage = calculate_peak_memory_usage(executors)
Подробнее здесь: https://stackoverflow.com/questions/788 ... -to-a-file
Мобильная версия