Skip to content

etl engine使用样例

hw2499 edited this page Jun 28, 2024 · 40 revisions

如何使用

命令行格式:
etl-engine -fileUrl <配置文件位置> -logLevel <日志级别>

./etl-engine -fileUrl ./db_to_db.grf -logLevel info 

样本列表

测试样本表结构

//--- mysql 表 ---//

CREATE TABLE `t_s_1` (
  `f1` INT(11) NOT NULL,
  `f2` VARCHAR(50) DEFAULT NULL,
  `f3` FLOAT DEFAULT NULL,
  `f4` VARCHAR(50) DEFAULT NULL
) 


CREATE TABLE `t_t_1` (
  `c1` INT(50) NOT NULL,
  `c2` VARCHAR(50) DEFAULT NULL,
  `c3` FLOAT DEFAULT NULL,
  `c4` VARCHAR(50) DEFAULT NULL
) 

CREATE TABLE `t_device_info1` (
  `device` INT(11) DEFAULT NULL,
  `value` FLOAT DEFAULT NULL,
  `time` VARCHAR(100) DEFAULT NULL,
  `writetime` VARCHAR(100) DEFAULT NULL
) 

CREATE TABLE `t_hive_u_info` (
  `uuid` INT(10) NOT NULL,
  `name` VARCHAR(30) DEFAULT NULL,
  `height` DECIMAL(10,2) DEFAULT NULL,
  `writetime` VARCHAR(20) DEFAULT NULL,
  PRIMARY KEY (`uuid`)
)  

CREATE TABLE `t_user_info` (
  `u_id` int(11) NOT NULL,
  `u_name` varchar(50) NOT NULL ,
  `u_phone` varchar(20) DEFAULT NULL,
  `u_height` float DEFAULT NULL ',
  `updated_time` varchar(50) ,
  `created_time` varchar(50) ,
  `deleted_time` varchar(50) 
)

CREATE TABLE `t_user_info_bk` (
  `u_id` int(11) NOT NULL,
  `u_name` varchar(50) NOT NULL ,
  `u_phone` varchar(20) DEFAULT NULL,
  `u_height` float DEFAULT NULL ',
  `updated_time` varchar(50) ,
  `created_time` varchar(50) ,
  `deleted_time` varchar(50) 
)

CREATE TABLE t_redis_info (
  id VARCHAR(32) NOT NULL,
  caption VARCHAR(50),
  tag VARCHAR(50),
  memo VARCHAR(100),
  writetime VARCHAR(19),
  PRIMARY KEY (id)
)

//--- 融合查询样表 --- // 
CREATE TABLE t_order_info (
  o_id VARCHAR(32) NOT NULL,
  u_id VARCHAR(32),
  p_id VARCHAR(32),
  o_price FLOAT,
  o_money FLOAT,
  o_number INT,
  o_writetime VARCHAR(19),
  PRIMARY KEY (o_id)
);

CREATE TABLE t_user_info (
  u_id VARCHAR(32) NOT NULL,
  u_name VARCHAR(32),
  u_phone VARCHAR(20),
  PRIMARY KEY (u_id)
);

CREATE TABLE t_product_info (
  p_id VARCHAR(32) NOT NULL,
  p_name VARCHAR(32),
  p_contacts VARCHAR(50),
  p_desc VARCHAR(100),
  PRIMARY KEY (p_id)
);


CREATE TABLE t_u_o_p_info (
  u_id VARCHAR(32) ,
  u_name VARCHAR(32),
  u_phone VARCHAR(20),
  o_id VARCHAR(32) ,
  o_price FLOAT,
  o_number INT,
  o_money FLOAT,
  o_writetime VARCHAR(19),
  p_id  VARCHAR(32) ,
  p_name  VARCHAR(32) ,
  p_contacts  VARCHAR(50) ,
  p_desc  VARCHAR(100)
);

//---  oracle 表 ---// 

CREATE TABLE "HW_U1"."T_T_1" 
   (	
    "C1" NUMBER(*,0), 
	"C2" VARCHAR2(100), 
	"C3" FLOAT(126), 
	"C4" VARCHAR2(100), 
	 CONSTRAINT "T_T_1_PK" PRIMARY KEY ("C1")
   )

//--- hive 表 ---//

create  table  t_u_info(
    uuid string,
    name string,
    height float,
    writetime string
)  
row  format  delimited  fields   terminated  by  '\t';

//--- hbase 表 ---//
create 't_user_info','u_id','u_name','u_phone','u_height','updated_time','created_time','deleted_time'



样本示例

