Kmeans是十分常见的数据挖掘算法,其逻辑较为简单,应用范围广。通过百度搜索java实现的Kmeans算法,可参考的版本很多,比如:
http://blog.csdn.net/jdplus/article/details/23960127
还有:
http://www.cnblogs.com/chaoku/p/3748456.html
虽然作者都表示亲测有效,不会有任何问题,然而在实际应用中每个人的环境不同,尤其是hadoop版本的不同,总会出现这样或者那样的问题。不过他们的算法给了很好的参考,按照他们的逻辑照虎画猫,也是可行的。
我的hadoop版本较为老旧,其中最为突出的问题就是在老版本的hadoop中并没有
org.apache.hadoop.mapreduce.Job;
这个包,这个版本上的差别照成了并不能直接拿大牛们的代码复制过来就用。随后在参考了hadoop官网中的案例重新对Kmeans算法进行了实现,代码参考“潇洒子弦”较多,也容纳我的思考,主要有耽三个大方面的不同:
实现的版本不同,基于低级版本的hadoop予以实现。
在计算距离上有了变化,采用了欧式距离,按照原来的实现方案并不能有效聚类成需要的组别数呢。
- 将中心点写入新文件中语句也有变动,按照原始的写法,似乎会覆盖掉。
以下是主要代码:
package mykmeans;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.LineReader;
public class CopyOfUtils {
//读取中心文件的数据
public static ArrayList<ArrayList<Double>> getCentersFromHDFS(String centersPath,boolean isDirectory) throws IOException{
ArrayList<ArrayList<Double>> result = new ArrayList<ArrayList<Double>>();
Path path = new Path(centersPath);
Configuration conf = new Configuration();
FileSystem fileSystem = path.getFileSystem(conf);
if(isDirectory){
FileStatus[] listFile = fileSystem.listStatus(path);
for (int i = 0; i < listFile.length; i++) {
result.addAll(getCentersFromHDFS(listFile[i].getPath().toString(),false));
}
return result;
}
FSDataInputStream fsis = fileSystem.open(path);
LineReader lineReader = new LineReader(fsis, conf);
Text line = new Text();
while(lineReader.readLine(line) > 0){
//ArrayList<Double> tempList = textToArray(line);
ArrayList<Double> tempList = new ArrayList<Double>();
String[] fields = line.toString().replaceAll("\t", "").split(",");
for(int i=0;i<fields.length;i++){
tempList.add(Double.parseDouble(fields[i]));
}
result.add(tempList);
}
lineReader.close();
return result;
}
//删掉文件
public static void deletePath(String pathStr) throws IOException{
Configuration conf = new Configuration();
Path path = new Path(pathStr);
FileSystem hdfs = path.getFileSystem(conf);
hdfs.delete(path ,true);
}
public static ArrayList<Double> textToArray(Text text){
ArrayList<Double> list = new ArrayList<Double>();
String[] fileds = text.toString().replaceAll("\t", "").split("/,");
for(int i=0;i<fileds.length;i++){
list.add(Double.parseDouble(fileds[i]));
}
return list;
}
public static boolean compareCenters(String centerPath,String newPath) throws IOException{
System.out.println("比较两个中心点是否相等");
List<ArrayList<Double>> oldCenters = CopyOfUtils.getCentersFromHDFS(centerPath,false);
List<ArrayList<Double>> newCenters = CopyOfUtils.getCentersFromHDFS(newPath,true);
int size = oldCenters.size();
int fildSize = oldCenters.get(0).size();
double distance = 0;
for(int i=0;i<size;i++){
for(int j=0;j<fildSize;j++){
double t1 = Math.abs(oldCenters.get(i).get(j));
double t2 = Math.abs(newCenters.get(i).get(j));
distance += Math.pow((t1 - t2) / (t1 + t2), 2);
}
}
if(distance <= 0.00001){
//删掉新的中心文件以便最后依次归类输出
CopyOfUtils.deletePath(newPath);
return true;
}else{
//先清空中心文件,将新的中心文件复制到中心文件中,再删掉中心文件
CopyOfUtils.deletePath(centerPath);
Configuration conf = new Configuration();
Path outPath = new Path(centerPath);
FileSystem fileSystem = outPath.getFileSystem(conf);
FSDataOutputStream out = fileSystem.create(outPath);
//out.
//将newCenter的内容写到文件里面
Path inPath = new Path(newPath);
FileStatus[] listFiles = fileSystem.listStatus(inPath);
for (int i = 0; i < listFiles.length; i++) {
FSDataInputStream in = fileSystem.open(listFiles[i].getPath());
int byteRead = 0;
byte[] buffer = new byte[256];
while ((byteRead = in.read(buffer)) > 0) {
out.write(buffer, 0, byteRead);
}
in.close();
}
out.close();
//删掉新的中心文件以便第二次任务运行输出
CopyOfUtils.deletePath(newPath);
}
return false;
}
}
package mykmeans;
import java.io.*;
import java.text.DecimalFormat;
import java.util.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class CopyOfNewMapReduce extends Configured implements Tool{
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, IntWritable, Text>{
// private String centerpath;
public ArrayList<ArrayList<Double>> centers = null;
public int k = 3;
public void configure(JobConf job){
String center = job.get("map.center.file");
try {
centers = Utils.getCentersFromHDFS(center,false);
k = centers.size();
System.out.println("centers point is: "+centers.toString());
} catch (IOException e) {
System.err.println("cannot find the map center file!");
e.printStackTrace();
}
}
@Override
public void map(LongWritable key, Text value,
OutputCollector<IntWritable, Text> output, Reporter report)
throws IOException {
//读取一行数据
ArrayList<Double> fileds = new ArrayList<Double>();
String[] temp = value.toString().replaceAll("\t", "").split(",");
for(int i = 0; i<temp.length;i++){
fileds.add(Double.parseDouble(temp[i]));
}
int sizeOfFileds = fileds.size();
double minDistance = 99999999;
int centerIndex = 0;
//依次取出k个中心点与当前读取的记录做计算
for(int i=0;i<k;i++){
double currentDistance = 0;
for(int j=0;j<sizeOfFileds;j++){
double centerPoint = centers.get(i).get(j);
double filed = fileds.get(j);
currentDistance += (centerPoint-filed)*(centerPoint-filed);
}
currentDistance = Math.sqrt(currentDistance);
//循环找出距离该记录最接近的中心点的ID
if(currentDistance<minDistance){
minDistance = currentDistance;
centerIndex = i;
}
}
//以中心点为Key 将记录原样输出
output.collect(new IntWritable(centerIndex+1), value);
}
}
public static class Reduce extends MapReduceBase implements Reducer<IntWritable, Text, Text, Text> {
@Override
public void reduce(IntWritable key, Iterator<Text> value,
OutputCollector<Text, Text> output, Reporter report)
throws IOException {
ArrayList<ArrayList<Double>> filedsList = new ArrayList<ArrayList<Double>>();
DecimalFormat df0 = new DecimalFormat("###.000000");
//依次读取记录集,每行为一个ArrayList<Double>
System.out.println(key+": "+value.toString());
while(value.hasNext()){
ArrayList<Double> tempList = new ArrayList<Double>();
String[] temp0 = value.next().toString().replaceAll("\t", "").split(",");
for(int i = 0; i< temp0.length; i++){
tempList.add(Double.parseDouble(df0.format(Double.parseDouble(temp0[i]))));
}
filedsList.add(tempList);
}
//计算新的中心
//每行的元素个数
int filedSize = filedsList.get(0).size();
double[] avg = new double[filedSize];
for(int i=0;i<filedSize;i++){
//求没列的平均值
double sum = 0;
int size = filedsList.size();
for(int j=0;j<size;j++){
sum += filedsList.get(j).get(i);
}
avg[i] = sum / size;
avg[i] = Double.parseDouble(df0.format(avg[i]));
}
output.collect(new Text("") , new Text(Arrays.toString(avg).replace("[", "").replace("]", "").replaceAll("\t", "")));
}
}
@Override
public int run(String[] args) throws Exception {
JobConf conf = new JobConf(getConf(), CopyOfNewMapReduce.class);
conf.setJobName("kmeans");
conf.setMapperClass(Map.class);
conf.setMapOutputKeyClass(IntWritable.class);
conf.setMapOutputValueClass(Text.class);
if(!"false".equals(args[3])||"true".equals(args[3])){
conf.setReducerClass(Reduce.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
}
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.set("map.center.file", args[2]);
JobClient.runJob(conf);
return 0;
}
public static void main(String[] args)throws Exception{
int count = 0;
int res = 0;
while(true){
res = ToolRunner.run(new Configuration(), new CopyOfNewMapReduce(), args);
System.out.println(" 第 " + ++count + " 次计算 ");
if(Utils.compareCenters(args[2],args[1] )){
String lastarg[] = new String[args.length];
for(int i=0; i < args.length-1; i++){
lastarg[i] = args[i];
}
lastarg[args.length-1] = "false";
res = ToolRunner.run(new Configuration(), new CopyOfNewMapReduce(), lastarg);
break;
}
}
System.exit(res);
}
}
编译后,打成jar包,注意java版本的一致性。
在hadoop客户端执行:
~hadoop>bin/hadoop jar MyKmeans.jar mykmeans.CopyOfNewMapReduce /xxxx/kmeans/input*.file /xxx/output /xxx/kmeans/cluster.file true
需要保证输入的数据内容以逗号“,”隔开,初始中心点需要自行设置,而不是随机取的,同样以逗号“,”隔开。
最后实现的效果在output文件夹下,每个聚类在一个文件中,输出的数据格式截图如下:
如此便实现了低版本的hadoop上kmeans算法的实现,有问题欢迎交流。
引用:MapReduce Kmeans聚类算法.http://www.cnblogs.com/chaoku/p/3748456.html