使用Apache Spark中的文本列访问MySql表

时间:2021-10-22 22:59:53

I am trying to read a MySql table from Apache Spark using Jdbc. I am getting the following exception:

我试图使用Jdbc从Apache Spark读取MySql表。我收到以下异常:

17/02/26 09:00:18 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.sql.SQLException: Value '  5023512432017-02-14 16:25:4654617a68ad457d2c2017-02-14 16:07:280000-00-00 00:00:0282.460741.7354    1024.1963sphoneUTRAN13966003659671810.162.223.143354854252                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     ' can not be represented as java.sql.Timestamp
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:996)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:935)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:924)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:870)
at com.mysql.jdbc.ResultSetRow.getTimestampFast(ResultSetRow.java:928)
at com.mysql.jdbc.BufferRow.getTimestampFast(BufferRow.java:555)
at com.mysql.jdbc.ResultSetImpl.getTimestampInternal(ResultSetImpl.java:5943)
at com.mysql.jdbc.ResultSetImpl.getTimestamp(ResultSetImpl.java:5609)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$11.apply(JdbcUtils.scala:378)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$11.apply(JdbcUtils.scala:377)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:286)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:268)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

The same code I use works with a different table, I just change the name. The table that produces the error was imported from a csv file by Workbench Import. The table that works was populated by inserts. Here is the table that works:

我使用的相同代码适用于不同的表,我只是更改名称。生成错误的表是由Workbench Import从csv文件导入的。有效的表由插入填充。这是有效的表:

CREATE TABLE `Events` (
  `time` datetime NOT NULL,
  `userID` varchar(45) NOT NULL,
  `action` varchar(45) DEFAULT NULL,
  `duration` double DEFAULT NULL,
  KEY `userid` (`userID`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

Here is the table that produces the error:

这是产生错误的表:

CREATE TABLE `small_session` (
  `ne_id` int(11) DEFAULT NULL,
  `lastupdatetime` datetime DEFAULT NULL,
  `session_id` text,
  `start_time` datetime DEFAULT NULL,
  `stop_time` datetime DEFAULT NULL,
  `UPLOAD_KB` double DEFAULT NULL,
  `DOWNLOAD_KB` double DEFAULT NULL,
  `TOTAL_KB` double DEFAULT NULL,
  `APN` text,
  `RAT` text,
  `imei` bigint(20) DEFAULT NULL,
  `ip_address` text,
  `CID` int(11) DEFAULT NULL,
  `mcc` int(11) DEFAULT NULL,
  `mnc` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

Here is the code I use. This bit produces the error:

这是我使用的代码。该位产生错误:

val tableName = "small_session"
dbDriver.read(sparkSession, tableName).createOrReplaceTempView(tableName)
val sqlDF = sparkSession.sql("SELECT * FROM small_session")
sqlDF.show


def read(spark: SparkSession, table: String): DataFrame = {
    val jdbcDF = spark.read
       .jdbc(mySqlURL, table, connectionProperties)
    jdbcDF
}

Is this an exception on a datetime column? Help will be much appreciated.

这是日期时间列的例外吗?将非常感谢帮助。

Thank you.

Kirill.

1 个解决方案

#1


0  

OK, the offender was a column of type text.

好吧,罪犯是一个类型文本列。

Apparently this is what Mysql Workbench creates when importing String data from a file. After I changed these column to varchar everything worked.

显然,这是从文件导入String数据时Mysql Workbench创建的。在我将这些列更改为varchar后,一切正常。

Thanks to all the contributors for the help.

感谢所有贡献者的帮助。

Kirill.

#1


0  

OK, the offender was a column of type text.

好吧,罪犯是一个类型文本列。

Apparently this is what Mysql Workbench creates when importing String data from a file. After I changed these column to varchar everything worked.

显然,这是从文件导入String数据时Mysql Workbench创建的。在我将这些列更改为varchar后,一切正常。

Thanks to all the contributors for the help.

感谢所有贡献者的帮助。

Kirill.