Hadoop文件倒排索引及TF-TDF统计

  1. 要求

在统计词语的倒排索引时,除了要输出带词频属性的倒排索引,还请计算每个词语的“平均出现次数”(定义见下)并输出。

“平均出现次数”在这里定义为:

平均出现次数 = 词语在全部文档中出现的频数总和 / 包含该词语的文档数

假如文档集中有四本小说:A、B、C、D。词语“江湖”在文档A中出现了100次,在文档B中出现了200次,在文档C中出现了300次,在文档D中没有出现。则词语“江湖”在该文档集中的“平均出现次数”为(100 + 200 + 300) / 3 = 200。

注意 这两个计算任务请在同一个MapReduce Job中完成。

 

输出格式

对于每个词语,输出一个键值对,该键值对的格式如下:

[词语] \TAB 平均出现次数,小说1:词频;小说2:词频;小说3:词频;…;小说N:词频

输出中的小说名需要去掉“.txt.segmented”的文件名后缀。

下图展示了输出文件的一个片段(图中内容仅为格式示例):

选做内容

该部分内容不做要求,供感兴趣的、学有余力的同学尝试练习。

  • 使用另外一个MapReduce Job对每个词语的平均出现次数进行全局排序,输出排序后的结果。
  • 为每位作家、计算每个词语的TF-IDF。TF定义为某个词语在某个作家的所有作品中的出现次数之和。IDF定义为:

输出格式:作家名字,词语,该词语的TF-IDF

本次实验提供了金庸、梁羽生等五位小说家的作品全集。每部小说对应一个文本文件。

文本文件均使用UTF-8字符编码,并且已分词,两个汉语单词之间使用空格

2.代码实现
Invertedindex.java

package com.hadoop.InvertIndex;

import java.io.IOException;

import java.util.ArrayList;

import java.util.Random;

import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileStatus;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.DoubleWritable;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class InvertIndex {

public static int FileNum = 0;

public static class InvertedIndexMap extends Mapper<Object,Text,Text,Text>{

private Text valueInfo = new Text();

private Text keyInfo = new Text();

private FileSplit split;

public void map(Object key, Text value,Context context)

throws IOException, InterruptedException {

//获取对所属的FileSplit对象

split = (FileSplit) context.getInputSplit();

StringTokenizer stk = new StringTokenizer(value.toString());

while (stk.hasMoreElements()) {

//key值由(单词:URI)组成

keyInfo.set(stk.nextToken()+”:”+split.getPath().toString());

//词频

valueInfo.set(“1”);

context.write(keyInfo, valueInfo);

}

}

}

public static class InvertedIndexCombiner extends Reducer<Text,Text,Text,Text>{

Text info = new Text();

public void reduce(Text key, Iterable values,Context context)

throws IOException, InterruptedException {

int sum = 0;

for (Text value : values) {

sum += Integer.parseInt(value.toString());

}

int splitIndex = key.toString().inexOf(“:”);

//重新设置value值由(URI+:词频组成)

info.set(key.toString().substring(splitIndex+1) +”:”+ sum);

//重新设置key值为单词

key.set(key.toString().substring(0,splitIndex));

context.write(key, info);

}

}

public static class InvertedIndexReduce extends Reducer<Text,Text,Text,Text>{

private Text result = new Text();

public void reduce(Text key, Iterable values, Context context)

throws IOException, InterruptedException {

//生成文档列表

String fileList = new String();

int sum = 0;

int times = 0;

for (Text value : values) {

String temp = value.toString();

String [] str = temp.split(System.getProperty(“file.separator”));

String [] str2 = temp.split(“:”);

String [] str1 = str[str.length – 1].split(“\\.”);

sum += Integer.parseInt(str2[str2.length – 1]);

String name = “”;

for (int i = 0; i < str1.length -2; i++) {

name += str1[i];

}

fileList += name + “:” + str2[str2.length – 1] + “;  “;

times ++;

}

double average = (double)sum / (double)times;

String atemp = Double.toString(average)+ “, ” + fileList;

result.set(atemp);

context.write(key, result);

}

}

public static class InvertIndexSortMapper extends Mapper<Object,Text,DoubleWritable,Text> {

private Text key = new Text();

private FileSplit split;

public void map(Object key, Text value, Mapper.Context context)

throws IOException, InterruptedException {

split = (FileSplit) context.getInputSplit();

StringTokenizer stk = new StringTokenizer(value.toString());

String name = stk.nextToken()+ ” “;

String temp = stk.nextToken().toString();

String [] a = temp.split(“,”);

while(stk.hasMoreTokens()) {

name = name + stk.nextToken().toString() + ” “;

}

double count = Double.parseDouble(a[0]);

context.write(new DoubleWritable(count), new Text(name));

}

}

public static class DoubleWritableDecreasingComparator extends DoubleWritable.Comparator {

public int compare(WritableComparable a, WritableComparable b) {

return -super.compare(a, b);

}

public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

return -super.compare(b1,s1,l1,b2,s2,l2);

}

}