常用数据源连接

    <Connection id="CONNECT_01" dbURL="http://127.0.0.1:58086" database="db1" username="user1" password="******" token=" " org="hw"  type="INFLUXDB_V1"/>
    <Connection id="CONNECT_02" dbURL="127.0.0.1:19000" database="db1" username="user1" password="******" batchSize="1000" type="CLICKHOUSE"/> 
    <Connection id="CONNECT_03" dbURL="127.0.0.1:3306" database="db1" username="user1" password="******"  batchSize="1000" type="MYSQL"/>
    <Connection id="CONNECT_04"  database="d:/sqlite_db1.db"  batchSize="10000" type="SQLITE"/>  
    <Connection id="CONNECT_05" dbURL="127.0.0.1:10000" database="db1" username="root" password="b" batchSize="1000" type="HIVE"/>
    <Connection id="CONNECT_06" dbURL="127.0.0.1:5432" database="db_1" username="u1" password="******" batchSize="1000" type="POSTGRES" />
    <Connection id="CONNECT_07" dbURL="http://127.0.0.1:9200" database="db1" username="elastic" password="******" batchSize="1000" type="ELASTIC"/>
    <Connection id="CONNECT_08" dbURL="127.0.0.1:1521" database="orcl" username="c##u1" password="******"  batchSize="1000" type="ORACLE" />
    <Connection id="CONNECT_09"  dbURL="127.0.0.1:9090" database="" username="" password=""  batchSize="1000" type="HBASE"/>

读数据表 -> 写数据表

读源表(mysql)写目标表(oracle)
命令行执行:

.\etl_engine.exe -fileUrl e:\example\db_to_db.grf  


配置文件db_to_db.grf内容

<?xml version="1.0" encoding="UTF-8"?>
<Graph runMode="2" desc="db to db">
  
  <Node id="DB_INPUT_01" dbConnection="CONNECT_01" type="DB_INPUT_TABLE" desc="读mysql节点1" fetchSize="500">
    <Script name="sqlScript"><![CDATA[
		         select * from (select f1,f2,f3,f4 from db1.t_s_1 limit 1000) t1
]]></Script>
  </Node>
  <Node id="DB_OUTPUT_01" type="DB_OUTPUT_TABLE" desc="写oracle节点1" dbConnection="CONNECT_02" outputFields="f1;f2;f3;f4"  renameOutputFields="C1;C2;C3;C4"  batchSize="1000" >
        <Script name="sqlScript"><![CDATA[
          insert into HW_U1.T_T_1 (C1,C2,C3,C4) values (:1,:2,:3,:4)
    ]]></Script>
  </Node>

<!-- 
  <Node id="DB_OUTPUT_01" type="DB_OUTPUT_TABLE" desc="写mysql节点1" dbConnection="CONNECT_02" outputFields="f1;f2;f3;f4"  renameOutputFields="C1;C2;C3;C4"  batchSize="1000" >
        <Script name="sqlScript"><![CDATA[
          insert into db1.T_T_1 (C1,C2,C3,C4) values (?,?,?,?)
    ]]></Script>
  </Node>
-->

  <Line id="LINE_01" type="STANDARD" from="DB_INPUT_01" to="DB_OUTPUT_01" order="0" metadata="METADATA_01"></Line>
  <Metadata id="METADATA_01">

    <Field name="C1" type="int" default="0" nullable="false"/>
    <Field name="C2" type="string" default="" nullable="false"/>
      <Field name="C3" type="float" default="0" nullable="false"/>
      <Field name="C4" type="string" default="" nullable="false"/>

  </Metadata>


    <Connection id="CONNECT_01" dbURL="127.0.0.1:3306" database="db1" username="root" password="******" batchSize="1000" type="MYSQL"/>

    <Connection id="CONNECT_02"  dbURL="127.0.0.1:1521" database="orcl" username="HW_U1" password="******"  batchSize="1000" type="ORACLE"/>
</Graph>

读数据表 -> 写excel文件

通过命令行向配置文件传递外部变量,实现动态替换配置文件内容的能力
命令行执行:

.\etl_engine.exe -fileUrl e:\example\db_to_excel.grf  arg2=1000 arg1=e:\tmp\t_u_info.xlsx


配置文件db_to_excel.grf 内容

<?xml version="1.0" encoding="UTF-8"?>
<Graph runMode="2">
  
    <Node id="DB_INPUT_01" dbConnection="CONNECT_01" type="DB_INPUT_TABLE" desc="节点1" fetchSize="500">
     <Script name="sqlScript"><![CDATA[
				 SELECT * FROM ( SELECT f1,f2,f3,f4 FROM t_s_1 LIMIT ${arg2}) t
]]></Script>
    </Node>

    <Node id="XLS_WRITER_01"   type="XLS_WRITER" desc="输出节点2" 
	appendRow="true"   
	fileURL="${arg1}" 
	metadataRow="2" 
	startRow="3" 
	sheetName="人员信息" 
	outputFields="f1;f2;f3;f4"  
	renameOutputFields="序号=B;姓名=C;身高=D;时间=E"  >
       
    </Node>

  <Line id="LINE_01" type="STANDARD" from="DB_INPUT_01" to="XLS_WRITER_01" order="0" metadata="METADATA_01"></Line>

  <Metadata id="METADATA_01">

    <Field name="序号" type="int" default="0" nullable="false"/>
    <Field name="姓名" type="string" default="" nullable="false"/>
    <Field name="身高" type="float" default="0" nullable="false"/>
    <Field name="时间" type="string" default="" nullable="false"/>
   
  </Metadata>
    <Connection id="CONNECT_01" dbURL="127.0.0.1:3306" database="db1" username="root" password="******" batchSize="10000" type="MYSQL"/>
</Graph>

HTTP_INPUT_SERVICE -> 写数据表

etl-engine运行后(阻塞模式),根据HTTP_INPUT_SERVICE的配置生成一个监听8081端口的阻塞式HTTP服务,
服务将接收的数据写到数据表

