Но есть одна проблема.
Я уже просмотрел текст
Это новый файл, который я создайте "JobRestarter.java"
Код: Выделить всё
package org.apache.hadoop.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.yarn.server.nodemanager.HadoopWorkloadPredictor;
public class JobRestarter {
public static void restartJobWithOptimizedConfig(Path[] inputPaths, Path outputPath, Job existingJob) throws Exception {
boolean isAlreadyOptimized = existingJob.getConfiguration().getBoolean("isOptimized", false);
if (isAlreadyOptimized) {
System.out.println("Job is already optimized. Skipping restart.");
return;
}
System.out.println("Killing the existing job...");
existingJob.killJob();
Configuration optimizedConf = new Configuration(existingJob.getConfiguration());
optimizedConf.setBoolean("isOptimized", true);
optimizedConf.set("mapreduce.job.user.name", existingJob.getUser());
optimizedConf.set("mapreduce.job.queuename", existingJob.getConfiguration().get("mapreduce.job.queuename", "default"));
optimizedConf.set("mapreduce.job.priority", existingJob.getPriority().name());
HadoopWorkloadPredictor predictor = new HadoopWorkloadPredictor();
String workloadType = predictor.collectDataAndPredict();
System.out.println("Detected workload type: " + workloadType);
optimizedConf = DynamicJobConfiguration.configureJob(workloadType, optimizedConf);
Job optimizedJob = Job.getInstance(optimizedConf, existingJob.getJobName());
// Mapper, Reducer, InputFormat, OutputFormat 설정
optimizedJob.setJarByClass(existingJob.getMapperClass());
optimizedJob.setMapperClass(existingJob.getMapperClass());
optimizedJob.setReducerClass(existingJob.getReducerClass());
optimizedJob.setInputFormatClass(existingJob.getInputFormatClass());
optimizedJob.setOutputFormatClass(existingJob.getOutputFormatClass());
FileSystem fs = FileSystem.get(optimizedConf);
if (fs.exists(outputPath)) {
System.out.println("Output path already exists. Deleting the existing output directory: " + outputPath);
fs.delete(outputPath, true);
}
for (Path inputPath : inputPaths) {
FileInputFormat.addInputPath(optimizedJob, inputPath);
}
FileOutputFormat.setOutputPath(optimizedJob, outputPath);
System.out.println("Restarting job with optimized parameters...");
try {
optimizedJob.submit();
if (optimizedJob.waitForCompletion(true)) {
System.out.println("Optimized job completed successfully.");
} else {
System.out.println("Optimized job did not complete successfully.");
}
} catch (Exception e) {
System.err.println("Exception during job completion: " + e.getMessage());
e.printStackTrace();
}
}
}
Код: Выделить всё
if (!report.equals(lastReport)) {
LOG.info(report);
lastReport = report;
if(mapProgress() > 0.00 && !this.getConfiguration().getBoolean("isRestarted", false)){
if (!this.getConfiguration().getBoolean("isOptimized", false)) {
System.out.println("Initial job submitted successfully. Analyzing workload and restarting with optimized parameters...");
try {
Path[] inputPaths = FileInputFormat.getInputPaths(this);
Path outputPath = FileOutputFormat.getOutputPath(this);
JobRestarter.restartJobWithOptimizedConfig(inputPaths, outputPath, this);
} catch (Exception e) {
System.err.println("Failed to restart the job with optimized configuration: " + e.getMessage());
e.printStackTrace();
}
}
}
введите здесь описание изображения
Код: Выделить всё
2024-10-30 11:50:33,972 INFO mapreduce.Job: map 12% reduce 0%
2024-10-30 11:50:37,123 INFO mapreduce.Job: map 0% reduce 0%
//when i forcefully kill a newly started Job
2024-10-30 11:50:37,126 INFO mapreduce.Job: Job job_1730256478452_0002 failed with state KILLED due to: Application application_1730256478452_0002 was killed by user dr.who
2024-10-30 11:50:37,134 INFO mapreduce.Job: Counters: 0
Exception during job completion: Optimized job failed to complete successfully.
java.lang.IllegalStateException: Optimized job failed to complete successfully.
at org.apache.hadoop.mapreduce.util.JobRestarter.restartJobWithOptimizedConfig(JobRestarter.java:80)
at org.apache.hadoop.mapreduce.Job.monitorAndPrintJob(Job.java:1655)
at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1589)
at org.apache.hadoop.examples.WordCount.main(WordCount.java:87)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.ProgramDriver$ProgramDescription.invoke(ProgramDriver.java:71)
at org.apache.hadoop.util.ProgramDriver.run(ProgramDriver.java:144)
at org.apache.hadoop.util.ProgramDriver.driver(ProgramDriver.java:152)
at org.apache.hadoop.examples.ExampleDriver.main(ExampleDriver.java:91)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:323)
at org.apache.hadoop.util.RunJar.main(RunJar.java:236)
Closing optimized job...
Optimized job is complete. Proceeding with resource cleanup.
Failed to restart the job with optimized configuration: Optimized job failed to complete successfully.
java.lang.IllegalStateException: Optimized job failed to complete successfully.
at org.apache.hadoop.mapreduce.util.JobRestarter.restartJobWithOptimizedConfig(JobRestarter.java:80)
at org.apache.hadoop.mapreduce.Job.monitorAndPrintJob(Job.java:1655)
at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1589)
at org.apache.hadoop.examples.WordCount.main(WordCount.java:87)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.ProgramDriver$ProgramDescription.invoke(ProgramDriver.java:71)
at org.apache.hadoop.util.ProgramDriver.run(ProgramDriver.java:144)
at org.apache.hadoop.util.ProgramDriver.driver(ProgramDriver.java:152)
at org.apache.hadoop.examples.ExampleDriver.main(ExampleDriver.java:91)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:323)
at org.apache.hadoop.util.RunJar.main(RunJar.java:236)
2024-10-30 11:50:38,137 INFO ipc.Client: Retrying connect to server: snowjeon2/127.0.1.1:35217. Already tried 0 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=3, sleepTime=1000 MILLISECONDS)
2024-10-30 11:50:39,137 INFO ipc.Client: Retrying connect to server: snowjeon2/127.0.1.1:35217. Already tried 1 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=3, sleepTime=1000 MILLISECONDS)
2024-10-30 11:50:40,138 INFO ipc.Client: Retrying connect to server: snowjeon2/127.0.1.1:35217. Already tried 2 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=3, sleepTime=1000 MILLISECONDS)
2024-10-30 11:50:40,240 INFO mapred.ClientServiceDelegate: Application state is completed. FinalApplicationStatus=KILLED. Redirecting to job history server
Я проанализировал код отправки задания в Hadoop и попробовал добавить несколько что-то, но это не сработало.
Мне было интересно, какой код мне добавить.
Подробнее здесь: https://stackoverflow.com/questions/791 ... figuration
Мобильная версия