文本挖掘之文本聚类(MapReduce)

时间:2021-06-02 06:32:05

刘 勇  Email:lyssym@sina.com

简介

  针对大数量的文本数据,采用单线程处理时,一方面消耗较长处理时间,另一方面对大量数据的I/O操作也会消耗较长处理时间,同时对内存空间的消耗也是很大,因此,本文引入MapReduce计算模型,以分布式方式处理文本数据,以期提高数据处理速率。本文结合Kmeans和DBSCAN算法,对上述算法进行改进,其中借鉴Kmeans聚类方法(类别个数的确定性)以及DBSCAN聚类方法(基于密度),并在数据处理过程中引入多个Reducer对数据进行归并处理。测试结果表明:在文本个数为457条,迭代次数为50次时,该算法具有可行性;但是在数据规模较小时,其处理速率较单线程处理存在一定的劣势,但是当数据量继续增大(数据量达到一定规模)时,基于分布式的算法,其速率优势会更加明显。

相关模型

  本文本着以实际工程应用的角度,针对其中涉及到的数学模型简要描述如下,更多内容请参考本系列之前的内容:

  1)余弦相似度

  本文在判别Web 文本数据相似度时,采用余弦定理对其进行判别,即将两篇Web文本先中文分词后进行量化,然后计算上述两个向量的余弦值,进而对其进行判定相似度,其中为了让处理结果更加准确,引入了同义词词林,即加载同义词词典。更多内容在此不再赘述,详细内容见本系列之文本挖掘之文本相似度判定

  2)DBSCAN

  DBSCAN聚类方法涉及两个重要的参数,即e邻域(近似理解为半径)和最少个数minPts(某一个固定值,即密度标识),上述参数表征在某个对象(理解为某一Web文本数据)的e邻域内对象数据个数大于minPts,则该对象为核心对象;然后根据该核心对象的e邻域内某一个对象i,选择对象i的e邻域内核心对象或边缘对象,继而持续递归,从而将上述所有寻找到的数据归为一个聚类集合。DBSCAN的目的在于寻找密度相连对象的最大集合,浅显的解释为,通过A可以找到B,通过B可以找到C,则A、B和C为同一聚类。更多内容在此不再赘述,详细内容见本系列之文本挖掘之文本聚类(DBSCAN)

  3)Kmeans

  Kmeans聚类方法先初始化K(K在聚类前就已确定,需要指出,DBSCAN方法在聚类前的类别个数是无法知道的)个聚类中心(即质心),然后将文本数据与聚类中心比较,与哪个聚类中心更合适(本文以余弦相似度表征)则与该聚类中心为一类,一轮过后则重新计算各个聚类中心(即质心),并进行迭代,直至最终收敛或者达到迭代次数为止。其中在聚类中心计算中引入DBSCAN方法中基于密度的思维,即在某一类中,若某个向量的密度最大,则该节点向量成为新的质心,较其它算法采用距离统计的算法有所创新。更多内容在此不再赘述。

基于MapReduce的改进算法实现

文本挖掘之文本聚类(MapReduce) 

图-1 基于MapReduce的改进算法框架图

  如图-1所示,为基于MapReduce的改进算法,单次执行框架图。对该框架中部分核心内容解释如下:

  1)  在Mapper端,借鉴Kmeans算法确定K个类别及其初始质心,然后根据该质心,将所有文本进行一次聚类,以相似度与哪个质心相近,则属于质心属于该类别。

  2)在Reducer端,借鉴DBSCAN算法,计算某个所属类别的e领域中所含个数,并以该e领域类所含个数,即minPts个数最多者为新的质心,即密度最大者为新的质心。

  3)在Reducer端,为加快程序访问速率,采用5个Reducer来重新计算类别质心。

  4)在Mapper端,通过读取缓存文件来获取每次迭代所需的类别新质心。

   以下将本次设计中Mapper和Reducer端各自的输入和输出介绍一下:

  Mapper :  <Object,Text>--><IntWritable, Text>

  输入:key未使用, value为Web文本数据;

  输出:key为类别ID,value为Web文本数据。

  Mapper设计的目标:给每篇文本计算出其所属类别,即类别ID。

  Reducer:  <IntWritable, Text>--><NullWritable, Text>

  输入:key为文本类别ID, value为Web文本数据;

  输出:key为Null, value为Web文本数据,即新的质心。

  Reducer设计的目标:给每类数据确定新的质心。

