I am a new bee in Spark and ML and I have a task that should be implemented by Apache Spark API. Some sample rows of my data are:
我是Spark和ML中的新蜜蜂,我有一个应该由Apache Spark API实现的任务。我的数据的一些示例行是:
298,217756,468,0,363,0,0,14,0,11,0,0,894,cluster3
299,219413,25,1364,261,15,0,1,11,5,1,0,1760.5,cluster5
300,223153,1650,8673,2215,282,0,43,120,37,7,0,12853,cluster1
and I need to train a classifier after which, its model will predict the cluster in any arbitrary incoming row. For example the model should predict the '?' in the following row:
我需要训练一个分类器,之后,它的模型将预测任意传入行中的簇。例如,模型应该预测'?'在以下行中:
318,240747,875,0,0,0,0,8,0,0,0,0,875,?
So I need to know what type of Spark Datatype, Classifier and so on should I use? How should I predict the '?' ?
所以我需要知道我应该使用什么类型的Spark数据类型,分类器等?我该如何预测'?' ?
Any help is appreciated!
任何帮助表示赞赏!
1 个解决方案
#1
Ok I solved the issue :-) just posting the answer for other interested users. The sample data is 60,236,178,0,0,4,15,16,0,0,575.00,5
好的,我解决了这个问题:-)只是为其他感兴趣的用户发布了答案。样本数据为60,236,178,0,0,4,15,16,0,0,575.00,5
1500,0,0,0,0,5,0,0,0,0,1500.00,5
50,2072,248,0,0,1,56,7,0,0,2658.50,5
package spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.mllib.classification.NaiveBayes;
import org.apache.spark.mllib.classification.NaiveBayesModel;
import org.apache.spark.mllib.feature.HashingTF;
import org.apache.spark.mllib.regression.LabeledPoint;
import scala.Tuple2;
import scala.actors.threadpool.Arrays;
import java.text.DecimalFormat;
/**
*/
public class NaiveBayesTest {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("NaiveBayes Example").set("spark.driver.allowMultipleContexts", "true").set("hadoop.version","hadoop-2.4");
conf.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
String path = "resources/clustering-Result-without-index-id.csv";
JavaRDD<String> data = sc.textFile(path);
final HashingTF tf = new HashingTF(10000);
// Split initial RDD into two... [60% training data, 40% testing data].
JavaRDD<LabeledPoint> mainData = data.map(
new Function<String , LabeledPoint>() {
@Override
public LabeledPoint call( String line) throws Exception {
String[] parts = line.split(",");
Double[] v = new Double[parts.length - 1];
for (int i = 0; i < parts.length - 1 ; i++){
v[i] = Double.parseDouble(parts[i]);
}
return new LabeledPoint(Double.parseDouble(parts[parts.length-1]),tf.transform(Arrays.asList(v)));
}
});
JavaRDD<LabeledPoint> training = mainData.sample(false, 0.9, 111L);
training.cache();
JavaRDD<LabeledPoint> test = mainData.subtract(training);
test.cache();
NaiveBayesModel model = NaiveBayes.train(training.rdd(), 23.0);
JavaPairRDD<Double, Double> predictionAndLabel =
test.mapToPair(new PairFunction<LabeledPoint, Double, Double>() {
@Override public Tuple2<Double, Double> call(LabeledPoint p) {
double cluster = model.predict(p.features());
String b = (cluster == p.label()) ? " ------> correct." : "";
System.out.println("predicted : "+cluster+ " , actual : " + p.label() + b);
return new Tuple2<Double, Double>(cluster, p.label());
}
});
double accuracy = predictionAndLabel.filter(
new Function<Tuple2<Double, Double>, Boolean>() {
@Override
public Boolean call(Tuple2<Double, Double> pl) {
return pl._1().equals(pl._2());
}
}).count() / (double) test.count();
System.out.println("accuracy is " + new DecimalFormat("#.000").format(accuracy * 100) + "%");
LabeledPoint point = new LabeledPoint(3,tf.transform(Arrays.asList(new String[]{"0,825,0,0,0,0,1,0,0,0,2180"})));
double d = model.predict(point.features());
System.out.println("predicted : "+d+ " , actual : " + point.label());
model.save(sc.sc(), "myModelPath");
NaiveBayesModel sameModel = NaiveBayesModel.load(sc.sc(), "myModelPath");
sameModel.labels();
}
}
#1
Ok I solved the issue :-) just posting the answer for other interested users. The sample data is 60,236,178,0,0,4,15,16,0,0,575.00,5
好的,我解决了这个问题:-)只是为其他感兴趣的用户发布了答案。样本数据为60,236,178,0,0,4,15,16,0,0,575.00,5
1500,0,0,0,0,5,0,0,0,0,1500.00,5
50,2072,248,0,0,1,56,7,0,0,2658.50,5
package spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.mllib.classification.NaiveBayes;
import org.apache.spark.mllib.classification.NaiveBayesModel;
import org.apache.spark.mllib.feature.HashingTF;
import org.apache.spark.mllib.regression.LabeledPoint;
import scala.Tuple2;
import scala.actors.threadpool.Arrays;
import java.text.DecimalFormat;
/**
*/
public class NaiveBayesTest {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("NaiveBayes Example").set("spark.driver.allowMultipleContexts", "true").set("hadoop.version","hadoop-2.4");
conf.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
String path = "resources/clustering-Result-without-index-id.csv";
JavaRDD<String> data = sc.textFile(path);
final HashingTF tf = new HashingTF(10000);
// Split initial RDD into two... [60% training data, 40% testing data].
JavaRDD<LabeledPoint> mainData = data.map(
new Function<String , LabeledPoint>() {
@Override
public LabeledPoint call( String line) throws Exception {
String[] parts = line.split(",");
Double[] v = new Double[parts.length - 1];
for (int i = 0; i < parts.length - 1 ; i++){
v[i] = Double.parseDouble(parts[i]);
}
return new LabeledPoint(Double.parseDouble(parts[parts.length-1]),tf.transform(Arrays.asList(v)));
}
});
JavaRDD<LabeledPoint> training = mainData.sample(false, 0.9, 111L);
training.cache();
JavaRDD<LabeledPoint> test = mainData.subtract(training);
test.cache();
NaiveBayesModel model = NaiveBayes.train(training.rdd(), 23.0);
JavaPairRDD<Double, Double> predictionAndLabel =
test.mapToPair(new PairFunction<LabeledPoint, Double, Double>() {
@Override public Tuple2<Double, Double> call(LabeledPoint p) {
double cluster = model.predict(p.features());
String b = (cluster == p.label()) ? " ------> correct." : "";
System.out.println("predicted : "+cluster+ " , actual : " + p.label() + b);
return new Tuple2<Double, Double>(cluster, p.label());
}
});
double accuracy = predictionAndLabel.filter(
new Function<Tuple2<Double, Double>, Boolean>() {
@Override
public Boolean call(Tuple2<Double, Double> pl) {
return pl._1().equals(pl._2());
}
}).count() / (double) test.count();
System.out.println("accuracy is " + new DecimalFormat("#.000").format(accuracy * 100) + "%");
LabeledPoint point = new LabeledPoint(3,tf.transform(Arrays.asList(new String[]{"0,825,0,0,0,0,1,0,0,0,2180"})));
double d = model.predict(point.features());
System.out.println("predicted : "+d+ " , actual : " + point.label());
model.save(sc.sc(), "myModelPath");
NaiveBayesModel sameModel = NaiveBayesModel.load(sc.sc(), "myModelPath");
sameModel.labels();
}
}