跳至主要內容

数据同步

...大约 13 分钟

数据同步

数据同步是工业物联网的典型需求,通过数据同步机制,可实现 IoTDB 之间的数据共享,搭建完整的数据链路来满足内网外网数据互通、端边云同步、数据迁移、数据备份等需求。

功能介绍

同步任务概述

一个数据同步任务包含2个阶段:

  • 抽取(Source)阶段:该部分用于从源 IoTDB 抽取数据,在 SQL 语句中的 source 部分定义
  • 发送(Sink)阶段:该部分用于向目标 IoTDB 发送数据,在 SQL 语句中的 sink 部分定义

通过 SQL 语句声明式地配置2个部分的具体内容,可实现灵活的数据同步能力。

同步任务 - 创建

使用 CREATE PIPE 语句来创建一条数据同步任务,下列属性中PipeIdsink为必填项,sourceprocessor为选填项,输入SQL时注意 SOURCE SINK 插件顺序不能替换。

SQL 示例如下:

CREATE PIPE <PipeId> -- PipeId 是能够唯一标定任务任务的名字
-- 数据抽取插件,必填插件
WITH SOURCE (
  [<parameter> = <value>,],
-- 数据连接插件,必填插件
WITH SINK (
  [<parameter> = <value>,],
)

📌 注:使用数据同步功能,请保证接收端开启自动创建元数据

同步任务 - 管理

数据同步任务有三种状态:RUNNING、STOPPED和DROPPED。任务状态转换如下图所示:

状态迁移图
状态迁移图

一个数据同步任务在生命周期中会经过多种状态:

  • RUNNING: 运行状态。
    • 说明1:任务的初始状态为运行状态(V1.3.1 及以上)
  • STOPPED: 停止状态。
    • 说明1:任务的初始状态为停止状态(V1.3.0),需要使用SQL语句启动任务
    • 说明2:用户也可以使用SQL语句手动将一个处于运行状态的任务停止,此时状态会从 RUNNING 变为 STOPPED
    • 说明3:当一个任务出现无法恢复的错误时,其状态会自动从 RUNNING 变为 STOPPED
  • DROPPED:删除状态。

我们提供以下SQL语句对同步任务进行状态管理。

启动任务

创建之后,任务不会立即被处理,需要启动任务。使用START PIPE语句来启动任务,从而开始处理数据:

START PIPE<PipeId>

停止任务

停止处理数据:

STOP PIPE <PipeId>

删除任务

删除指定任务:

DROP PIPE <PipeId>

删除任务不需要先停止同步任务。

查看任务

查看全部任务:

SHOW PIPES

查看指定任务:

SHOW PIPE <PipeId>

插件

为了使得整体架构更加灵活以匹配不同的同步场景需求,在上述同步任务框架中 IoTDB 支持进行插件组装。系统为您预置了一些常用插件可直接使用,同时您也可以自定义 Sink 插件,并加载至 IoTDB 系统进行使用。

模块插件预置插件自定义插件
抽取(Source)Source 插件iotdb-source不支持
发送(Sink)Sink 插件iotdb-thrift-sink、iotdb-air-gap-sink支持

预置插件

预置插件如下:

插件名称类型介绍适用版本
iotdb-sourcesource 插件默认的 source 插件,用于抽取 IoTDB 历史或实时数据1.2.x
iotdb-thrift-sinksink 插件用于 IoTDB(v1.2.0及以上)与 IoTDB(v1.2.0及以上)之间的数据传输。使用 Thrift RPC 框架传输数据,多线程 async non-blocking IO 模型,传输性能高,尤其适用于目标端为分布式时的场景1.2.x
iotdb-air-gap-sinksink 插件用于 IoTDB(v1.2.2+)向 IoTDB(v1.2.2+)跨单向数据网闸的数据同步。支持的网闸型号包括南瑞 Syskeeper 2000 等1.2.2 及以上
iotdb-thrift-ssl-sinksink plugin用于 IoTDB(v1.3.1及以上)与 IoTDB(v1.2.0及以上)之间的数据传输。使用 Thrift RPC 框架传输数据,单线程 sync blocking IO 模型,适用于安全需求较高的场景1.3.1 及以上

每个插件的详细参数可参考本文参数说明章节。

查看插件

查看系统中的插件(含自定义与内置插件)可以用以下语句:

SHOW PIPEPLUGINS

返回结果如下:

IoTDB> show pipeplugins
+------------------------------+----------+---------------------------------------------------------------------------------+---------+
|                    PluginName|PluginType|                                                                        ClassName|PluginJar|
+------------------------------+--------------------------------------------------------------------------------------------+---------+
|          DO-NOTHING-PROCESSOR|   Builtin|        org.apache.iotdb.commons.pipe.plugin.builtin.processor.DoNothingProcessor|         |
|               DO-NOTHING-SINK|   Builtin|                  org.apache.iotdb.commons.pipe.plugin.builtin.sink.DoNothingSink|         |
|            IOTDB-AIR-GAP-SINK|   Builtin|                org.apache.iotdb.commons.pipe.plugin.builtin.sink.IoTDBAirGapSink|         |
|                  IOTDB-SOURCE|   Builtin|                  org.apache.iotdb.commons.pipe.plugin.builtin.source.IoTDBSOURCE|         |
|             IOTDB-THRIFT-SINK|   Builtin|                org.apache.iotdb.commons.pipe.plugin.builtin.sink.IoTDBThriftSink|         |
|IOTDB-THRIFT-SSL-SINK(V1.3.1+)|   Builtin|org.apache.iotdb.commons.pipe.plugin.builtin.sink.iotdb.thrift.IoTDBThriftSslSink|         |
+------------------------------+----------+---------------------------------------------------------------------------------+---------+

使用示例

全量数据同步

本例子用来演示将一个 IoTDB 的所有数据同步至另一个 IoTDB,数据链路如下图所示:

在这个例子中,我们可以创建一个名为 A2B 的同步任务,用来同步 A IoTDB 到 B IoTDB 间的全量数据,这里需要用到用到 sink 的 iotdb-thrift-sink 插件(内置插件),需指定接收端地址,这个例子中指定了'sink.ip'和'sink.port',也可指定'sink.node-urls',如下面的示例语句:

create pipe A2B
with sink (
  'sink'='iotdb-thrift-sink',
  'sink.ip'='127.0.0.1',
  'sink.port'='6668'
)

历史数据同步

本例子用来演示同步某个历史时间范围( 2023 年 8 月 23 日 8 点到 2023 年 10 月 23 日 8 点)的数据至另一个 IoTDB,数据链路如下图所示:

在这个例子中,我们可以创建一个名为 A2B 的同步任务。首先我们需要在 source 中定义传输数据的范围,由于传输的是历史数据(历史数据是指同步任务创建之前存在的数据),所以需要将 source.realtime.enable 参数配置为 false;同时需要配置数据的起止时间 start-time 和 end-time 以及传输的模式 mode,此处推荐 mode 设置为 hybrid 模式(hybrid 模式为混合传输,在无数据积压时采用实时传输方式,有数据积压时采用批量传输方式,并根据系统内部情况自动切换)。

详细语句如下:

create pipe A2B
WITH SOURCE (
  'source'= 'iotdb-source',
  'source.start-time' = '2023.08.23T08:00:00+00:00',
  'source.end-time' = '2023.10.23T08:00:00+00:00'
) 
with SINK (
  'sink'='iotdb-thrift-async-sink',
  'sink.node-urls'='xxxx:6668',
  'sink.batch.enable'='false'
)

双向数据传输

本例子用来演示两个 IoTDB 之间互为双活的场景,数据链路如下图所示:

在这个例子中,为了避免数据无限循环,需要将 A 和 B 上的参数source.forwarding-pipe-requests 均设置为 false,表示不转发从另一pipe传输而来的数据。

详细语句如下:

在 A IoTDB 上执行下列语句:

create pipe AB
with source (
  'source.forwarding-pipe-requests' = 'false'
)
with sink (
  'sink'='iotdb-thrift-sink',
  'sink.ip'='127.0.0.1',
  'sink.port'='6668'
)

在 B IoTDB 上执行下列语句:

create pipe BA
with source (
  'source.forwarding-pipe-requests' = 'false'
)
with sink (
  'sink'='iotdb-thrift-sink',
  'sink.ip'='127.0.0.1',
  'sink.port'='6667'
)

级联数据传输

本例子用来演示多个 IoTDB 之间级联传输数据的场景,数据由 A 集群同步至 B 集群,再同步至 C 集群,数据链路如下图所示:

在这个例子中,为了将 A 集群的数据同步至 C,在 BC 之间的 pipe 需要将 source.forwarding-pipe-requests 配置为true,详细语句如下:

在 A IoTDB 上执行下列语句,将 A 中数据同步至 B:

create pipe AB
with sink (
  'sink'='iotdb-thrift-sink',
  'sink.ip'='127.0.0.1',
  'sink.port'='6668'
)

在 B IoTDB 上执行下列语句,将 B 中数据同步至 C:

create pipe BC
with source (
  'source.forwarding-pipe-requests' = 'true'
)
with sink (
  'sink'='iotdb-thrift-sink',
  'sink.ip'='127.0.0.1',
  'sink.port'='6669'
)

跨网闸数据传输

本例子用来演示将一个 IoTDB 的数据,经过单向网闸,同步至另一个 IoTDB 的场景,数据链路如下图所示:

在这个例子中,需要使用 sink 任务中的 iotdb-air-gap-sink 插件(目前支持部分型号网闸,具体型号请联系天谋科技工作人员确认),配置网闸后,在 A IoTDB 上执行下列语句,其中 ip 和 port 填写网闸配置的虚拟 ip 和相关 port,详细语句如下:

create pipe A2B
with sink (
  'sink'='iotdb-air-gap-sink',
  'sink.ip'='10.53.53.53',
  'sink.port'='9780'
)

SSL协议数据传输

本例子演示了使用 SSL 协议配置 IoTDB 单向数据同步的场景,数据链路如下图所示:

在该场景下,需要使用 IoTDB 的 iotdb-thrift-ssl-sink 插件。我们可以创建一个名为 A2B 的同步任务,并配置自身证书的密码和地址,详细语句如下:

create pipe A2B
with sink (
  'sink'='iotdb-thrift-ssl-sink',
  'sink.ip'='127.0.0.1',
  'sink.port'='6669',
  'ssl.trust-store-path'='pki/trusted'
  'ssl.trust-store-pwd'='root'
)

参考:注意事项

可通过修改 IoTDB 配置文件(iotdb-common.properties)以调整数据同步的参数,如同步数据存储目录等。完整配置如下:

V1.3.0+:

####################
### Pipe Configuration
####################

# Uncomment the following field to configure the pipe lib directory.
# For Windows platform
# If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is
# absolute. Otherwise, it is relative.
# pipe_lib_dir=ext\\pipe
# For Linux platform
# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
# pipe_lib_dir=ext/pipe

# The maximum number of threads that can be used to execute the pipe subtasks in PipeSubtaskExecutor.
# The actual value will be min(pipe_subtask_executor_max_thread_num, max(1, CPU core number / 2)).
# pipe_subtask_executor_max_thread_num=5

# The connection timeout (in milliseconds) for the thrift client.
# pipe_connector_timeout_ms=900000

# The maximum number of selectors that can be used in the async connector.
# pipe_async_connector_selector_number=1

# The core number of clients that can be used in the async connector.
# pipe_async_connector_core_client_number=8

# The maximum number of clients that can be used in the async connector.
# pipe_async_connector_max_client_number=16

# Whether to enable receiving pipe data through air gap.
# The receiver can only return 0 or 1 in tcp mode to indicate whether the data is received successfully.
# pipe_air_gap_receiver_enabled=false

# The port for the server to receive pipe data through air gap.
# pipe_air_gap_receiver_port=9780

V1.3.1+:

# Uncomment the following field to configure the pipe lib directory.
# For Windows platform
# If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is
# absolute. Otherwise, it is relative.
# pipe_lib_dir=ext\\pipe
# For Linux platform
# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
# pipe_lib_dir=ext/pipe

# The maximum number of threads that can be used to execute the pipe subtasks in PipeSubtaskExecutor.
# The actual value will be min(pipe_subtask_executor_max_thread_num, max(1, CPU core number / 2)).
# pipe_subtask_executor_max_thread_num=5

# The connection timeout (in milliseconds) for the thrift client.
# pipe_sink_timeout_ms=900000

# The maximum number of selectors that can be used in the sink.
# Recommend to set this value to less than or equal to pipe_sink_max_client_number.
# pipe_sink_selector_number=4

# The maximum number of clients that can be used in the sink.
# pipe_sink_max_client_number=16

# Whether to enable receiving pipe data through air gap.
# The receiver can only return 0 or 1 in tcp mode to indicate whether the data is received successfully.
# pipe_air_gap_receiver_enabled=false

# The port for the server to receive pipe data through air gap.
# pipe_air_gap_receiver_port=9780

参考:参数说明

📌 说明:在 1.3.1 及以上的版本中,除 source、processor、sink 本身外,各项参数不再需要额外增加 source、processor、sink 前缀。例如:

create pipe A2B
with sink (
  'sink'='iotdb-air-gap-sink',
  'sink.ip'='10.53.53.53',
  'sink.port'='9780'
)

可以写作

create pipe A2B
with sink (
  'sink'='iotdb-air-gap-sink',
  'ip'='10.53.53.53',
  'port'='9780'
)

source 参数

keyvaluevalue 取值范围是否必填默认取值
sourceiotdb-sourceString: iotdb-source必填-
source.pattern用于筛选时间序列的路径前缀String: 任意的时间序列前缀选填root
source.history.start-time同步历史数据的开始 event time,包含 start-timeLong: [Long.MIN_VALUE, Long.MAX_VALUE]选填Long.MIN_VALUE
source.history.end-time同步历史数据的结束 event time,包含 end-timeLong: [Long.MIN_VALUE, Long.MAX_VALUE]选填Long.MAX_VALUE
start-time(V1.3.1+)同步所有数据的开始 event time,包含 start-timeLong: [Long.MIN_VALUE, Long.MAX_VALUE]选填Long.MIN_VALUE
end-time(V1.3.1+)同步所有数据的结束 event time,包含 end-timeLong: [Long.MIN_VALUE, Long.MAX_VALUE]选填Long.MAX_VALUE
source.realtime.mode实时数据的抽取模式String: hybrid, stream, batch选填hybrid
source.forwarding-pipe-requests是否转发由其他 Pipe (通常是数据同步)写入的数据Boolean: true, false选填true

💎 说明:历史数据与实时数据的差异

  • 历史数据:所有 arrival time < 创建 pipe 时当前系统时间的数据称为历史数据
  • 实时数据:所有 arrival time >= 创建 pipe 时当前系统时间的数据称为实时数据
  • 全量数据: 全量数据 = 历史数据 + 实时数据

💎 ​说明:数据抽取模式hybrid, stream和batch的差异

  • hybrid(推荐):该模式下,任务将优先对数据进行实时处理、发送,当数据产生积压时自动切换至批量发送模式,其特点是平衡了数据同步的时效性和吞吐量
  • stream:该模式下,任务将对数据进行实时处理、发送,其特点是高时效、低吞吐
  • batch:该模式下,任务将对数据进行批量(按底层数据文件)处理、发送,其特点是低时效、高吞吐

sink 参数

iotdb-thrift-sink

keyvaluevalue 取值范围是否必填默认取值
sinkiotdb-thrift-sink 或 iotdb-thrift-async-sinkString: iotdb-thrift-sink 或 iotdb-thrift-async-sink必填
sink.ip目标端 IoTDB 其中一个 DataNode 节点的数据服务 ip(请注意同步任务不支持向自身服务进行转发)String选填与 sink.node-urls 任选其一填写
sink.port目标端 IoTDB 其中一个 DataNode 节点的数据服务 port(请注意同步任务不支持向自身服务进行转发)Integer选填与 sink.node-urls 任选其一填写
sink.node-urls目标端 IoTDB 任意多个 DataNode 节点的数据服务端口的 url(请注意同步任务不支持向自身服务进行转发)String。例:'127.0.0.1:6667,127.0.0.1:6668,127.0.0.1:6669', '127.0.0.1:6667'选填与 sink.ip:sink.port 任选其一填写
sink.batch.enable是否开启日志攒批发送模式,用于提高传输吞吐,降低 IOPSBoolean: true, false选填true
sink.batch.max-delay-seconds在开启日志攒批发送模式时生效,表示一批数据在发送前的最长等待时间(单位:s)Integer选填1
sink.batch.size-bytes在开启日志攒批发送模式时生效,表示一批数据最大的攒批大小(单位:byte)Long选填

iotdb-air-gap-sink

keyvaluevalue 取值范围是否必填默认取值
sinkiotdb-air-gap-sinkString: iotdb-air-gap-sink必填
sink.ip目标端 IoTDB 其中一个 DataNode 节点的数据服务 ipString选填与 sink.node-urls 任选其一填写
sink.port目标端 IoTDB 其中一个 DataNode 节点的数据服务 portInteger选填与 sink.node-urls 任选其一填写
sink.node-urls目标端 IoTDB 任意多个 DataNode 节点的数据服务端口的 urlString。例:'127.0.0.1:6667,127.0.0.1:6668,127.0.0.1:6669', '127.0.0.1:6667'选填与 sink.ip:sink.port 任选其一填写
sink.air-gap.handshake-timeout-ms发送端与接收端在首次尝试建立连接时握手请求的超时时长,单位:毫秒Integer选填5000

iotdb-thrift-ssl-sink(V1.3.1+)

keyvaluevalue rangerequired or notdefault value
sinkiotdb-thrift-ssl-sinkString: iotdb-thrift-ssl-sink必填
sink.ip目标端 IoTDB 其中一个 DataNode 节点的数据服务 ip(请注意同步任务不支持向自身服务进行转发)String选填与 sink.node-urls 任选其一填写
sink.port目标端 IoTDB 其中一个 DataNode 节点的数据服务 port(请注意同步任务不支持向自身服务进行转发)Integer选填与 sink.node-urls 任选其一填写
sink.node-urls目标端 IoTDB 任意多个 DataNode 节点的数据服务端口的 url(请注意同步任务不支持向自身服务进行转发)String。例:'127.0.0.1:6667,127.0.0.1:6668,127.0.0.1:6669', '127.0.0.1:6667'选填与 sink.ip:sink.port 任选其一填写
sink.batch.enable是否开启日志攒批发送模式,用于提高传输吞吐,降低 IOPSBoolean: true, false选填true
sink.batch.max-delay-seconds在开启日志攒批发送模式时生效,表示一批数据在发送前的最长等待时间(单位:s)Integer选填1
sink.batch.size-bytes在开启日志攒批发送模式时生效,表示一批数据最大的攒批大小(单位:byte)Long选填
ssl.trust-store-path连接目标端 DataNode 所需的 trust store 证书路径String.Example: '127.0.0.1:6667,127.0.0.1:6668,127.0.0.1:6669', '127.0.0.1:6667'OptionalFill in either sink.ip:sink.port
ssl.trust-store-pwd连接目标端 DataNode 所需的 trust store 证书密码IntegerOptional5000
上次编辑于:

Copyright © 2024 Timecho Limited All rights reserved.