- Dataset转为DataFrame
必须要导入隐式转换否则会报错(隐式转换在我上一篇中有讲到)
第一种情况:若数据字段不多的话可以直接利用toDF()可以直接转换为DataFrame,如下:
import spark.implicits._
val ds: Dataset[String] = spark.read.textFile("hdfs://192.192.192.24/tmp/tb_order/")
ds.map(x => x.split("\\|")).map(x => (x(0),x(1),x(2),x(3),x(4))).toDF("aa","ss","dd","rr","hh").show()
结果如下:
第二种情况,如果你的数据字段较多的情况下,可以使用样例类的方法(因为在元组中字段最多只能有22个,超过22个就不能使用上面的方法)进行转换,如下,我这个有60个字段:
import spark.implicits._
val ds_ls: Dataset[String] = spark.read.textFile("hdfs://192.192.192.24/inceptor1/user/hive/warehouse/ods.db/hive/mz_rkk_ls/")
//将ds_ls转换为dataframe
val frame_ls: DataFrame = ds_ls.map(x => x.split("\\|")).map(x => mz_rkk_ls(x(0), x(1), x(2), x(3), x(4), x(5), x(6), x(7), x(8), x(9), x(10), x(11), x(12), x(13), x(14), x(15), x(16), x(17), x(18), x(19), x(20), x(21).toInt, x(22).toInt, x(23), x(24), x(25), x(26), x(27), x(28), x(29).toDouble, x(30), x(31), x(32), x(33), x(34), x(35), x(36), x(37), x(38), x(39), x(40), x(41), x(42), x(43), x(44), x(45), x(46), x(47), x(48), x(49), x(50).toInt, x(51).toInt, x(52).toInt, x(53).toInt, x(54).toInt, x(55).toInt, x(56).toInt, x(57).toInt, x(58).toInt, x(59).toInt)).toDF()
frame_ls.show(5)
//创建临时表样例类
case class mz_rkk_ls(pk_id: String, fk_main_id: String, xzqh: String, qhmc: String, dhjdz: String, dryxxlb: String, ndryxxlb: String, dywlb: String, ndywlb: String, ddjlx: String, nddjlx: String, dgx: String, ndgx: String, dxm: String, dsfz: String, dxb: String, ndxb: String, dlxdh: String, dcsrq: String, dhkxz: String, ndhkxz: String, dnl: Int, dnld: Int, ndnld: String, djkzk: String, ndjkzk: String, dgzdw: String, dzw: String, ndzw: String, dysr: Double, dwhcd: String, ndwhcd: String, dhyzk: String, ndhyzk: String, dzzmm: String, ndzzmm: String, dcjlb: String, ndcjlb: String, dcjdj: String, ndcjdj: String, dcjzh: String, dswzk: String, dswsj: String, xfrq: String, clbj: String, b_str1: String, b_str2: String, b_str3: String, b_str4: String, b_str5: String, b_int1: Int, b_int2: Int, b_int3: Int, b_int4: Int, b_int5: Int, b_db1: Int, b_db2: Int, b_db3: Int, b_db4: Int, b_db5: Int)
创建样例类和Java中一样,不要写在主方法里面。而且,要注意字段类型的问题,如果知道字段类型的话要进行转换,默认是字符串,比如我上面有些字段是Int类型的,使用toInt做一个转换。
结果就不粘出来了,和上面是一样的效果。
- RDD转为DataFrame
那如果没有办法转换为所需要的类型怎么办呢?比如RDD[Array[String]],这时候就需要用到下面的方法了。下面讲一下RDD利用这种方法转换DataFrame应该怎么做。
从原始RDD中生成RDD的Rows
2. step1中的Rows的structure,生成相应的StructType,表示成schema
3. 将schema对应到Rows上,通过SparkSession提供的createDataFrame函数生成DataFrame
Row代表RDD中的一行数据
//StructType and convert RDD to DataFrame
def rddToDF(sparkSession : SparkSession):DataFrame = {
//设置schema结构
val schema = StructType(
Seq(
StructField("name",StringType,true)
,StructField("age",IntegerType,true)
)
)
val rowRDD = sparkSession.sparkContext
.textFile("file:/E:/scala_workspace/z_spark_study/people.txt",2)
.map( x => x.split(",")).map( x => Row(x(0),x(1).trim().toInt))
sparkSession.createDataFrame(rowRDD,schema)
}
- Dataset、DataFrame转为RDD
利用.rdd