参考资料
http://blog.csdn.net/zhong_han_jun/article/details/50855720
http://3iter.com/2015/12/10/Spark-SQL-%E8%AF%BB%E5%86%99MySQL/
https://www.iteblog.com/archives/1290
http://bit1129.iteye.com/blog/2186405
spark写mysql解决schema不一致问题的简单方法
1-问题描述
利用spark自带的方式写mysql DF的schema必须要mysql中的建表语句一致
情况一:要写的DF与mysql建表语句一致,直接调用mode.write写即可
情况二:两者不一致(也就是我们要分析关注的)
2-schema不一致时的处理方法
对于情况二我们遇到的例子如下:mysql中有一个id字段(自增),DF中无该字段
处理方法(一)
将DF中拼接进去一个id,并且固定为0即可,这样写入mysql中的id即为自增
//connect to mysql val user = "用户" val passwd = "密码" val dbname = "数据库名" val host = "ip" val port = "端口" val dbtable = "表名" //write to mysql val prop = new Properties() prop.setProperty("driver", "com.mysql.jdbc.Driver") prop.setProperty("user", user) prop.setProperty("password", passwd) prop.setProperty("driver", "com.mysql.jdbc.Driver") val url = s"jdbc:mysql://${host}:${port}/${dbname}" prop.setProperty("url", url) //下面tempRDD为人为添加的一个id RDD, testDF为原本要写入的DF,testDF中原本无id字段,只有其他5个字段具体见下 val tempRDD = sc.makeRDD(s"""{"id":"0","eid":"$eId"}""".stripMargin:: Nil) val tempRDD2DF = sqlContext.read.json(tempRDD) val outputDF = testDF.join(tempRDD2DF,Seq("eid")).select( "id","jid","ctime","status_id","status","eid") outputDF.write.mode(SaveMode.Append).jdbc(url, dbtable, prop)
处理方法(二)
将RDD的分区先设为1,再人为将RDD中添加序号id(先读mysql中最大id),再写入mysql
分区设为1是因为多分区并发写,当一个分区写入时比如id=10,另一个分区再次写入id=10的记录,此时会报错
处理方法(三)
同方法二不过 用java写jdbc的方式
或直接接将RDD collect回客户端 再利用用java写jdbc的方式