命令行执行:

./etl-engine -fileUrl ./http_to_db.grf


配置文件http_to_db.grf 内容

<?xml version="1.0" encoding="UTF-8"?>

<Graph fileName="" desc="" runMode="2">

   <Node id="HTTP_INPUT_SERVICE_01" type="HTTP_INPUT_SERVICE"  
      servicePort="8081" serviceName="etlEngineService"  />
    

     <Node id="DB_OUTPUT_TABLE_01" type="DB_OUTPUT_TABLE" desc="写mysql节点1" 
        dbConnection="CONNECT_01" 
        outputFields="device;time;value;writetime"  
        renameOutputFields="device;time;value;writetime"  
        batchSize="1000" >

        <Script name="sqlScript"><![CDATA[
          insert into t_device_info1 (device,time,value,writetime) values (?,?,?,?)
    ]]></Script>

 <BeforeOut>
            <![CDATA[
package ext
import (
	"errors"
	"fmt"
	"strconv"
	"time"
	"github.com/tidwall/gjson"
	"github.com/tidwall/sjson"
	"etl-engine/etl/tool/extlibs/common"
)
func RunScript(dataValue string) (result string, topErr error) {
	 

	newRows := ""
	rows := gjson.Get(dataValue, "rows")
	for _, row := range rows.Array() {
  
		tmpRow2 :=""
		device := gjson.Get(row.String(),"device").String()
		value := gjson.Get(row.String(),"value").String()
		time := gjson.Get(row.String(),"time").String()

		writetime:= common.GetSystemTimeFormatExt("YYYY-MM-DD hh:mm:ss")//调用系统函数

		tmpRow2, _ = sjson.Set(tmpRow2, "device", device) //解析device内容并增加字段
		tmpRow2, _ = sjson.Set(tmpRow2, "value", value)         //解析value内容并增加字段
		tmpRow2, _ = sjson.Set(tmpRow2, "time", time)           //解析time内容并增加字段
		tmpRow2, _ = sjson.Set(tmpRow2, "writetime", writetime) //调用系统函数增加字段
		newRows, _ = sjson.SetRaw(newRows, "rows.-1", tmpRow2)
		
	}

	fmt.Println(newRows)
	return newRows, nil
}

                   ]]>
        </BeforeOut>

    </Node>

 
	<Line from="HTTP_INPUT_SERVICE_01" to="DB_OUTPUT_TABLE_01" type="STANDARD" order="0" metadata="METADATA_01"  id="LINE_STANDARD_01"/>

     <Metadata id="METADATA_01">
        <Field name="device" type="int" default="0" nullable="false"/> 
        <Field name="value" type="float" default="0" nullable="false"/> 
        <Field name="time" type="string" default="0" nullable="false"/> 
	<Field name="writetime" type="string" default="0" nullable="false"/> 
    </Metadata>

    <Connection sortId="1" id="CONNECT_01" type="MYSQL" dbURL="127.0.0.1:3306" database="db1" username="root" password="******" token="" org=""/>
</Graph>

PostMan工具测试

     选择 POST 方式
     URL: http://127.0.0.1:8081/etlEngineService
     Headers选项:
          KEY录入 Content-Type
          VALUE录入 x-www-form-urlencoded
     Body选项:
          KEY录入 jsondata
          VALUE录入
			"rows":[
				{"device":"1","value":"20.9","time":"19:00:01","writetime":"2022-02-01 19:00:01"},
				{"device":"2","value":"19.1","time":"19:00:02","writetime":"2022-02-01 19:00:02"}
			]

     注意:必须传递KEY为rows的数组结构

     Authorization选项:
	TYPE选择Basic Auth
        Username和Password根据实际情况录入

Kafka -> 写数据表

etl-engine运行后(阻塞模式),根据MQ_CONSUMER的配置生成一个消费者服务,消费者将消费的数据写到数据表
命令行执行:

./etl-engine -fileUrl ./kafkamq_to_db.grf


配置文件kafkamq_to_db.grf 内容

