hadoop之Kmeans数据挖掘算法实现

时间:2022-08-31 14:55:30

Kmeans是十分常见的数据挖掘算法,其逻辑较为简单,应用范围广。通过百度搜索java实现的Kmeans算法,可参考的版本很多,比如:
http://blog.csdn.net/jdplus/article/details/23960127
还有:
http://www.cnblogs.com/chaoku/p/3748456.html

虽然作者都表示亲测有效,不会有任何问题,然而在实际应用中每个人的环境不同,尤其是hadoop版本的不同,总会出现这样或者那样的问题。不过他们的算法给了很好的参考,按照他们的逻辑照虎画猫,也是可行的。
我的hadoop版本较为老旧,其中最为突出的问题就是在老版本的hadoop中并没有

org.apache.hadoop.mapreduce.Job;

这个包,这个版本上的差别照成了并不能直接拿大牛们的代码复制过来就用。随后在参考了hadoop官网中的案例重新对Kmeans算法进行了实现,代码参考“潇洒子弦”较多,也容纳我的思考,主要有耽三个大方面的不同:

  1. 实现的版本不同,基于低级版本的hadoop予以实现。

  2. 在计算距离上有了变化,采用了欧式距离,按照原来的实现方案并不能有效聚类成需要的组别数呢。

  3. 将中心点写入新文件中语句也有变动,按照原始的写法,似乎会覆盖掉。

以下是主要代码:

package mykmeans;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.LineReader;

public class CopyOfUtils {

    //读取中心文件的数据
    public static ArrayList<ArrayList<Double>> getCentersFromHDFS(String centersPath,boolean isDirectory) throws IOException{

        ArrayList<ArrayList<Double>> result = new ArrayList<ArrayList<Double>>();

        Path path = new Path(centersPath);

        Configuration conf = new Configuration();

        FileSystem fileSystem = path.getFileSystem(conf);

        if(isDirectory){    
            FileStatus[] listFile = fileSystem.listStatus(path);
            for (int i = 0; i < listFile.length; i++) {
                result.addAll(getCentersFromHDFS(listFile[i].getPath().toString(),false));
            }
            return result;
        }

        FSDataInputStream fsis = fileSystem.open(path);
        LineReader lineReader = new LineReader(fsis, conf);

        Text line = new Text();

        while(lineReader.readLine(line) > 0){
            //ArrayList<Double> tempList = textToArray(line);
            ArrayList<Double> tempList = new ArrayList<Double>();
            String[] fields = line.toString().replaceAll("\t", "").split(",");
            for(int i=0;i<fields.length;i++){
                tempList.add(Double.parseDouble(fields[i]));
            }
            result.add(tempList);
        }
        lineReader.close();
        return result;
    }

    //删掉文件
    public static void deletePath(String pathStr) throws IOException{
        Configuration conf = new Configuration();
        Path path = new Path(pathStr);
        FileSystem hdfs = path.getFileSystem(conf);
        hdfs.delete(path ,true);
    }

    public static ArrayList<Double> textToArray(Text text){
        ArrayList<Double> list = new ArrayList<Double>();
        String[] fileds = text.toString().replaceAll("\t", "").split("/,");
        for(int i=0;i<fileds.length;i++){
            list.add(Double.parseDouble(fileds[i]));
        }
        return list;
    }

    public static boolean compareCenters(String centerPath,String newPath) throws IOException{
        System.out.println("比较两个中心点是否相等");
        List<ArrayList<Double>> oldCenters = CopyOfUtils.getCentersFromHDFS(centerPath,false);
        List<ArrayList<Double>> newCenters = CopyOfUtils.getCentersFromHDFS(newPath,true);


        int size = oldCenters.size();
        int fildSize = oldCenters.get(0).size();

        double distance = 0;
        for(int i=0;i<size;i++){
            for(int j=0;j<fildSize;j++){
                double t1 = Math.abs(oldCenters.get(i).get(j));
                double t2 = Math.abs(newCenters.get(i).get(j));
                distance += Math.pow((t1 - t2) / (t1 + t2), 2);
            }
        }

        if(distance <= 0.00001){
            //删掉新的中心文件以便最后依次归类输出
            CopyOfUtils.deletePath(newPath);
            return true;
        }else{
            //先清空中心文件,将新的中心文件复制到中心文件中,再删掉中心文件

            CopyOfUtils.deletePath(centerPath);
            Configuration conf = new Configuration();
            Path outPath = new Path(centerPath);
            FileSystem fileSystem = outPath.getFileSystem(conf);            
            FSDataOutputStream out = fileSystem.create(outPath);
            //out.
            //将newCenter的内容写到文件里面



            Path inPath = new Path(newPath);

            FileStatus[] listFiles = fileSystem.listStatus(inPath);
            for (int i = 0; i < listFiles.length; i++) {                

                FSDataInputStream in = fileSystem.open(listFiles[i].getPath());

                int byteRead = 0;
                byte[] buffer = new byte[256];
                while ((byteRead = in.read(buffer)) > 0) {
                    out.write(buffer, 0, byteRead);
                }
                in.close();
            }
                out.close();

            //删掉新的中心文件以便第二次任务运行输出
            CopyOfUtils.deletePath(newPath);
        }

        return false;
    }
}
package mykmeans;

import java.io.*;
import java.text.DecimalFormat;
import java.util.*;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;



public class CopyOfNewMapReduce extends Configured implements Tool{

