- 要求
在统计词语的倒排索引时,除了要输出带词频属性的倒排索引,还请计算每个词语的“平均出现次数”(定义见下)并输出。
“平均出现次数”在这里定义为:
平均出现次数 = 词语在全部文档中出现的频数总和 / 包含该词语的文档数
假如文档集中有四本小说:A、B、C、D。词语“江湖”在文档A中出现了100次,在文档B中出现了200次,在文档C中出现了300次,在文档D中没有出现。则词语“江湖”在该文档集中的“平均出现次数”为(100 + 200 + 300) / 3 = 200。
注意 这两个计算任务请在同一个MapReduce Job中完成。
输出格式
对于每个词语,输出一个键值对,该键值对的格式如下:
[词语] \TAB 平均出现次数,小说1:词频;小说2:词频;小说3:词频;…;小说N:词频
输出中的小说名需要去掉“.txt.segmented”的文件名后缀。
下图展示了输出文件的一个片段(图中内容仅为格式示例):
选做内容
该部分内容不做要求,供感兴趣的、学有余力的同学尝试练习。
- 使用另外一个MapReduce Job对每个词语的平均出现次数进行全局排序,输出排序后的结果。
- 为每位作家、计算每个词语的TF-IDF。TF定义为某个词语在某个作家的所有作品中的出现次数之和。IDF定义为:
输出格式:作家名字,词语,该词语的TF-IDF
本次实验提供了金庸、梁羽生等五位小说家的作品全集。每部小说对应一个文本文件。
文本文件均使用UTF-8字符编码,并且已分词,两个汉语单词之间使用空格
2.代码实现
Invertedindex.java
package com.hadoop.InvertIndex;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Random;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class InvertIndex {
public static int FileNum = 0;
public static class InvertedIndexMap extends Mapper<Object,Text,Text,Text>{
private Text valueInfo = new Text();
private Text keyInfo = new Text();
private FileSplit split;
public void map(Object key, Text value,Context context)
throws IOException, InterruptedException {
//获取对所属的FileSplit对象
split = (FileSplit) context.getInputSplit();
StringTokenizer stk = new StringTokenizer(value.toString());
while (stk.hasMoreElements()) {
//key值由(单词:URI)组成
keyInfo.set(stk.nextToken()+”:”+split.getPath().toString());
//词频
valueInfo.set(“1”);
context.write(keyInfo, valueInfo);
}
}
}
public static class InvertedIndexCombiner extends Reducer<Text,Text,Text,Text>{
Text info = new Text();
public void reduce(Text key, Iterable values,Context context)
throws IOException, InterruptedException {
int sum = 0;
for (Text value : values) {
sum += Integer.parseInt(value.toString());
}
int splitIndex = key.toString().inexOf(“:”);
//重新设置value值由(URI+:词频组成)
info.set(key.toString().substring(splitIndex+1) +”:”+ sum);
//重新设置key值为单词
key.set(key.toString().substring(0,splitIndex));
context.write(key, info);
}
}
public static class InvertedIndexReduce extends Reducer<Text,Text,Text,Text>{
private Text result = new Text();
public void reduce(Text key, Iterable values, Context context)
throws IOException, InterruptedException {
//生成文档列表
String fileList = new String();
int sum = 0;
int times = 0;
for (Text value : values) {
String temp = value.toString();
String [] str = temp.split(System.getProperty(“file.separator”));
String [] str2 = temp.split(“:”);
String [] str1 = str[str.length – 1].split(“\\.”);
sum += Integer.parseInt(str2[str2.length – 1]);
String name = “”;
for (int i = 0; i < str1.length -2; i++) {
name += str1[i];
}
fileList += name + “:” + str2[str2.length – 1] + “; “;
times ++;
}
double average = (double)sum / (double)times;
String atemp = Double.toString(average)+ “, ” + fileList;
result.set(atemp);
context.write(key, result);
}
}
public static class InvertIndexSortMapper extends Mapper<Object,Text,DoubleWritable,Text> {
private Text key = new Text();
private FileSplit split;
public void map(Object key, Text value, Mapper.Context context)
throws IOException, InterruptedException {
split = (FileSplit) context.getInputSplit();
StringTokenizer stk = new StringTokenizer(value.toString());
String name = stk.nextToken()+ ” “;
String temp = stk.nextToken().toString();
String [] a = temp.split(“,”);
while(stk.hasMoreTokens()) {
name = name + stk.nextToken().toString() + ” “;
}
double count = Double.parseDouble(a[0]);
context.write(new DoubleWritable(count), new Text(name));
}
}
public static class DoubleWritableDecreasingComparator extends DoubleWritable.Comparator {
public int compare(WritableComparable a, WritableComparable b) {
return -super.compare(a, b);
}
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return -super.compare(b1,s1,l1,b2,s2,l2);
}
}
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
FileStatus[] inputFiles = fs.listStatus(new Path(args[0]));
FileNum = inputFiles.length;
Job job = new Job(conf,”InvertedIndex”);
job.setJarByClass(InvertIndex.class);
Path tempDir = new Path(“count-temp-” + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
try {
job.setMapperClass(InvertedIndexMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setCombinerClass(InvertedIndexCombiner.class);
job.setReducerClass(InvertedIndexReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, tempDir);
if (job.waitForCompletion(true)) {
Job sortJob = new Job(conf, “sort”);
sortJob.setJarByClass(InvertIndex.class);
FileInputFormat.addInputPath(sortJob, tempDir);
FileOutputFormat.setOutputPath(sortJob, new Path(args[1]));
sortJob.setSortComparatorClass(DoubleWritableDecreasingComparator.class);
sortJob.setMapperClass(InvertIndexSortMapper.class);
sortJob.setMapOutputKeyClass(DoubleWritable.class);
sortJob.setMapOutputValueClass(Text.class);
sortJob.setOutputKeyClass(DoubleWritable.class);
sortJob.setOutputValueClass(Text.class);
System.exit(sortJob.waitForCompletion(true) ? 0 : 1);
}
}
finally {
//FileSystem.get(conf).deleteOnExit(tempDir);
}
}
}
TF_TDF
package com.hadoop.InvertIndex;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;meimport org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.ArrayList;
import java.util.StringTokenizer;
/**
* Created by jiangchenzhou on 16/4/26.
*/
public class TFIDF {
public static double FileNum = 219;
public static class TFIDFMapper extends Mapper<Object,Text,Text,Text> {
private FileSplit split;
public void map(Object key, Text value,Context context)
throws IOException, InterruptedException {
s split = (FileSplit) context.getInputSplit();
StringTokenizer stk = new StringTokenizer(value.toString());
stk.nextToken();
String word = stk.nextToken();
ArrayList wordInfo = new ArrayList();
while(stk.hasMoreTokens()) {
String temp = stk.nextToken();
wordInfo.add(temp);
}
System.out.println(FileNum + ” ” + wordInfo.size());
System.out.println(wordInfo);
double IDF = Math.log(FileNum/(double)(wordInfo.size()+1));
String IDFString = Double.toString(IDF);
for (String info: wordInfo) {
String [] temp = info.split(“;”);
String [] temp1 = temp[0].split(“:”);
String times = temp1[1];
String [] temp2 = temp1[0].split(“[0-9]”);
String author = temp2[0];
String bookName = temp2[2];
Text keyInfo = new Text(author + ” ” + word + ” ” + bookName + ” “);
Text valueInfo = new Text(times + ” ” + IDFString);
context.write(keyInfo, valueInfo);
}
}
}
public static class TFIDFCombiner extends Reducer<Text,Text,Text,Text> {
public void reduce(Text key, Iterable values, Context context)
throws IOException, InterruptedException {
StringTokenizer keyStk = new StringTokenizer(key.toString());
Text keyInfo = new Text(keyStk.nextToken()+” ” + keyStk.nextToken());
for (Text value: values) {
context.write(keyInfo, value);
}
}
}
public static class TFIDFReduce extends Reducer<Text,Text,Text,DoubleWritable> {
private Text result = new Text();
public void reduce(Text key, Iterable values, Context context)
throws IOException, InterruptedException {
int TF = 0;
double IDF = 0;
for (Text value: values) {
StringTokenizer stk = new StringTokenizer(value.toString());
TF += Integer.parseInt(stk.nextToken());
IDF = Double.parseDouble(stk.nextToken());
}
StringTokenizer stk = new StringTokenizer(key.toString());
context.write(new Text(stk.nextToken()+” ” + stk.nextToken() + ” “), new DoubleWritable(TF * IDF));
}
}
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
FileStatus[] inputFiles = fs.listStatus(new Path(args[0]));
FileNum = inputFiles.length;
Job TFIDFJob = new Job(conf, “TFIDF”);
TFIDFJob.setJarByClass(TFIDF.class);
FileInputFormat.addInputPath(TFIDFJob, new Path(args[1]));
FileOutputFormat.setOutputPath(TFIDFJob, new Path(“TF-IDF”));
TFIDFJob.setMapperClass(TFIDFMapper.class);
TFIDFJob.setMapOutputKeyClass(Text.class);
TFIDFJob.setMapOutputValueClass(Text.class);
TFIDFJob.setCombinerClass(TFIDFCombiner.class);
TFIDFJob.setReducerClass(TFIDFReduce.class);
TFIDFJob.setOutputKeyClass(Text.class);
TFIDFJob.setOutputKeyClass(DoubleWritable.class);
System.exit(TFIDFJob.waitForCompletion(true) ? 0 : 1);
}
}
3.报告
1 Map和Reduce的设计思路(含Map、Reduce阶段的K、V类型)
1.1 基本要求与排序
因为两者代码具有关联性,故放在一起说。
首先在基本要求中,Map我们对于输入的文件每句进行切割,将单词与文件名作为(text)key,并且对每个词设置词频1(text)。
接下来在combiner中,我们统计每个单词的value并加起来为sum,并把原来key中的文件名剥离出来与sum合并为新的value(text),把单词设为key(text)。
最后在reduce中,我们对每个词的value用“:”,“.“进行分割,抽取每个文件中出现的词频,同时统计出现文件个数,计算出词频加入在value的前面,作为新的value,key不变。并将结果输入到临时文件。
新建一个排序job,将零时文件作为输入,其map中,我们将输入每一行进行切割,将词频作为key(double),其他的作为value(包含文件名等),并且重载DoubleWritableDecreasingComparator类,进行从小到大排序,之后进行输出即可。
1.2 计算TF—IDF
首先,我们将我们前一步的任务输出作为文件输入,在Map中,我们首先用空格分割后,统计一个词在后面几本书内出现了,并通过已知的书总数计算出每个词的IDF(double),并把每个本书的作者,词,书名(空格分割),作为key(text),词频与之前计算出的IDF做为value(text)。
接下来我们在combiner中,我们把key重新设定为作者,词,去掉书名。
在reduce中,我们统计每个作者使用的词后面value中书加使用次数,我们把每个这个作者的这个词使用次数相加获得TF,并把词,作者作为key(text),
TF*TDF做为value(double)。最后输出结果即可。
具体细节及测试数据参见:https://github.com/BlackKD/hadoop_invertedindex_sort_TF_TDF