用户自定义函数
2025年4月3日大约 8 分钟
用户自定义函数
1. UDF介绍
UDF(User Defined Function)即用户自定义函数,IoTDB 提供多种内置的时序处理函数,也支持扩展自定义函数来满足更多的计算需求。
IoTDB 表模型中支持两种类型的 UDF ,如下表所示。
UDF 类型 | 函数类型 | 描述 |
---|---|---|
UDSF(User-defined Scalar Function) | 标量函数 | 输入 k 列 1 行数据,输出1 列 1 行数据(一对一)。 |
UDAF(User-defined Aggregate Function) | 聚合函数 | 输入k 列 m 行数据,输出1 列 1 行数据(多对一)。 |
UDSF
可用于标量函数出现的任何子句和表达式中,如select子句、where子句等。select udsf1(s1) from table1 where udsf2(s1)>0
UDAF
可用于聚合函数出现的任何子句和表达式中,如select子句、having子句等;select udaf1(s1), device_id from table1 group by device_id having udaf2(s1)>0
2. UDF 管理
2.1 UDF 注册
准备 UDF 实现的 JAR 包,其中包含 UDF 实现类,如
org.apache.iotdb.udf.ScalarFunctionExample
。Jar 包的放置有两种方式:
- 本地:需要将 JAR 包放置到集群所有节点的
ext/udf
目录下。 - 远端:需要将 JAR 包上传到 URI 服务器上并确保 IoTDB 实例能够访问该 URI 服务器(注册成功后IoTDB 会下载 JAR 包并同步到整个集群)。
- 使用以下 SQL 语句注册 UDF
CREATE FUNCTION <UDF-NAME> AS <UDF-CLASS-FULL-PATHNAME> (USING URI URI-STRING)
- 示例
-- 本地
CREATE FUNCTION contain_null AS 'org.apache.iotdb.udf.ScalarFunctionExample';
-- 远端
CREATE FUNCTION contain_null AS 'org.apache.iotdb.udf.ScalarFunctionExample' USING URI 'http://jar/example.jar'
注意:
- UDF 在装载过程中无需启停服务器。
- UDF 名称大小写不敏感,不能与 IoTDB 内置函数重名。
- 表模型和树模型的 UDF 空间相互独立。
- 避免在不同的 JAR 包中创建全类名相同但功能逻辑不同的 UDF 类。如果存在,系统在执行 UDF 时会随机加载其中一个,造成执行行为不一致。
2.2 UDF 卸载
SQL 语法如下:
DROP FUNCTION <UDF-NAME>
示例:卸载上述例子的 UDF:
DROP FUNCTION contain_null
2.3 UDF 查看
- 如果 State 为 UNAVAILABLE,可能是在注册或卸载过程中系统发生了错误,请查看系统日志进行排查,重新注册或卸载 UDF 直至成功即可。
SHOW FUNCTIONS
2.4 UDF 配置
- 可以在
iotdb-system.properties
中配置 UDF Jar 文件的存储目录:
# UDF lib dir
udf_lib_dir=ext/udf
3. UDF 开发
3.1 UDF 依赖
可以从 Maven 库 中搜索下面示例中的依赖,请注意选择和目标 IoTDB 服务器版本相同的依赖版本。
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>udf-api</artifactId>
<version>2.0.0</version>
<scope>provided</scope>
</dependency>
3.2 标量函数(UDSF)
编写一个 UDSF 需要实现org.apache.iotdb.udf.api.relational.ScalarFunction
接口。
public interface ScalarFunction extends SQLFunction {
/**
* In this method, the user need to do the following things:
*
* <ul>
* <li>Validate {@linkplain FunctionArguments}. Throw {@link UDFArgumentNotValidException} if
* any parameter is not valid.
* <li>Use {@linkplain FunctionArguments} to get input data types and infer output data type.
* <li>Construct and return a {@linkplain ScalarFunctionAnalysis} object.
* </ul>
*
* @param arguments arguments used to validate
* @throws UDFArgumentNotValidException if any parameter is not valid
* @return the analysis result of the scalar function
*/
ScalarFunctionAnalysis analyze(FunctionArguments arguments) throws UDFArgumentNotValidException;
/**
* This method is called after the ScalarFunction is instantiated and before the beginning of the
* transformation process. This method is mainly used to initialize the resources used in
* ScalarFunction.
*
* @param arguments used to parse the input arguments entered by the user
* @throws UDFException the user can throw errors if necessary
*/
default void beforeStart(FunctionArguments arguments) throws UDFException {
// do nothing
}
/**
* This method will be called to process the transformation. In a single UDF query, this method
* may be called multiple times.
*
* @param input original input data row
* @throws UDFException the user can throw errors if necessary
*/
Object evaluate(Record input) throws UDFException;
/** This method is mainly used to release the resources used in the ScalarFunction. */
default void beforeDestroy() {
// do nothing
}
}
接口说明:
接口定义 | 描述 | 是否必须 |
---|---|---|
ScalarFunctionAnalysis analyze(FunctionArguments arguments); | 1. 校验FunctionArguments 中的输入列数、数据类型、系统参数等是否合法,不合法则抛出异常。2. 根据 FunctionArguments 构造ScalarFunctionAnalysis ,包括输出类型等信息。 | 是 |
void beforeStart(FunctionArguments arguments); | 在 UDSF 处理输入数据前,调用用户自定义的初始化行为 | 否 |
Object evaluate(Record input) throws UDFException; | UDSF 处理逻辑,根据一行输入数据,返回一行输出数据。 | 是 |
void beforeDestroy(); | UDSF 的结束方法,您可以在此方法中进行一些资源释放等的操作。此方法由框架调用。对于一个实例而言,生命周期中会且只会被调用一次,即在处理完最后一条记录之后被调用。 | 否 |
目前 ScalarFunctionAnalysis 中的字段:
字段类型 | 字段名称 | 默认值 |
---|---|---|
Type | outputDataType | 无 |
示例:UDSF 的实现示例,输入任意数据类型的任意多列,返回一个布尔值,代表该行输入是否包含 NULL 值。
3.3 聚合函数(UDAF)
一个完整的 UDAF 定义涉及到 State
和 AggregateFunction
两个类。
3.3.1 State 类
编写一个 State 类需要实现org.apache.iotdb.udf.api.State
接口。
public interface State {
/** Reset your state object to its initial state. */
void reset();
/**
* Serialize your state into byte array. The order of serialization must be consistent with
* deserialization.
*/
byte[] serialize();
/**
* Deserialize byte array into your state. The order of deserialization must be consistent with
* serialization.
*/
void deserialize(byte[] bytes);
/** Destroy state. You may release previously binding resource in this method. */
default void destroyState() {}
;
}
接口说明:
接口定义 | 描述 | 是否必须 |
---|---|---|
void reset() | 将State 对象重置为初始的状态,您需要像编写构造函数一样,在该方法内填入State 类中各个字段的初始值。 | 是 |
byte[] serialize() | 将State 序列化为二进制数据。该方法用于 IoTDB 内部的State 对象传递,注意序列化的顺序必须和下面的反序列化方法一致。 | 是 |
void deserialize(byte[] bytes) | 将二进制数据反序列化为State 。该方法用于 IoTDB 内部的State 对象传递,注意反序列化的顺序必须和上面的序列化方法一致。 | 是 |
void destroyState() | 进行资源释放等的操作。此方法由框架调用。对于一个实例而言,生命周期中会且只会被调用一次。 | 否 |
3.3.2 AggregateFunction 类
编写一个 UDAF 需要实现 org.apache.iotdb.udf.api.relational.AggregateFunction
接口。
public interface AggregateFunction extends SQLFunction {
/**
* In this method, the user need to do the following things:
*
* <ul>
* <li>Validate {@linkplain FunctionArguments}. Throw {@link UDFArgumentNotValidException} if
* any parameter is not valid.
* <li>Use {@linkplain FunctionArguments} to get input data types and infer output data type.
* <li>Construct and return a {@linkplain AggregateFunctionAnalysis} object.
* </ul>
*
* @param arguments arguments used to validate
* @throws UDFArgumentNotValidException if any parameter is not valid
* @return the analysis result of the scalar function
*/
AggregateFunctionAnalysis analyze(FunctionArguments arguments)
throws UDFArgumentNotValidException;
/**
* This method is called after the AggregateFunction is instantiated and before the beginning of
* the transformation process. This method is mainly used to initialize the resources used in
* AggregateFunction.
*
* @param arguments used to parse the input arguments entered by the user
* @throws UDFException the user can throw errors if necessary
*/
default void beforeStart(FunctionArguments arguments) throws UDFException {
// do nothing
}
/** Create and initialize state. You may bind some resource in this method. */
State createState();
/**
* Update state with data columns.
*
* @param state state to be updated
* @param input original input data row
*/
void addInput(State state, Record input);
/**
* Merge two state in execution engine.
*
* @param state current state
* @param rhs right-hand-side state to be merged
*/
void combineState(State state, State rhs);
/**
* Calculate output value from final state
*
* @param state final state
* @param resultValue used to collect output data points
*/
void outputFinal(State state, ResultValue resultValue);
/**
* Remove input data from state. This method is used to remove the data points that have been
* added to the state. Once it is implemented, {@linkplain
* AggregateFunctionAnalysis.Builder#removable(boolean)} should be set to true.
*
* @param state state to be updated
* @param input row to be removed
*/
default void remove(State state, Record input) {
throw new UnsupportedOperationException();
}
/** This method is mainly used to release the resources used in the SQLFunction. */
default void beforeDestroy() {
// do nothing
}
}
接口说明:
接口定义 | 描述 | 是否必须 |
---|---|---|
AggregateFunctionAnalysis analyze(FunctionArguments arguments); | 1. 校验FunctionArguments 中的输入列数、数据类型、系统参数等是否合法,不合法则抛出异常。2. 根据 FunctionArguments 构造AggregateFunctionAnalysis ,包括输出类型、removable 等信息。 | 是 |
void beforeStart(FunctionArguments arguments); | 在 UDAF 处理输入数据前,调用用户自定义的初始化行为 | 否 |
State createState(); | 创建State 对象,一般只需要调用默认构造函数,然后按需修改默认的初始值即可。 | 是 |
void addInput(State state, Record input); | 更新State 对象,将输入的一行 Record 数据添加到聚合状态中。 | 是 |
void combineState(State state, State rhs); | 将rhs 状态合并至state 状态中。在分布式场景下,同一组的数据可能分布在不同节点上,IoTDB 会为每个节点上的部分数据生成一个State 对象,然后调用该方法合并成完整的State 。 | 是 |
void outputFinal(State state, ResultValue resultValue); | 根据State 中的数据,计算出最终的聚合结果。注意根据聚合的语义,每一组只能输出一个值。 | 是 |
void remove(State state, Record input); | 更新State 对象,将输入的一行 Record 数据从聚合状态中剔除。实现该方法需要设置 AggregateFunctionAnalysis 中的 removable 字段为 true。 | 否 |
void beforeDestroy(); | UDSF 的结束方法,您可以在此方法中进行一些资源释放等的操作。此方法由框架调用。对于一个实例而言,生命周期中会且只会被调用一次,即在处理完最后一条记录之后被调用。 | 否 |
目前 AggregateFunctionAnalysis 中的字段:
字段类型 | 字段名称 | 默认值 |
---|---|---|
Type | outputDataType | 无 |
boolean | removable | false |
示例:UDAF 的实现示例,计算不为 NULL 的行数。
3.4 完整Maven项目示例
如果使用 Maven,可以参考示例项目udf-example。