<?xml version="1.0" encoding="UTF-8"?>
<Graph>

    <Node id="MQ_CONSUMER_01" type="MQ_CONSUMER" flag="KAFKA" 
    nameServer="127.0.0.1:18081" 
    group="group_10" 
    topic="out_event_user_info" 
    listenerFlag="2">
    </Node>


  <Node id="DB_OUTPUT_01" type="DB_OUTPUT_TABLE" desc="写mysql节点1" 
  dbConnection="CONNECT_01" 
  outputFields="device;time;value;writetime"  
  renameOutputFields="device;time;value;writetime"  
  batchSize="1000" >

        <Script name="sqlScript"><![CDATA[
          insert into t_device_info1 (device,time,value,writetime) values (?,?,?,?)
    ]]></Script>

        <BeforeOut>
            <![CDATA[
package ext
import (
	"errors"
	"fmt"
	"strconv"
	"time"
	"github.com/tidwall/gjson"
	"github.com/tidwall/sjson"
	"etl-engine/etl/tool/extlibs/common"
)
func RunScript(dataValue string) (result string, topErr error) {
	
	newRows := ""
	rows := gjson.Get(dataValue, "rows")
	for _, row := range rows.Array() {
	 	tmpStr := gjson.Get(row.String(),"Value").String()//解析kafka消息内容
 		device := gjson.Get(tmpStr,"device").String()
 		value := gjson.Get(tmpStr,"value").String()
 		time := gjson.Get(tmpStr,"time").String()

		writetime:= common.GetSystemTimeFormatExt("YYYY-MM-DD hh:mm:ss")//调用系统函数

		tmpStr, _ = sjson.Set(row.String(), "device", device) //从消息中解析device内容并增加字段
		tmpStr, _ = sjson.Set(tmpStr, "value", value)         //从消息中解析value内容并增加字段
		tmpStr, _ = sjson.Set(tmpStr, "time", time)           //从消息中解析time内容并增加字段
		tmpStr, _ = sjson.Set(tmpStr, "writetime", writetime) //调用系统函数增加字段
		newRows, _ = sjson.SetRaw(newRows, "rows.-1", tmpStr)
	}
	fmt.Println(newRows)
	return newRows, nil
}

                   ]]>
        </BeforeOut>

  </Node>


    <Metadata id="METADATA_02">
        <Field name="device" type="int" default="0" nullable="false"/> 
        <Field name="value" type="float" default="0" nullable="false"/> 
        <Field name="time" type="string" default="0" nullable="false"/> 
		<Field name="writetime" type="string" default="0" nullable="false"/> 

    </Metadata>
  <Line id="LINE_04" type="STANDARD" from="MQ_CONSUMER_01" to="DB_OUTPUT_01" order="1" metadata="METADATA_02"></Line>

  <Connection id="CONNECT_01" dbURL="127.0.0.1:3306" database="db1" username="root" password="******" batchSize="10000" type="MYSQL"/>

</Graph>

Kafka生产者消息体样本

   "{\"device\":\"1\",\"time\":\"20220602\",\"value\":\"001\"}"

增量对比 -> 数据表

将源表(mysql)与目标表(oracle)进行增量对比,并将差异化数据变更到目标表(oracle)
由于是在内存中对比数据,具体实施时注意对比数据量级,避免长时间等待。
场景介绍
命令行执行:

./etl-engine -fileUrl ./increment_mysql_to_oracle.grf


配置文件increment_mysql_to_oracle.grf 内容

<?xml version="1.0" encoding="UTF-8"?>
<Graph>
	
    <Node id="INCREMENT_01" type="INCREMENT"   
		inputSourceConnection="CONNECT_1"
		inputTargetConnection="CONNECT_2"
		inputSourceSQL="select * from t_s_1 where (f1&gt;=1 and f1&lt;=1000) order by f1 asc "
		inputTargetSQL="select * from t_t_1 where (C1&gt;=1 and C1&lt;=1000) order by C1 asc "
		
		outputToCopyStream="false"

		outputInsertConnection="CONNECT_2"
		outputUpdateConnection="CONNECT_2"
		outputDeleteConnection="CONNECT_2"
		
		outputInsertSQL="insert into t_t_1 (C1,C2,C3,C4) values (:1,:2,:3,:4)"
		outputUpdateSQL="update t_t_1 set C3=:1 ,C4=:2,C2=:3 where C1=:4"
		outputDeleteSQL="delete from t_t_1 where C1=:1"

		outputInsertFields="C1;C2;C3;C4"
		outputUpdateFields="C3;C4;C2;C1"
		outputDeleteFields="C1"

		outputInsertMetadata="METADATA_2"
		outputUpdateMetadata="METADATA_2"
		outputDeleteMetadata="METADATA_2"

		inputSourcePrimaryKey="f1"
		inputSourceCompareKey="f2;f3"
		inputSourceMappingKey="f4"
		inputSourceFetchSize="1000"

		inputTargetPrimaryKey="C1"
		inputTargetCompareKey="C2;C3"
		inputTargetMappingKey="C4"

		mustConvertMetadata="false"
	></Node>

    <Node id="OUTPUT_TRASH_01"   type="OUTPUT_TRASH" desc="垃圾桶节点1"  > </Node>
    <Line id="LINE_01" type="STANDARD" from="INCREMENT_01" to="OUTPUT_TRASH_01" order="1" metadata="METADATA_2"></Line>
	<Metadata id="METADATA_1" sortId="1">
        <Field name="f1" type="int" default="" nullable="true"/>
        <Field name="f2" type="string" default="" nullable="true"/>
        <Field name="f3" type="float" default="" nullable="true"/>
        <Field name="f4" type="string" default="" nullable="true"/>
    </Metadata>
	<Metadata id="METADATA_2" sortId="1">
        <Field name="C1" type="int" default="" nullable="true"/>
        <Field name="C2" type="string" default="" nullable="true"/>
        <Field name="C3" type="float" default="" nullable="true"/>
        <Field name="C4" type="string" default="" nullable="true"/>
    </Metadata>

    <Connection   id="CONNECT_1" type="MYSQL" dbURL="127.0.0.1:3306" database="db1" username="root" password="******" token="" org=""/>
    <Connection   id="CONNECT_2" type="ORACLE" dbURL="127.0.0.1:1521" database="orcl" username="hw_u1" password="******" token="" org=""/>
