I am a new bee in Spark and ML and I have a task that should be implemented by Apache Spark API. Some sample rows of my data are:
我是Spark和ML中的新蜜蜂,我有一个应该由Apache Spark API实现的任务。我的数据的一些示例行是:
and I need to train a classifier after which, its model will predict the cluster in any arbitrary incoming row. For example the model should predict the '?' in the following row:
So I need to know what type of Spark Datatype, Classifier and so on should I use? How should I predict the '?' ?
所以我需要知道我应该使用什么类型的Spark数据类型,分类器等?我该如何预测'?' ?
Any help is appreciated!
1 个解决方案
Ok I solved the issue :-) just posting the answer for other interested users. The sample data is 60,236,178,0,0,4,15,16,0,0,575.00,5
package spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.mllib.classification.NaiveBayes;
import org.apache.spark.mllib.classification.NaiveBayesModel;
import org.apache.spark.mllib.feature.HashingTF;
import org.apache.spark.mllib.regression.LabeledPoint;
import scala.Tuple2;
import scala.actors.threadpool.Arrays;
import java.text.DecimalFormat;
public class NaiveBayesTest {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("NaiveBayes Example").set("spark.driver.allowMultipleContexts", "true").set("hadoop.version","hadoop-2.4");
JavaSparkContext sc = new JavaSparkContext(conf);
String path = "resources/clustering-Result-without-index-id.csv";
JavaRDD<String> data = sc.textFile(path);
final HashingTF tf = new HashingTF(10000);
// Split initial RDD into two... [60% training data, 40% testing data].
JavaRDD<LabeledPoint> mainData = data.map(
new Function<String , LabeledPoint>() {
public LabeledPoint call( String line) throws Exception {
String[] parts = line.split(",");
Double[] v = new Double[parts.length - 1];
for (int i = 0; i < parts.length - 1 ; i++){
v[i] = Double.parseDouble(parts[i]);
return new LabeledPoint(Double.parseDouble(parts[parts.length-1]),tf.transform(Arrays.asList(v)));
JavaRDD<LabeledPoint> training = mainData.sample(false, 0.9, 111L);
JavaRDD<LabeledPoint> test = mainData.subtract(training);
NaiveBayesModel model = NaiveBayes.train(training.rdd(), 23.0);
JavaPairRDD<Double, Double> predictionAndLabel =
test.mapToPair(new PairFunction<LabeledPoint, Double, Double>() {
@Override public Tuple2<Double, Double> call(LabeledPoint p) {
double cluster = model.predict(p.features());
String b = (cluster == p.label()) ? " ------> correct." : "";
System.out.println("predicted : "+cluster+ " , actual : " + p.label() + b);
return new Tuple2<Double, Double>(cluster, p.label());
double accuracy = predictionAndLabel.filter(
new Function<Tuple2<Double, Double>, Boolean>() {
public Boolean call(Tuple2<Double, Double> pl) {
return pl._1().equals(pl._2());
}).count() / (double) test.count();
System.out.println("accuracy is " + new DecimalFormat("#.000").format(accuracy * 100) + "%");
LabeledPoint point = new LabeledPoint(3,tf.transform(Arrays.asList(new String[]{"0,825,0,0,0,0,1,0,0,0,2180"})));
double d = model.predict(point.features());
System.out.println("predicted : "+d+ " , actual : " + point.label());
model.save(sc.sc(), "myModelPath");
NaiveBayesModel sameModel = NaiveBayesModel.load(sc.sc(), "myModelPath");
Ok I solved the issue :-) just posting the answer for other interested users. The sample data is 60,236,178,0,0,4,15,16,0,0,575.00,5
package spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.mllib.classification.NaiveBayes;
import org.apache.spark.mllib.classification.NaiveBayesModel;
import org.apache.spark.mllib.feature.HashingTF;
import org.apache.spark.mllib.regression.LabeledPoint;
import scala.Tuple2;
import scala.actors.threadpool.Arrays;
import java.text.DecimalFormat;
public class NaiveBayesTest {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("NaiveBayes Example").set("spark.driver.allowMultipleContexts", "true").set("hadoop.version","hadoop-2.4");
JavaSparkContext sc = new JavaSparkContext(conf);
String path = "resources/clustering-Result-without-index-id.csv";
JavaRDD<String> data = sc.textFile(path);
final HashingTF tf = new HashingTF(10000);
// Split initial RDD into two... [60% training data, 40% testing data].
JavaRDD<LabeledPoint> mainData = data.map(
new Function<String , LabeledPoint>() {
public LabeledPoint call( String line) throws Exception {
String[] parts = line.split(",");
Double[] v = new Double[parts.length - 1];
for (int i = 0; i < parts.length - 1 ; i++){
v[i] = Double.parseDouble(parts[i]);
return new LabeledPoint(Double.parseDouble(parts[parts.length-1]),tf.transform(Arrays.asList(v)));
JavaRDD<LabeledPoint> training = mainData.sample(false, 0.9, 111L);
JavaRDD<LabeledPoint> test = mainData.subtract(training);
NaiveBayesModel model = NaiveBayes.train(training.rdd(), 23.0);
JavaPairRDD<Double, Double> predictionAndLabel =
test.mapToPair(new PairFunction<LabeledPoint, Double, Double>() {
@Override public Tuple2<Double, Double> call(LabeledPoint p) {
double cluster = model.predict(p.features());
String b = (cluster == p.label()) ? " ------> correct." : "";
System.out.println("predicted : "+cluster+ " , actual : " + p.label() + b);
return new Tuple2<Double, Double>(cluster, p.label());
double accuracy = predictionAndLabel.filter(
new Function<Tuple2<Double, Double>, Boolean>() {
public Boolean call(Tuple2<Double, Double> pl) {
return pl._1().equals(pl._2());
}).count() / (double) test.count();
System.out.println("accuracy is " + new DecimalFormat("#.000").format(accuracy * 100) + "%");
LabeledPoint point = new LabeledPoint(3,tf.transform(Arrays.asList(new String[]{"0,825,0,0,0,0,1,0,0,0,2180"})));
double d = model.predict(point.features());
System.out.println("predicted : "+d+ " , actual : " + point.label());
model.save(sc.sc(), "myModelPath");
NaiveBayesModel sameModel = NaiveBayesModel.load(sc.sc(), "myModelPath");