Spark SQL:解析时间戳而不是秒

时间:2021-02-14 23:13:04

Some data I don't own comes with a field that's supposed to be a timestamp, but sometimes doesn't seem to comply with the ISO 8601 standard.

我不拥有的一些数据带有一个应该是时间戳的字段,但有时似乎不符合ISO 8601标准。

In my code, I defined a schema and then when Spark SQL parses my json data, I get the following error:

在我的代码中,我定义了一个模式,然后当Spark SQL解析我的json数据时,我收到以下错误:

java.lang.IllegalArgumentException: 2016-10-07T11:15Z

The source data has the following:

源数据具有以下内容:

"transaction_date_time": "2016-10-07T11:15Z"

And my schema is defined as such:

我的架构定义如下:

 val schema = (new StructType)
      .add("transaction_date_time", TimestampType)

I believe it's due to the fact that it's missing the seconds. How could I go to correctly parse the timestamp?

我相信这是因为它错过了秒。我怎样才能正确解析时间戳?

edit: For example, reading the data using

编辑:例如,使用读取数据

spark.read.schema(schema).json(rdd).show()

Will trigger the following error

将触发以下错误

16/10/24 13:06:27 ERROR Executor: Exception in task 6.0 in stage 5.0 (TID 23)
java.lang.IllegalArgumentException: 2016-10-07T11:15Z
    at org.apache.xerces.jaxp.datatype.XMLGregorianCalendarImpl$Parser.skip(Unknown Source)
    at org.apache.xerces.jaxp.datatype.XMLGregorianCalendarImpl$Parser.parse(Unknown Source)
    at org.apache.xerces.jaxp.datatype.XMLGregorianCalendarImpl.<init>(Unknown Source)
    at org.apache.xerces.jaxp.datatype.DatatypeFactoryImpl.newXMLGregorianCalendar(Unknown Source)
    at javax.xml.bind.DatatypeConverterImpl._parseDateTime(DatatypeConverterImpl.java:422)
    at javax.xml.bind.DatatypeConverterImpl.parseDateTime(DatatypeConverterImpl.java:417)
    at javax.xml.bind.DatatypeConverter.parseDateTime(DatatypeConverter.java:327)
    at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:140)
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertField(JacksonParser.scala:114)
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertObject(JacksonParser.scala:215)
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertField(JacksonParser.scala:182)
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertRootField(JacksonParser.scala:73)
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$2.apply(JacksonParser.scala:288)
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$2.apply(JacksonParser.scala:285)
    at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2366)
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1.apply(JacksonParser.scala:285)
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1.apply(JacksonParser.scala:280)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
16/10/24 13:06:27 WARN TaskSetManager: Lost task 6.0 in stage 5.0 (TID 23, localhost): java.lang.IllegalArgumentException: 2016-10-07T11:15Z
    at org.apache.xerces.jaxp.datatype.XMLGregorianCalendarImpl$Parser.skip(Unknown Source)
    at org.apache.xerces.jaxp.datatype.XMLGregorianCalendarImpl$Parser.parse(Unknown Source)
    at org.apache.xerces.jaxp.datatype.XMLGregorianCalendarImpl.<init>(Unknown Source)
    at org.apache.xerces.jaxp.datatype.DatatypeFactoryImpl.newXMLGregorianCalendar(Unknown Source)
    at javax.xml.bind.DatatypeConverterImpl._parseDateTime(DatatypeConverterImpl.java:422)
    at javax.xml.bind.DatatypeConverterImpl.parseDateTime(DatatypeConverterImpl.java:417)
    at javax.xml.bind.DatatypeConverter.parseDateTime(DatatypeConverter.java:327)
    at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:140)
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertField(JacksonParser.scala:114)
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertObject(JacksonParser.scala:215)
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertField(JacksonParser.scala:182)
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertRootField(JacksonParser.scala:73)
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$2.apply(JacksonParser.scala:288)
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$2.apply(JacksonParser.scala:285)
    at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2366)
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1.apply(JacksonParser.scala:285)
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1.apply(JacksonParser.scala:280)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

16/10/24 13:06:27 ERROR TaskSetManager: Task 6 in stage 5.0 failed 1 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 5.0 failed 1 times, most recent failure: Lost task 6.0 in stage 5.0 (TID 23, localhost): java.lang.IllegalArgumentException: 2016-10-07T11:15Z
    at org.apache.xerces.jaxp.datatype.XMLGregorianCalendarImpl$Parser.skip(Unknown Source)
    at org.apache.xerces.jaxp.datatype.XMLGregorianCalendarImpl$Parser.parse(Unknown Source)
    at org.apache.xerces.jaxp.datatype.XMLGregorianCalendarImpl.<init>(Unknown Source)
    at org.apache.xerces.jaxp.datatype.DatatypeFactoryImpl.newXMLGregorianCalendar(Unknown Source)
    at javax.xml.bind.DatatypeConverterImpl._parseDateTime(DatatypeConverterImpl.java:422)
    at javax.xml.bind.DatatypeConverterImpl.parseDateTime(DatatypeConverterImpl.java:417)
    at javax.xml.bind.DatatypeConverter.parseDateTime(DatatypeConverter.java:327)
    at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:140)
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertField(JacksonParser.scala:114)
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertObject(JacksonParser.scala:215)
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertField(JacksonParser.scala:182)
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertRootField(JacksonParser.scala:73)
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$2.apply(JacksonParser.scala:288)
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$2.apply(JacksonParser.scala:285)
    at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2366)
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1.apply(JacksonParser.scala:285)
    at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1.apply(JacksonParser.scala:280)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:347)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
  at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2183)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
  at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2532)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2182)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2189)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1925)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1924)
  at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2562)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:1924)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2139)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:526)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:486)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:495)
  ... 54 elided
