spark中处理dataframe往往遇到”...cannot be cast to ...”这种数据不匹配的问题

时间:2024-05-23 15:22:42

    spark处理dataframe数据时,往往遇到"...cannot be cast to ..."这种数据不匹配的问题,主要是因为我们代码中spark指定的数据类型和数据源类型不一致。这里以MySQL为数据源为例。


一、读取数据格式匹配

MySQL表的创建语句指定的数据类型有decimal、varchar、datetime等。

CREATE TABLE `customfieldvalue` (
  `ID` decimal(18,0) NOT NULL,
  `ISSUE` decimal(18,0) DEFAULT NULL,
  `CUSTOMFIELD` decimal(18,0) DEFAULT NULL,
  `PARENTKEY` varchar(255) DEFAULT NULL,
  `STRINGVALUE` varchar(255) DEFAULT NULL,
  `NUMBERVALUE` decimal(18,6) DEFAULT NULL,
  `TEXTVALUE` longtext,
  `DATEVALUE` datetime DEFAULT NULL,
  `VALUETYPE` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`ID`),
  KEY `cfvalue_issue` (`ISSUE`,`CUSTOMFIELD`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;


def main(args: Array[String]): Unit = {
  val conf = new SparkConf().setAppName("ETL")
  conf.setMaster("local")
  val sc = new SparkContext(conf)
  val sqlContext = new SQLContext(sc)
  //从MySQL中获取数据
  val url = "jdbc:mysql://192.168.xxx.xxx:3306/jira?user=xxxx&password=xxxx"
  val prop = new Properties()
  //加载整个表
  val customfieldvalue = sqlContext.read.jdbc(url, "customfieldvalue", prop)

  val customfieldvalue2 = customfieldvalue.map { line =>
    val ID = line.getAs[Double]("ID")    //问题所在
    val ISSUE = line.getAs[BigDecimal]("ISSUE")
    val CUSTOMFIELD = line.getAs[BigDecimal]("CUSTOMFIELD")
    val PARENTKEY = line.getAs[String]("PARENTKEY")
    val STRINGVALUE = line.getAs[String]("STRINGVALUE")
    val NUMBERVALUE = line.getAs[BigDecimal]("NUMBERVALUE")
    val TEXTVALUE = line.getAs[String]("TEXTVALUE")
    val DATEVALUE = line.getAs[Date]("DATEVALUE")
    val VALUETYPE = line.getAs[String]("VALUETYPE")
    (ID, ISSUE, CUSTOMFIELD, PARENTKEY, STRINGVALUE, NUMBERVALUE, TEXTVALUE, DATEVALUE, VALUETYPE)
  }.filter(line =>
    line._5 != null &&
  line._5.matches("^\\d+$")).map { line =>
    val STRINGVALUE = new BigDecimal(line._5)
    Row.apply(line._1, line._2, line._3, line._4, STRINGVALUE, line._6, line._7, line._8, line._9)
  }

以上代码处理数据时会出现如下报错:

java.lang.ClassCastException: java.math.BigDecimal cannot be cast to java.lang.Double

at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:119)
at com.xbn.amazon.analysis.jira.MySql2Hive$$anonfun$2.apply(MySql2Hive.scala:40)
at com.xbn.amazon.analysis.jira.MySql2Hive$$anonfun$2.apply(MySql2Hive.scala:39)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

MySQL中ID字段的数据类型为decimal,getAs方法不能隐式装换为double,因此这里必须指定为BigDecimal类型:line.getAs[BigDecimal]("ID"),或者直接用getDecimal()方法,如下图

spark中处理dataframe往往遇到”...cannot be cast to ...”这种数据不匹配的问题


二、保存数据格式匹配

上面的dataframe处理后的数据保存为csv格式的文件,因此也得匹配。

Spark SQL和DataFrames支持的数据格式如下:

数值类型 
ByteType: 代表1字节有符号整数. 数值范围: -128 到 127. 
ShortType: 代表2字节有符号整数. 数值范围: -32768 到 32767. 
IntegerType: 代表4字节有符号整数. 数值范围: -2147483648 t到 2147483647. 
LongType: 代表8字节有符号整数. 数值范围: -9223372036854775808 到 9223372036854775807. 
FloatType: 代表4字节单精度浮点数。 
DoubleType: 代表8字节双精度浮点数。 
DecimalType: 表示任意精度的有符号十进制数。内部使用java.math.BigDecimal.A实现。 
BigDecimal由一个任意精度的整数非标度值和一个32位的整数组成。 
String类型 
StringType: 表示字符串值。 
Binary类型 
BinaryType: 代表字节序列值。 
Boolean类型 
BooleanType: 代表布尔值。 
Datetime类型 
TimestampType: 代表包含的年、月、日、时、分和秒的时间值 
DateType: 代表包含的年、月、日的日期值 
复杂类型 
ArrayType(elementType, containsNull): 代表包含一系列类型为elementType的元素。如果在一个将ArrayType值的元素可以为空值,containsNull指示是否允许为空。 
MapType(keyType, valueType, valueContainsNull): 代表一系列键值对的集合。key不允许为空,valueContainsNull指示value是否允许为空 
StructType(fields): 代表带有一个StructFields(列)描述结构数据。 
StructField(name, dataType, nullable): 表示StructType中的一个字段。name表示列名、dataType表示数据类型、nullable指示是否允许为空。 
Spark SQL所有的数据类型在 org.apache.spark.sql.types 包内。不同语言访问或创建数据类型方法不一样:

Scala 代码中添加 import org.apache.spark.sql.types._,再进行数据类型访问或创建操作。 
scalaAccessDataTypes

Java 可以使用 org.apache.spark.sql.types.DataTypes 中的工厂方法,如下表: 
javaAccessDataTypes



上述处理后的dataframe的格式依次为:BigDecimal,BigDecimal,BigDecimal,String,String,BigDecimal,String,Date,String 

因此在构建schema的时候也得遵循上述格式。

val schema = StructType(
  Array(StructField("ID",  DecimalType(18,0), true),
    StructField("ISSUE", DecimalType(18,0), true),
    StructField("CUSTOMFIELD", DecimalType(18,0), true),
    StructField("PARENTKEY", StringType, true),
    StructField("STRINGVALUE", DecimalType(18,0), true),
    StructField("NUMBERVALUE", DecimalType(18,6), true),
    StructField("TEXTVALUE", StringType, true),
    StructField("DATEVALUE", DateType, true),
    StructField("VALUETYPE", StringType, true)
  ))



val schema = StructType(
  Array(StructField("ID",  DoubleType, true),        //问题所在
    StructField("ISSUE", DecimalType(18,0), true),
    StructField("CUSTOMFIELD", DecimalType(18,0), true),
    StructField("PARENTKEY", StringType, true),
    StructField("STRINGVALUE", DecimalType(18,0), true),
    StructField("NUMBERVALUE", DecimalType(18,6), true),
    StructField("TEXTVALUE", StringType, true),
    StructField("DATEVALUE", DateType, true),
    StructField("VALUETYPE", StringType, true)
  ))

否则会报如下错误:

java.lang.ClassCastException: java.math.BigDecimal cannot be cast to java.lang.Double
at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:119)
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getDouble(rows.scala:44)
at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getDouble(rows.scala:221)