测试结果与性能分析

  由于本次测试目的,在于判别基于MapReduce的文本聚类算法可行性,因此数据规模并未设置很大。测试数据集为随机从网络上抓取的457篇Web标题,并迭代50次来展开测试,迭代的目的在于使每个类别的质心收敛。

表-1 改进的Kmeans和DBSCAN文本聚类算法测试结果

文本挖掘之文本聚类(MapReduce)

  从表-1测试结果可知:在数据规模较小时,单线程处理的速率明显要优于MapReduce。主要原因在于,基于MapReduce框架,其每次迭代需要重新加载词典,同时读/写缓存文件,以获取质心或者修改质心,因此在数据规模较小时,处理数据的时间甚至不及上述文件的I/O时间,因此其优势并未发挥出来。本文作者曾尝试采用Java反射机制,加载数据对象以期解决上述问题,但收效甚微。

  但是,采用MapReduce框架,在计算新的质心时,采用多个Reduer,明显能够改善数据规约的速率,较单线程处理来说,不仅能节省存储空间,同时处理简单、便捷。考虑到后期文本数据规模日益增大的趋势,引入分布式处理框架,对海量文本数据展开处理,已趋于一种潮流趋势,因此本文提出的算法有一定的实践意义。

  程序源代码:

文本挖掘之文本聚类(MapReduce)文本挖掘之文本聚类(MapReduce)
 1 public class ElementDict {
2 private String term;
3 private int freq;
4
5 public ElementDict(String term, int freq) {
6 this.term = term;
7 this.freq = freq;
8 }
9
10
11 public void setFreq (int freq) {
12 this.freq = freq;
13 }
14
15
16 public String getTerm() {
17 return term;
18 }
19
20
21 public int getFreq() {
22 return freq;
23 }
24
25
26 public boolean equals(ElementDict e) {
27 boolean ret = false;
28 if (term.equals(e.getTerm()) && freq == e.getFreq())
29 {
30 ret = true;
31 }
32
33 return ret;
34 }
35 }
Class ElementDict
文本挖掘之文本聚类(MapReduce)文本挖掘之文本聚类(MapReduce)
  1 import java.io.BufferedReader;
