Java UDF 的设计与使用介绍,兼容 Hive UDF 实现数据快速迁移

时间:2023-01-16 11:09:08

作者介绍: 李仕杨,SelectDB 生态研发工程师,Apache Doris Contributor。

我们在使用各个 SQL 引擎时,会遇到纷繁复杂的查询需求。一部分可以通过引擎自带的内置函数去解决,但内置函数往往具有一定通用性,在部分特殊场景下内置函数可能无法满足需求,所以一般 SQL 引擎会提供 UDF 功能,方便用户通过自己写逻辑来满足特定的需求,Apache Doris 也不例外。

在 Java UDF 之前,Apache Doris 提供了原生 UDF 。由于是使用 C++ 来编写的,执行效率高、速度更快,但是在实际使用中也会存在一些问题:

  • 跟 Doris 代码耦合度高,需要自己打包编译 Doris 源码
  • 只支持 C++ 语言并且 UDF 代码出错会影响 Doris 集群稳定性
  • 对于只熟悉 Hive、Spark 等大数据组件的用户有一定使用门槛

由上可知,原生的 UDF 实现起来门槛较高且存在一定的不稳定性因素。那么是否有一种实现相对简单、使用门槛较低且与 Doris 代码耦合度低的 UDF 呢?

答案是有的。在 2022 年 12 月正式发布的 Apache Doris 1.2.0 版本(https://github.com/apache/doris/releases)中,我们推出了全新的 Java UDF 和 Remote UDF 功能,其中 Java UDF 不仅能满足以上要求,并且在方便和安全角度为用户带来了全新体验:

  • 不熟悉 C++?Java 代码一样可以实现自己的 UDF
  • 使用条件苛刻?只要有 Jar 包就能使用
  • 担心稳定性?Java UDF 出错只影响自身,对 Doris 的稳定性几乎无影响
  • 迁移旧大数据平台的数据和 UDF 费时费力?Java UDF 完全兼容 Hive UDF,轻松实现快速迁移
  • .......

设计思路

大体步骤

Apache Doris 的 BE 是由 C++ 代码编写,如果想在 Doris 中实现 Java UDF,不可避免需要调用 JNI,而不正确的 JNI 调用将导致严重的性能问题。那么该如何设计 Java UDF 以解决这个问题呢?

Doris Java UDF 针对向量化引擎,其设计思路大体如下:

  • 首先,制定用户在创建 UDF 时必须遵循的一些规则。 例如,UDF 类必须具有 Evaluate 方法,并且必须是 Public 和 Non-Static 的。 这些规则确保我们可以正确调用 UDF。
  • 其次,Doris 查询引擎会执行一个新的 Java 函数调用,BE 会创建或重用一个 JVM 来调用真正的 Java UDF。 为了隔离不同的 UDF 实例,选择使用不同的类加载器来加载 UDF。
  • 最后,由于执行时是向量化的,因此可以实现一次执行多行数据只调用一次 JNI,原因是 JNI 开销被输入列中所有行分摊了[1], 这将给用户带来更好的性能体验。

详细步骤

熟悉 Java 的朋友应该都知道,JVM 在直接内存即非堆区的 IO 操作比堆区更高效,因此 Doris Java UDF 一般是在直接内存中对数据进行 IO 操作。通常 UDF 有以下几种情况:

  • 一般 UDF (定长UDF)

    • 此处的基本思想是传递直接指向输入缓冲区和输出缓冲区的地址,Doris 可以直接从所给地址中读取和写回数据,这可以帮助 Doris 避免不必要的数据拷贝。 Input Buffer 和 Output Buffer 都是 JVM 的堆外内存,可以直接通过J ava 的 API 来操作这部分内存。
    • 整体执行模式如下图:

Java UDF 的设计与使用介绍,兼容 Hive UDF 实现数据快速迁移

  • UDAF(变长输出)

    • 对于一般 UDF 来说,输出大小和类型是不变,因此所需 Buffer 大小也是确定的,而对于 UDAF(变长输出),一般 UDF(定长 UDF)的步骤将不再适用。
    • 因此* *需要做出如下改变:**在第 1 步分配一个初始缓冲区,当结果大于分配的初始缓冲区时跳到第 3 步,在第 3 步中进行一次扩容。当这种情况持续发生时,我们再次重复上述步骤分配新缓冲区并继续为剩余的行执行 UDF,直到所有数据都执行完成。
    • 执行过程如下图所示:

Java UDF 的设计与使用介绍,兼容 Hive UDF 实现数据快速迁移

通过上文介绍,我们基本了解了 Doris Java UDF 的执行情况,那么在实际生产中应该如何来使用Java UDF呢?

Java UDF 的使用

Java UDF 使用起来非常简单。Java UDF 在 Doris 内注册完成后,Doris 执行时通过调用 jar 包来实现 UDF 逻辑。顺序结构如下图:

Java UDF 的设计与使用介绍,兼容 Hive UDF 实现数据快速迁移

具体步骤:

  1. 参考``doris/samples/doris-demo/java-udf-demo/src/main/java/org/apache/doris/udf/AddOne.java 文件,编写 UDF 逻辑,你可以像 Hive UDF 一样在任何地方进行编写和打包,不必跟 Doris 环境相关联。
  • AddOne.java文件内容如下:
  • // Licensed to the Apache Software Foundation (ASF) under one
    // or more contributor license agreements.  See the NOTICE file
    // distributed with this work for additional information
    // regarding copyright ownership.  The ASF licenses this file
    // to you under the Apache License, Version 2.0 (the
    // "License"); you may not use this file except in compliance
    // with the License.  You may obtain a copy of the License at
    //
    //   http://www.apache.org/licenses/LICENSE-2.0
    //
    // Unless required by applicable law or agreed to in writing,
    // software distributed under the License is distributed on an
    // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    // KIND, either express or implied.  See the License for the
    // specific language governing permissions and limitations
    // under the License.
    ​
    package org.apache.doris.udf;
    ​
    import org.apache.hadoop.hive.ql.exec.UDF;
    ​
    public class AddOne extends UDF {
        public Integer evaluate(Integer value) {
            return value == null? null: value + 1;
        }
    }
    
  1. 执行 mvn 打包命令

       mvn clean package
    
  2. 创建 UDF

      CREATE FUNCTION java_udf_name(int) RETURNS int PROPERTIES (
          "file"="file:///path/to/your_jar_name.jar",
          "symbol"="org.apache.doris.udf.AddOne",
          "always_nullable"="true",
          "type"="JAVA_UDF"
      );
  1. 使用已创建的 UDF
           建表:
           CREATE TABLE IF NOT EXISTS test.t1 (`col_1` int NOT NULL)
           DISTRIBUTED BY HASH(col_1) PROPERTIES("replication_num" = "1");
           
           
           插入数据:
           insert into test.t1 values(1),(2);
           
           
           使用udf:
           MySQL [(none)]> select col_1, java_udf_name(col_1) as col_2 from test.t1;
           +------------+------------+
           | col_1      | col_2      |
           +------------+------------+
           | 1          | 2          |
           | 2          | 3          |
           +------------+------------+

至此,Doris Java UDF 的创建和使用就完成了,十分简单易用。

注意事项

  • 最开始需要确定 BE 节点是否配置了JAVA_HOME,如果环境变量没有配置,则可以在be/bin/start_be.sh文件第一行加上
export JAVA_HOME=/xxx/xxx
  • UDF 代码中必须要带有以下信息(UDAF 则替换成对应的)
import org.apache.hadoop.hive.ql.exec.UDF;
  • 创建 Doris java UDF 的语句,其格式如下
CREATE FUNCTION name ([,...])
[RETURNS] rettype
PROPERTIES (["key"="value"][,...])  
  • 例子中完整的 SQL 如下
 CREATE FUNCTION java_udf_name(int) RETURNS int PROPERTIES (
"file"="file:///path/to/your_jar_name.jar",
"always_nullable"="true",
"type"="JAVA_UDF"
        );

java_udf_name 是创建 UDF 的名称,可以进行更改,UDF 名称不能与 Doris 其他函数重命。

名称后的``(int)``表示函数输入参数是 int 类型,RETURNS``后的``int``表示函数输出也是 int 类型;输入输出类型跟 Java 代码中 Evaluate 函数的输入输出类型要保持一致。

  • PROPERTIES

file表示 jar 包在本机的路径,应该修改"/path/to/your_jar_name.jar"``作为 jar 包的绝对路径。如果是多机环境,也可以使用 http 形式表示的路径,例如"file"="http://${host}:${http_port}/${your_jar_file}"

可以使用python命令来简单启动一个http server:
nohup python -m SimpleHTTPServer 12345 > /dev/null 2>&1
(启动python的目录需要跟你的jar包保持一致,比如你的jar放在A机器的/usr/lib下,那么python命令最好也在该机器的该目录下启动)

Symbol 可以参考 Java 代码中的 Package always_nullable表示 UDF 返回结果中是否可能出现 NULL 值,如果想要在计算中对出现的NULL值有特殊处理,以确定结果中不会返回 NULL,可以设为 false,有利于提升整个查询计算过程的性能。

收益总结

通过本文的介绍,了解了Doris Java UDF 的设计与使用方法,那么在实际的应用中,Doris Java UDF 能为使用者带来什么收益呢?

  • 熟悉 Java 的同学也可以快速上手开发 Doris,使用简单便捷,较大提升开发效率。

  • 兼容 Hive UDF,有效降低从 Hadoop 迁移数据的成本。

  • UDF 代码出错并不会影响 Doris,某种程度上保证了 Doris 的更好稳定运行。

  • 与 Doris 代码解耦,真正做到了"Write Once Run Anywhere"

  • 执行效率方面,Java UDF 是完全向量化执行的,一次执行多行数据只调用一次 JNI,结合堆外内存、Zero Copy 等优化技术,用户在使用 Java UDF 时,也能得到与之前的 C++ UDF 一致甚至更佳的查询性能体验。

社区贡献

如果你的 UDF 已被许多场景应用,可以将 UDF 贡献到 Apache Doris 社区。贡献步骤可参考:https://doris.incubator.apache.org/zh-CN/docs/dev/ecosystem/udf/contribute-udf

需要注意的是,Doris BE 端是由 C++ 代码实现的,因此你所贡献的内置 UDF 也需要由C++代码实现。Apache Doris 社区期待你的加入!

本文引用

[1] Viktor Rosenfeld, René Müller, Pinar Tözün, etc. Processing Java UDFs in a C++ environment. SoCC 2017: 419-431.

[2] Marcel Kornacker, Alexander Behm, Victor Bittorf, etc. Impala: A Modern, Open-Source SQL Engine for Hadoop. CIDR 2015.

[3]DSIP-001: Java UDF: https://cwiki.apache.org/confluence/display/DORIS/DSIP-001%3A+Java+UDF

[4]Apache Doris GitHub: https://github.com/apache/doris

END