【Clickhouse】Spark通过ClickHouse-Native-JDBC写入Clickhouse

时间:2021-04-20 00:46:27

目前通过JDBC写Clickhouse有两种插件可以用
官方的JDBC:8123端口
基于HTTP实现的,整体性能不太出色,有可能出现超时的现象
​​housepower的ClickHouse-Native-JDBC​​:9000端口
基于TCP协议实现,支持高性能写入,数据按列组织并有压缩

记录下使用ClickHouse-Native-JDBC的过程:
​Spark版本​​:2.1.0
​Clickhouse版本​​:20.2.1.2183,单点部署
​ClickHouse-Native-JDBC版本​​:2.1-stable

  1. 首先在Clickhouse创建一张本地表:


CREATE TABLE IF NOT EXISTS jdbc_test_table \
( \
`name` String, \
`age` UInt8, \
`job` String \
) \
ENGINE = MergeTree() \
PARTITION BY age \
ORDER BY age \
SETTINGS index_granularity = 8192
  1. 编写Spark代码:


## pom要加入JDBC的依赖
<dependency>
<groupId>com.github.housepower</groupId>
<artifactId>clickhouse-native-jdbc</artifactId>
<version>2.1-stable</version>
</dependency>


import java.sql.SQLFeatureNotSupportedException
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}

object SparkJDBCToClickhouse {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf =
new SparkConf()
.setAppName("SparkJDBCToClickhouse")
.setMaster("local[1]")

val spark: SparkSession =
SparkSession
.builder()
.config(sparkConf)
.getOrCreate()

val filePath = "people.csv"
val ckDriver = "com.github.housepower.jdbc.ClickHouseDriver"
val ckUrl = "jdbc:clickhouse://localhost:9000"
val table = "jdbc_test_table"
// 读取people.csv测试文件内容
val peopleDFCsv =
spark.read
.format("csv")
.option("sep", ";")
.option("inferSchema", "true")
.option("header", "true")
.load(filePath)
peopleDFCsv.show()

try {
val pro = new java.util.Properties
pro.put("driver", ckDriver)
peopleDFCsv.write
.mode(SaveMode.Append)
.option("batchsize", "20000")
.option("isolationLevel", "NONE")
.option("numPartitions", "1")
.jdbc(ckUrl, table, pro)
} catch {
// 这里注意下,spark里面JDBC datasource用到的一些获取元数据的方法插件里并没有支持,比如getPrecision & setQueryTimeout等等,都会抛出异常,但是并不影响写入
case e: SQLFeatureNotSupportedException =>
println("catch and ignore!")
}
spark.close()
}
}

在上面抛异常的地方卡了很久,Spark在JDBC读取的时候,会先尝试获取目标表的schema元数据,调用一些方法,比如resolvedTable时会getSchema:


def getSchema(
resultSet: ResultSet,
dialect: JdbcDialect,
alwaysNullable: Boolean = false): StructType = {
val rsmd = resultSet.getMetaData
val ncols = rsmd.getColumnCount
val fields = new Array[StructField](ncols)
var i = 0
while (i < ncols) {
// 以下方法可能不兼容
val columnName = rsmd.getColumnLabel(i + 1)
val dataType = rsmd.getColumnType(i + 1)
val typeName = rsmd.getColumnTypeName(i + 1)
val fieldSize = rsmd.getPrecision(i + 1)
val fieldScale = rsmd.getScale(i + 1)
val isSigned = {
try {
rsmd.isSigned(i + 1)
} catch {
// Workaround for HIVE-14684:
case e: SQLException if
e.getMessage == "Method not supported" &&
rsmd.getClass.getName == "org.apache.hive.jdbc.HiveResultSetMetaData" => true
}
}
val nullable = if (alwaysNullable) {
true
} else {
rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls
}
val metadata = new MetadataBuilder().putLong("scale", fieldScale)
val columnType =
dialect.getCatalystType(dataType, typeName, fieldSize, metadata).getOrElse(
getCatalystType(dataType, fieldSize, fieldScale, isSigned))
fields(i) = StructField(columnName, columnType, nullable)
i = i + 1
}
new StructType(fields)
}

调用的这些方法里,有一些是ClickHouse-Native-JDBC不支持的,如:


@Override
public int getPrecision(int column) throws SQLException {
throw new SQLFeatureNotSupportedException();
}

@Override
public int getScale(int column) throws SQLException {
throw new SQLFeatureNotSupportedException();
}

所有抛异常的方法在​​这里​​。

one more thing
若使用spark-2.4.5版本无法写入,由于插件不支持setQueryTimeout接口,直接在JdbcRelation阶段退出:

【Clickhouse】Spark通过ClickHouse-Native-JDBC写入Clickhouse

java.sql.SQLFeatureNotSupportedException
at com.github.housepower.jdbc.wrapper.SQLStatement.setQueryTimeout(SQLStatement.java:59)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.createTable(JdbcUtils.scala:862)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:81)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)