</Graph>


Hive -> 写数据表

读Hadoop生态Hive源表写目标表
命令行执行:

./etl-engine -fileUrl ./hive_to_mysql.grf


配置文件hive_to_mysql.grf 内容

<?xml version="1.0" encoding="UTF-8"?>
<Graph>

    <Node id="HIVE_READER_01" dbConnection="CONNECT_02"   
	type="HIVE_READER" desc="读Hive节点1"  fetchSize="1000" >
    <Script name="sqlScript"><![CDATA[
		 select uuid,name,height,writetime from db_hive_edu.t_u_info  
]]></Script>
      </Node>
 
  <Node id="DB_OUTPUT_01" type="DB_OUTPUT_TABLE" desc="写数据表节点1" dbConnection="CONNECT_01" outputFields="uuid;name;height;writetime"  renameOutputFields="uuid;name;height;writetime"  >

        <Script name="sqlScript"><![CDATA[
          insert into db1.t_hive_u_info (uuid,name,height,writetime) values (?,?,?,?)
    ]]></Script>
  </Node>


  <Line id="LINE_01" type="STANDARD" from="HIVE_READER_01" to="DB_OUTPUT_01" order="0" metadata="METADATA_01"></Line>

  <Metadata id="METADATA_01">
    <Field name="uuid" type="int" default="-1" nullable="false"/>
    <Field name="name" type="string" default="-1" nullable="false"/>
    <Field name="height" type="float" default="-1" nullable="false"/>
     <Field name="writetime" type="string" default="-1" nullable="false"/>
  </Metadata>

    <Connection id="CONNECT_02" dbURL="hadoop01:10000" database="db_hive_edu" username="Administrator" password="******" batchSize="1000" type="HIVE"/>

	<Connection id="CONNECT_01" dbURL="127.0.0.1:3306" database="db1" username="root" password="******" batchSize="1000" type="MYSQL"/>

   <!--  
     <Connection sortId="1" id="CONNECT_1" type="ORACLE" dbURL="127.0.0.1:1521" database="orcl" username="c##u1" password="******" />
   -->
</Graph>

融合查询 -> 写数据表

读多种类型数据库表(维表和事实表),根据各业务表id进行关联查询,最终将关联结果写入目标表(或文件)
从不同数据源读取用户表t_user_info、 产品表t_product_info 、订单表t_order_info,并在内存中融合计算查询出所有用户的订单信息


命令行执行:

./etl-engine -fileUrl ./federation_to_mysql.grf


配置文件federation_to_mysql.grf 内容

<?xml version="1.0" encoding="UTF-8"?>
<Graph desc="融合查询1">

    <Node id="FEDERATION_READER_01" type="FEDERATION_READER"   
		factTableConnectionId="CONNECT_01"

		factTableQueryFetchSize="100"
		factTableQuery="select o_id,u_id,p_id,o_price,o_number,o_money,o_writetime from t_order_info ORDER BY  CAST(SUBSTRING(o_id,3) AS UNSIGNED)  ASC"

		dimensionTableConnectionIds="CONNECT_02;CONNECT_03"

		dimensionTableQuery="select u_id,u_name,u_phone from t_user_info ORDER BY  CAST(SUBSTRING(u_id,3) AS UNSIGNED)  ASC ;select p_id,p_name,p_contacts,p_desc from t_product_info ORDER BY  CAST(SUBSTRING(p_id,3) AS UNSIGNED)  ASC "

		federationTableAliasName="t_o;t_u;t_p"
		federationQuery="SELECT t_u.u_id,t_u.u_name,t_u.u_phone,t_o.o_id,t_o.o_price,t_o.o_number,t_o.o_money,t_o.o_writetime,t_p.p_id,t_p.p_name,t_p.p_contacts,t_p.p_desc  FROM  t_u inner JOIN  t_o  ON t_o.u_id=t_u.u_id inner JOIN  t_p ON t_o.p_id=t_p.p_id ORDER BY  INTEGER(SUBSTRING(t_u.u_id,3))  ASC "

	></Node>
	  <Node id="DB_OUTPUT_01" type="DB_OUTPUT_TABLE" desc="写节点1" dbConnection="CONNECT_04" outputFields="u_id;u_name;u_phone;o_id;o_price;o_number;o_money;o_writetime;p_id;p_name;p_contacts;p_desc"  renameOutputFields="u_id;u_name;u_phone;o_id;o_price;o_number;o_money;o_writetime;p_id;p_name;p_contacts;p_desc"  batchSize="1000"  >


        <Script name="sqlScript"><![CDATA[

           insert into db1.t_u_o_p_info (u_id,u_name,u_phone,o_id,o_price,o_number,o_money,o_writetime,p_id,p_name,p_contacts,p_desc) values (?,?,?,?,?,?,?,?,?,?,?,?)
    ]]></Script>
	  </Node>

   

 
    <Line id="LINE_01" type="STANDARD" from="FEDERATION_READER_01" to="DB_OUTPUT_01" order="1" metadata="METADATA_1"></Line>

	
	<Metadata id="METADATA_1" sortId="1">
        <Field name="u_id" type="string" default="" nullable="true"/>
        <Field name="u_name" type="string" default="" nullable="true"/>
        <Field name="u_phone" type="string" default="" nullable="true"/>
        <Field name="o_id" type="string" default="" nullable="true"/>
		<Field name="o_price" type="float" default="0" nullable="false"/>
		<Field name="o_number" type="int" default="0" nullable="false"/>
		<Field name="o_money" type="float" default="0" nullable="false"/>
		<Field name="o_writetime" type="string" default="" nullable="true"/>
		<Field name="p_id" type="string" default="" nullable="true"/>
		<Field name="p_name" type="string" default="" nullable="true"/>
		<Field name="p_contacts" type="string" default="" nullable="true"/>
		<Field name="p_desc" type="string" default="" nullable="true"/>

    </Metadata>

    <Connection  id="CONNECT_01" type="MYSQL" dbURL="127.0.0.1:3306" database="db1" username="root" password="******" token="" org=""/>
    <Connection  id="CONNECT_02" type="MYSQL" dbURL="127.0.0.1:3307" database="db1" username="root" password="******" token="" org=""/>
    <Connection  id="CONNECT_03" type="MYSQL" dbURL="127.0.0.1:3308" database="db1" username="root" password="******" token="" org=""/>
    <Connection  id="CONNECT_04" type="MYSQL" dbURL="127.0.0.1:3309" database="db1" username="root" password="******" token="" org=""/>
