Конфигурация задания HadoopJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Конфигурация задания Hadoop

Сообщение Anonymous »

Я хочу создать новую архитектуру для Hadoop.
Но есть одна проблема.
Я уже просмотрел текст
Это новый файл, который я создайте "JobRestarter.java"

Код: Выделить всё

package org.apache.hadoop.mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.yarn.server.nodemanager.HadoopWorkloadPredictor;

public class JobRestarter {

public static void restartJobWithOptimizedConfig(Path[] inputPaths, Path outputPath, Job existingJob) throws Exception {
boolean isAlreadyOptimized = existingJob.getConfiguration().getBoolean("isOptimized", false);
if (isAlreadyOptimized) {
System.out.println("Job is already optimized. Skipping restart.");
return;
}

System.out.println("Killing the existing job...");
existingJob.killJob();

Configuration optimizedConf = new Configuration(existingJob.getConfiguration());
optimizedConf.setBoolean("isOptimized", true);

optimizedConf.set("mapreduce.job.user.name", existingJob.getUser());
optimizedConf.set("mapreduce.job.queuename", existingJob.getConfiguration().get("mapreduce.job.queuename", "default"));
optimizedConf.set("mapreduce.job.priority", existingJob.getPriority().name());

HadoopWorkloadPredictor predictor = new HadoopWorkloadPredictor();
String workloadType = predictor.collectDataAndPredict();
System.out.println("Detected workload type: " + workloadType);
optimizedConf = DynamicJobConfiguration.configureJob(workloadType, optimizedConf);

Job optimizedJob = Job.getInstance(optimizedConf, existingJob.getJobName());

// Mapper, Reducer, InputFormat, OutputFormat 설정
optimizedJob.setJarByClass(existingJob.getMapperClass());
optimizedJob.setMapperClass(existingJob.getMapperClass());
optimizedJob.setReducerClass(existingJob.getReducerClass());
optimizedJob.setInputFormatClass(existingJob.getInputFormatClass());
optimizedJob.setOutputFormatClass(existingJob.getOutputFormatClass());

FileSystem fs = FileSystem.get(optimizedConf);
if (fs.exists(outputPath)) {
System.out.println("Output path already exists. Deleting the existing output directory: " + outputPath);
fs.delete(outputPath, true);
}

for (Path inputPath : inputPaths) {
FileInputFormat.addInputPath(optimizedJob, inputPath);
}
FileOutputFormat.setOutputPath(optimizedJob, outputPath);

System.out.println("Restarting job with optimized parameters...");

try {
optimizedJob.submit();
if (optimizedJob.waitForCompletion(true)) {
System.out.println("Optimized job completed successfully.");
} else {
System.out.println("Optimized job did not complete successfully.");
}
} catch (Exception e) {
System.err.println("Exception during job completion: " + e.getMessage());
e.printStackTrace();
}
}
}
Это код, который убивает существующее задание и перезапускает его с новым параметром, но когда оно завершается или его принудительно завершают, он продолжает попытки перезапустить его.Я поместил это в файл Job.java вот так

Код: Выделить всё

if (!report.equals(lastReport)) {
LOG.info(report);
lastReport = report;
if(mapProgress() > 0.00 && !this.getConfiguration().getBoolean("isRestarted", false)){
if (!this.getConfiguration().getBoolean("isOptimized", false)) {
System.out.println("Initial job submitted successfully.  Analyzing workload and restarting with optimized parameters...");
try {
Path[] inputPaths = FileInputFormat.getInputPaths(this);
Path outputPath = FileOutputFormat.getOutputPath(this);
JobRestarter.restartJobWithOptimizedConfig(inputPaths, outputPath, this);
} catch (Exception e) {
System.err.println("Failed to restart the job with optimized configuration: " + e.getMessage());
e.printStackTrace();
}
}
}
И вот что происходит, когда я принудительно завершаю только что начатое задание.
введите здесь описание изображения

Код: Выделить всё

2024-10-30 11:50:33,972 INFO mapreduce.Job:  map 12% reduce 0%
2024-10-30 11:50:37,123 INFO mapreduce.Job:  map 0% reduce 0%
//when i forcefully kill a newly started Job
2024-10-30 11:50:37,126 INFO mapreduce.Job: Job job_1730256478452_0002 failed with state KILLED due to: Application application_1730256478452_0002 was killed by user dr.who
2024-10-30 11:50:37,134 INFO mapreduce.Job: Counters: 0
Exception during job completion: Optimized job failed to complete successfully.
java.lang.IllegalStateException: Optimized job failed to complete successfully.
at org.apache.hadoop.mapreduce.util.JobRestarter.restartJobWithOptimizedConfig(JobRestarter.java:80)
at org.apache.hadoop.mapreduce.Job.monitorAndPrintJob(Job.java:1655)
at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1589)
at org.apache.hadoop.examples.WordCount.main(WordCount.java:87)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.ProgramDriver$ProgramDescription.invoke(ProgramDriver.java:71)
at org.apache.hadoop.util.ProgramDriver.run(ProgramDriver.java:144)
at org.apache.hadoop.util.ProgramDriver.driver(ProgramDriver.java:152)
at org.apache.hadoop.examples.ExampleDriver.main(ExampleDriver.java:91)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:323)
at org.apache.hadoop.util.RunJar.main(RunJar.java:236)
Closing optimized job...
Optimized job is complete. Proceeding with resource cleanup.
Failed to restart the job with optimized configuration: Optimized job failed to complete successfully.
java.lang.IllegalStateException: Optimized job failed to complete successfully.
at org.apache.hadoop.mapreduce.util.JobRestarter.restartJobWithOptimizedConfig(JobRestarter.java:80)
at org.apache.hadoop.mapreduce.Job.monitorAndPrintJob(Job.java:1655)
at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1589)
at org.apache.hadoop.examples.WordCount.main(WordCount.java:87)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.ProgramDriver$ProgramDescription.invoke(ProgramDriver.java:71)
at org.apache.hadoop.util.ProgramDriver.run(ProgramDriver.java:144)
at org.apache.hadoop.util.ProgramDriver.driver(ProgramDriver.java:152)
at org.apache.hadoop.examples.ExampleDriver.main(ExampleDriver.java:91)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:323)
at org.apache.hadoop.util.RunJar.main(RunJar.java:236)
2024-10-30 11:50:38,137 INFO ipc.Client: Retrying connect to server: snowjeon2/127.0.1.1:35217. Already tried 0 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=3, sleepTime=1000 MILLISECONDS)
2024-10-30 11:50:39,137 INFO ipc.Client: Retrying connect to server: snowjeon2/127.0.1.1:35217.  Already tried 1 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=3, sleepTime=1000 MILLISECONDS)
2024-10-30 11:50:40,138 INFO ipc.Client: Retrying connect to server: snowjeon2/127.0.1.1:35217. Already tried 2 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=3, sleepTime=1000 MILLISECONDS)
2024-10-30 11:50:40,240 INFO mapred.ClientServiceDelegate: Application state is completed. FinalApplicationStatus=KILLED. Redirecting to job history server

Есть ли какой-нибудь код, который я пропустил при отправке задания??
Я проанализировал код отправки задания в Hadoop и попробовал добавить несколько что-то, но это не сработало.
Мне было интересно, какой код мне добавить.

Подробнее здесь: https://stackoverflow.com/questions/791 ... figuration
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»