-
Notifications
You must be signed in to change notification settings - Fork 13
etl engine使用样例
命令行格式:
etl-engine -fileUrl <配置文件位置> -logLevel <日志级别>
./etl-engine -fileUrl ./db_to_db.grf -logLevel info
- 常用数据源连接
- 读数据表 -> 写数据表
- 读数据表 -> 写excel文件
- HTTP_INPUT_SERVICE -> 写数据表
- Kafka -> 写数据表
- 增量对比 -> 写数据表
- Hive -> 写数据表
- 融合查询 -> 写数据表
- CDC模式 mysql -> mysql
- CDC模式 postgre -> postgre
- 读写Hbase数据表
- 读redis -> 写redis
- redis与mysql相互同步
//--- mysql 表 ---//
CREATE TABLE `t_s_1` (
`f1` INT(11) NOT NULL,
`f2` VARCHAR(50) DEFAULT NULL,
`f3` FLOAT DEFAULT NULL,
`f4` VARCHAR(50) DEFAULT NULL
)
CREATE TABLE `t_t_1` (
`c1` INT(50) NOT NULL,
`c2` VARCHAR(50) DEFAULT NULL,
`c3` FLOAT DEFAULT NULL,
`c4` VARCHAR(50) DEFAULT NULL
)
CREATE TABLE `t_device_info1` (
`device` INT(11) DEFAULT NULL,
`value` FLOAT DEFAULT NULL,
`time` VARCHAR(100) DEFAULT NULL,
`writetime` VARCHAR(100) DEFAULT NULL
)
CREATE TABLE `t_hive_u_info` (
`uuid` INT(10) NOT NULL,
`name` VARCHAR(30) DEFAULT NULL,
`height` DECIMAL(10,2) DEFAULT NULL,
`writetime` VARCHAR(20) DEFAULT NULL,
PRIMARY KEY (`uuid`)
)
CREATE TABLE `t_user_info` (
`u_id` int(11) NOT NULL,
`u_name` varchar(50) NOT NULL ,
`u_phone` varchar(20) DEFAULT NULL,
`u_height` float DEFAULT NULL ',
`updated_time` varchar(50) ,
`created_time` varchar(50) ,
`deleted_time` varchar(50)
)
CREATE TABLE `t_user_info_bk` (
`u_id` int(11) NOT NULL,
`u_name` varchar(50) NOT NULL ,
`u_phone` varchar(20) DEFAULT NULL,
`u_height` float DEFAULT NULL ',
`updated_time` varchar(50) ,
`created_time` varchar(50) ,
`deleted_time` varchar(50)
)
CREATE TABLE t_redis_info (
id VARCHAR(32) NOT NULL,
caption VARCHAR(50),
tag VARCHAR(50),
memo VARCHAR(100),
writetime VARCHAR(19),
PRIMARY KEY (id)
)
//--- 融合查询样表 --- //
CREATE TABLE t_order_info (
o_id VARCHAR(32) NOT NULL,
u_id VARCHAR(32),
p_id VARCHAR(32),
o_price FLOAT,
o_money FLOAT,
o_number INT,
o_writetime VARCHAR(19),
PRIMARY KEY (o_id)
);
CREATE TABLE t_user_info (
u_id VARCHAR(32) NOT NULL,
u_name VARCHAR(32),
u_phone VARCHAR(20),
PRIMARY KEY (u_id)
);
CREATE TABLE t_product_info (
p_id VARCHAR(32) NOT NULL,
p_name VARCHAR(32),
p_contacts VARCHAR(50),
p_desc VARCHAR(100),
PRIMARY KEY (p_id)
);
CREATE TABLE t_u_o_p_info (
u_id VARCHAR(32) ,
u_name VARCHAR(32),
u_phone VARCHAR(20),
o_id VARCHAR(32) ,
o_price FLOAT,
o_number INT,
o_money FLOAT,
o_writetime VARCHAR(19),
p_id VARCHAR(32) ,
p_name VARCHAR(32) ,
p_contacts VARCHAR(50) ,
p_desc VARCHAR(100)
);
//--- oracle 表 ---//
CREATE TABLE "HW_U1"."T_T_1"
(
"C1" NUMBER(*,0),
"C2" VARCHAR2(100),
"C3" FLOAT(126),
"C4" VARCHAR2(100),
CONSTRAINT "T_T_1_PK" PRIMARY KEY ("C1")
)
//--- hive 表 ---//
create table t_u_info(
uuid string,
name string,
height float,
writetime string
)
row format delimited fields terminated by '\t';
//--- hbase 表 ---//
create 't_user_info','u_id','u_name','u_phone','u_height','updated_time','created_time','deleted_time'
<Connection id="CONNECT_01" dbURL="http://127.0.0.1:58086" database="db1" username="user1" password="******" token=" " org="hw" type="INFLUXDB_V1"/>
<Connection id="CONNECT_02" dbURL="127.0.0.1:19000" database="db1" username="user1" password="******" batchSize="1000" type="CLICKHOUSE"/>
<Connection id="CONNECT_03" dbURL="127.0.0.1:3306" database="db1" username="user1" password="******" batchSize="1000" type="MYSQL"/>
<Connection id="CONNECT_04" database="d:/sqlite_db1.db" batchSize="10000" type="SQLITE"/>
<Connection id="CONNECT_05" dbURL="127.0.0.1:10000" database="db1" username="root" password="b" batchSize="1000" type="HIVE"/>
<Connection id="CONNECT_06" dbURL="127.0.0.1:5432" database="db_1" username="u1" password="******" batchSize="1000" type="POSTGRES" />
<Connection id="CONNECT_07" dbURL="http://127.0.0.1:9200" database="db1" username="elastic" password="******" batchSize="1000" type="ELASTIC"/>
<Connection id="CONNECT_08" dbURL="127.0.0.1:1521" database="orcl" username="c##u1" password="******" batchSize="1000" type="ORACLE" />
<Connection id="CONNECT_09" dbURL="127.0.0.1:9090" database="" username="" password="" batchSize="1000" type="HBASE"/>
读源表(mysql)写目标表(oracle)
命令行执行:
.\etl_engine.exe -fileUrl e:\example\db_to_db.grf
配置文件db_to_db.grf内容
<?xml version="1.0" encoding="UTF-8"?>
<Graph runMode="2" desc="db to db">
<Node id="DB_INPUT_01" dbConnection="CONNECT_01" type="DB_INPUT_TABLE" desc="读mysql节点1" fetchSize="500">
<Script name="sqlScript"><![CDATA[
select * from (select f1,f2,f3,f4 from db1.t_s_1 limit 1000) t1
]]></Script>
</Node>
<Node id="DB_OUTPUT_01" type="DB_OUTPUT_TABLE" desc="写oracle节点1" dbConnection="CONNECT_02" outputFields="f1;f2;f3;f4" renameOutputFields="C1;C2;C3;C4" batchSize="1000" >
<Script name="sqlScript"><![CDATA[
insert into HW_U1.T_T_1 (C1,C2,C3,C4) values (:1,:2,:3,:4)
]]></Script>
</Node>
<!--
<Node id="DB_OUTPUT_01" type="DB_OUTPUT_TABLE" desc="写mysql节点1" dbConnection="CONNECT_02" outputFields="f1;f2;f3;f4" renameOutputFields="C1;C2;C3;C4" batchSize="1000" >
<Script name="sqlScript"><![CDATA[
insert into db1.T_T_1 (C1,C2,C3,C4) values (?,?,?,?)
]]></Script>
</Node>
-->
<Line id="LINE_01" type="STANDARD" from="DB_INPUT_01" to="DB_OUTPUT_01" order="0" metadata="METADATA_01"></Line>
<Metadata id="METADATA_01">
<Field name="C1" type="int" default="0" nullable="false"/>
<Field name="C2" type="string" default="" nullable="false"/>
<Field name="C3" type="float" default="0" nullable="false"/>
<Field name="C4" type="string" default="" nullable="false"/>
</Metadata>
<Connection id="CONNECT_01" dbURL="127.0.0.1:3306" database="db1" username="root" password="******" batchSize="1000" type="MYSQL"/>
<Connection id="CONNECT_02" dbURL="127.0.0.1:1521" database="orcl" username="HW_U1" password="******" batchSize="1000" type="ORACLE"/>
</Graph>
通过命令行向配置文件传递外部变量,实现动态替换配置文件内容的能力
命令行执行:
.\etl_engine.exe -fileUrl e:\example\db_to_excel.grf arg2=1000 arg1=e:\tmp\t_u_info.xlsx
配置文件db_to_excel.grf 内容
<?xml version="1.0" encoding="UTF-8"?>
<Graph runMode="2">
<Node id="DB_INPUT_01" dbConnection="CONNECT_01" type="DB_INPUT_TABLE" desc="节点1" fetchSize="500">
<Script name="sqlScript"><![CDATA[
SELECT * FROM ( SELECT f1,f2,f3,f4 FROM t_s_1 LIMIT ${arg2}) t
]]></Script>
</Node>
<Node id="XLS_WRITER_01" type="XLS_WRITER" desc="输出节点2"
appendRow="true"
fileURL="${arg1}"
metadataRow="2"
startRow="3"
sheetName="人员信息"
outputFields="f1;f2;f3;f4"
renameOutputFields="序号=B;姓名=C;身高=D;时间=E" >
</Node>
<Line id="LINE_01" type="STANDARD" from="DB_INPUT_01" to="XLS_WRITER_01" order="0" metadata="METADATA_01"></Line>
<Metadata id="METADATA_01">
<Field name="序号" type="int" default="0" nullable="false"/>
<Field name="姓名" type="string" default="" nullable="false"/>
<Field name="身高" type="float" default="0" nullable="false"/>
<Field name="时间" type="string" default="" nullable="false"/>
</Metadata>
<Connection id="CONNECT_01" dbURL="127.0.0.1:3306" database="db1" username="root" password="******" batchSize="10000" type="MYSQL"/>
</Graph>
etl-engine运行后(阻塞模式),根据HTTP_INPUT_SERVICE的配置生成一个监听8081端口的阻塞式HTTP服务,
服务将接收的数据写到数据表
命令行执行:
./etl-engine -fileUrl ./http_to_db.grf
配置文件http_to_db.grf 内容
<?xml version="1.0" encoding="UTF-8"?>
<Graph fileName="" desc="" runMode="2">
<Node id="HTTP_INPUT_SERVICE_01" type="HTTP_INPUT_SERVICE"
servicePort="8081" serviceName="etlEngineService" />
<Node id="DB_OUTPUT_TABLE_01" type="DB_OUTPUT_TABLE" desc="写mysql节点1"
dbConnection="CONNECT_01"
outputFields="device;time;value;writetime"
renameOutputFields="device;time;value;writetime"
batchSize="1000" >
<Script name="sqlScript"><![CDATA[
insert into t_device_info1 (device,time,value,writetime) values (?,?,?,?)
]]></Script>
<BeforeOut>
<![CDATA[
package ext
import (
"errors"
"fmt"
"strconv"
"time"
"github.com/tidwall/gjson"
"github.com/tidwall/sjson"
"etl-engine/etl/tool/extlibs/common"
)
func RunScript(dataValue string) (result string, topErr error) {
newRows := ""
rows := gjson.Get(dataValue, "rows")
for _, row := range rows.Array() {
tmpRow2 :=""
device := gjson.Get(row.String(),"device").String()
value := gjson.Get(row.String(),"value").String()
time := gjson.Get(row.String(),"time").String()
writetime:= common.GetSystemTimeFormatExt("YYYY-MM-DD hh:mm:ss")//调用系统函数
tmpRow2, _ = sjson.Set(tmpRow2, "device", device) //解析device内容并增加字段
tmpRow2, _ = sjson.Set(tmpRow2, "value", value) //解析value内容并增加字段
tmpRow2, _ = sjson.Set(tmpRow2, "time", time) //解析time内容并增加字段
tmpRow2, _ = sjson.Set(tmpRow2, "writetime", writetime) //调用系统函数增加字段
newRows, _ = sjson.SetRaw(newRows, "rows.-1", tmpRow2)
}
fmt.Println(newRows)
return newRows, nil
}
]]>
</BeforeOut>
</Node>
<Line from="HTTP_INPUT_SERVICE_01" to="DB_OUTPUT_TABLE_01" type="STANDARD" order="0" metadata="METADATA_01" id="LINE_STANDARD_01"/>
<Metadata id="METADATA_01">
<Field name="device" type="int" default="0" nullable="false"/>
<Field name="value" type="float" default="0" nullable="false"/>
<Field name="time" type="string" default="0" nullable="false"/>
<Field name="writetime" type="string" default="0" nullable="false"/>
</Metadata>
<Connection sortId="1" id="CONNECT_01" type="MYSQL" dbURL="127.0.0.1:3306" database="db1" username="root" password="******" token="" org=""/>
</Graph>
PostMan工具测试
选择 POST 方式
URL: http://127.0.0.1:8081/etlEngineService
Headers选项:
KEY录入 Content-Type
VALUE录入 x-www-form-urlencoded
Body选项:
KEY录入 jsondata
VALUE录入
"rows":[
{"device":"1","value":"20.9","time":"19:00:01","writetime":"2022-02-01 19:00:01"},
{"device":"2","value":"19.1","time":"19:00:02","writetime":"2022-02-01 19:00:02"}
]
注意:必须传递KEY为rows的数组结构
Authorization选项:
TYPE选择Basic Auth
Username和Password根据实际情况录入
etl-engine运行后(阻塞模式),根据MQ_CONSUMER的配置生成一个消费者服务,消费者将消费的数据写到数据表
命令行执行:
./etl-engine -fileUrl ./kafkamq_to_db.grf
配置文件kafkamq_to_db.grf 内容
<?xml version="1.0" encoding="UTF-8"?>
<Graph>
<Node id="MQ_CONSUMER_01" type="MQ_CONSUMER" flag="KAFKA"
nameServer="127.0.0.1:18081"
group="group_10"
topic="out_event_user_info"
listenerFlag="2">
</Node>
<Node id="DB_OUTPUT_01" type="DB_OUTPUT_TABLE" desc="写mysql节点1"
dbConnection="CONNECT_01"
outputFields="device;time;value;writetime"
renameOutputFields="device;time;value;writetime"
batchSize="1000" >
<Script name="sqlScript"><![CDATA[
insert into t_device_info1 (device,time,value,writetime) values (?,?,?,?)
]]></Script>
<BeforeOut>
<![CDATA[
package ext
import (
"errors"
"fmt"
"strconv"
"time"
"github.com/tidwall/gjson"
"github.com/tidwall/sjson"
"etl-engine/etl/tool/extlibs/common"
)
func RunScript(dataValue string) (result string, topErr error) {
newRows := ""
rows := gjson.Get(dataValue, "rows")
for _, row := range rows.Array() {
tmpStr := gjson.Get(row.String(),"Value").String()//解析kafka消息内容
device := gjson.Get(tmpStr,"device").String()
value := gjson.Get(tmpStr,"value").String()
time := gjson.Get(tmpStr,"time").String()
writetime:= common.GetSystemTimeFormatExt("YYYY-MM-DD hh:mm:ss")//调用系统函数
tmpStr, _ = sjson.Set(row.String(), "device", device) //从消息中解析device内容并增加字段
tmpStr, _ = sjson.Set(tmpStr, "value", value) //从消息中解析value内容并增加字段
tmpStr, _ = sjson.Set(tmpStr, "time", time) //从消息中解析time内容并增加字段
tmpStr, _ = sjson.Set(tmpStr, "writetime", writetime) //调用系统函数增加字段
newRows, _ = sjson.SetRaw(newRows, "rows.-1", tmpStr)
}
fmt.Println(newRows)
return newRows, nil
}
]]>
</BeforeOut>
</Node>
<Metadata id="METADATA_02">
<Field name="device" type="int" default="0" nullable="false"/>
<Field name="value" type="float" default="0" nullable="false"/>
<Field name="time" type="string" default="0" nullable="false"/>
<Field name="writetime" type="string" default="0" nullable="false"/>
</Metadata>
<Line id="LINE_04" type="STANDARD" from="MQ_CONSUMER_01" to="DB_OUTPUT_01" order="1" metadata="METADATA_02"></Line>
<Connection id="CONNECT_01" dbURL="127.0.0.1:3306" database="db1" username="root" password="******" batchSize="10000" type="MYSQL"/>
</Graph>
Kafka生产者消息体样本
"{\"device\":\"1\",\"time\":\"20220602\",\"value\":\"001\"}"
将源表(mysql)与目标表(oracle)进行增量对比,并将差异化数据变更到目标表(oracle)
由于是在内存中对比数据,具体实施时注意对比数据量级,避免长时间等待。
场景介绍
命令行执行:
./etl-engine -fileUrl ./increment_mysql_to_oracle.grf
配置文件increment_mysql_to_oracle.grf 内容
<?xml version="1.0" encoding="UTF-8"?>
<Graph>
<Node id="INCREMENT_01" type="INCREMENT"
inputSourceConnection="CONNECT_1"
inputTargetConnection="CONNECT_2"
inputSourceSQL="select * from t_s_1 where (f1>=1 and f1<=1000) order by f1 asc "
inputTargetSQL="select * from t_t_1 where (C1>=1 and C1<=1000) order by C1 asc "
outputToCopyStream="false"
outputInsertConnection="CONNECT_2"
outputUpdateConnection="CONNECT_2"
outputDeleteConnection="CONNECT_2"
outputInsertSQL="insert into t_t_1 (C1,C2,C3,C4) values (:1,:2,:3,:4)"
outputUpdateSQL="update t_t_1 set C3=:1 ,C4=:2,C2=:3 where C1=:4"
outputDeleteSQL="delete from t_t_1 where C1=:1"
outputInsertFields="C1;C2;C3;C4"
outputUpdateFields="C3;C4;C2;C1"
outputDeleteFields="C1"
outputInsertMetadata="METADATA_2"
outputUpdateMetadata="METADATA_2"
outputDeleteMetadata="METADATA_2"
inputSourcePrimaryKey="f1"
inputSourceCompareKey="f2;f3"
inputSourceMappingKey="f4"
inputSourceFetchSize="1000"
inputTargetPrimaryKey="C1"
inputTargetCompareKey="C2;C3"
inputTargetMappingKey="C4"
mustConvertMetadata="false"
></Node>
<Node id="OUTPUT_TRASH_01" type="OUTPUT_TRASH" desc="垃圾桶节点1" > </Node>
<Line id="LINE_01" type="STANDARD" from="INCREMENT_01" to="OUTPUT_TRASH_01" order="1" metadata="METADATA_2"></Line>
<Metadata id="METADATA_1" sortId="1">
<Field name="f1" type="int" default="" nullable="true"/>
<Field name="f2" type="string" default="" nullable="true"/>
<Field name="f3" type="float" default="" nullable="true"/>
<Field name="f4" type="string" default="" nullable="true"/>
</Metadata>
<Metadata id="METADATA_2" sortId="1">
<Field name="C1" type="int" default="" nullable="true"/>
<Field name="C2" type="string" default="" nullable="true"/>
<Field name="C3" type="float" default="" nullable="true"/>
<Field name="C4" type="string" default="" nullable="true"/>
</Metadata>
<Connection id="CONNECT_1" type="MYSQL" dbURL="127.0.0.1:3306" database="db1" username="root" password="******" token="" org=""/>
<Connection id="CONNECT_2" type="ORACLE" dbURL="127.0.0.1:1521" database="orcl" username="hw_u1" password="******" token="" org=""/>
</Graph>
读Hadoop生态Hive源表写目标表
命令行执行:
./etl-engine -fileUrl ./hive_to_mysql.grf
配置文件hive_to_mysql.grf 内容
<?xml version="1.0" encoding="UTF-8"?>
<Graph>
<Node id="HIVE_READER_01" dbConnection="CONNECT_02"
type="HIVE_READER" desc="读Hive节点1" fetchSize="1000" >
<Script name="sqlScript"><![CDATA[
select uuid,name,height,writetime from db_hive_edu.t_u_info
]]></Script>
</Node>
<Node id="DB_OUTPUT_01" type="DB_OUTPUT_TABLE" desc="写数据表节点1" dbConnection="CONNECT_01" outputFields="uuid;name;height;writetime" renameOutputFields="uuid;name;height;writetime" >
<Script name="sqlScript"><![CDATA[
insert into db1.t_hive_u_info (uuid,name,height,writetime) values (?,?,?,?)
]]></Script>
</Node>
<Line id="LINE_01" type="STANDARD" from="HIVE_READER_01" to="DB_OUTPUT_01" order="0" metadata="METADATA_01"></Line>
<Metadata id="METADATA_01">
<Field name="uuid" type="int" default="-1" nullable="false"/>
<Field name="name" type="string" default="-1" nullable="false"/>
<Field name="height" type="float" default="-1" nullable="false"/>
<Field name="writetime" type="string" default="-1" nullable="false"/>
</Metadata>
<Connection id="CONNECT_02" dbURL="hadoop01:10000" database="db_hive_edu" username="Administrator" password="******" batchSize="1000" type="HIVE"/>
<Connection id="CONNECT_01" dbURL="127.0.0.1:3306" database="db1" username="root" password="******" batchSize="1000" type="MYSQL"/>
<!--
<Connection sortId="1" id="CONNECT_1" type="ORACLE" dbURL="127.0.0.1:1521" database="orcl" username="c##u1" password="******" />
-->
</Graph>
读多种类型数据库表(维表和事实表),根据各业务表id进行关联查询,最终将关联结果写入目标表(或文件)
从不同数据源读取用户表t_user_info、 产品表t_product_info 、订单表t_order_info,并在内存中融合计算查询出所有用户的订单信息
命令行执行:
./etl-engine -fileUrl ./federation_to_mysql.grf
配置文件federation_to_mysql.grf 内容
<?xml version="1.0" encoding="UTF-8"?>
<Graph desc="融合查询1">
<Node id="FEDERATION_READER_01" type="FEDERATION_READER"
factTableConnectionId="CONNECT_01"
factTableQueryFetchSize="100"
factTableQuery="select o_id,u_id,p_id,o_price,o_number,o_money,o_writetime from t_order_info ORDER BY CAST(SUBSTRING(o_id,3) AS UNSIGNED) ASC"
dimensionTableConnectionIds="CONNECT_02;CONNECT_03"
dimensionTableQuery="select u_id,u_name,u_phone from t_user_info ORDER BY CAST(SUBSTRING(u_id,3) AS UNSIGNED) ASC ;select p_id,p_name,p_contacts,p_desc from t_product_info ORDER BY CAST(SUBSTRING(p_id,3) AS UNSIGNED) ASC "
federationTableAliasName="t_o;t_u;t_p"
federationQuery="SELECT t_u.u_id,t_u.u_name,t_u.u_phone,t_o.o_id,t_o.o_price,t_o.o_number,t_o.o_money,t_o.o_writetime,t_p.p_id,t_p.p_name,t_p.p_contacts,t_p.p_desc FROM t_u inner JOIN t_o ON t_o.u_id=t_u.u_id inner JOIN t_p ON t_o.p_id=t_p.p_id ORDER BY INTEGER(SUBSTRING(t_u.u_id,3)) ASC "
></Node>
<Node id="DB_OUTPUT_01" type="DB_OUTPUT_TABLE" desc="写节点1" dbConnection="CONNECT_04" outputFields="u_id;u_name;u_phone;o_id;o_price;o_number;o_money;o_writetime;p_id;p_name;p_contacts;p_desc" renameOutputFields="u_id;u_name;u_phone;o_id;o_price;o_number;o_money;o_writetime;p_id;p_name;p_contacts;p_desc" batchSize="1000" >
<Script name="sqlScript"><![CDATA[
insert into db1.t_u_o_p_info (u_id,u_name,u_phone,o_id,o_price,o_number,o_money,o_writetime,p_id,p_name,p_contacts,p_desc) values (?,?,?,?,?,?,?,?,?,?,?,?)
]]></Script>
</Node>
<Line id="LINE_01" type="STANDARD" from="FEDERATION_READER_01" to="DB_OUTPUT_01" order="1" metadata="METADATA_1"></Line>
<Metadata id="METADATA_1" sortId="1">
<Field name="u_id" type="string" default="" nullable="true"/>
<Field name="u_name" type="string" default="" nullable="true"/>
<Field name="u_phone" type="string" default="" nullable="true"/>
<Field name="o_id" type="string" default="" nullable="true"/>
<Field name="o_price" type="float" default="0" nullable="false"/>
<Field name="o_number" type="int" default="0" nullable="false"/>
<Field name="o_money" type="float" default="0" nullable="false"/>
<Field name="o_writetime" type="string" default="" nullable="true"/>
<Field name="p_id" type="string" default="" nullable="true"/>
<Field name="p_name" type="string" default="" nullable="true"/>
<Field name="p_contacts" type="string" default="" nullable="true"/>
<Field name="p_desc" type="string" default="" nullable="true"/>
</Metadata>
<Connection id="CONNECT_01" type="MYSQL" dbURL="127.0.0.1:3306" database="db1" username="root" password="******" token="" org=""/>
<Connection id="CONNECT_02" type="MYSQL" dbURL="127.0.0.1:3307" database="db1" username="root" password="******" token="" org=""/>
<Connection id="CONNECT_03" type="MYSQL" dbURL="127.0.0.1:3308" database="db1" username="root" password="******" token="" org=""/>
<Connection id="CONNECT_04" type="MYSQL" dbURL="127.0.0.1:3309" database="db1" username="root" password="******" token="" org=""/>
</Graph>
将mysql增量数据实时同步到 mysql,oracle,postgre,elastic等数据库(注意目标库mysql,oracle,postgre数据表要事先存在)
更多配置细节请联系作者
命令行执行:
./etl-engine -fileUrl ./mysqlbinlog_to_db.grf
MySQL配置文件,配置事项
server-id = 123
log-bin = /opt/mysql/data/mysql-bin
binlog_format = ROW
配置文件mysqlbinlog_to_db.grf 内容
<?xml version="1.0" encoding="UTF-8"?>
<Graph runMode="2" desc="读mysqlbin写db">
<Node id="MYSQL_BINLOG_01" type="MYSQL_BINLOG" desc="MYSQL_BINLOG输入节点1"
masterAddress="127.0.0.1:3306"
masterUserName="root"
masterPassword="******"
masterCanalIncludeTableRegex="db1.t_order_info"
masterCanalExcludeTableRegex=""
slaveOutputConnectionId="CONNECT_02"
slaveOutputMetaDataId="METADATA_01"
outputToCopyStream="false"
>
</Node>
<Node id="OUTPUT_TRASH_01" type="OUTPUT_TRASH" desc="输出1" >
</Node>
<Line id="LINE_01" type="STANDARD" from="MYSQL_BINLOG_01" to="OUTPUT_TRASH_01" order="0" metadata="METADATA_01"></Line>
<Metadata id="METADATA_01">
<Field name="hw_u1.t_1.c1" type="int" default="0" nullable="false"/>
<Field name="hw_u1.t_1.c2" type="string" default="" nullable="false"/>
<Field name="hw_u1.t_222.c3" type="decimal" default="0" nullable="false" dataFormat="2" />
<Field name="hw_u1.t_222.writetime" type="datetime" default="" nullable="false" dataFormat="YYYY-MM-DD hh:mm:ss"/>
<Field name="hw_u1.t_order_info.o_writetime" type="datetime" default="" nullable="false" dataFormat="YYYY-MM-DD hh:mm:ss"/>
</Metadata>
<Connection id="CONNECT_01" dbURL="127.0.0.1:3306" database="db2" username="root" password="******" batchSize="10000" type="MYSQL"/>
<Connection id="CONNECT_02" type="ORACLE" dbURL="127.0.0.1:1521" database="orcl" username="hw_u1" password="******" token="" org=""/>
<Connection id="CONNECT_03" dbURL="127.0.0.1:5432" database="postgres" username="hw_u1" password="******" batchSize="1000" type="POSTGRES"/>
</Graph>
将postgre增量数据实时同步到 mysql,oracle,postgre,elastic等数据库(注意目标库mysql,oracle,postgre数据表要事先存在)
更多配置细节请联系作者
命令行执行:
./etl-engine -fileUrl ./pgwal_to_db.grf
postgresql.conf配置文件,配置事项
wal_level = logical
max_replication_slots = 20
max_wal_senders = 20
pg_hba.conf配置文件,配置事项
host all all 0.0.0.0/0 md5
监听的数据表均需要执行以下操作,如表hw_u1.t_4
ALTER TABLE hw_u1.t_4 REPLICA IDENTITY FULL ;
配置文件pgwal_to_db.grf 内容
<?xml version="1.0" encoding="UTF-8"?>
<Graph runMode="2" desc="读pgWal写db">
<Node id="PG_WAL_01" type="PG_WAL" desc="PG_WAL输入节点1"
masterAddress="127.0.0.1:5432"
masterDataBase="postgres"
masterUserName="hw_u1"
masterPassword="******"
masterPublicationTables="hw_u1.t_1"
slaveOutputConnectionId="CONNECT_01"
slaveOutputMetaDataId="METADATA_01"
outputToCopyStream="false"
>
</Node>
<Node id="OUTPUT_TRASH_01" type="OUTPUT_TRASH" desc="输出1" >
</Node>
<Line id="LINE_01" type="STANDARD" from="PG_WAL_01" to="OUTPUT_TRASH_01" order="0" metadata="METADATA_01"></Line>
<Metadata id="METADATA_01">
<Field name="hw_u1.t_1.c1" type="int" default="0" nullable="false"/>
</Metadata>
<Connection id="CONNECT_01" dbURL="127.0.0.1:3306" database="db1" username="root" password="******" batchSize="10000" type="MYSQL"/>
<Connection id="CONNECT_02" type="ORACLE" dbURL="127.0.0.1:1521" database="orcl" username="hw_u1" password="******" token="" org=""/>
<Connection id="CONNECT_03" dbURL="127.0.0.1:5432" database="postgres" username="hw_u1" password="******" batchSize="1000" type="POSTGRES"/>
通过thrift2服务接口访问,样本实现串行两个任务线,读mysql写hbase,读hbase写mysql
命令行执行:
./etl-engine -fileUrl ./rw_hbase.grf
配置文件rw_hbase.grf 内容
<?xml version="1.0" encoding="UTF-8"?>
<Graph fileName="" desc="" aesKey="" runMode="1">
<Node id="DB_INPUT_TABLE_1" type="DB_INPUT_TABLE" fetchSize="1000" dbConnection="CONNECT_1" desc="">
<Script name="sqlScript">
<![CDATA[select u_id,u_name,u_phone,u_height,updated_time,created_time,deleted_time from t_user_info]]>
</Script>
</Node>
<Node id="HBASE_WRITER_1" type="HBASE_WRITER" dbConnection="CONNECT_2" batchSize="1000"
tableName="t_user_info"
rowKey="u_id"
outputFields="u_id;u_name;u_phone;u_height;updated_time;created_time;deleted_time"
renameOutputFields="u_id;u_name;u_phone;u_height;updated_time;created_time;deleted_time" desc="" />
<Node id="HBASE_READER_1" type="HBASE_READER" dbConnection="CONNECT_2"
fetchSize="1000"
tableName="t_user_info"
cols=""
limitRow="0"
maxVersions="100" desc="" />
<Node id="DB_OUTPUT_TABLE_1" type="DB_OUTPUT_TABLE" batchSize="" dbConnection="CONNECT_1" outputFields="u_id;u_name;u_phone;u_height;updated_time;created_time;deleted_time" renameOutputFields="u_id;u_name;u_phone;u_height;updated_time;created_time;deleted_time" desc="" >
<Script name="sqlScript">
<![CDATA[insert into t_user_info_bk (u_id,u_name,u_phone,u_height,updated_time,created_time,deleted_time) values (?,?,?,?,?,?,?)]]>
</Script>
</Node>
<Line from="DB_INPUT_TABLE_1" to="HBASE_WRITER_1" type="STANDARD" order="0" metadata="METADATA_1" id="LINE_STANDARD_1"/>
<Line from="HBASE_READER_1" to="DB_OUTPUT_TABLE_1" type="STANDARD" order="1" metadata="METADATA_1" id="LINE_STANDARD_2"/>
<Metadata id="METADATA_1" fileUrl="" >
<Field name="u_id" type="int" default="" nullable="true" errDefault="" dataFormat="" dataLen=""/>
<Field name="u_name" type="string" default="" nullable="true" errDefault="" dataFormat="" dataLen=""/>
<Field name="u_phone" type="string" default="" nullable="true" errDefault="" dataFormat="" dataLen=""/>
<Field name="u_height" type="float" default="" nullable="true" errDefault="" dataFormat="" dataLen=""/>
<Field name="updated_time" type="string" default="" nullable="true" errDefault="" dataFormat="" dataLen=""/>
<Field name="created_time" type="string" default="" nullable="true" errDefault="" dataFormat="" dataLen=""/>
<Field name="deleted_time" type="string" default="" nullable="true" errDefault="" dataFormat="" dataLen=""/>
</Metadata>
<Connection id="CONNECT_1" type="MYSQL" dbURL="127.0.0.1:3306" database="db2" username="root" password="mysql" token="" org=""/>
<Connection id="CONNECT_2" type="HBASE" dbURL="127.0.0.1:9090" database="" username="" password="" token="" org=""/>
</Graph>
读redis写redis
命令行执行:
./etl-engine -fileUrl ./rw_redis.grf
配置文件rw_redis.grf 内容
<?xml version="1.0" encoding="UTF-8"?>
<Graph>
<Node id="REDIS_READER_01" type="REDIS_READER" desc="输入节点1" nameServer="127.0.0.1:6379" password="******" db="0" patternMatchKey="true" keys="H*">
</Node>
<Node id="REDIS_WRITER_01" type="REDIS_WRITER" desc="输出节点1" nameServer="127.0.0.1:6379" password="******" db="1"
patternMatchKey="true"
outputFields="key;value"
renameOutputFields="key;value" >
</Node>
<Line id="LINE_01" type="STANDARD" from="REDIS_READER_01" to="REDIS_WRITER_01" order="0" metadata="METADATA_01"></Line>
<Metadata id="METADATA_01">
<Field name="key" type="string" default="-1" nullable="false"/>
<Field name="value" type="string" default="-1" nullable="false"/>
<Field name="key_ttl" type="int" default="-1" nullable="false"/>
</Metadata>
</Graph>
读mysql写redis,读redis写mysql(嵌入go脚本增加字段),串行执行任务
命令行执行:
./etl-engine -fileUrl ./rw_redis_mysql.grf
配置文件rw_redis_mysql.grf 内容
<?xml version="1.0" encoding="UTF-8"?>
<Graph runMode="1">
<Node id="DB_INPUT_TABLE_1" type="DB_INPUT_TABLE" fetchSize="1000" dbConnection="CONNECT_1" desc="读数据表" >
<Script name="sqlScript">
<![CDATA[ SELECT caption AS k ,CONCAT(id,';',caption,';',memo,';', tag) AS v FROM t_redis_info]]>
</Script>
</Node>
<Node id="DB_OUTPUT_TABLE_1" type="DB_OUTPUT_TABLE" dbConnection="CONNECT_1" outputFields="id;caption;memo;tag;writetime" renameOutputFields="id;caption;memo;tag;writetime" desc="写数据表" >
<Script name="sqlScript">
<![CDATA[INSERT INTO t_redis_info (id,caption,memo,tag,writetime) VALUES(?,?,?,?,?);]]>
</Script>
<BeforeOut>
<![CDATA[package ext
import (
"errors"
"fmt"
"strconv"
"strings"
"time"
"github.com/tidwall/gjson"
"github.com/tidwall/sjson"
"etl-engine/etl/tool/extlibs/common"
)
func RunScript(dataValue string) (result string, topErr error) {
newRows := ""
rows := gjson.Get(dataValue, "rows")
for index, row := range rows.Array() {
//增加一个字段名称为id的列
tmpStr, _ := sjson.Set(row.String(), "id", common.GetUUID() )
//将系统默认输出的value字段拆分,并创建多个字段
values := gjson.Get(row.String(),"value").String()
vArr := strings.Split(values, ";")
caption := vArr[1]
memo := vArr[2]
tag := vArr[3]
tmpStr, _ = sjson.Set(tmpStr, "caption", caption )
//tmpStr, _ = sjson.Set(tmpStr, "value", memo )
//tmpStr, _ = sjson.Delete(tmpStr, "value")
tmpStr, _ = sjson.Set(tmpStr, "memo", memo )
tmpStr, _ = sjson.Set(tmpStr, "tag", tag )
tmpStr, _ = sjson.Set(tmpStr, "writetime", time.Now().Format("2006-01-02 15:04:05"))
common.GetLogger().Infoln("tmpStr:",tmpStr)
newRows, _ = sjson.SetRaw(newRows, "rows.-1", tmpStr)
}
return newRows, nil
}]]>
</BeforeOut>
</Node>
<Node id="REDIS_WRITER_1" type="REDIS_WRITER" nameServer="127.0.0.1:16379" password="******" db="1" isGetTTL="false" patternMatchKey="true" outputFields="k;v" renameOutputFields="key;value" desc="写redis" />
<Node id="REDIS_READER_1" type="REDIS_READER" nameServer="127.0.0.1:16379" password="******" db="1" isGetTTL="false" patternMatchKey="true" keys="*" desc="读redis" />
<Line from="DB_INPUT_TABLE_1" to="REDIS_WRITER_1" type="STANDARD" order="0" metadata="METADATA_1" id="LINE_STANDARD_1"/>
<Line from="REDIS_READER_1" to="DB_OUTPUT_TABLE_1" type="STANDARD" order="1" metadata="METADATA_2" id="LINE_STANDARD_2"/>
<Metadata id="METADATA_2" >
<Field name="id" type="string" default="" nullable="true" errDefault="" dataFormat="" dataLen=""/>
<Field name="caption" type="string" default="" nullable="true" errDefault="" dataFormat="" dataLen=""/>
<Field name="memo" type="string" default="" nullable="true" errDefault="" dataFormat="" dataLen=""/>
<Field name="tag" type="string" default="" nullable="true" errDefault="" dataFormat="" dataLen=""/>
<Field name="writetime" type="string" default="" nullable="true" errDefault="" dataFormat="" dataLen=""/>
</Metadata>
<Metadata id="METADATA_1" >
<Field name="key" type="string" default="" nullable="true" errDefault="" dataFormat="" dataLen=""/>
<Field name="value" type="string" default="" nullable="true" errDefault="" dataFormat="" dataLen=""/>
</Metadata>
<Connection id="CONNECT_1" type="MYSQL" dbURL="127.0.0.1:3306" database="db1" username="root" password="******" />
</Graph>