一:准备数据源
在项目下新建一个student.txt文件,里面的内容为:
1
2
3
4
|
1 ,zhangsan, 20
2 ,lisi, 21
3 ,wanger, 19
4 ,fangliu, 18
|
二:实现
java版:
1.首先新建一个student的bean对象,实现序列化和tostring()方法,具体代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
package com.cxd.sql;
import java.io.serializable;
@suppresswarnings ( "serial" )
public class student implements serializable {
string sid;
string sname;
int sage;
public string getsid() {
return sid;
}
public void setsid(string sid) {
this .sid = sid;
}
public string getsname() {
return sname;
}
public void setsname(string sname) {
this .sname = sname;
}
public int getsage() {
return sage;
}
public void setsage( int sage) {
this .sage = sage;
}
@override
public string tostring() {
return "student [sid=" + sid + ", sname=" + sname + ", sage=" + sage + "]" ;
}
}
|
2.转换,具体代码如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
|
package com.cxd.sql;
import java.util.arraylist;
import org.apache.spark.sparkconf;
import org.apache.spark.api.java.javardd;
import org.apache.spark.sql.dataset;
import org.apache.spark.sql.row;
import org.apache.spark.sql.rowfactory;
import org.apache.spark.sql.savemode;
import org.apache.spark.sql.sparksession;
import org.apache.spark.sql.types.datatypes;
import org.apache.spark.sql.types.structfield;
import org.apache.spark.sql.types.structtype;
public class txttoparquetdemo {
public static void main(string[] args) {
sparkconf conf = new sparkconf().setappname( "txttoparquet" ).setmaster( "local" );
sparksession spark = sparksession.builder().config(conf).getorcreate();
reflecttransform(spark); //java反射
dynamictransform(spark); //动态转换
}
/**
* 通过java反射转换
* @param spark
*/
private static void reflecttransform(sparksession spark)
{
javardd<string> source = spark.read().textfile( "stuinfo.txt" ).javardd();
javardd<student> rowrdd = source.map(line -> {
string parts[] = line.split( "," );
student stu = new student();
stu.setsid(parts[ 0 ]);
stu.setsname(parts[ 1 ]);
stu.setsage(integer.valueof(parts[ 2 ]));
return stu;
});
dataset<row> df = spark.createdataframe(rowrdd, student. class );
df.select( "sid" , "sname" , "sage" ).
coalesce( 1 ).write().mode(savemode.append).parquet( "parquet.res" );
}
/**
* 动态转换
* @param spark
*/
private static void dynamictransform(sparksession spark)
{
javardd<string> source = spark.read().textfile( "stuinfo.txt" ).javardd();
javardd<row> rowrdd = source.map( line -> {
string[] parts = line.split( "," );
string sid = parts[ 0 ];
string sname = parts[ 1 ];
int sage = integer.parseint(parts[ 2 ]);
return rowfactory.create(
sid,
sname,
sage
);
});
arraylist<structfield> fields = new arraylist<structfield>();
structfield field = null ;
field = datatypes.createstructfield( "sid" , datatypes.stringtype, true );
fields.add(field);
field = datatypes.createstructfield( "sname" , datatypes.stringtype, true );
fields.add(field);
field = datatypes.createstructfield( "sage" , datatypes.integertype, true );
fields.add(field);
structtype schema = datatypes.createstructtype(fields);
dataset<row> df = spark.createdataframe(rowrdd, schema);
df.coalesce( 1 ).write().mode(savemode.append).parquet( "parquet.res1" );
}
}
|
scala版本:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
import org.apache.spark.sql.sparksession
import org.apache.spark.sql.types.stringtype
import org.apache.spark.sql.types.structfield
import org.apache.spark.sql.types.structtype
import org.apache.spark.sql.row
import org.apache.spark.sql.types.integertype
object rdd2dataset {
case class student(id: int ,name:string,age: int )
def main(args:array[string])
{
val spark=sparksession.builder().master( "local" ).appname( "rdd2dataset" ).getorcreate()
import spark.implicits._
reflectcreate(spark)
dynamiccreate(spark)
}
/**
* 通过java反射转换
* @param spark
*/
private def reflectcreate(spark:sparksession):unit={
import spark.implicits._
val sturdd=spark.sparkcontext.textfile( "student2.txt" )
//todf()为隐式转换
val studf=sturdd.map(_.split( "," )).map(parts⇒student(parts( 0 ).trim.toint,parts( 1 ),parts( 2 ).trim.toint)).todf()
//studf.select("id","name","age").write.text("result") //对写入文件指定列名
studf.printschema()
studf.createorreplacetempview( "student" )
val namedf=spark.sql( "select name from student where age<20" )
//namedf.write.text("result") //将查询结果写入一个文件
namedf.show()
}
/**
* 动态转换
* @param spark
*/
private def dynamiccreate(spark:sparksession):unit={
val sturdd=spark.sparkcontext.textfile( "student.txt" )
import spark.implicits._
val schemastring= "id,name,age"
val fields=schemastring.split( "," ).map(fieldname => structfield(fieldname, stringtype, nullable = true ))
val schema=structtype(fields)
val rowrdd=sturdd.map(_.split( "," )).map(parts⇒row(parts( 0 ),parts( 1 ),parts( 2 )))
val studf=spark.createdataframe(rowrdd, schema)
studf.printschema()
val tmpview=studf.createorreplacetempview( "student" )
val namedf=spark.sql( "select name from student where age<20" )
//namedf.write.text("result") //将查询结果写入一个文件
namedf.show()
}
}
|
注:
1.上面代码全都已经测试通过,测试的环境为spark2.1.0,jdk1.8。
2.此代码不适用于spark2.0以前的版本。
以上这篇java和scala实现 spark rdd转换成dataframe的两种方法小结就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持服务器之家。
原文链接:https://blog.csdn.net/u010592112/article/details/73730796