我在Spark中的数据集的最佳分类器模型

时间:2021-08-29 23:11:57

I am a new bee in Spark and ML and I have a task that should be implemented by Apache Spark API. Some sample rows of my data are:

我是Spark和ML中的新蜜蜂,我有一个应该由Apache Spark API实现的任务。我的数据的一些示例行是:

298,217756,468,0,363,0,0,14,0,11,0,0,894,cluster3
299,219413,25,1364,261,15,0,1,11,5,1,0,1760.5,cluster5
300,223153,1650,8673,2215,282,0,43,120,37,7,0,12853,cluster1

and I need to train a classifier after which, its model will predict the cluster in any arbitrary incoming row. For example the model should predict the '?' in the following row:

我需要训练一个分类器,之后,它的模型将预测任意传入行中的簇。例如,模型应该预测'?'在以下行中:

318,240747,875,0,0,0,0,8,0,0,0,0,875,?

So I need to know what type of Spark Datatype, Classifier and so on should I use? How should I predict the '?' ?

所以我需要知道我应该使用什么类型的Spark数据类型,分类器等?我该如何预测'?' ?

Any help is appreciated!

任何帮助表示赞赏!

1 个解决方案

#1


Ok I solved the issue :-) just posting the answer for other interested users. The sample data is 60,236,178,0,0,4,15,16,0,0,575.00,5

好的,我解决了这个问题:-)只是为其他感兴趣的用户发布了答案。样本数据为60,236,178,0,0,4,15,16,0,0,575.00,5

1500,0,0,0,0,5,0,0,0,0,1500.00,5

50,2072,248,0,0,1,56,7,0,0,2658.50,5

package spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.mllib.classification.NaiveBayes;
import org.apache.spark.mllib.classification.NaiveBayesModel;
import org.apache.spark.mllib.feature.HashingTF;
import org.apache.spark.mllib.regression.LabeledPoint;
import scala.Tuple2;
import scala.actors.threadpool.Arrays;

import java.text.DecimalFormat;


/**
 */
public class NaiveBayesTest {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("NaiveBayes Example").set("spark.driver.allowMultipleContexts", "true").set("hadoop.version","hadoop-2.4");
        conf.setMaster("local[*]");
        JavaSparkContext sc = new JavaSparkContext(conf);

        String path = "resources/clustering-Result-without-index-id.csv";
        JavaRDD<String> data = sc.textFile(path);
        final HashingTF tf = new HashingTF(10000);


        // Split initial RDD into two... [60% training data, 40% testing data].
        JavaRDD<LabeledPoint> mainData   = data.map(
                new Function<String , LabeledPoint>() {
                    @Override
                    public LabeledPoint call( String line) throws Exception {

                        String[] parts = line.split(",");

                        Double[] v = new Double[parts.length - 1];
                        for (int i = 0; i < parts.length - 1 ; i++){
                            v[i] = Double.parseDouble(parts[i]);
                        }
                        return new LabeledPoint(Double.parseDouble(parts[parts.length-1]),tf.transform(Arrays.asList(v)));
                    }
        });
        JavaRDD<LabeledPoint> training = mainData.sample(false, 0.9, 111L);
         training.cache();

        JavaRDD<LabeledPoint> test = mainData.subtract(training);
        test.cache();

         NaiveBayesModel model = NaiveBayes.train(training.rdd(), 23.0);

      JavaPairRDD<Double, Double> predictionAndLabel =
                test.mapToPair(new PairFunction<LabeledPoint, Double, Double>() {
                    @Override public Tuple2<Double, Double> call(LabeledPoint p) {
                        double cluster = model.predict(p.features());
                        String b = (cluster == p.label()) ? "  ------> correct." : "";
                        System.out.println("predicted : "+cluster+ " , actual : " + p.label() + b);
                        return new Tuple2<Double, Double>(cluster, p.label());
                    }
                });
        double accuracy = predictionAndLabel.filter(
                new Function<Tuple2<Double, Double>, Boolean>() {
            @Override
            public Boolean call(Tuple2<Double, Double> pl) {
                return pl._1().equals(pl._2());
            }
        }).count() / (double) test.count();

