spark写mysql解决schema不一致问题的简单方法

时间:2022-09-22 18:43:40

参考资料

http://blog.csdn.net/zhong_han_jun/article/details/50855720
http://3iter.com/2015/12/10/Spark-SQL-%E8%AF%BB%E5%86%99MySQL/
https://www.iteblog.com/archives/1290
http://bit1129.iteye.com/blog/2186405

spark写mysql解决schema不一致问题的简单方法

1-问题描述

利用spark自带的方式写mysql DF的schema必须要mysql中的建表语句一致

情况一:要写的DF与mysql建表语句一致,直接调用mode.write写即可

情况二:两者不一致(也就是我们要分析关注的)

2-schema不一致时的处理方法

对于情况二我们遇到的例子如下:mysql中有一个id字段(自增),DF中无该字段

处理方法(一)

将DF中拼接进去一个id,并且固定为0即可,这样写入mysql中的id即为自增

     //connect to mysql
      val user = "用户"
      val passwd = "密码"
      val dbname = "数据库名"
      val host = "ip"
      val port = "端口"
      val dbtable = "表名"

      //write to mysql
      val prop = new Properties()
      prop.setProperty("driver", "com.mysql.jdbc.Driver")
      prop.setProperty("user", user)
      prop.setProperty("password", passwd)
      prop.setProperty("driver", "com.mysql.jdbc.Driver")
      val url = s"jdbc:mysql://${host}:${port}/${dbname}"
      prop.setProperty("url", url)

      //下面tempRDD为人为添加的一个id RDD,  testDF为原本要写入的DF,testDF中原本无id字段,只有其他5个字段具体见下
      val tempRDD = sc.makeRDD(s"""{"id":"0","eid":"$eId"}""".stripMargin:: Nil)
      
      val tempRDD2DF = sqlContext.read.json(tempRDD)   
      val outputDF = testDF.join(tempRDD2DF,Seq("eid")).select(
          "id","jid","ctime","status_id","status","eid")
        outputDF.write.mode(SaveMode.Append).jdbc(url, dbtable, prop)

处理方法(二)

将RDD的分区先设为1,再人为将RDD中添加序号id(先读mysql中最大id),再写入mysql

分区设为1是因为多分区并发写,当一个分区写入时比如id=10,另一个分区再次写入id=10的记录,此时会报错

处理方法(三)

同方法二不过 用java写jdbc的方式

或直接接将RDD collect回客户端 再利用用java写jdbc的方式