2 import java.io.IOException;
3 import java.io.InputStreamReader;
4 import java.net.URI;
5 import java.util.HashMap;
6 import java.util.List;
7 import java.util.ArrayList;
8 import java.util.Map;
9 import org.apache.lucene.analysis.TokenStream;
10 import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
11 import org.wltea.analyzer.lucene.IKAnalyzer;
12 import org.apache.hadoop.conf.Configuration;
13 import org.apache.hadoop.fs.FSDataInputStream;
14 import org.apache.hadoop.fs.FileSystem;
15 import org.apache.hadoop.fs.Path;
16 import org.apache.logging.log4j.LogManager;
17 import org.apache.logging.log4j.Logger;
18
19 public class TextCosine {
20 private Map<String, String> map= null;
21 private double common;
22 private double special;
23 private static final String PATH = "hdfs://10.1.130.10:9000";
24 private static Logger logger = LogManager.getLogger(TextCosine.class);
25
26 public TextCosine() {
27 map = new HashMap<String, String>();
28 try {
29 Configuration conf = new Configuration();
30 FileSystem fs = FileSystem.get(URI.create(PATH), conf);
31 Path path = new Path("/user/hadoop/doc/synonyms.dict");
32 FSDataInputStream is = fs.open(path);
33 BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8"));
34 String s = null;
35 while ((s = br.readLine()) != null) {
36 String []synonymsEnum = s.split("→");
37 map.put(synonymsEnum[0], synonymsEnum[1]);
38 }
39 br.close();
40 } catch (IOException e) {
41 logger.error("TextCosine IOException!");
42 }
43 }
44
45
46 public TextCosine(double common, double special) {
47 map = new HashMap<String, String>();
48 try {
49 Configuration conf = new Configuration();
50 FileSystem fs = FileSystem.get(URI.create(PATH), conf);
51 Path path = new Path("/user/hadoop/doc/synonyms.dict");
52 FSDataInputStream is = fs.open(path);
53 BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8"));
54 String s = null;
55 while ((s = br.readLine()) != null) {
56 String []synonymsEnum = s.split("→");
57 map.put(synonymsEnum[0], synonymsEnum[1]);
58 }
59 br.close();
60 } catch (IOException e) {
61 logger.error("TextCosine IOException!");
62 }
63
64 this.common = common;
65 this.special = special;
66 }
67
68
69 public void setCommon(double common) {
70 this.common = common;
71 }
72
73
74 public void setSpecial(double special) {
75 this.special = special;
76 }
77
78 // get the word with IK Analyzer
79 public List<ElementDict> tokenizer(String str) {
80 List<ElementDict> list = new ArrayList<ElementDict>();
81 IKAnalyzer analyzer = new IKAnalyzer(true);
82 try {
83 TokenStream stream = analyzer.tokenStream("", str);
84 CharTermAttribute cta = stream.addAttribute(CharTermAttribute.class);
85 stream.reset();
86 int index = -1;
87 while (stream.incrementToken()) {
88 if ((index = isContain(cta.toString(), list)) >= 0) {
89 list.get(index).setFreq(list.get(index).getFreq() + 1);
90 }
91 else {
92 list.add(new ElementDict(cta.toString(), 1));
93 }
94 }
95 analyzer.close();
96 } catch (IOException e) {
97 e.printStackTrace();
98 }
99 return list;
100 }
101
102 // assert one term is in the List
103 public int isContain(String str, List<ElementDict> list) {
104 for (ElementDict ed : list) {
105 if (ed.getTerm().equals(str)) {
106 return list.indexOf(ed);
107 } else if (map.get(ed.getTerm())!= null && map.get(ed.getTerm()).equals(str)) {
108 return list.indexOf(ed);
109 }
110 }
111 return -1;
112 }
113
114 // merge the two List to align the vector
115 public List<String> mergeTerms(List<ElementDict> list1, List<ElementDict> list2) {
116 List<String> list = new ArrayList<String>();
117 for (ElementDict ed : list1) {
118 if (!list.contains(ed.getTerm())) {
119 list.add(ed.getTerm());
120 } else if (!list.contains(map.get(ed.getTerm()))) {
121 list.add(ed.getTerm());
122 }
123 }
124
125 for (ElementDict ed : list2) {
126 if (!list.contains(ed.getTerm())) {
127 list.add(ed.getTerm());
128 } else if (!list.contains(map.get(ed.getTerm()))) {
129 list.add(ed.getTerm());
130 }
131 }
132 return list;
133 }
134
135 // get the max cosine
136 public double analysisText(List<ElementDict> list1, List<ElementDict> list2) {
137 int len1 = list1.size();
138 int len2 = list2.size();
139 double ret = 0;
140 if (len2 >= len1 * 1.5) {
141 List<ElementDict> newList = new ArrayList<ElementDict>();
142 for (int i = 0; i + len1 <= len2; i++) {
143 for (int j = 0; j < len1; j++)
144 newList.add(list2.get(i+j));
145
146 newList = adjustList(newList, list2, len2, len1, i);
147 double tmp = analysis(list1, newList);
148 if (tmp > ret)
149 ret = tmp;
150 }
151 } else if (len1 >= len2 * 1.5) {
152 List<ElementDict> newList = new ArrayList<ElementDict>();
153 for (int i = 0; i + len2 <= len1; i++) {
154 for (int j = 0; j < len2; j++)
155 newList.add(list1.get(i+j));
156
157 newList = adjustList(newList, list1, len1, len2, i);
158 double tmp = analysis(list1, newList);
159 if (tmp > ret)
160 ret = tmp;
161 }
162 } else {
163 ret = analysis(list1, list2);
164 }
165 return ret;
166 }
167
168 // adjust the new List with the length about the original List
169 public List<ElementDict> adjustList(List<ElementDict> newList, List<ElementDict> list, int lenBig, int lenSmall, int index) {
170 int gap = lenBig -lenSmall;
171 int size = (gap/2 > 2) ? 2: gap/2;
172 if (index < gap/2) {
173 for (int i = 0; i < size; i++) {
174 newList.add(list.get(lenSmall+index+i));
175 }
176 } else {
177 for (int i = 0; i > size; i++) {
178 newList.add(list.get(lenBig-index-i));
179 }
180 }
181 return newList;
182 }
183
184 // analysis the cosine for two vectors
185 public double analysis(List<ElementDict> list1, List<ElementDict> list2) {
186 List<String> list = mergeTerms(list1, list2);
187 List<Integer> weightList1 = assignWeight(list, list1);
188 List<Integer> weightList2 = assignWeight(list, list2);
189 return countCosSimilarity(weightList1, weightList2);
190 }
191
192 // according the frequency to assign the weight
193 public List<Integer> assignWeight(List<String> list, List<ElementDict> list1) {
194 List<Integer> vecList = new ArrayList<Integer>(list.size());
195 boolean isEqual = false;
196 for (String str : list) {
197 for (ElementDict ed : list1) {
198 if (ed.getTerm().equals(str)) {
199 isEqual = true;
200 vecList.add(new Integer(ed.getFreq()));
201 } else if (map.get(ed.getTerm())!= null && map.get(ed.getTerm()).equals(str)) {
202 isEqual = true;
203 vecList.add(new Integer(ed.getFreq()));
204 }
205 }
206
207 if (!isEqual) {
208 vecList.add(new Integer(0));
209 }
210 isEqual = false;
211 }
212 return vecList;
213 }
214
215 // count the cosine about the two vectors
216 public double countCosSimilarity(List<Integer> list1, List<Integer> list2) {
217 double countScores = 0;
218 int element = 0;
219 int denominator1 = 0;
220 int denominator2 = 0;
221 int index = -1;
222 for (Integer it : list1) {
223 index ++;
224 int left = it.intValue();
225 int right = list2.get(index).intValue();
226 element += left * right;
227 denominator1 += left * left;
228 denominator2 += right * right;
229 }
230 try {
231 countScores = (double)element / Math.sqrt(denominator1 * denominator2);
232 } catch (ArithmeticException e) {
233 e.printStackTrace();
234 }
235 return countScores;
236 }
237
238
239 public boolean isSimilarity(double param, double score) {
240 boolean ret = false;
241 if (score >= param)
242 ret = true;
243 return ret;
244 }
245
246
247 public boolean assertSimilarity(List<ElementDict> list1, List<ElementDict> list2)
248 {
249 int len1 = list1.size();
250 int len2 = list2.size();
251 if (len2 >= len1 * 1.5) {
252 List<ElementDict> newList = new ArrayList<ElementDict>();
253 for (int i = 0; i + len1 <= len2; i++) {
254 for (int j = 0; j < len1; j++)
255 newList.add(list2.get(i+j));
256
257 newList = adjustList(newList, list2, len2, len1, i);
258 if (isSimilarity(special, analysis(list1, newList)))
259 return true;
260 }
261 } else if (len1 >= len2 * 1.5) {
262 List<ElementDict> newList = new ArrayList<ElementDict>();
263 for (int i = 0; i + len2 <= len1; i++) {
264 for (int j = 0; j < len2; j++)
265 newList.add(list1.get(i+j));
266
267 newList = adjustList(newList, list1, len1, len2, i);
268 if (isSimilarity(special, analysis(list1, newList)))
269 return true;
270 }
271 } else {
272 if (isSimilarity(common, analysis(list1, list2)))
273 return true;
274 }
275 return false;
276 }
277 }
Class TextCosine
文本挖掘之文本聚类(MapReduce)文本挖掘之文本聚类(MapReduce)
 1 import java.util.Collections;