</Graph>


CDC模式 mysql -> mysql

将mysql增量数据实时同步到 mysql,oracle,postgre,elastic等数据库(注意目标库mysql,oracle,postgre数据表要事先存在)
更多配置细节请联系作者

命令行执行:

./etl-engine -fileUrl ./mysqlbinlog_to_db.grf


MySQL配置文件,配置事项

server-id = 123 
log-bin    = /opt/mysql/data/mysql-bin
binlog_format = ROW 


配置文件mysqlbinlog_to_db.grf 内容

<?xml version="1.0" encoding="UTF-8"?>
<Graph runMode="2" desc="读mysqlbin写db">
  
  <Node id="MYSQL_BINLOG_01"   type="MYSQL_BINLOG" desc="MYSQL_BINLOG输入节点1"
      masterAddress="127.0.0.1:3306"
      masterUserName="root"
      masterPassword="******"
      masterCanalIncludeTableRegex="db1.t_order_info"
      masterCanalExcludeTableRegex=""
      slaveOutputConnectionId="CONNECT_02"
      slaveOutputMetaDataId="METADATA_01"
      outputToCopyStream="false"
  >
  </Node>
    <Node id="OUTPUT_TRASH_01" type="OUTPUT_TRASH" desc="输出1" >
   </Node>

  <Line id="LINE_01" type="STANDARD" from="MYSQL_BINLOG_01" to="OUTPUT_TRASH_01" order="0" metadata="METADATA_01"></Line>

  <Metadata id="METADATA_01">

        <Field name="hw_u1.t_1.c1" type="int" default="0" nullable="false"/>
        <Field name="hw_u1.t_1.c2" type="string" default="" nullable="false"/>
       <Field name="hw_u1.t_222.c3" type="decimal" default="0" nullable="false" dataFormat="2" />
       <Field name="hw_u1.t_222.writetime" type="datetime" default="" nullable="false" dataFormat="YYYY-MM-DD hh:mm:ss"/>
       <Field name="hw_u1.t_order_info.o_writetime" type="datetime" default="" nullable="false" dataFormat="YYYY-MM-DD hh:mm:ss"/>
  </Metadata>

    <Connection id="CONNECT_01" dbURL="127.0.0.1:3306" database="db2" username="root" password="******" batchSize="10000" type="MYSQL"/>
    <Connection id="CONNECT_02" type="ORACLE" dbURL="127.0.0.1:1521" database="orcl" username="hw_u1" password="******" token="" org=""/>
    <Connection id="CONNECT_03"  dbURL="127.0.0.1:5432" database="postgres" username="hw_u1" password="******"  batchSize="1000" type="POSTGRES"/>

</Graph>

CDC模式 postgre -> postgre

将postgre增量数据实时同步到 mysql,oracle,postgre,elastic等数据库(注意目标库mysql,oracle,postgre数据表要事先存在)
更多配置细节请联系作者

命令行执行:

./etl-engine -fileUrl ./pgwal_to_db.grf


postgresql.conf配置文件,配置事项

   wal_level = logical
   max_replication_slots = 20
   max_wal_senders = 20


pg_hba.conf配置文件,配置事项

host    all         all              0.0.0.0/0               md5 


监听的数据表均需要执行以下操作,如表hw_u1.t_4

ALTER TABLE hw_u1.t_4  REPLICA IDENTITY  FULL ;


配置文件pgwal_to_db.grf 内容