public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

Configuration conf = new Configuration();

FileSystem fs = FileSystem.get(conf);

FileStatus[] inputFiles = fs.listStatus(new Path(args[0]));

FileNum = inputFiles.length;

Job job = new Job(conf,”InvertedIndex”);

job.setJarByClass(InvertIndex.class);

Path tempDir = new Path(“count-temp-” + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));

try {

job.setMapperClass(InvertedIndexMap.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(Text.class);

job.setCombinerClass(InvertedIndexCombiner.class);

job.setReducerClass(InvertedIndexReduce.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, tempDir);

if (job.waitForCompletion(true)) {

Job sortJob = new Job(conf, “sort”);

sortJob.setJarByClass(InvertIndex.class);

FileInputFormat.addInputPath(sortJob, tempDir);

FileOutputFormat.setOutputPath(sortJob, new Path(args[1]));

sortJob.setSortComparatorClass(DoubleWritableDecreasingComparator.class);

sortJob.setMapperClass(InvertIndexSortMapper.class);

sortJob.setMapOutputKeyClass(DoubleWritable.class);

sortJob.setMapOutputValueClass(Text.class);

sortJob.setOutputKeyClass(DoubleWritable.class);

sortJob.setOutputValueClass(Text.class);

System.exit(sortJob.waitForCompletion(true) ? 0 : 1);

}

}

finally {

//FileSystem.get(conf).deleteOnExit(tempDir);

}

}

}

TF_TDF

package com.hadoop.InvertIndex;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileStatus;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;meimport org.apache.hadoop.io.DoubleWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

import java.util.ArrayList;

import java.util.StringTokenizer;

/**

* Created by jiangchenzhou on 16/4/26.

*/

