spark处理dataframe数据时,往往遇到"...cannot be cast to ..."这种数据不匹配的问题,主要是因为我们代码中spark指定的数据类型和数据源类型不一致。这里以MySQL为数据源为例。
一、读取数据格式匹配
MySQL表的创建语句指定的数据类型有decimal、varchar、datetime等。
CREATE TABLE `customfieldvalue` (
`ID` decimal(18,0) NOT NULL,
`ISSUE` decimal(18,0) DEFAULT NULL,
`CUSTOMFIELD` decimal(18,0) DEFAULT NULL,
`PARENTKEY` varchar(255) DEFAULT NULL,
`STRINGVALUE` varchar(255) DEFAULT NULL,
`NUMBERVALUE` decimal(18,6) DEFAULT NULL,
`TEXTVALUE` longtext,
`DATEVALUE` datetime DEFAULT NULL,
`VALUETYPE` varchar(255) DEFAULT NULL,
PRIMARY KEY (`ID`),
KEY `cfvalue_issue` (`ISSUE`,`CUSTOMFIELD`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("ETL") conf.setMaster("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) //从MySQL中获取数据 val url = "jdbc:mysql://192.168.xxx.xxx:3306/jira?user=xxxx&password=xxxx" val prop = new Properties() //加载整个表 val customfieldvalue = sqlContext.read.jdbc(url, "customfieldvalue", prop) val customfieldvalue2 = customfieldvalue.map { line => val ID = line.getAs[Double]("ID") //问题所在 val ISSUE = line.getAs[BigDecimal]("ISSUE") val CUSTOMFIELD = line.getAs[BigDecimal]("CUSTOMFIELD") val PARENTKEY = line.getAs[String]("PARENTKEY") val STRINGVALUE = line.getAs[String]("STRINGVALUE") val NUMBERVALUE = line.getAs[BigDecimal]("NUMBERVALUE") val TEXTVALUE = line.getAs[String]("TEXTVALUE") val DATEVALUE = line.getAs[Date]("DATEVALUE") val VALUETYPE = line.getAs[String]("VALUETYPE") (ID, ISSUE, CUSTOMFIELD, PARENTKEY, STRINGVALUE, NUMBERVALUE, TEXTVALUE, DATEVALUE, VALUETYPE) }.filter(line => line._5 != null && line._5.matches("^\\d+$")).map { line => val STRINGVALUE = new BigDecimal(line._5) Row.apply(line._1, line._2, line._3, line._4, STRINGVALUE, line._6, line._7, line._8, line._9) }
以上代码处理数据时会出现如下报错:
java.lang.ClassCastException: java.math.BigDecimal cannot be cast to java.lang.Double
at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:119)at com.xbn.amazon.analysis.jira.MySql2Hive$$anonfun$2.apply(MySql2Hive.scala:40)
at com.xbn.amazon.analysis.jira.MySql2Hive$$anonfun$2.apply(MySql2Hive.scala:39)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
MySQL中ID字段的数据类型为decimal,getAs方法不能隐式装换为double,因此这里必须指定为BigDecimal类型:line.getAs[BigDecimal]("ID"),或者直接用getDecimal()方法,如下图
二、保存数据格式匹配
上面的dataframe处理后的数据保存为csv格式的文件,因此也得匹配。
Spark SQL和DataFrames支持的数据格式如下:
数值类型
ByteType: 代表1字节有符号整数. 数值范围: -128 到 127.
ShortType: 代表2字节有符号整数. 数值范围: -32768 到 32767.
IntegerType: 代表4字节有符号整数. 数值范围: -2147483648 t到 2147483647.
LongType: 代表8字节有符号整数. 数值范围: -9223372036854775808 到 9223372036854775807.
FloatType: 代表4字节单精度浮点数。
DoubleType: 代表8字节双精度浮点数。
DecimalType: 表示任意精度的有符号十进制数。内部使用java.math.BigDecimal.A实现。
BigDecimal由一个任意精度的整数非标度值和一个32位的整数组成。
String类型
StringType: 表示字符串值。
Binary类型
BinaryType: 代表字节序列值。
Boolean类型
BooleanType: 代表布尔值。
Datetime类型
TimestampType: 代表包含的年、月、日、时、分和秒的时间值
DateType: 代表包含的年、月、日的日期值
复杂类型
ArrayType(elementType, containsNull): 代表包含一系列类型为elementType的元素。如果在一个将ArrayType值的元素可以为空值,containsNull指示是否允许为空。
MapType(keyType, valueType, valueContainsNull): 代表一系列键值对的集合。key不允许为空,valueContainsNull指示value是否允许为空
StructType(fields): 代表带有一个StructFields(列)描述结构数据。
StructField(name, dataType, nullable): 表示StructType中的一个字段。name表示列名、dataType表示数据类型、nullable指示是否允许为空。
Spark SQL所有的数据类型在 org.apache.spark.sql.types 包内。不同语言访问或创建数据类型方法不一样:
Scala 代码中添加 import org.apache.spark.sql.types._,再进行数据类型访问或创建操作。
scalaAccessDataTypes
Java 可以使用 org.apache.spark.sql.types.DataTypes 中的工厂方法,如下表:
javaAccessDataTypes
上述处理后的dataframe的格式依次为:BigDecimal,BigDecimal,BigDecimal,String,String,BigDecimal,String,Date,String
因此在构建schema的时候也得遵循上述格式。
val schema = StructType( Array(StructField("ID", DecimalType(18,0), true), StructField("ISSUE", DecimalType(18,0), true), StructField("CUSTOMFIELD", DecimalType(18,0), true), StructField("PARENTKEY", StringType, true), StructField("STRINGVALUE", DecimalType(18,0), true), StructField("NUMBERVALUE", DecimalType(18,6), true), StructField("TEXTVALUE", StringType, true), StructField("DATEVALUE", DateType, true), StructField("VALUETYPE", StringType, true) ))
val schema = StructType( Array(StructField("ID", DoubleType, true), //问题所在 StructField("ISSUE", DecimalType(18,0), true), StructField("CUSTOMFIELD", DecimalType(18,0), true), StructField("PARENTKEY", StringType, true), StructField("STRINGVALUE", DecimalType(18,0), true), StructField("NUMBERVALUE", DecimalType(18,6), true), StructField("TEXTVALUE", StringType, true), StructField("DATEVALUE", DateType, true), StructField("VALUETYPE", StringType, true) ))
否则会报如下错误:
java.lang.ClassCastException: java.math.BigDecimal cannot be cast to java.lang.Double
at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:119)
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getDouble(rows.scala:44)
at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getDouble(rows.scala:221)