2 import java.util.List;
3
4 import org.apache.logging.log4j.LogManager;
5 import org.apache.logging.log4j.Logger;
6
7 import com.gta.cosine.TextCosine;
8 import com.gta.cosine.ElementDict;
9
10 public class DensityCenter {
11 private Logger logger = LogManager.getLogger(DensityCenter.class);
12 private double eps;
13 private TextCosine cosine;
14
15 public DensityCenter(double eps, TextCosine cosine) {
16 this.eps = eps;
17 this.cosine = cosine;
18 }
19
20
21 public double cosineDistance(String src, String dst)
22 {
23 List<ElementDict> vec1 = cosine.tokenizer(src);
24 List<ElementDict> vec2 = cosine.tokenizer(dst);
25 return cosine.analysisText(vec1, vec2);
26 }
27
28
29 public int getNeighbors(String src, List<String> dst) {
30 int ret = 0;
31 double score = 0;
32 for (String s : dst) {
33 score = cosineDistance(src, s);
34 if (score >= eps)
35 ret++;
36 }
37 return ret;
38 }
39
40
41 public String getDensityCenter(List<String> text) {
42 int max = 0;
43 int i = 0;
44 int index = 0;
45 for (String s : text) {
46 int ret = getNeighbors(s, text);
47 if (ret > max) {
48 index = i;
49 max = ret;
50 }
51 i++;
52 }
53 return text.get(index);
54 }
55
56
57 public boolean compareCenters(List<String> oldCenters, List<String> newCenters)
58 {
59 boolean ret = false;
60 Collections.sort(oldCenters);
61 Collections.sort(newCenters);
62 int oldSize = oldCenters.size();
63 int newSize = newCenters.size();
64 logger.info("oldSize : " + oldSize);
65 logger.info("newSize : " + newSize);
66 int size = oldSize > newSize ? newSize : oldSize;
67 int index = 0;
68 int count = 0;
69 for (String s : oldCenters) {
70 if (s.equals(newCenters.get(index)))
71 count++;
72
73 index++;
74 if (index >= size) // Avoid the size of two List is not the same
75 break;
76 }
77 logger.info("count : " + count);
78 if (count == index)
79 ret = true;
80
81 return ret;
82 }
83 }
Class DensityCenter
文本挖掘之文本聚类(MapReduce)文本挖掘之文本聚类(MapReduce)
  1 import java.io.BufferedReader;
