已知文本有三列,整理数据,并导入mysql
scala> import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext
scala> val gitrdd=sc.textFile("/tmp/git.txt")
gitrdd: org.apache.spark.rdd.RDD[String] = /tmp/git.txt MapPartitionsRDD[1] at textFile at <console>:25
scala> gitrdd.count
res2: Long = 548
分割符空格不定
scala> gitrdd.map(_.split(" | ")).filter(_.length<3).count
res3: Long = 8
scala> gitrdd.map(_.split(" | ")).filter(_.length<3).collect
res1: Array[Array[String]] = Array(Array(""), Array(""), Array(" "), Array(""))
scala> val gitDF=gitrdd.map(_.split(" | ")).filter(_.length==3).map(x=>(x(0),x(1),x(2))).toDF
gitDF: org.apache.spark.sql.DataFrame = [_1: string, _2: string ... 1 more field]
scala> gitDF.registerTempTable("tb_git")
warning: there was one deprecation warning; re-run with -deprecation for details
scala> sqlcon.sql("select * from tb_git").show
+--------------------+--------------------+----------------+
| _1| _2| _3|
+--------------------+--------------------+----------------+
...................................................
.................................................
+--------------------+--------------------+----------------+
only showing top 20 rows
scala> val gitDF=gitrdd.map(_.split(" | ")).filter(_.length==3).map(x=>(x(0),x(1),x(2))).toDF("name","email","else")
gitDF: org.apache.spark.sql.DataFrame = [name: string, email: string ... 1 more field]
scala> gitDF.registerTempTable("tb_git")
warning: there was one deprecation warning; re-run with -deprecation for details
scala> sqlcon.sql("select * from tb_git").show
+--------------------+--------------------+----------------+
| name| email| else|
+--------------------+--------------------+----------------+
.........................
+--------------------+--------------------+----------------+
only showing top 20 rows
mysql> create table tb_git(name varchar(50),email varchar(80),else1 varchar(50));
Query OK, 0 rows affected (0.04 sec)
scala> import java.sql.{Connection,DriverManager, PreparedStatement,Date}
import java.sql.{Connection, DriverManager, PreparedStatement, Date}
scala> def rddtodb(iter:Iterator[(String,String,String)]){var con:Connection=null;var ps:PreparedStatement=null;val sql="insert into tb_git(name,email,else1)values(?,?,?)" ;try{con=DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8", "root", "root");iter.foreach(line=>{ps=con.prepareStatement(sql);ps.setString(1,line._1.toString);ps.setString(2,line._2.toString);ps.setString(3,line._3.toString);ps.executeUpdate()}) }catch{case e:Exception=>println(e.toString)} finally{if(con!=null)con.close;if(ps!=null)ps.close}}
rddtodb: (iter: Iterator[(String, String, String)])Unit
scala> gitrdd.map(_.split(" | ")).filter(_.length==3).map(x=>(x(0).trim,x(1).trim,x(2).trim)).foreachPartition(rddtodb)
mysql> select count(1) from tb_git;
+----------+
| count(1) |
+----------+
| 534 |
+----------+
1 row in set (0.03 sec)