    public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, IntWritable, Text>{


        // private String centerpath;
         public ArrayList<ArrayList<Double>> centers = null;
         public int k = 3;
         public void configure(JobConf job){
             String center = job.get("map.center.file");
             try {
                centers = Utils.getCentersFromHDFS(center,false);
                k = centers.size();
                System.out.println("centers point is: "+centers.toString());

            } catch (IOException e) {
                System.err.println("cannot find the map center file!");
                e.printStackTrace();
            }

         }

        @Override
        public void map(LongWritable key, Text value,
                OutputCollector<IntWritable, Text> output, Reporter report)
                throws IOException {
            //读取一行数据

            ArrayList<Double> fileds = new ArrayList<Double>();
            String[] temp = value.toString().replaceAll("\t", "").split(",");
            for(int i = 0; i<temp.length;i++){
                fileds.add(Double.parseDouble(temp[i]));
            }

            int sizeOfFileds = fileds.size();

            double minDistance = 99999999;
            int centerIndex = 0;

            //依次取出k个中心点与当前读取的记录做计算
            for(int i=0;i<k;i++){
                double currentDistance = 0;
                for(int j=0;j<sizeOfFileds;j++){

                    double centerPoint = centers.get(i).get(j);
                    double filed = fileds.get(j);
                    currentDistance += (centerPoint-filed)*(centerPoint-filed);

                }
                currentDistance = Math.sqrt(currentDistance);
                //循环找出距离该记录最接近的中心点的ID
                if(currentDistance<minDistance){
                    minDistance = currentDistance;
                    centerIndex = i;
                }
            }
            //以中心点为Key 将记录原样输出
            output.collect(new IntWritable(centerIndex+1), value);

        }

    }


     public static class Reduce extends MapReduceBase implements Reducer<IntWritable, Text, Text, Text> {

        @Override
        public void reduce(IntWritable key, Iterator<Text> value,
                OutputCollector<Text, Text> output, Reporter report)
                throws IOException {
ArrayList<ArrayList<Double>> filedsList = new ArrayList<ArrayList<Double>>();
DecimalFormat df0 = new DecimalFormat("###.000000");

            //依次读取记录集,每行为一个ArrayList<Double>
            System.out.println(key+": "+value.toString());
            while(value.hasNext()){

                ArrayList<Double> tempList = new ArrayList<Double>();
                String[] temp0 = value.next().toString().replaceAll("\t", "").split(",");
                for(int i = 0; i< temp0.length; i++){
                    tempList.add(Double.parseDouble(df0.format(Double.parseDouble(temp0[i]))));
                }
                filedsList.add(tempList);
            }

            //计算新的中心
            //每行的元素个数
            int filedSize = filedsList.get(0).size();
            double[] avg = new double[filedSize];
            for(int i=0;i<filedSize;i++){
                //求没列的平均值
                double sum = 0;
                int size = filedsList.size();
                for(int j=0;j<size;j++){
                    sum += filedsList.get(j).get(i);
                }
                avg[i] = sum / size;
                avg[i] = Double.parseDouble(df0.format(avg[i]));
            }
            output.collect(new Text("") , new Text(Arrays.toString(avg).replace("[", "").replace("]", "").replaceAll("\t", "")));

        }


     }



    @Override
    public int run(String[] args) throws Exception {
        JobConf conf = new JobConf(getConf(), CopyOfNewMapReduce.class);
         conf.setJobName("kmeans");

         conf.setMapperClass(Map.class);
         conf.setMapOutputKeyClass(IntWritable.class);
         conf.setMapOutputValueClass(Text.class);



         if(!"false".equals(args[3])||"true".equals(args[3])){
             conf.setReducerClass(Reduce.class);
             conf.setOutputKeyClass(Text.class);
             conf.setOutputValueClass(Text.class);
         }

         FileInputFormat.setInputPaths(conf, new Path(args[0]));
         FileOutputFormat.setOutputPath(conf, new Path(args[1]));
         conf.set("map.center.file", args[2]);


         JobClient.runJob(conf);
         return 0;
    }
    public static void main(String[] args)throws Exception{
        int count = 0;
        int res = 0;
        while(true){
            res = ToolRunner.run(new Configuration(), new CopyOfNewMapReduce(), args);
            System.out.println(" 第 " + ++count + " 次计算 ");
            if(Utils.compareCenters(args[2],args[1] )){

                String lastarg[] = new String[args.length];
                for(int i=0; i < args.length-1; i++){
                    lastarg[i] = args[i];
                    }
                lastarg[args.length-1] = "false";
                res = ToolRunner.run(new Configuration(), new CopyOfNewMapReduce(), lastarg);
                break;
                }

            }


        System.exit(res);
    }


}

编译后,打成jar包,注意java版本的一致性。
在hadoop客户端执行:

~hadoop>bin/hadoop jar MyKmeans.jar mykmeans.CopyOfNewMapReduce /xxxx/kmeans/input*.file /xxx/output /xxx/kmeans/cluster.file true 

需要保证输入的数据内容以逗号“,”隔开,初始中心点需要自行设置,而不是随机取的,同样以逗号“,”隔开。
最后实现的效果在output文件夹下,每个聚类在一个文件中,输出的数据格式截图如下:
hadoop之Kmeans数据挖掘算法实现

如此便实现了低版本的hadoop上kmeans算法的实现,有问题欢迎交流。

引用:MapReduce Kmeans聚类算法.http://www.cnblogs.com/chaoku/p/3748456.html