2 import java.io.InputStreamReader;
3 import java.io.IOException;
4 import java.net.URI;
5 import java.util.ArrayList;
6 import java.util.List;
7 import org.apache.hadoop.fs.FileSystem;
8 import org.apache.hadoop.fs.FSDataInputStream;
9 import org.apache.hadoop.fs.Path;
10 import org.apache.hadoop.io.IntWritable;
11 import org.apache.hadoop.io.NullWritable;
12 import org.apache.hadoop.io.Text;
13 import org.apache.hadoop.mapreduce.Mapper;
14 import org.apache.hadoop.mapreduce.Reducer;
15 import org.apache.logging.log4j.LogManager;
16 import org.apache.logging.log4j.Logger;
17 import com.gta.cosine.TextCosine;
18 import com.gta.cosine.ElementDict;
19 import com.gta.util.DensityCenter;
20
21 public class KMeansProcess {
22
23 public static class TextMapper extends Mapper<Object, Text, IntWritable, Text> {
24 private static Logger logger = LogManager.getLogger(TextMapper.class);
25 public static List<String> centersList = new ArrayList<String>();
26 public static TextCosine cosine = new TextCosine();
27
28 public void setup(Context context)
29 {
30 int iteration = context.getConfiguration().getInt("ITERATION", 100);
31 if (iteration == 0) {
32 int task = context.getConfiguration().getInt("TASK", 0);
33 try {
34 URI[] caches = context.getCacheFiles();
35 if (caches == null || caches.length <= 0) {
36 System.exit(1);
37 }
38 for (int i = 0; i < task; i++) {
39 FileSystem fs = FileSystem.get(caches[i], context.getConfiguration());
40 FSDataInputStream is = fs.open(new Path(caches[i].toString()));
41 BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8"));
42 String s = null;
43 while ((s = br.readLine()) != null) {
44 centersList.add(s);
45 }
46 br.close();
47 }
48 } catch (IOException e) {
49 logger.error(e.getMessage());
50 }
51 }
52 }
53
54
55 public void map(Object key, Text value, Context context)
56 {
57 try {
58 String str = value.toString();
59 double score = 0;
60 double countTmp = 0;
61 int clusterID = 0;
62 int index = 0;
63 List<ElementDict> vec1 = cosine.tokenizer(str);
64 for (String s : centersList) {
65 List<ElementDict> vec2 = cosine.tokenizer(s);
66 countTmp = cosine.analysisText(vec1, vec2);
67 if (countTmp > score) {
68 clusterID = index;
69 score = countTmp;
70 }
71 index++;
72 }
73 context.write(new IntWritable(clusterID), new Text(str));
74 } catch (IOException e) {
75 logger.error(e.getMessage());
76 } catch (InterruptedException e) {
77 logger.error(e.getMessage());
78 }
79 }
80 }
81
82
83 public static class TextReducer extends Reducer<IntWritable, Text, NullWritable, Text> {
84 private static Logger logger = LogManager.getLogger(TextReducer.class);
85 public static DensityCenter center = new DensityCenter(0.75, KMeansProcess.TextMapper.cosine);
86
87 public void reduce(IntWritable key, Iterable<Text> values, Context context) {
88 try {
89 List<String> list = new ArrayList<String>();
90 for (Text val : values) {
91 list.add(val.toString());
92 }
93 context.write(NullWritable.get(), new Text(center.getDensityCenter(list)));
94 } catch (IOException e) {
95 logger.error(e.getMessage());
96 } catch (InterruptedException e) {
97 logger.error(e.getMessage());
98 }
99 }
100 }
101 }
Class KMeansProcess
文本挖掘之文本聚类(MapReduce)文本挖掘之文本聚类(MapReduce)
 1 import java.io.BufferedReader;