<?xml version="1.0" encoding="UTF-8"?>
<Graph runMode="2" desc="读pgWal写db">
  <Node id="PG_WAL_01"   type="PG_WAL" desc="PG_WAL输入节点1"
      masterAddress="127.0.0.1:5432"
    
      masterDataBase="postgres"
      masterUserName="hw_u1"
      masterPassword="******"
      masterPublicationTables="hw_u1.t_1"
    
      slaveOutputConnectionId="CONNECT_01"
      slaveOutputMetaDataId="METADATA_01"
      outputToCopyStream="false"
  >
  </Node>
 <Node id="OUTPUT_TRASH_01" type="OUTPUT_TRASH" desc="输出1" >
   </Node>

  <Line id="LINE_01" type="STANDARD" from="PG_WAL_01" to="OUTPUT_TRASH_01" order="0" metadata="METADATA_01"></Line>

  <Metadata id="METADATA_01">

        <Field name="hw_u1.t_1.c1" type="int" default="0" nullable="false"/>
  </Metadata>

    <Connection id="CONNECT_01" dbURL="127.0.0.1:3306" database="db1" username="root" password="******" batchSize="10000" type="MYSQL"/>
   <Connection  id="CONNECT_02" type="ORACLE" dbURL="127.0.0.1:1521" database="orcl" username="hw_u1" password="******" token="" org=""/>
   <Connection id="CONNECT_03"  dbURL="127.0.0.1:5432" database="postgres" username="hw_u1" password="******"  batchSize="1000" type="POSTGRES"/>

读写Hbase数据表

通过thrift2服务接口访问,样本实现串行两个任务线,读mysql写hbase,读hbase写mysql

命令行执行:

./etl-engine -fileUrl ./rw_hbase.grf


配置文件rw_hbase.grf 内容

<?xml version="1.0" encoding="UTF-8"?>
<Graph fileName="" desc="" aesKey="" runMode="1">
    <Node id="DB_INPUT_TABLE_1" type="DB_INPUT_TABLE" fetchSize="1000" dbConnection="CONNECT_1" desc="">
        <Script name="sqlScript">
            <![CDATA[select u_id,u_name,u_phone,u_height,updated_time,created_time,deleted_time from t_user_info]]>
        </Script>
    </Node>
    <Node id="HBASE_WRITER_1" type="HBASE_WRITER" dbConnection="CONNECT_2" batchSize="1000" 
        tableName="t_user_info" 
        rowKey="u_id" 
        outputFields="u_id;u_name;u_phone;u_height;updated_time;created_time;deleted_time" 
        renameOutputFields="u_id;u_name;u_phone;u_height;updated_time;created_time;deleted_time" desc="" />
    <Node id="HBASE_READER_1" type="HBASE_READER" dbConnection="CONNECT_2" 
        fetchSize="1000" 
        tableName="t_user_info" 
        cols="" 
        limitRow="0" 
        maxVersions="100" desc="" />
    <Node id="DB_OUTPUT_TABLE_1" type="DB_OUTPUT_TABLE" batchSize="" dbConnection="CONNECT_1" outputFields="u_id;u_name;u_phone;u_height;updated_time;created_time;deleted_time" renameOutputFields="u_id;u_name;u_phone;u_height;updated_time;created_time;deleted_time" desc="" >
        <Script name="sqlScript">
            <![CDATA[insert into t_user_info_bk (u_id,u_name,u_phone,u_height,updated_time,created_time,deleted_time) values (?,?,?,?,?,?,?)]]>
        </Script>
    </Node>
    <Line from="DB_INPUT_TABLE_1" to="HBASE_WRITER_1" type="STANDARD" order="0" metadata="METADATA_1" id="LINE_STANDARD_1"/>
    <Line from="HBASE_READER_1" to="DB_OUTPUT_TABLE_1" type="STANDARD" order="1" metadata="METADATA_1" id="LINE_STANDARD_2"/>
    <Metadata id="METADATA_1" fileUrl="" >
        <Field name="u_id" type="int" default="" nullable="true" errDefault="" dataFormat="" dataLen=""/>
        <Field name="u_name" type="string" default="" nullable="true" errDefault="" dataFormat="" dataLen=""/>
        <Field name="u_phone" type="string" default="" nullable="true" errDefault="" dataFormat="" dataLen=""/>
        <Field name="u_height" type="float" default="" nullable="true" errDefault="" dataFormat="" dataLen=""/>
        <Field name="updated_time" type="string" default="" nullable="true" errDefault="" dataFormat="" dataLen=""/>
        <Field name="created_time" type="string" default="" nullable="true" errDefault="" dataFormat="" dataLen=""/>
        <Field name="deleted_time" type="string" default="" nullable="true" errDefault="" dataFormat="" dataLen=""/>
    </Metadata>
    <Connection id="CONNECT_1" type="MYSQL" dbURL="127.0.0.1:3306" database="db2" username="root" password="mysql" token="" org=""/>
    <Connection id="CONNECT_2" type="HBASE" dbURL="127.0.0.1:9090" database="" username="" password="" token="" org=""/>
</Graph>

读redis -> 写redis

读redis写redis

命令行执行:

./etl-engine -fileUrl ./rw_redis.grf


配置文件rw_redis.grf 内容

<?xml version="1.0" encoding="UTF-8"?>
<Graph>

  <Node id="REDIS_READER_01"   type="REDIS_READER" desc="输入节点1" nameServer="127.0.0.1:6379" password="******" db="0"   patternMatchKey="true" keys="H*">
  </Node>

<Node id="REDIS_WRITER_01"   type="REDIS_WRITER" desc="输出节点1"  nameServer="127.0.0.1:6379"   password="******" db="1"  
patternMatchKey="true"  
outputFields="key;value"  
renameOutputFields="key;value" >
</Node>

