import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.StringTokenizer;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.io.*;
import org.apache.hadoop.hbase.util.Bytes;
public class InvertedIndexHbase {
public static void createHBaseTable(Configuration conf, String tablename) throws IOException {
HBaseAdmin admin = new HBaseAdmin(conf);
if (admin.tableExists(tablename)) {
System.out.println("table exits, Trying recreate table!");
admin.disableTable(tablename);
admin.deleteTable(tablename);
}
HTableDescriptor htd = new HTableDescriptor(tablename);
HColumnDescriptor col = new HColumnDescriptor("content");
htd.addFamily(col);
System.out.println("Create new table: " + tablename);
admin.createTable(htd);
}
public static class Map
extends Mapper<Object, Text, Text, Text> {
private Text keyWord = new Text();
private Text valueDocCount = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit)context.getInputSplit();
String fileName = fileSplit.getPath().getName();
StringTokenizer itr = new StringTokenizer(value.toString());
while(itr.hasMoreTokens()) {
keyWord.set(itr.nextToken() + ":" + fileName);
valueDocCount.set("1");
context.write(keyWord, valueDocCount);
}
}
}
public static class InvertedIndexCombiner
extends Reducer<Text, Text, Text, Text> {
private Text wordCount = new Text();
private Text wordDoc = new Text();
public void reduce(Text key, Iterable<Text> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (Text value : values) {
sum += Integer.parseInt(value.toString());
}
int splitIndex = key.toString().indexOf(":");
wordDoc.set(key.toString().substring(0, splitIndex));
wordCount.set(sum + "");
context.write(wordDoc, wordCount);
}
}
public static class Reduce
extends TableReducer<Text, Text, NullWritable> {
private Text temp = new Text();
public void reduce(Text key, Iterable<Text> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
int count = 0;
Iterator<Text> it = values.iterator();
for(;it.hasNext();) {
count++;
temp.set(it.next());
sum += Integer.parseInt(temp.toString());
}
float averageCount = (float)sum / (float)count;
FloatWritable average = new FloatWritable(averageCount);
Put put = new Put(Bytes.toBytes(key.toString()));
put.add(Bytes.toBytes("content"), Bytes.toBytes("average"), Bytes.toBytes(average.toString()));
context.write(NullWritable.get(), put);
}
}
public static void main(String[] args) throws Exception {
String tablename = "Wuxia";
Configuration conf = HBaseConfiguration.create();
conf.set(TableOutputFormat.OUTPUT_TABLE, tablename);
createHBaseTable(conf, tablename);
Job job = Job.getInstance(conf, "Wuxia");
job.setJarByClass(InvertedIndexHbase.class);
job.setMapperClass(Map.class);
job.setCombinerClass(InvertedIndexCombiner.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(TableOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}