2 import java.io.IOException;
3 import java.io.InputStreamReader;
4 import java.net.URI;
5 import java.util.List;
6 import org.apache.hadoop.fs.FSDataInputStream;
7 import org.apache.hadoop.fs.FileSystem;
8 import org.apache.hadoop.fs.Path;
9 import org.apache.hadoop.io.IntWritable;
10 import org.apache.hadoop.io.Text;
11 import org.apache.hadoop.mapreduce.Mapper;
12 import org.apache.hadoop.mapreduce.Reducer;
13 import org.apache.logging.log4j.LogManager;
14 import org.apache.logging.log4j.Logger;
15 import com.gta.cosine.TextCosine;
16 import com.gta.cosine.ElementDict;
17
18 public class KMeans {
19
20 public static class KMeansMapper extends Mapper<Object, Text, IntWritable, Text> {
21 private List<String> centersList = KMeansProcess.TextMapper.centersList;
22 private static Logger logger = LogManager.getLogger(KMeans.KMeansMapper.class);
23 private TextCosine cosine = KMeansProcess.TextMapper.cosine;
24
25 public void setup(Context context)
26 {
27 int task = context.getConfiguration().getInt("TASK", 0);
28 try {
29 URI[] caches = context.getCacheFiles();
30 if (caches == null || caches.length <= 0) {
31 System.exit(1);
32 }
33 for (int i = 0; i < task; i++) {
34 FileSystem fs = FileSystem.get(caches[i], context.getConfiguration());
35 FSDataInputStream is = fs.open(new Path(caches[i].toString()));
36 BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8"));
37 String s = null;
38 while ((s = br.readLine()) != null)
39 centersList.add(s);
40 br.close();
41 }
42 } catch (IOException e) {
43 logger.error(e.getMessage());
44 }
45 }
46
47
48 public void map(Object key, Text value, Context context) {
49 try {
50 String str = value.toString();
51 double score = 0;
52 double countTmp = 0;
53 int clusterID = 0;
54 int index = 0;
55 List<ElementDict> vec1 = cosine.tokenizer(str);
56 for (String s : centersList) {
57 List<ElementDict> vec2 = cosine.tokenizer(s);
58 countTmp = cosine.analysisText(vec1, vec2);
59 if (countTmp > score) {
60 clusterID = index;
61 score = countTmp;
62 }
63 index++;
64 }
65 context.write(new IntWritable(clusterID), new Text(str));
66 } catch (IOException e) {
67 logger.error(e.getMessage());
68 } catch (InterruptedException e) {
69 logger.error(e.getMessage());
70 }
71 }
72
73
74 public void cleanup(Context context)
75 {
76 centersList.clear();
77 }
78 }
79
80
81 public static class KMeansReducer extends Reducer<IntWritable, Text, IntWritable, Text> {
82 private static Logger logger = LogManager.getLogger(KMeans.KMeansReducer.class);
83
84 public void ruduce(IntWritable key, Iterable<Text> values, Context context) {
85 try {
86 for (Text val : values) {
87 context.write(key, val);
88 }
89 } catch (IOException e) {
90 logger.error(e.getMessage());
91 } catch (InterruptedException e) {
92 logger.error(e.getMessage());
93 }
94 }
95 }
96
97 }
Class KMeans
文本挖掘之文本聚类(MapReduce)文本挖掘之文本聚类(MapReduce)
  1 import java.io.BufferedReader;
