spark-sql集合的“条件过滤”,“合并”,“动态类型映射DataFrame”,“存储”

时间:2022-11-21 00:37:04
  List<String> basicList = new ArrayList<String>();
basicList.add("{\"name\": \"zzq\",\"age\": 15}");
basicList.add("{\"name\": \"zzq1\",\"age\": 25}");
basicList.add("{\"name\": \"zzq2\",\"age\": 35}"); List<String> scoreList = new ArrayList<String>();
scoreList.add("{\"name\": \"zzq\",\"sex\": \"男\",\"score\": 110}");
scoreList.add("{\"name\": \"zzq1\",\"sex\": \"女\",\"score\": 90}");
scoreList.add("{\"name\": \"zzq2\",\"sex\": \"男\",\"score\": 70}"); SparkConf sparkConf = new SparkConf()
.setAppName("StudentsScore")
.setMaster("local"); JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(javaSparkContext); JavaRDD<String> rdd_basicList = javaSparkContext.parallelize(basicList);
JavaRDD<String> rdd_scoreList = javaSparkContext.parallelize(scoreList); DataFrame df_scoreList = sqlContext.read().json(rdd_scoreList);
JavaRDD<Row> rdd_filter_score = df_scoreList.filter(df_scoreList.col("score").geq(90)).javaRDD(); //Pair默认返回一个Tuple2,如果更多属性值的话可以在第二个参数下使用TupleX,例子如下
JavaPairRDD<String, Tuple2<String, Long>> rdd_pair_score = rdd_filter_score.mapToPair(new PairFunction<Row, String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Tuple2<String, Long>> call(Row row) throws Exception {
return new Tuple2<String, Tuple2<String, Long>>(row.getString(0), new Tuple2<String, Long>(row.getString(2), row.getLong(1)));
}
}); DataFrame df_basicList = sqlContext.read().json(rdd_basicList);
df_basicList.registerTempTable("df_basicList_table");
StringBuilder sqlStrB = new StringBuilder();
sqlStrB.append("select name,age from df_basicList_table where name in ( ");
List<Tuple2<String, Tuple2<String, Long>>> local_rdd_pair_score = rdd_pair_score.collect();
Iterator<Tuple2<String, Tuple2<String, Long>>> itr = local_rdd_pair_score.iterator();
for (; itr.hasNext(); ) {
Tuple2<String, Tuple2<String, Long>> currItem = itr.next();
sqlStrB.append("\"");
sqlStrB.append(currItem._1());
sqlStrB.append("\"");
if (itr.hasNext())
sqlStrB.append(",");
}
sqlStrB.append(" ) "); DataFrame df_filter_basicList = sqlContext.sql(sqlStrB.toString());
JavaRDD<Row> rdd_filter_basic = df_filter_basicList.javaRDD();
JavaPairRDD<String, Long> rdd_pair_basic = rdd_filter_basic.mapToPair(new PairFunction<Row, String, Long>() {
@Override
public Tuple2<String, Long> call(Row row) throws Exception {
return new Tuple2<String, Long>(row.getString(0), row.getLong(1));
}
}); JavaPairRDD<String, Tuple2<Tuple2<String, Long>, Long>> all_studentsInfo = rdd_pair_score.join(rdd_pair_basic); //存储-------------------------------start----------------------------------
JavaRDD<Row> row_all_studentsInfo = all_studentsInfo.map(new Function<Tuple2<String, Tuple2<Tuple2<String, Long>, Long>>, Row>() {
@Override
public Row call(Tuple2<String, Tuple2<Tuple2<String, Long>, Long>> v1) throws Exception {
return RowFactory.create(v1._1(), v1._2()._1()._1(), v1._2()._1()._2(), v1._2()._2());
}
}); List<StructField> fieldList = new ArrayList<StructField>();
fieldList.add(DataTypes.createStructField("name", DataTypes.StringType, true));
fieldList.add(DataTypes.createStructField("sex", DataTypes.StringType, true));
fieldList.add(DataTypes.createStructField("score", DataTypes.LongType, true));
fieldList.add(DataTypes.createStructField("age", DataTypes.LongType, true));
StructType temp = DataTypes.createStructType(fieldList); DataFrame df_save = sqlContext.createDataFrame(row_all_studentsInfo, temp); df_save.write().save("hdfs://xxxx..........parquet");//将文件存储
//存储-------------------------------end---------------------------------- all_studentsInfo.foreach(new VoidFunction<Tuple2<String, Tuple2<Tuple2<String, Long>, Long>>>() {
@Override
public void call(Tuple2<String, Tuple2<Tuple2<String, Long>, Long>> stringTuple2Tuple2) throws Exception {
System.out.println(">>>>>>>>>>>>" + stringTuple2Tuple2._1() + " -- " + stringTuple2Tuple2._2()._1()._1() + " -- " + stringTuple2Tuple2._2()._1()._2() + " -- " + stringTuple2Tuple2._2()._2());
}
});