        System.out.println("accuracy is " + new DecimalFormat("#.000").format(accuracy * 100) + "%");
        LabeledPoint point = new LabeledPoint(3,tf.transform(Arrays.asList(new String[]{"0,825,0,0,0,0,1,0,0,0,2180"})));
       double d =  model.predict(point.features());

    System.out.println("predicted : "+d+ " , actual : " + point.label());
    model.save(sc.sc(), "myModelPath");

        NaiveBayesModel sameModel = NaiveBayesModel.load(sc.sc(), "myModelPath");
        sameModel.labels();

    }
}

#1


Ok I solved the issue :-) just posting the answer for other interested users. The sample data is 60,236,178,0,0,4,15,16,0,0,575.00,5

好的,我解决了这个问题:-)只是为其他感兴趣的用户发布了答案。样本数据为60,236,178,0,0,4,15,16,0,0,575.00,5

1500,0,0,0,0,5,0,0,0,0,1500.00,5

50,2072,248,0,0,1,56,7,0,0,2658.50,5

package spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.mllib.classification.NaiveBayes;
import org.apache.spark.mllib.classification.NaiveBayesModel;
import org.apache.spark.mllib.feature.HashingTF;
import org.apache.spark.mllib.regression.LabeledPoint;
import scala.Tuple2;
import scala.actors.threadpool.Arrays;

import java.text.DecimalFormat;


/**
 */
public class NaiveBayesTest {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("NaiveBayes Example").set("spark.driver.allowMultipleContexts", "true").set("hadoop.version","hadoop-2.4");
        conf.setMaster("local[*]");
        JavaSparkContext sc = new JavaSparkContext(conf);

        String path = "resources/clustering-Result-without-index-id.csv";
        JavaRDD<String> data = sc.textFile(path);
        final HashingTF tf = new HashingTF(10000);


        // Split initial RDD into two... [60% training data, 40% testing data].
        JavaRDD<LabeledPoint> mainData   = data.map(
                new Function<String , LabeledPoint>() {
                    @Override
                    public LabeledPoint call( String line) throws Exception {

                        String[] parts = line.split(",");

                        Double[] v = new Double[parts.length - 1];
                        for (int i = 0; i < parts.length - 1 ; i++){
                            v[i] = Double.parseDouble(parts[i]);
                        }
                        return new LabeledPoint(Double.parseDouble(parts[parts.length-1]),tf.transform(Arrays.asList(v)));
                    }
        });
        JavaRDD<LabeledPoint> training = mainData.sample(false, 0.9, 111L);
         training.cache();

        JavaRDD<LabeledPoint> test = mainData.subtract(training);
        test.cache();

         NaiveBayesModel model = NaiveBayes.train(training.rdd(), 23.0);

      JavaPairRDD<Double, Double> predictionAndLabel =
                test.mapToPair(new PairFunction<LabeledPoint, Double, Double>() {
                    @Override public Tuple2<Double, Double> call(LabeledPoint p) {
                        double cluster = model.predict(p.features());
                        String b = (cluster == p.label()) ? "  ------> correct." : "";
                        System.out.println("predicted : "+cluster+ " , actual : " + p.label() + b);
                        return new Tuple2<Double, Double>(cluster, p.label());
                    }
                });
        double accuracy = predictionAndLabel.filter(
                new Function<Tuple2<Double, Double>, Boolean>() {
            @Override
            public Boolean call(Tuple2<Double, Double> pl) {
                return pl._1().equals(pl._2());
            }
        }).count() / (double) test.count();

        System.out.println("accuracy is " + new DecimalFormat("#.000").format(accuracy * 100) + "%");
        LabeledPoint point = new LabeledPoint(3,tf.transform(Arrays.asList(new String[]{"0,825,0,0,0,0,1,0,0,0,2180"})));
       double d =  model.predict(point.features());

    System.out.println("predicted : "+d+ " , actual : " + point.label());
    model.save(sc.sc(), "myModelPath");

        NaiveBayesModel sameModel = NaiveBayesModel.load(sc.sc(), "myModelPath");
        sameModel.labels();

    }
}