2 import java.io.IOException;
3 import java.io.InputStreamReader;
4 import java.util.List;
5 import java.util.ArrayList;
6 import java.net.URI;
7 import org.apache.hadoop.conf.Configuration;
8 import org.apache.hadoop.fs.FSDataInputStream;
9 import org.apache.hadoop.fs.FileSystem;
10 import org.apache.hadoop.fs.Path;
11 import org.apache.hadoop.io.IntWritable;
12 import org.apache.hadoop.io.Text;
13 import org.apache.hadoop.mapreduce.Job;
14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
16 import com.gta.cluster.KMeans.KMeansMapper;
17 import com.gta.cluster.KMeans.KMeansReducer;
18 import com.gta.cluster.KMeansProcess.TextMapper;
19 import com.gta.cluster.KMeansProcess.TextReducer;
20 import org.apache.logging.log4j.LogManager;
21 import org.apache.logging.log4j.Logger;
22
23 public class Cluster {
24 public static final int MAX = 50;
25 public static final String INPUT_PATH = "hdfs://10.1.130.10:9000/user/hadoop/input/";
26 public static final String OUTPUT_PATH = "hdfs://10.1.130.10:9000/user/hadoop/output/";
27 public static final String TMP_PATH = "hdfs://10.1.130.10:9000/user/hadoop/tmp/";
28 public static final int TASK = 5;
29 public static Logger logger = LogManager.getLogger(Cluster.class);
30 private Configuration conf;
31 private int iteration = 0;
32
33 public Cluster()
34 {
35 this.conf = new Configuration();
36 conf.setInt("TASK", TASK);
37 }
38
39
40 public void run() throws IOException, InterruptedException, ClassNotFoundException
41 {
42 while (iteration < MAX) {
43 logger.info("次数 : " + (iteration+1));
44 conf.setInt("ITERATION", iteration);
45 Job job = Job.getInstance(conf, "KMeans Process");
46 if (iteration == 0) {
47 String cacheFile = TMP_PATH + iteration + "/part-r-0000";
48 for (int i = 0; i < TASK; i++)
49 job.addCacheFile(URI.create(cacheFile+i));
50 }
51 job.setJarByClass(KMeansProcess.class);
52 job.setMapperClass(TextMapper.class);
53 job.setNumReduceTasks(TASK);
54 job.setReducerClass(TextReducer.class);
55 job.setOutputKeyClass(IntWritable.class);
56 job.setOutputValueClass(Text.class);
57 iteration++;
58 String outFile = TMP_PATH + iteration;
59 FileInputFormat.addInputPath(job, new Path(INPUT_PATH));
60 FileOutputFormat.setOutputPath(job, new Path(outFile));
61 job.waitForCompletion(true);
62 conf.unset("ITERATION");
63 List<String> tmpList = getCenterList(outFile);
64 if (KMeansProcess.TextReducer.center.compareCenters(KMeansProcess.TextMapper.centersList, tmpList))
65 break;
66 else {
67 KMeansProcess.TextMapper.centersList.clear();
68 for (String s : tmpList) {
69 KMeansProcess.TextMapper.centersList.add(s);
70 }
71 }
72 }
73 }
74
75
76 public void lastRun() throws IOException, InterruptedException, ClassNotFoundException
77 {
78 String cacheFile = TMP_PATH + iteration + "/part-r-0000";
79 Job job = Job.getInstance(conf, "KMeans");
80 for (int i = 0; i < TASK; i++)
81 job.addCacheFile(URI.create(cacheFile+i));
82 job.setJarByClass(KMeans.class);
83 job.setMapperClass(KMeansMapper.class);
84 job.setReducerClass(KMeansReducer.class);
85 job.setOutputKeyClass(IntWritable.class);
86 job.setOutputValueClass(Text.class);
87 FileInputFormat.addInputPath(job, new Path(INPUT_PATH));
88 FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
89 job.waitForCompletion(true);
90 }
91
92
93 public List<String> getCenterList(String outFile)
94 {
95 List<String> centerList = new ArrayList<String>();
96 String fileName = outFile + "/part-r-0000";
97 try {
98 for (int i = 0; i < TASK; i++) {
99 FileSystem fs = FileSystem.get(URI.create((fileName+i)), conf);
100 FSDataInputStream is = fs.open(new Path((fileName+i).toString()));
101 BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8"));
102 String s = null;
103 while ((s = br.readLine()) != null)
104 centerList.add(s);
105 br.close();
106 }
107 } catch (IOException e) {
108 logger.info(e.getMessage());
109 }
110
111 return centerList;
112 }
113
114
115 public static void main(String[] args) {
116 Cluster cluster = new Cluster();
117 try {
118 long start = System.currentTimeMillis();
119 cluster.run();
120 cluster.lastRun();
121 long end = System.currentTimeMillis();
122 Cluster.logger.info(end-start);
123 } catch (ClassNotFoundException e) {
124 e.printStackTrace();
125 } catch (IOException e) {
126 e.printStackTrace();
127 } catch (InterruptedException e) {
128 e.printStackTrace();
129 }
130 }
131 }
Class Cluster

   鉴于在分布式环境下,多次迭代需要多次读取缓存文件,因此本文引入静态变量,以减少对TextCosine等初始化,以达到提升文本处理速率的目的。本文作者一直试图将对象实体传入Job中,但是经过多次实践,均以失败告终,若是有更好的解决方案,请联系我

 

 


  作者:志青云集
  出处:http://www.cnblogs.com/lyssym
  如果,您认为阅读这篇博客让您有些收获,不妨点击一下右下角的【推荐】。
  如果,您希望更容易地发现我的新博客,不妨点击一下左下角的【关注我】。
  如果,您对我的博客所讲述的内容有兴趣,请继续关注我的后续博客,我是【志青云集】。
  本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接。