小记--------sparksql和DataFrame的小小案例java、scala版本

时间:2022-01-18 06:56:39

sparksql是spark中的一个模块,主要用于进行结构化数据的处理,他提供的最核心的编程抽象,就是DataFrame。同时,sparksql还可以作为分布式的sql查询引擎。 最最重要的功能就是从hive中查询数据。     Dataframe可以理解为:以列的形式组织的,分布式的数据集合。     Dataframe可以通过很多来源进行构建,包括:结构化的数据文件、hive中的表、外部的关系型数据库、以及RDD   使用sparksql 首先需要创建一个sqlContext对象,或者是它的子类的对象(hiveContext的对象)   Java版本

package cn.spark.study.sql;
 
 
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
 
 
/**
* 创建dataframe
*/
public class DataFrameCreate {
    public static void main (String[] args){
        SparkConf conf = new SparkConf()
                .setAppName("DataFrameCreate")
                .setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);
        sqlContext.read().json("hdfs://spark1:9000/test.json").show();
    }
}
 
//=======================分隔符======================================
 
 
 
package cn.spark.study.sql;
 
 
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
/**
* dataframe常用操作
*/
public class DataFrameOperation {
    public static void main(String [] args){
        // 创建DataFrame
        SparkConf conf = new SparkConf()
                .setAppName("DataFrameCreate");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);
 
 
        // 创建出来的DataFrame完全可以理解为一张表
        Dataset<Row> json = sqlContext.read().json("hdfs://spark1:9000/students.json");
        //打印dataframe ;select * from 表名
        json.show();
        //打印dataframe的元数据信息(schema)
        json.printSchema();
        //查询某一列的数据
        json.select("name").show();
        //查询多列 name ,age 并对所有的age列的结果值加1
        json.select(json.col("name") , json.col("age").plus(1)).show();
        //对某一列的值进行过滤;eg:只展示age字段值大于18的数据
        json.select(json.col("age").gt(18)).show();
        //根据某一列进行分组,并聚合;eg:通过age分组,并求出每组的个数
        json.groupBy("age").count().show();
    }
}

 

  Scala版本  
package cn.spark.study.sql
 
 
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
 
 
/**
  * 创建 dataframe
  */
object DataFrameCreateScala {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("dataFramecreate")
      .setAppName("local")
 
 
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
 
 
    sqlContext.read.json("hdfs://spark1/test.json").show()
  }
}
 
===================================分隔符========================================
package cn.spark.study.sql
 
 
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
 
 
/**
  * dataframe的常用操作
  */
object DataframeOperation {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("dataframeOperation")
      .setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val df = sqlContext.read.json("hdfs://spark1:9000/text.json")
 
 
    //打印dataframe
    df.show()
    //打印dataframe的schema
    df.printSchema()
    //查询某一列的数据
    df.select("name").show()
    //查询多列数据并进行计算;eg:查询name,age列,并对age列的值 1
    df.select(df("name") , df("age") 1).show()
    //查询某列并对其过滤;eg:查询age列并且值大于18
    df.select(df("age").gt(18)).show()
    df.select(df("age")>18).show()
    //对某一列进行分组,并对分组后的结果进行求个数
    df.groupBy(df("age")).count().show()
  }
}