本文主要记录 一些基础 数据源 转换 成为 RDD 的案例 仅供参考
1.集合类转换为RDD
import ;
import ;
import ;
import ;
import ;
import .Function2;
public class ParallelizeCollection {
public static void main(String[] args) {
// 创建SparkConf
SparkConf conf = new SparkConf()
.setAppName("ParallelizeCollection")
.setMaster("local");
// 创建JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 要通过并行化集合的方式创建RDD,那么就调用SparkContext以及其子类,的parallelize()方法
List<Integer> numbers = (1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
JavaRDD<Integer> numberRDD = (numbers);
// 执行reduce算子操作
// 相当于,先进行1 + 2 = 3;然后再用3 + 3 = 6;然后再用6 + 4 = 10。。。以此类推
int sum = (new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer num1, Integer num2) throws Exception {
return num1 + num2;
}
});
// 输出累加的和
("1到10的累加和:" + sum);
// 关闭JavaSparkContext
();
}
}
2.读取本地文件 (转换部分)
// 创建SparkConf
SparkConf conf = new SparkConf()
.setAppName("LocalFile")
.setMaster("local");
// 创建JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 使用SparkContext以及其子类的textFile()方法,针对本地文件创建RDD
JavaRDD<String> lines =("C://Users//Administrator//Desktop//");
3.读取 hdfs系统文件 (转换部分)
SparkConf conf = new SparkConf()
.setAppName("HDFSFile");
// 创建JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 使用SparkContext以及其子类的textFile()方法,针对HDFS文件创建RDD
// 只要把textFile()内的路径修改为hdfs文件路径即可
JavaRDD<String> lines = ("hdfs://spark1:9000/");