Spark SQL 自定义函数类型
一、spark读取数据
前段时间一直在研究GeoMesa下的Spark JTS,Spark JTS支持用户自定义函数,然后有一份数据,读取文件:
package com.geomesa.spark.SparkCore
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{ArrayType, DataTypes, StringType, StructField, StructType}
object test {
def main(args: Array[String]): Unit = {
import org.locationtech.geomesa.spark.jts._
//spark
val spark: SparkSession = {
SparkSession.builder()
.appName("test")
.master("local[*]")
.getOrCreate()
//需注入spark.jts._包
.withJTS
}
val dataFile = this.getClass.getClassLoader.getResource("gsmc.txt").getPath
val df = spark.read
.schema(schema)
.json(dataFile)
//.show(5, false)
//.printSchema()
}
}
二、自定义函数结构
然后打印出来的数据结构如下,通过spark sql的自定义函数构建这个结构的数据,主要构建features下的相关数据结构,之前耗时N久,各种不会构建以及构建错误,后,皇天不负有心人,搞就是了,搞出来了。
root
|-- crs: struct (nullable = true)
| |-- properties: struct (nullable = true)
| | |-- name: string (nullable = true)
| |-- type: string (nullable = true)
|-- features: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- geometry: struct (nullable = true)
| | | |-- coordinates: array (nullable = true)
| | | | |-- element: array (containsNull = true)
| | | | | |-- element: array (containsNull = true)
| | | | | | |-- element: double (containsNull = true)
| | | |-- type: string (nullable = true)
| | |-- geometry_name: string (nullable = true)
| | |-- id: string (nullable = true)
自定义格式如下:
val schema = StructType(Array(
StructField("crs", StringType),
StructField("features", ArrayType(
StructType(Array(StructField("geometry",
StructType(Array(StructField("coordinates",
ArrayType(DataTypes.createArrayType(ArrayType((DataTypes.DoubleType)))))
)))))))
))
经过printSchema()方法测试,结构如上面的features结构一模一样,nice。
三、附上长长的各种pom
<properties>
<geospark.version>1.2.0</geospark.version>
<geotools.version>14.1</geotools.version>
<spark.version>2.3.1</spark.version>
<encoding>UTF-8</encoding>
<scala.binary.version>2.11</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.0</version>
</dependency>
<dependency>
<groupId>org.datasyslab</groupId>
<artifactId>geospark</artifactId>
<version>${geospark.version}</version>
</dependency>
<dependency>
<groupId>org.geotools</groupId>
<artifactId>gt-geometry</artifactId>
<version>20.0</version>
</dependency>
<dependency>
<groupId>com.vividsolutions</groupId>
<artifactId>jts</artifactId>
<version>1.13</version>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20180813</version>
</dependency>
<dependency>
<groupId>com.esri.geometry</groupId>
<artifactId>esri-geometry-api</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>org.geotools</groupId>
<artifactId>gt-geojson</artifactId>
<version>${geotools.version}</version>
</dependency>
<dependency>
<groupId>org.geotools</groupId>
<artifactId>gt-api</artifactId>
<version>${geotools.version}</version>
</dependency>
<dependency>
<groupId>org.geotools</groupId>
<artifactId>gt-referencing</artifactId>
<version>${geotools.version}</version>
</dependency>
<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-spark-jts_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.geotools</groupId>
<artifactId>gt-epsg-hsql</artifactId>
<version>${geotools.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.locationtech.jts.io</groupId>
<artifactId>jts-io-common</artifactId>
<version>1.16.0</version>
</dependency>
<dependency>
<groupId>org.locationtech.jts</groupId>
<artifactId>jts-core</artifactId>
<version>1.16.0</version>
</dependency>
<dependency>
<groupId>org.locationtech.spatial4j</groupId>
<artifactId>spatial4j</artifactId>
<version>0.6</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<!--redis-->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
</dependencies>