spark-sql集合的“条件过滤”,“合并”,“动态类型映射DataFrame”,“存储”的更多相关文章

  1. PL&sol;SQL集合(一):记录类型(TYPE 类型名称 IS RECORD)

    记录类型 利用记录类型可以实现复合数据类型的定义: 记录类型允许嵌套: 可以直接利用记录类型更新数据. 传统操作的问题 对于Oracle数据类型,主要使用的是VARCHAR2.NUMBER.DATE等 ...

  2. 大数据技术之&lowbar;19&lowbar;Spark学习&lowbar;03&lowbar;Spark SQL 应用解析 &plus; Spark SQL 概述、解析 、数据源、实战 &plus; 执行 Spark SQL 查询 &plus; JDBC&sol;ODBC 服务器

    第1章 Spark SQL 概述1.1 什么是 Spark SQL1.2 RDD vs DataFrames vs DataSet1.2.1 RDD1.2.2 DataFrame1.2.3 DataS ...

  3. Spark 官方文档(5)——Spark SQL,DataFrames和Datasets 指南

    Spark版本:1.6.2 概览 Spark SQL用于处理结构化数据,与Spark RDD API不同,它提供更多关于数据结构信息和计算任务运行信息的接口,Spark SQL内部使用这些额外的信息完 ...

  4. 平易近人、兼容并蓄——Spark SQL 1&period;3&period;0概览

    自2013年3月面世以来,Spark SQL已经成为除Spark Core以外最大的Spark组件.除了接过Shark的接力棒,继续为Spark用户提供高性能的SQL on Hadoop解决方案之外, ...

  5. 【转载】Spark SQL 1&period;3&period;0 DataFrame介绍、使用

    http://www.aboutyun.com/forum.php?mod=viewthread&tid=12358&page=1 1.DataFrame是什么?2.如何创建DataF ...

  6. Apache Spark 2&period;2&period;0 中文文档 - Spark SQL&comma; DataFrames and Datasets Guide &vert; ApacheCN

    Spark SQL, DataFrames and Datasets Guide Overview SQL Datasets and DataFrames 开始入门 起始点: SparkSession ...

  7. &lbrack;转&rsqb; Spark sql 内置配置(V2&period;2)

    [From] https://blog.csdn.net/u010990043/article/details/82842995 最近整理了一下spark SQL内置配.加粗配置项是对sparkSQL ...

  8. Apache Spark 2&period;2&period;0 中文文档 - Spark SQL&comma; DataFrames and Datasets

    Spark SQL, DataFrames and Datasets Guide Overview SQL Datasets and DataFrames 开始入门 起始点: SparkSession ...

  9. Spark SQL 官方文档-中文翻译

    Spark SQL 官方文档-中文翻译 Spark版本:Spark 1.5.2 转载请注明出处:http://www.cnblogs.com/BYRans/ 1 概述(Overview) 2 Data ...

随机推荐

  1. 编译安装 Centos 7 x64 &plus; tengine&period;2&period;0&period;3 &lpar;实测&plus;笔记&rpar;

    系统硬件:vmware vsphere (CPU:2*4核,内存2G) 系统版本:CentOS Linux release 7.0.1406 安装步骤: 1.系统环境 1.1 更新系统 [root@c ...

  2. HSLA颜色

    CSS2中色彩模式只有RGB色彩模式(RGB即RED.Green.BLue)和十六进制模式,为了能支持 透明opacity 的Alpha值,CSS3又增加了RGBA色彩模式(RGBA即RED.Gree ...

  3. 后缀数组 POJ 3974 Palindrome &amp&semi;&amp&semi; URAL 1297 Palindrome

    题目链接 题意:求给定的字符串的最长回文子串 分析:做法是构造一个新的字符串是原字符串+反转后的原字符串(这样方便求两边回文的后缀的最长前缀),即newS = S + '$' + revS,枚举回文串 ...

  4. pull方法解析xml

    private void getData() {        new Thread() { public void run() {                try {              ...

  5. Jordan Lecture Note-9&colon; Principal Components Analysis &lpar;PCA&rpar;&period;

    Principal Components Analysis (一)引入PCA    当我们对某个系统或指标进行研究时往往会发现,影响这些系统和指标的因素或变量的数量非常的多.多变量无疑会为科学研究带来 ...

  6. Balanced Lineup&lpar;最简单的线段树题目&rpar;

    Time Limit: 5000MS   Memory Limit: 65536K Total Submissions: 33389   Accepted: 15665 Case Time Limit ...

  7. 一个漂亮的DIV搜索条

    <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/ ...

  8. 增加几个entity framework 的函数 &lpar;记录备忘&rpar;&lbrack;转&rsqb;

    public static class DatabaseExtensions { public static DataTable SqlQueryForDataTatable(this Databas ...

  9. Linux 基础(3)

    Linux 基础(三) rpm与yum学习 本篇分享一下自己学习rpm和yum过程中的一些心得,自己在使用yum过程中由于自己的虚拟机网络的问题在学习这一块品尝到不少苦头,还望学习这块的盆友先检查一下 ...

  10. 初识 systemd

    从 init 系统说起 linux 操作系统的启动首先从 BIOS 开始,接下来进入 boot loader,由 bootloader 载入内核,进行内核初始化.内核初始化的最后一步就是启动 PID ...