Карта уменьшить ошибку программы для структуры Top-KJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Карта уменьшить ошибку программы для структуры Top-K

Сообщение Anonymous »

Я получаю ошибку в своей программе Maven на основе карты, так что я не могу получить ничего в моем редукторе, который имеет только один экземпляр для структуры Top-K. Оператор печати в Mapper отлично работает, но из Reducer ничего не напечатано.

Код: Выделить всё

package it.my.bigdata.hadoop.lab;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapreduce.Counter;

/**
* MapReduce program
*/
public class DriverBigData extends Configured implements Tool {

@Override
public int run(String[] args) throws Exception {

int exitCode = 0;

//Change the following part of the code

Path inputPath;
Path outputDir;
int numberOfReducers;

// Parse the parameters
numberOfReducers = 1;
inputPath = new Path(args[0]);
outputDir = new Path(args[1]);

Configuration conf = this.getConf();

// Define a new job
Job job = Job.getInstance(conf);
FileSystem fs = FileSystem.get(conf);
// Delete the output folder if it exists
if (fs.exists(outputDir)) {
fs.delete(outputDir, true);  // true to delete recursively
System.out.println("Deleted existing output directory: " + outputDir);
}

// Assign a name to the job
job.setJobName("Lab - Skeleton");

// Set path of the input file/folder (if it is a folder, the job reads all the files in the specified folder) for this job
FileInputFormat.addInputPath(job, inputPath);

// Set path of the output folder for this job
FileOutputFormat.setOutputPath(job, outputDir);

// Specify the class of the Driver for this job
job.setJarByClass(DriverBigData.class);

// Set job input format
job.setInputFormatClass(TextInputFormat.class);

// Set job output format
job.setOutputFormatClass(TextOutputFormat.class);

// Set map class
job.setMapperClass(MapperBigData.class);

// Set map output key and value classes
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

// Set reduce class
job.setReducerClass(ReducerBigData.class);

// Set reduce output key and value classes
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

// Set number of reducers
job.setNumReduceTasks(numberOfReducers);

// Execute the job and wait for completion
if (job.waitForCompletion(true)==true)
exitCode=0;
else
exitCode=1;

return exitCode;

}

/** Main of the driver
*/

public static void main(String args[]) throws Exception {
// Exploit the ToolRunner class to "configure" and run the Hadoop application
int res = ToolRunner.run(new Configuration(), new DriverBigData(), args);

System.exit(res);
}
}

mapper

Код: Выделить всё

package it.my.bigdata.hadoop.lab;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;

import com.google.common.collect.Comparators;

/**
* Lab  - Mapper
*/

/* Set the proper data types for the (key,value) pairs */
class MapperBigData extends Mapper<
LongWritable, // Input key type
Text,         // Input value type
Text,         // Output key type
Text> {// Output value type

private int k = 5;
private PriorityQueue
  localTopk;

@Override
protected void setup(Context context) throws IOException, InterruptedException {
localTopk = new PriorityQueue(k, new Comparator() {
@Override
public int compare(Pair p1, Pair p2) {
// Compare based on the count value, this will sort the Pair in ascending order
return Integer.compare(p1.count, p2.count); // For min-heap behavior
}
});

}

protected void map(
LongWritable key,   // Input key type
Text value,         // Input value type
Context context) throws IOException, InterruptedException {

String[] parts = value.toString().split("\t");
String word = parts[0].trim();
int count = Integer.parseInt(parts[1].trim());
// System.out.println(word + ": " + count);

localTopk.offer(new Pair(word, count));

if (localTopk.size() > k) {
localTopk.poll();
}
}

@Override
protected void cleanup(Context context) throws IOException, InterruptedException  {
// for (Pair pair : localTopk){
//
// }
for (Pair pair : localTopk){
context.write(new Text("1"), new Text(pair.word + "\t" + pair.count));
System.out.println(pair.word + "->  " + pair.count);
}
}

// Helper class to store word and its count
public static class Pair {
String word;
int count;

Pair(String word, int count) {
this.word = word;
this.count = count;
}
}
}

Reducer
package it.my.bigdata.hadoop.lab;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;

import java.io.IOException;
import java.util.*;
/**
* Lab - Reducer
*/

/* Set the proper data types for the (key,value) pairs */
class ReducerBigData extends Reducer<
Text, // Input key type
Text, // Input value type
Text, // Output key type
IntWritable> { // Output value type
private int K = 10; // Define top K
private PriorityQueue
globalTopK; // Min-heap for global top K

@Override
protected void setup(Context context) throws IOException, InterruptedException {
// Initialize the priority queue (min-heap) for top K with a custom comparator
globalTopK = new PriorityQueue(K, new Comparator() {
@Override
public int compare(Pair p1, Pair p2) {
// Min-heap, compare based on the count field
return Integer.compare(p1.count, p2.count);
}
});
}

@Override
public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
// Merge all values from all mappers (local top K) and compute the global top K
System.out.println("Receiving in reducer. ");
for (Text value : values) {

String[] parts = value.toString().split("\t");
String word = parts[0].trim();
int count = Integer.parseInt(parts[1].trim());
System.out.println(word + ": " + count);
// Add the (word, count) pair to the global top K
globalTopK.offer(new Pair(word, count));

// If the heap exceeds K size, remove the smallest element
if (globalTopK.size() > K) {
globalTopK.poll();
}
}
}

@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
// Emit the global top K results
List result = new ArrayList();
while (!globalTopK.isEmpty()) {
result.add(globalTopK.poll());
}

// Sort in descending order (because the min-heap gives the smallest element first)
Collections.reverse(result);

// Write the top K results
for (Pair pair : result) {
context.write(new Text(pair.word), new IntWritable(pair.count));
}
}

// Helper class to store word and its count
public static class Pair {
String word;
int count;

Pair(String word, int count) {
this.word = word;
this.count = count;
}
}
}

< /code>
Я пробовал все, что мог, изменив ключ на Nullwrable или на 1 (постоянный). но я ничего не вижу в редукторе.


Подробнее здесь: https://stackoverflow.com/questions/794 ... -structure
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Карта уменьшить ошибку программы для структуры Top-K
    Anonymous » » в форуме JAVA
    0 Ответы
    3 Просмотры
    Последнее сообщение Anonymous
  • Почему window.top.parent не является нулевым и почему window.top.parent - это window.top?
    Anonymous » » в форуме Javascript
    0 Ответы
    60 Просмотры
    Последнее сообщение Anonymous
  • Как на самом деле работают свойства CSS «scroll-margin-top» и «scroll-padding-top»
    Гость » » в форуме CSS
    0 Ответы
    81 Просмотры
    Последнее сообщение Гость
  • Как на самом деле работают свойства CSS «scroll-margin-top» и «scroll-padding-top»
    Anonymous » » в форуме CSS
    0 Ответы
    48 Просмотры
    Последнее сообщение Anonymous
  • Unique_Ptr структуры push_back в вектор другой структуры создает ошибку нарушения чтения
    Anonymous » » в форуме C++
    0 Ответы
    7 Просмотры
    Последнее сообщение Anonymous

Вернуться в «JAVA»