sparksql是spark中的一个模块,主要用于进行结构化数据的处理,他提供的最核心的编程抽象,就是DataFrame。同时,sparksql还可以作为分布式的sql查询引擎。 最最重要的功能就是从hive中查询数据。 Dataframe可以理解为:以列的形式组织的,分布式的数据集合。 Dataframe可以通过很多来源进行构建,包括:结构化的数据文件、hive中的表、外部的关系型数据库、以及RDD 使用sparksql 首先需要创建一个sqlContext对象,或者是它的子类的对象(hiveContext的对象) Java版本
package cn.spark.study.sql; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SQLContext; /** * 创建dataframe */ public class DataFrameCreate { public static void main (String[] args){ SparkConf conf = new SparkConf() .setAppName("DataFrameCreate") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); sqlContext.read().json("hdfs://spark1:9000/test.json").show(); } } //=======================分隔符====================================== package cn.spark.study.sql; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; /** * dataframe常用操作 */ public class DataFrameOperation { public static void main(String [] args){ // 创建DataFrame SparkConf conf = new SparkConf() .setAppName("DataFrameCreate"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); // 创建出来的DataFrame完全可以理解为一张表 Dataset<Row> json = sqlContext.read().json("hdfs://spark1:9000/students.json"); //打印dataframe ;select * from 表名 json.show(); //打印dataframe的元数据信息(schema) json.printSchema(); //查询某一列的数据 json.select("name").show(); //查询多列 name ,age 并对所有的age列的结果值加1 json.select(json.col("name") , json.col("age").plus(1)).show(); //对某一列的值进行过滤;eg:只展示age字段值大于18的数据 json.select(json.col("age").gt(18)).show(); //根据某一列进行分组,并聚合;eg:通过age分组,并求出每组的个数 json.groupBy("age").count().show(); } }
Scala版本
package cn.spark.study.sql import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf, SparkContext} /** * 创建 dataframe */ object DataFrameCreateScala { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("dataFramecreate") .setAppName("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) sqlContext.read.json("hdfs://spark1/test.json").show() } } ===================================分隔符======================================== package cn.spark.study.sql import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf, SparkContext} /** * dataframe的常用操作 */ object DataframeOperation { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName("dataframeOperation") .setMaster("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val df = sqlContext.read.json("hdfs://spark1:9000/text.json") //打印dataframe df.show() //打印dataframe的schema df.printSchema() //查询某一列的数据 df.select("name").show() //查询多列数据并进行计算;eg:查询name,age列,并对age列的值 1 df.select(df("name") , df("age") 1).show() //查询某列并对其过滤;eg:查询age列并且值大于18 df.select(df("age").gt(18)).show() df.select(df("age")>18).show() //对某一列进行分组,并对分组后的结果进行求个数 df.groupBy(df("age")).count().show() } }