import java.io.IOException;
import java.util.StringTokenizer;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class InvertedIndex {
public static class Map
extends Mapper<Object, Text, Text, Text> {
private Text keyWord = new Text();
private Text valueDocCount = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit)context.getInputSplit();
String fileName = fileSplit.getPath().getName();
StringTokenizer itr = new StringTokenizer(value.toString());
while(itr.hasMoreTokens()) {
keyWord.set(itr.nextToken() + ":" + fileName);
valueDocCount.set("1");
context.write(keyWord, valueDocCount);
}
}
}
public static class InvertedIndexCombiner
extends Reducer<Text, Text, Text, Text> {
private Text wordCount = new Text();
private Text wordDoc = new Text();
public void reduce(Text key, Iterable<Text> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (Text value : values) {
sum += Integer.parseInt(value.toString());
}
int splitIndex = key.toString().indexOf(":");
int splitFileName = key.toString().indexOf(".txt.segmented");
wordDoc.set(key.toString().substring(0, splitIndex));
wordCount.set(key.toString().substring(splitIndex + 1, splitFileName) + ":" + sum);
context.write(wordDoc, wordCount);
}
}
public static class Reduce
extends Reducer<Text, Text, Text, Text> {
private Text temp = new Text();
public void reduce(Text key, Iterable<Text> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
int count = 0;
Iterator<Text> it = values.iterator();
StringBuilder all = new StringBuilder();
for(;it.hasNext();) {
count++;
temp.set(it.next());
all.append(temp.toString());
int splitIndex = temp.toString().indexOf(":");
temp.set(temp.toString().substring(splitIndex + 1));
sum += Integer.parseInt(temp.toString());
if (it.hasNext()) {
all.append(";");
}
}
float averageCount = (float)sum / (float)count;
FloatWritable average = new FloatWritable(averageCount);
all.insert(0, average.toString() + ",");
context.write(key, new Text(all.toString()));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Inverted Index");
job.setJarByClass(InvertedIndex.class);
job.setMapperClass(Map.class);
job.setCombinerClass(InvertedIndexCombiner.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}