result源码

时间:2022-05-06 20:45:29

CREATE TABLE `result` (
`id` INT(10) UNSIGNED NOT NULL AUTO_INCREMENT,
`thetime` CHAR(100) ,
`category` CHAR(100) ,
`weight` decimal(24,4),
PRIMARY KEY (`id`)
) DEFAULT CHARSET=utf8;
ALTER TABLE `result` ADD INDEX(`category`)

/**
* Created by lkl on 2017/7/31.
*/
/**
* Created by lkl on 2017/6/26.
*///spark-shell --driver-class-path /home/hadoop/test/mysqljdbc.jar
import java.math.BigDecimal
import java.sql.{DriverManager, ResultSet}
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import java.text.SimpleDateFormat
import java.util.Date
object result {
val rl = "jdbc:mysql://192.168.0.37:3306/emotional?user=root&password=123456&useUnicode=true&characterEncoding=utf8&autoReconnect=true&failOverReadOnly=false"
classOf[com.mysql.jdbc.Driver]
val conn = DriverManager.getConnection(rl) def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local").setAppName("test")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc) val titlesplit1 = sqlContext.jdbc("jdbc:mysql://192.168.0.37:3306/emotional?user=root&password=123456", "middle")
val titlesplit = titlesplit1.toDF().registerTempTable("middle") val format = new java.text.SimpleDateFormat("yyyyMMdd")
val date = format.format(new java.util.Date().getTime())
import sqlContext.implicits._ val value = sqlContext.sql("SELECT SUBSTR(middle.`times`,1,13) as thetime,middle.`category`,SUM(middle.`svalue`)/COUNT(middle.`innserSessionid`) AS weight FROM middle WHERE middle.`svalue` IS NOT NULL GROUP BY SUBSTR(middle.`times`,1,13),middle.`category`") //val jo = value.toDF("innserSessionid", "times", "category", "svalue", "wordscount", "categoryscount", "rank")
val a=value.count()
print(a)
val p1 = value.map(p => {
val v0 = p.getString(0)
val v1 = p.getString(1)
val v2 = p.getDecimal(2)
(v0,v1,v2) })
p1.foreach(p => {
val v1=p._1
val v2=p._2
val v3=p._3
insert(v1,v2,v3)
})
conn.close() } def insert (value0:String,value1:String,value2:BigDecimal): Unit ={ val statement = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE)
// CREATE TABLE words2(innersessionId VARCHAR(100),words VARCHAR(100), VARCHAR(100),posit VARCHAR(100),va VARCHAR(100))
try {
val prep = conn.prepareStatement("INSERT INTO result(thetime,category,weight) VALUES (?,?,?) ")
prep.setString(1,value0)
prep.setString(2,value1)
prep.setBigDecimal(3,value2)
prep.executeUpdate
} catch{
case e:Exception =>e.printStackTrace
}
finally {
} } }