<Line id="LINE_01" type="STANDARD" from="REDIS_READER_01" to="REDIS_WRITER_01" order="0" metadata="METADATA_01"></Line>


  <Metadata id="METADATA_01">
	<Field name="key" type="string" default="-1" nullable="false"/>
	<Field name="value" type="string" default="-1" nullable="false"/>
	<Field name="key_ttl" type="int" default="-1" nullable="false"/>
  </Metadata>


</Graph>

redis与mysql相互同步

读mysql写redis,读redis写mysql(嵌入go脚本增加字段),串行执行任务

命令行执行:

./etl-engine -fileUrl ./rw_redis_mysql.grf


配置文件rw_redis_mysql.grf 内容

<?xml version="1.0" encoding="UTF-8"?>
<Graph runMode="1">

    <Node id="DB_INPUT_TABLE_1" type="DB_INPUT_TABLE" fetchSize="1000" dbConnection="CONNECT_1"  desc="读数据表" >
        <Script name="sqlScript">
            <![CDATA[ SELECT caption AS k  ,CONCAT(id,';',caption,';',memo,';', tag) AS v  FROM t_redis_info]]>
        </Script>
    </Node>
    <Node id="DB_OUTPUT_TABLE_1" type="DB_OUTPUT_TABLE"  dbConnection="CONNECT_1"  outputFields="id;caption;memo;tag;writetime" renameOutputFields="id;caption;memo;tag;writetime"     desc="写数据表" >
        <Script name="sqlScript">
            <![CDATA[INSERT INTO t_redis_info (id,caption,memo,tag,writetime) VALUES(?,?,?,?,?);]]>
        </Script>
        <BeforeOut>
            <![CDATA[package ext
import (
	"errors"
	"fmt"
	"strconv"
	"strings"
	"time"
	"github.com/tidwall/gjson"
	"github.com/tidwall/sjson"
	"etl-engine/etl/tool/extlibs/common"
)
func RunScript(dataValue string) (result string, topErr error) {
	newRows := ""
	rows := gjson.Get(dataValue, "rows")
	for index, row := range rows.Array() {
	  
	  	//增加一个字段名称为id的列
		tmpStr, _ := sjson.Set(row.String(), "id",  common.GetUUID() )
		
		//将系统默认输出的value字段拆分,并创建多个字段
		values := gjson.Get(row.String(),"value").String()
		vArr := strings.Split(values, ";")
		caption := vArr[1]
		memo := vArr[2]
		tag := vArr[3]
		tmpStr, _ = sjson.Set(tmpStr, "caption",  caption )
		//tmpStr, _ = sjson.Set(tmpStr, "value",  memo )
		//tmpStr, _ = sjson.Delete(tmpStr, "value")
		tmpStr, _ = sjson.Set(tmpStr, "memo",  memo )
		tmpStr, _ = sjson.Set(tmpStr, "tag",  tag )
	    tmpStr, _ = sjson.Set(tmpStr, "writetime", time.Now().Format("2006-01-02 15:04:05"))
		 common.GetLogger().Infoln("tmpStr:",tmpStr)
		newRows, _ = sjson.SetRaw(newRows, "rows.-1", tmpStr)
	}
	return newRows, nil
}]]>
        </BeforeOut>
    </Node>
    <Node id="REDIS_WRITER_1" type="REDIS_WRITER" nameServer="127.0.0.1:16379" password="******" db="1" isGetTTL="false" patternMatchKey="true" outputFields="k;v" renameOutputFields="key;value" desc="写redis"  />
    <Node id="REDIS_READER_1" type="REDIS_READER" nameServer="127.0.0.1:16379" password="******" db="1" isGetTTL="false" patternMatchKey="true" keys="*" desc="读redis"  />
    <Line from="DB_INPUT_TABLE_1" to="REDIS_WRITER_1" type="STANDARD" order="0" metadata="METADATA_1"   id="LINE_STANDARD_1"/>
    <Line from="REDIS_READER_1" to="DB_OUTPUT_TABLE_1" type="STANDARD" order="1" metadata="METADATA_2"   id="LINE_STANDARD_2"/>
    <Metadata id="METADATA_2" >
        <Field name="id" type="string" default="" nullable="true" errDefault="" dataFormat="" dataLen=""/>
        <Field name="caption" type="string" default="" nullable="true" errDefault="" dataFormat="" dataLen=""/>
        <Field name="memo" type="string" default="" nullable="true" errDefault="" dataFormat="" dataLen=""/>
        <Field name="tag" type="string" default="" nullable="true" errDefault="" dataFormat="" dataLen=""/>
        <Field name="writetime" type="string" default="" nullable="true" errDefault="" dataFormat="" dataLen=""/>
    </Metadata>
    <Metadata id="METADATA_1" >
        <Field name="key" type="string" default="" nullable="true" errDefault="" dataFormat="" dataLen=""/>
        <Field name="value" type="string" default="" nullable="true" errDefault="" dataFormat="" dataLen=""/>
    </Metadata>
    <Connection   id="CONNECT_1" type="MYSQL" dbURL="127.0.0.1:3306" database="db1" username="root" password="******"  />
</Graph>