public class TFIDF {

public static double FileNum = 219;

public static class TFIDFMapper extends Mapper<Object,Text,Text,Text> {

private FileSplit split;

public void map(Object key, Text value,Context context)

throws IOException, InterruptedException {

s            split = (FileSplit) context.getInputSplit();

StringTokenizer stk = new StringTokenizer(value.toString());

stk.nextToken();

String word = stk.nextToken();

ArrayList wordInfo = new ArrayList();

while(stk.hasMoreTokens()) {

String temp = stk.nextToken();

wordInfo.add(temp);

}

System.out.println(FileNum + ” ” + wordInfo.size());

System.out.println(wordInfo);

double IDF = Math.log(FileNum/(double)(wordInfo.size()+1));

String IDFString = Double.toString(IDF);

for (String info: wordInfo) {

String [] temp = info.split(“;”);

String [] temp1 = temp[0].split(“:”);

String times = temp1[1];

String [] temp2 = temp1[0].split(“[0-9]”);

String author = temp2[0];

String bookName = temp2[2];

Text keyInfo = new Text(author + ” ” + word + ” ” + bookName + ” “);

Text valueInfo = new Text(times + ” ” + IDFString);

context.write(keyInfo, valueInfo);

}

}

}

public static class TFIDFCombiner extends Reducer<Text,Text,Text,Text> {

public void reduce(Text key, Iterable values, Context context)

throws IOException, InterruptedException {

StringTokenizer keyStk = new StringTokenizer(key.toString());

Text keyInfo = new Text(keyStk.nextToken()+” ” + keyStk.nextToken());

for (Text value: values) {

context.write(keyInfo, value);

}

}

}

public static class TFIDFReduce extends Reducer<Text,Text,Text,DoubleWritable> {

private Text result = new Text();

public void reduce(Text key, Iterable values, Context context)

throws IOException, InterruptedException {

int TF = 0;

double IDF = 0;

for (Text value: values) {

StringTokenizer stk = new StringTokenizer(value.toString());

TF += Integer.parseInt(stk.nextToken());

IDF = Double.parseDouble(stk.nextToken());

}

StringTokenizer stk = new StringTokenizer(key.toString());

context.write(new Text(stk.nextToken()+” ” + stk.nextToken() + ” “), new DoubleWritable(TF * IDF));

}

}

public static void main(String[] args)  throws IOException, InterruptedException, ClassNotFoundException {

Configuration conf = new Configuration();

FileSystem fs = FileSystem.get(conf);

FileStatus[] inputFiles = fs.listStatus(new Path(args[0]));

FileNum = inputFiles.length;

Job TFIDFJob = new Job(conf, “TFIDF”);

TFIDFJob.setJarByClass(TFIDF.class);

FileInputFormat.addInputPath(TFIDFJob, new Path(args[1]));

FileOutputFormat.setOutputPath(TFIDFJob, new Path(“TF-IDF”));

TFIDFJob.setMapperClass(TFIDFMapper.class);

TFIDFJob.setMapOutputKeyClass(Text.class);

TFIDFJob.setMapOutputValueClass(Text.class);

TFIDFJob.setCombinerClass(TFIDFCombiner.class);

TFIDFJob.setReducerClass(TFIDFReduce.class);

TFIDFJob.setOutputKeyClass(Text.class);

TFIDFJob.setOutputKeyClass(DoubleWritable.class);

System.exit(TFIDFJob.waitForCompletion(true) ? 0 : 1);

}

}

3.报告

1         Map和Reduce的设计思路(含Map、Reduce阶段的K、V类型)

1.1      基本要求与排序

因为两者代码具有关联性,故放在一起说。

首先在基本要求中,Map我们对于输入的文件每句进行切割,将单词与文件名作为(text)key,并且对每个词设置词频1(text)。

接下来在combiner中,我们统计每个单词的value并加起来为sum,并把原来key中的文件名剥离出来与sum合并为新的value(text),把单词设为key(text)。

最后在reduce中,我们对每个词的value用“:”,“.“进行分割,抽取每个文件中出现的词频,同时统计出现文件个数,计算出词频加入在value的前面,作为新的value,key不变。并将结果输入到临时文件。

新建一个排序job,将零时文件作为输入,其map中,我们将输入每一行进行切割,将词频作为key(double),其他的作为value(包含文件名等),并且重载DoubleWritableDecreasingComparator类,进行从小到大排序,之后进行输出即可。

1.2      计算TF—IDF

首先,我们将我们前一步的任务输出作为文件输入,在Map中,我们首先用空格分割后,统计一个词在后面几本书内出现了,并通过已知的书总数计算出每个词的IDF(double),并把每个本书的作者,词,书名(空格分割),作为key(text),词频与之前计算出的IDF做为value(text)。

接下来我们在combiner中,我们把key重新设定为作者,词,去掉书名。

在reduce中,我们统计每个作者使用的词后面value中书加使用次数,我们把每个这个作者的这个词使用次数相加获得TF,并把词,作者作为key(text),

TF*TDF做为value(double)。最后输出结果即可。

具体细节及测试数据参见:https://github.com/BlackKD/hadoop_invertedindex_sort_TF_TDF

发表评论