Caused by: java.lang.IllegalArgumentException: 2016-10-07T11:15Z
  at org.apache.xerces.jaxp.datatype.XMLGregorianCalendarImpl$Parser.skip(Unknown Source)
  at org.apache.xerces.jaxp.datatype.XMLGregorianCalendarImpl$Parser.parse(Unknown Source)
  at org.apache.xerces.jaxp.datatype.XMLGregorianCalendarImpl.<init>(Unknown Source)
  at org.apache.xerces.jaxp.datatype.DatatypeFactoryImpl.newXMLGregorianCalendar(Unknown Source)
  at javax.xml.bind.DatatypeConverterImpl._parseDateTime(DatatypeConverterImpl.java:422)
  at javax.xml.bind.DatatypeConverterImpl.parseDateTime(DatatypeConverterImpl.java:417)
  at javax.xml.bind.DatatypeConverter.parseDateTime(DatatypeConverter.java:327)
  at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:140)
  at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertField(JacksonParser.scala:114)
  at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertObject(JacksonParser.scala:215)
  at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertField(JacksonParser.scala:182)
  at org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertRootField(JacksonParser.scala:73)
  at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$2.apply(JacksonParser.scala:288)
  at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$2.apply(JacksonParser.scala:285)
  at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2366)
  at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1.apply(JacksonParser.scala:285)
  at org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1.apply(JacksonParser.scala:280)
  at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
  at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
  at org.apache.spark.scheduler.Task.run(Task.scala:85)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)

2 个解决方案

#1


2  

You can change

你可以改变

val schema = (new StructType)
      .add("transaction_date_time", TimestampType)

TO

val schema = (new StructType)
      .add("transaction_date_time", StringType)

and then use df.withColumn("columnTimeWithOutSec", unix_timestamp($"time", format))

然后使用df.withColumn(“columnTimeWithOutSec”,unix_timestamp($“time”,format))

where format = "format time with out seconds ex HH:mm "

just like this...

像这样...

Also, have a look at DateTimeUtils.scala to be inline with Spark style conversions of Date and TimeStamp.

另外,看看DateTimeUtils.scala是否与Date和TimeStamp的Spark样式转换内联。

#2


0  

It looks like apache.spark.Timestamp is just a wrapper for java.sql.Timestamp. At least that's what this leads me to believe.

它看起来像apache.spark.Timestamp只是java.sql.Timestamp的包装器。至少这是让我相信的原因。

Accordingly, we can parse a date using SimpleDateFormat and extract the milliseconds, passing that to the Timestamp constructor.

因此,我们可以使用SimpleDateFormat解析日期并提取毫秒,并将其传递给Timestamp构造函数。

You could do something like in this example to preprocess the data:

你可以在这个例子中做一些事情来预处理数据:

import java.sql.Timestamp;
import java.text.*;
import java.util.Date;

public class Test {
    public static void main(String[] args) {
        String timestamp = "2016-10-07T11:15Z";
        DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mmXXX");
        Date parsedDate = null;
        try{
                parsedDate = df.parse(timestamp);
        }catch(Exception e){
                //do nothing
        }
        Timestamp ts = new Timestamp(parsedDate.getTime());
        System.out.println(parsedDate);
        System.out.println(ts);
    }
}

Which outputs

哪个输出

Fri Oct 07 04:15:00 PDT 2016
2016-10-07 04:15:00.0

I searched a bit for 'optional parts in a date format' and found this SO saying you should just make two SimpleDateFormats.

我搜索了一些“日期格式的可选部分”,发现这个说你应该制作两个SimpleDateFormats。

#1


2  

You can change

你可以改变

val schema = (new StructType)
      .add("transaction_date_time", TimestampType)

TO

val schema = (new StructType)
      .add("transaction_date_time", StringType)

and then use df.withColumn("columnTimeWithOutSec", unix_timestamp($"time", format))

然后使用df.withColumn(“columnTimeWithOutSec”,unix_timestamp($“time”,format))

where format = "format time with out seconds ex HH:mm "

just like this...

像这样...

Also, have a look at DateTimeUtils.scala to be inline with Spark style conversions of Date and TimeStamp.

另外,看看DateTimeUtils.scala是否与Date和TimeStamp的Spark样式转换内联。

#2


0  

It looks like apache.spark.Timestamp is just a wrapper for java.sql.Timestamp. At least that's what this leads me to believe.

它看起来像apache.spark.Timestamp只是java.sql.Timestamp的包装器。至少这是让我相信的原因。

Accordingly, we can parse a date using SimpleDateFormat and extract the milliseconds, passing that to the Timestamp constructor.

因此,我们可以使用SimpleDateFormat解析日期并提取毫秒,并将其传递给Timestamp构造函数。

You could do something like in this example to preprocess the data:

你可以在这个例子中做一些事情来预处理数据:

import java.sql.Timestamp;
import java.text.*;
import java.util.Date;

public class Test {
    public static void main(String[] args) {
        String timestamp = "2016-10-07T11:15Z";
        DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mmXXX");
        Date parsedDate = null;
        try{
                parsedDate = df.parse(timestamp);
        }catch(Exception e){
                //do nothing
        }
        Timestamp ts = new Timestamp(parsedDate.getTime());
        System.out.println(parsedDate);
        System.out.println(ts);
    }
}

Which outputs

哪个输出

Fri Oct 07 04:15:00 PDT 2016
2016-10-07 04:15:00.0

I searched a bit for 'optional parts in a date format' and found this SO saying you should just make two SimpleDateFormats.

我搜索了一些“日期格式的可选部分”,发现这个说你应该制作两个SimpleDateFormats。