Skip to content

etl engine readme for english

hw2499 edited this page Apr 12, 2023 · 13 revisions

etl-engine

CN doc EN doc

Batch stream integrated data exchange engine

Realize reading data from the source ->(target data type conversion | data distribution) ->writing to the target data source

Support fusion computing queries during data stream transmission

Product overview

  • Products are composed of etl-engine engine, etl-designer cloud designer and etl-crontab scheduling.
  • Etl-engine parses etl configuration files and performs etl tasks.
  • The ETl-Designer cloud designer drag-and-drop generates etl task configuration files that are recognized by the ETl-Engine engine.
  • The ETl-crontab scheduling designer is responsible for executing the specified etl task according to the time cycle. The ETL-crontab scheduling also provides the function of querying etl task execution logs.
  • Three parts form an etl solution that can be integrated into any usage scenario.

Product details

High availability introduction

Resource address

  • etl-engine Download address

The last compile time of the current version is 20230310

Download address

  • etl-designer The address of the designer video player

The etl-designer supports OEM publishing(currently integrated into etl_crontab

Video play address

  • etl-crontab Scheduling designer video play address

Video play address

etl_crontab Instructions

  • Sample etl-engine configuration file

Example of using etl-engine

Functional characteristics

  • Supports cross-platform execution (windows,linux), requires only one executable and one configuration file to run, no other dependencies, lightweight engine.
  • The input/output data source supports influxdb v1, clickhouse, prometheus, elasticsearch, hadoop(hive), postgresql, mysql, oracle, sqlite, rocketmq, kafka, redis, and excel
  • Any input node can be combined with any output node, following the pipeline model.
  • Support data fusion queries across multiple types of databases.
  • Support data fusion calculation queries with multiple types of databases during message flow data transmission.
  • The data fusion query syntax follows the ANSI SQL standard.
  • To meet service requirements, you can use global variables in configuration files to dynamically update configuration files.
  • Any output node can be embedded in the go language script and parsed to achieve the format conversion function of the output data stream.
  • Supports secondary development at the node level. By configuring custom nodes and configuring go language scripts on custom nodes, various functions can be extended and implemented.
  • Any input node can be copied by combining data flows to achieve a scenario where one input is branched to multiple outputs simultaneously.
  • The execution logs of each node can be output to the database.
  • Combined with crontab scheduling, etl-engine tasks are executed periodically.

Data flow characteristic

  • Any combination of input and output

输入输出

  • Parsing embedded scripting language facilitates format conversion

支持嵌入脚本语言

  • Data stream replication facilitates multiplexing

数据流复制

  • Custom nodes facilitate various operations

自定义节点

  • Transition nodes facilitate various transformations

转换节点

  • Streaming batch integrated query
    Supports multi-source input, memory computing, and fused output
    流批一体融合查询

Scheduling integration scheme

  • Flexible combination of etl_crontab and etl_engine

集成方案

  • etl-designer

etl-designer设计器

  • Scheduling designer

调度设计器

  • Scheduling log

调度日志

  • Etl log details

Etl日志明细

Mode of use

window platform

  etl_engine.exe -fileUrl .\graph.xml -logLevel info 
 

linux platform

  
  etl_engine -fileUrl .\graph.xml -logLevel info 

Example configuration file

<?xml version="1.0" encoding="UTF-8"?>
<Graph>
  <Node id="DB_INPUT_01" dbConnection="CONNECT_01" type="DB_INPUT_TABLE" desc="节点1" fetchSize="5">
      <Script name="sqlScript"><![CDATA[
		         select * from (select * from t3 limit 10)
]]></Script>
  </Node>
  <Node id="DB_OUTPUT_01" type="DB_OUTPUT_TABLE" desc="node 2" dbConnection="CONNECT_02" outputFields="f1;f2" renameOutputFields="c1;c2" outputTags="tag1;tag4"  renameOutputTags="tag_1;tag_4"  measurement="t1" rp="autogen">
  </Node>
  <!--
     <Node id="DB_OUTPUT_02" type="DB_OUTPUT_TABLE" desc="node 3" dbConnection="CONNECT_03" outputFields="f1;f2;f3"  renameOutputFields="c1;c2;c3"  batchSize="1000"  >
        <Script name="sqlScript"><![CDATA[
           insert into db1.t1 (c1,c2,c3) values (?,?,?)
    ]]></Script>
    </Node>
  -->
  <Line id="LINE_01" type="STANDARD" from="DB_INPUT_01" to="DB_OUTPUT_01" order="0" metadata="METADATA_01"></Line>
  <Metadata id="METADATA_01">
    <Field name="c1" type="string" default="-1" nullable="false"/>
    <Field name="c2" type="int" default="-1" nullable="false"/>
    <Field name="tag_1" type="string" default="-1" nullable="false"/>
    <Field name="tag_4" type="string" default="-1" nullable="false"/>
  </Metadata>
  <Connection id="CONNECT_01" dbURL="http://127.0.0.1:58080" database="db1" username="user1" password="******" token=" " org="hw"  type="INFLUXDB_V1"/>

  <Connection id="CONNECT_02" dbURL="http://127.0.0.1:58086" database="db1" username="user1" password="******" token=" " org="hw"  type="INFLUXDB_V1"/>
 <!--    <Connection id="CONNECT_04" dbURL="127.0.0.1:19000" database="db1" username="user1" password="******" type="CLICKHOUSE"/>-->
  <!--    <Connection id="CONNECT_03" dbURL="127.0.0.1:3306" database="db1" username="user1" password="******" type="MYSQL"/>-->
  <!--        <Connection id="CONNECT_03"  database="d:/sqlite_db1.db"  batchSize="10000" type="SQLITE"/>-->
</Graph>

Support node type

Any read node can output to any write node

Input node - Read the data table

Output node - Write data table

Input node - Read excel file

Output Node - Write excel files

Input node - Execute database script

Output node - Trash can, no output

Input node -MQ consumer

Output node -MQ producer

The data stream copy node, located between the input node and the output node, is both output and input

Input node - Read redis

Output node - Write redis

Custom nodes, by embedding the go script to achieve various operations

Input Node - Execute system script node

Input node - Read CSV file node

Input Node - Read PROMETHEUS node

Input node -PROMETHEUS EXPORTER node

Output Node - Write PROMETHEUS node

Input node -Http node

Input node - Read elastic node

Output node - Write elastic node

Input Node - Read hive node

Combination mode

  • Any input node can be connected to any output node
  • Any input node can connect to a copy node
  • A copy node can connect to multiple output nodes
  • Any input node can be connected to a transition node
  • The copy node cannot connect to the transition node

Configuration description

DB_INPUT_TABLE

Input node

attribute description
id Unique mark
type The type is , DB_INPUT_TABLE
script sqlScript SQL statement
fetchSize Number of records read per session
dbConnection Data source ID
desc description

Support source type

MYSQL、Influxdb 1x、CK、PostgreSQL、Oracle、sqlite

sample

  <Node id="DB_INPUT_01" dbConnection="CONNECT_01" type="DB_INPUT_TABLE" desc="节点1" fetchSize="1000">
    <Script name="sqlScript"><![CDATA[
		         select * from (select * from t4 limit 100000)
]]></Script>
  </Node>

XLS_READER

Input node

Read the EXCEL file

attribute description
id Unique mark
type The type is , XLS_READER
fileURL File path + file name
startRow Read from row 1 with index 0 (usually column header)
sheetName Table name
maxRow The maximum number of lines read is *, which means all lines read, and 10, which means 10 lines read
fieldMap Field mapping in the format of field1=A; field2=B; field3=C
Field name = Number of columns Multiple fields are separated by semicolons

样本

  <Node id="XLS_READER_01"   type="XLS_READER" desc="Input node 1"  fileURL="d:/demo/test1.xlsx" startRow="2" sheetName="Personnel information" fieldMap="field1=A;field2=B;field3=C">
  </Node>

DB_OUTPUT_TABLE

Output node

attribute description Suitable for
id Unique mark
type The type is, DB_OUTPUT_TABLE
script insert、delete、update SQL statements ck,mysql,sqlite,postgre,oracle
batchSize The number of records committed per batch ck,mysql,sqlite,postgre,oracle
Note that influx entered with the fetchSize as the batch submitted size
outputFields Enter the field name passed by the node when reading data influx,ck,mysql,sqlite,postgre,oracle
renameOutputFields The field name of the output node to the target data source influx,ck,mysql,sqlite,postgre,oracle
dbConnection Data source ID
desc description
outputTags Enter the label name passed by the node when reading data influx
renameOutputTags The label name of the output node to the target data source influx
rp Reserve policy name influx
measurement Table name influx
timeOffset The time jitter offset used to generate a non-repeatable timestamp when writing in bulk
(This feature is implemented through time.Sleep, which suggests adding a nanosecond time column by embedding the script, or adjusting your time+tags.)
influx

Support target type

MYSQL、Influxdb 1x、CK、PostgreSQL、Oracle、sqlite

sample

  <Node id="DB_OUTPUT_01" type="DB_OUTPUT_TABLE" desc="write influx node1" dbConnection="CONNECT_02" outputFields="f1;f2;f3;f4"  renameOutputFields="c1;c2;c3;c4"  outputTags="tag1;tag2;tag3;tag4"  renameOutputTags="tag_1;tag_2;tag_3;tag_4" measurement="t5" rp="autogen">
        
  </Node>
  
  <Node id="DB_OUTPUT_02" type="DB_OUTPUT_TABLE" desc="write mysql node2" dbConnection="CONNECT_03" outputFields="time;f1;f2;f3;f4;tag1;tag2;tag3;tag4"  renameOutputFields="time;c1;c2;c3;c4;tag_1;tag_2;tag_3;tag_4" batchSize="1000" >
        <Script name="sqlScript"><![CDATA[
          insert into db1.t1 (time,c1,c2,c3,c4,tag_1,tag_2,tag_3,tag_4) values (?,?,?,?,?,?,?,?,?)
    ]]></Script>
  </Node>

XLS_WRITER

Output node

Write the contents of the EXCEL file

attribute description
id Unique mark
type XLS_WRITER
fileURL File path + file name
startRow For example, the number 2 indicates that data is written from the second line
sheetName Table name
outputFields Enter the name of the field passed by the node,
Format: field1; field2; field3
renameOutputFields Field mapping, format: indicator =B; Year =C; Region =D
Field name = Number of columns Multiple fields are separated by semicolons
metadataRow The number of lines in the EXCEL file where the field name is displayed. For example, the number 1 indicates the field name that is started in line 1
appendRow true indicates the append record mode and false indicates the overwrite mode.

sample

  <Node id="XLS_WRITER_01"   type="XLS_WRITER" desc="Output node 2" appendRow="true"  fileURL="d:/demo/test2.xlsx" startRow="3" metadataRow="2" sheetName="Personnel information" outputFields="c1;c3;tag_1"  renameOutputFields="Index=B;Year=C;Region=D"  >
    </Node>

DB_EXECUTE_TABLE

Input node

Execute insert ,delete ,update statements

attribute description Suitable for
id Unique mark
type DB_EXECUTE_TABLE
roolback Rollback or not false is not rolled back. true is rolled back
sqlScript delete、updatestatements are separated by semicolons mysql,sqlite,postgre,oracle,ck(delete,update not supported)
fileURL External file fileURL has a higher priority than sqlScript, and only one of the two can be used

sample

 <Node id="DB_EXECUTE_01" dbConnection="CONNECT_01" type="DB_EXECUTE_TABLE" desc="node 1" rollback="false" >
    <Script name="sqlScript"><![CDATA[
		         insert into t_1 (uuid,name) values (13,'aaa');
		         insert into t_1 (uuid,name) values (14,'bbb');
		         insert into t_1 (uuid,name) values (15,'ccc');
		         insert into t_1 (uuid,name) values (1,'aaa')
]]></Script>

OUTPUT_TRASH

Output node

Empty pipe with no output, suitable for the target node connected as a node without any output (for example, the DB_EXECUTE_TABLE node)

sample

  <Node id="OUTPUT_TRASH_01"   type="OUTPUT_TRASH" desc="node 2"  >
      </Node>

MQ_CONSUMER

Input node, block mode

MQ_CONSUMER(rocketmq support)

attribute description Suitable for
id Unique mark
type MQ_CONSUMER
flag Default value:ROCKETMQ rocketmq is supported
nameServer The address of the mq server is in the format of 127.0.0.1:8080
group mq group name
topic Subscribed subject name
tag Label name, format: * represents all labels consumed,
tag_1 means that only the tag_1 tag is consumed

sample

    <Node id="MQ_CONSUMER_02" type="MQ_CONSUMER" flag="ROCKETMQ" nameServer="127.0.0.1:8080" group="group_1" topic="out_event_user_info" tag="*"></Node>

MQ_CONSUMER(kafka support)

attribute description Suitable for
id Unique mark
type MQ_CONSUMER
flag Default value:KAFKA kafka is supported
nameServer The address of the mq server is in the format of 127.0.0.1:8080
group mq group name
topic Subscribed subject name
listenerFlag 1 is to listen by partition. 2 is to monitor by a single channel,topic can be multiple

sample

 <Node id="MQ_CONSUMER_03" type="MQ_CONSUMER" flag="KAFKA" nameServer="127.0.0.1:18081" group="group_10" topic="out_event_user_info" listenerFlag="2"></Node>

MQ_PRODUCER

Output node

MQ_PRODUCER(rocketmq support)

attribute description Suitable for
id Unique mark
type MQ_PRODUCER
flag Default value:ROCKETMQ rocketmq is supported
nameServer The address of the mq server is in the format of 127.0.0.1:8080
group mq group name
topic Subscribed subject name
tag Label name. The format is tag_1
sendFlag Sending mode,1 is synchronization; 2 is asynchronous; Three is one way
outputFields Enter the name of the field passed by the node,
Format: field1; field2; field3 Multiple fields are separated by semicolons
renameOutputFields Field mapping format: field1; field2; field3 Multiple fields are separated by semicolons

sample

    <Node id="MQ_PRODUCER_01" type="MQ_PRODUCER" flag="ROCKETMQ" nameServer="127.0.0.1:8080" group="group_11" topic="out_event_system_user" tag="tag_1"
          sendFlag="3" outputFields="time;tag_1;c2"  renameOutputFields="Time;Equipment;index" >
    </Node>

MQ_PRODUCER (kafka support)

attribute description Suitable for
id Unique mark
type MQ_PRODUCER
flag Default value:KAFKA kafka is supported
nameServer The address of the mq server is in the format of 127.0.0.1:8080
topic Subscribed subject name
isPartition true sends messages to the specified partition. false: send messages to random partitions
sendFlag Sending mode,1 is synchronization; 2 is asynchronous
outputFields Enter the name of the field passed by the node,
Format: field1; field2; field3 Multiple fields are separated by semicolons
renameOutputFields Field mapping format: field1; field2; field3 Multiple fields are separated by semicolons

sample

     <Node id="MQ_PRODUCER_02" type="MQ_PRODUCER" flag="KAFKA" nameServer="127.0.0.1:18081"  topic="out_event_system_user"
          sendFlag="1" outputFields="Offset;Partition;Topic;Value"  renameOutputFields="Offset;Partition;Topic;Value" >
    </Node>

COPY_STREAM

Output a data stream from one input node to multiple branch output nodes

attribute description Suitable for
id Unique mark
type COPY_STREAM

sample

  <Node id="COPY_STREAM_01" type="COPY_STREAM" desc="Data flow copy node" ></Node>
  <Line id="LINE_01" type="STANDARD" from="DB_INPUT_01" to="COPY_STREAM_01" order="1" metadata="METADATA_01" ></Line>
  <Line id="LINE_02" type="COPY" from="COPY_STREAM_01:0" to="DB_OUTPUT_01" order="2" metadata="METADATA_01"></Line>
  <Line id="LINE_03" type="COPY" from="COPY_STREAM_01:1" to="DB_OUTPUT_02" order="2" metadata="METADATA_02"></Line>

REDIS_READER

Input node

attribute description Suitable for
id Unique mark
type REDIS_READER
nameServer 127.0.0.1:6379
password ******
db 0 Database ID
isGetTTL true or false Whether to read the ttl information
keys Read keys separated by semicolons Currently, only string,int, and float content can be read

sample

  <Node id="REDIS_READER_01"   type="REDIS_READER" desc="Input node 1" 
  nameServer="127.0.0.1:6379" password="******" db="0" isGetTTL="true" keys="a1;a_1" ></Node>

REDIS_WRITER

Output node: Because the key name cannot be repeated, only the last line of the read node is suitable for writing

attribute description Suitable for
id Unique mark
type REDIS_WRITER
nameServer 127.0.0.1:6379
password ******
db 0 Database ID
isGetTTL true or false Whether to read the ttl information
outputFields Currently, only string,int, and float contents are supported
renameOutputFields Currently, only string,int, and float contents are supported

sample

  <Node id="REDIS_WRITER_01"   type="REDIS_WRITER" desc="Output node 1"  nameServer="127.0.0.1:6379" password="******" db="1" 
  isGetTTL="true" outputFields="a1;a_1"  renameOutputFields="f1;f2"  ></Node>

CUSTOM_READER_WRITER

Custom nodes, which can be implemented by embedding GO scripts

attribute description Suitable for
id Unique mark
type CUSTOM_READER_WRITER

EXECUTE_SHELL

Input Node - Execute system script node

attribute description Suitable for
id Unique mark
type EXECUTE_SHELL
fileURL External script file location Only one fileURL and Script can appear. When fileURL appears at the same time, fileurl takes precedence over Script
Script Script content
outLogFileURL The console outputs content to the specified log file

sample

<Node id="EXECUTE_SHELL_01"  type="EXECUTE_SHELL" desc="node 1"  _fileURL="d:/test1.bat" outLogFileURL="d:/test1_log.txt">
    <Script><![CDATA[
    c:
    dir/w
]]></Script>
  </Node>

CSV_READER

Input node - Read CSV file node

attribute description Suitable for
id Unique mark
type CSV_READER
fileURL CSV file location
fetchSize Number of batches read from memory at a time For example, influxdb can match the number of records submitted in each batch. 123 fields of 1W pieces of data have been tested and 100 is configured. The storage time is 15 seconds
startRow The row from which the data is read, with 0 representing row 1 by default Usually 0 is the first column name
fields Defines the output field name, separated by a semicolon field1;field2;field3
fieldsIndex Define the columns of the output. By default, 0 represents column 1. Multiple fields are separated by semicolons. If the value is set to -1, all fields are read in sequence "2;3;4"

sample

  <Node id="CSV_READER_01"   type="CSV_READER" desc="Input node 1" fetchSize="5"  fileURL="d:/demo2.csv" startRow="1" fields="field1;field2;field3"  fieldsIndex="0;3;4">
  </Node>

PROMETHEUS_API_READER

Input Node - Read PROMETHEUS node

attribute description Suitable for
id Unique mark
type PROMETHEUS_API_READER
url prometheus server address For example:http://127.0.0.1:9090
Script To query API content, only /api/v1/query is supported
And /api/v1/query_range
For example: /api/v1/query?query=my_device_info{deviceCode="DeviceNumber000"}[1d]

** Note: in the result set returned by the query, name is the measure name; TIME is the timestamp when prometheus was stored; VALUE is the value of prometheus **

    <Node id="PROMETHEUS_API_READER_1" type="PROMETHEUS_API_READER"  url="http://127.0.0.1:9090" >
        <Script name="sqlScript">
            <![CDATA[
            /api/v1/query?query=my_device_info{deviceCode="DeviceNumber000"}[1d]
            ]]>
		</Script>
    </Node>
     <Node id="DB_OUTPUT_TABLE_1" type="DB_OUTPUT_TABLE" batchSize="10" dbConnection="CONNECT_1" desc="" outputFields="__name__;address;deviceCode;__TIME__;__VALUE__" renameOutputFields="f1;f2;f3;f4;f5"  >
        <Script name="sqlScript">
            <![CDATA[insert into 
t_prome_info_bk
(f1,f2,f3,f4,f5)
values (?,?,?,?,?)]]>
        </Script>

PROMETHEUS_EXPORTER

Input node-PROMETHEUS EXPORTER

attribute description Suitable for
id Unique mark
type PROMETHEUS_EXPORTER
exporterAddr The address of the exporter, IP:PORT For example: :10000
exporterMetricsPath exporter's path, For example: /EtlEngineExport
metricName Metric name For example:Etl_Engine_Exporter
metricHelp Metric description sample
labels Label name For example:deviceCode;address;desc
    <Node id="PROMETHEUS_EXPORTER_1" type="PROMETHEUS_EXPORTER"  
	    exporterAddr=":10000" exporterMetricsPath="/EtlEngineExport" 
	    metricName="Etl_Engine_Exporter" metricHelp="Etl_Engine_Exporter samples" 
	    labels="deviceCode;address;desc">
	</Node>

Add the following contents to the prometheus configuration file:

  - job_name: "etlengine_exporter"
    metrics_path: "/EtlEngineExport" 
    static_configs:
      - targets: ["127.0.0.1:10000"]

A service address /pushDataService is also exposed for generating data. postman debugging details are as follows:

	 
	 POST Method
	 URL  http://127.0.0.1:10000/pushDataService , 
	 Body x-www-form-urlencoded
	 Parameters:
		"jsondata":{
			"labels":{"deviceCode":"DeviceCode001","address":"District_1","desc":"Maximum value"},
			"value":100
		}

Two fields are automatically added to the output data stream; name is the name of the measure, VALUE is the value of prometheus,

PROMETHEUS_API_WRITER

Output Node - Write PROMETHEUS node

attribute description Suitable for
id Unique mark
type PROMETHEUS_API_WRITER
url prometheus server address For example:http://127.0.0.1:9090
metric Metric name
outputFields Input the name of the field passed by the node
renameOutputFields Name of the label corresponding to when prometheus was imported; data items correspond to the outputFields
valueField value of prometheus when it was imported into the library; the data entry corresponds to an existing field name in the input node
  <Node id="DB_INPUT_TABLE_1" type="DB_INPUT_TABLE" fetchSize="1000" dbConnection="CONNECT_1"  >
        <Script name="sqlScript">
            <![CDATA[select f2,f3,f4 from t_prome_info ]]>
        </Script>
    </Node>
    <Node id="PROMETHEUS_API_WRITER_1" type="PROMETHEUS_API_WRITER" url="http://127.0.0.1:9090" metric="my_device_info" outputFields="f2;f3" renameOutputFields="deviceCode;address" valueField="f4" >
	</Node>

HTTP_INPUT_SERVICE

Input node -Http node, blocking mode

attribute description Suitable for
id Unique mark
type HTTP_INPUT_SERVICE
serviceIp Bind the IP address of the HTTP/HTTPS service
servicePort Port bound to the HTTP/HTTPS service
serviceName Name of the exposed service Default:etlEngineService
serviceCertFile Location of HTTPS service certificate file
serviceKeyFile Location of the HTTPS service key file
	<Node id=""
		type="HTTP_INPUT_SERVICE"
		serviceIp=""
		servicePort="8081"
		serviceName="etlEngineService"
		serviceCertFile=""
		serviceKeyFile="" >
	</Node>
	
	
	postman debugging :
         http://127.0.0.1:8081/etlEngineService
	 POST Method,URL: /etlEngineService , Body:x-www-form-urlencoded
	 Parameters:
		"jsondata":{
			"rows":[
				{"deviceCode":"DeviceCode001","address":"Chaoyang District","desc":"Maximum value","value":20},
				{"deviceCode":"DeviceCode002","address":"Chaoyang District","desc":"Maximum value","value":18}
			]
		}
     Note: You must pass an array structure with rows as the KEY

ELASTIC_READER

Input node - Read elastic node

attribute description Suitable for
id Unique mark
type ELASTIC_READER
index Index name
sourceFields The name of the field output in the result set
fetchSize Number of records read per session
Script标签 DSL query syntax

sample

   <Node id="ELASTIC_READER_01" dbConnection="CONNECT_02"   
	type="ELASTIC_READER" desc="node 2"  sourceFields="custom_type;username;desc;address" fetchSize="2" >
        <Script name="sqlScript"><![CDATA[
            {
                  "query" : {
                       "bool":{
                          "must":[
                              //{
                                // "term": { "username.keyword": "Mr.Wang"  }
                             //     "match": { "username": ""  }
                             //  },
                               {
                                 "term":   {   "custom_type":"t_user_info"  }
                               }
                          ]
            
                       }
                    }
                }
        ]]></Script>
   </Node>
   
    <Connection id="CONNECT_02" type="ELASTIC" dbURL="http://127.0.0.1:9200" database="es_db3" username="elastic" password="******" />
  

ELASTIC_WRITER

Output node - Write elastic node

attribute description Suitable for
id Unique mark
type ELASTIC_WRITER
index Index name
idType Primary key output mode: 1 indicates that the id is not specified. The es system generates a 20-bit GUID by itself.
2 indicates the field name specified in idExpress, and the value matching the same field name is obtained from the renameOutputFields of the previous node.
3 indicates that the value is configured using the expression specified in idExpress. The _HW_UUID32 expression indicates that a primary key is automatically generated by a 32-bit UUID
idExpress For example, idType is set to 3. This parameter is set to _HW_UUID32
outputFields Input node the field name passed by the node when reading data Output field contents in sequence, not by field name
renameOutputFields The field name of the output node to the target data source Output field contents in sequence, not by field name

sample

  <Node id="DB_INPUT_01" dbConnection="CONNECT_01" type="DB_INPUT_TABLE" desc="node 1" fetchSize="2">
    <Script name="sqlScript"><![CDATA[
		        SELECT "t_user_info" AS custom_type,uname, udesc,uaddress,uid FROM t_u_info
]]></Script>
  </Node>

    <Node id="ELASTIC_WRITER_01" dbConnection="CONNECT_02"  type="ELASTIC_WRITER" desc="node 2"  
        outputFields="custom_type;uname;udesc;uaddress;uid" 
        renameOutputFields="custom_type;username;desc;address;uid"
        idType="3" 
        idExpress="_HW_UUID32">
      </Node>
      
       <Line id="LINE_01" type="STANDARD" from="DB_INPUT_01" to="ELASTIC_WRITER_01" order="0" metadata="METADATA_01"></Line>

  <Metadata id="METADATA_01">
    <Field name="custom_type" type="string" default="-1" nullable="false"/>
    <Field name="username" type="string" default="-1" nullable="false"/>
    <Field name="desc" type="string" default="-1" nullable="false"/>
    <Field name="address" type="string" default="-1" nullable="false"/>
    <Field name="uid" type="string" default="-1" nullable="false"/>
  </Metadata>
  
  <Connection id="CONNECT_02" type="ELASTIC" dbURL="http://127.0.0.1:9200" database="es_db3" username="elastic" password="******" />
  
      

HIVE_READER

Input node -Read hive node

Authentication

hive.server2.authentication = NONE
hive.server2.authentication = KERBEROS

attribute description
id Unique mark
type HIVE_READER
script sqlScript SQL statements
fetchSize Number of records read per session
dbConnection Data source ID
authFlag Authentication type: NONE or KERBEROS, default is NONE
krb5Principal Kerberos user name, such as: hive. server2. authentication. kerberos. principal=hive/ _HOST@EXAMPLE.COM Hive in
desc description

sample

    <Node id="HIVE_READER_01" dbConnection="CONNECT_01"   
	type="HIVE_READER" desc="node 1"  fetchSize="100" >
    <Script name="sqlScript"><![CDATA[
		 select * from db1.t_u_info  
]]></Script>
      </Node>
      
          <Connection id="CONNECT_01" 
          dbURL="127.0.0.1:10000" database="db1" 
          username="root" password="******" 
          batchSize="1000" type="HIVE"/>
      

Metadata

The metadata file defines the target data format (such as the name and type of the field corresponding to renameOutputFields or renameOutputTags defined in the output node). outputFields is the name of the field in the data result set in the input node. Converts a field defined by outputFields to a field defined by renameOutputFields. The renameOutputFields conversion format is defined in the metadata file.

attribute description Suitable for
id Unique mark
field
name The field name of the output data source renameOutputFields,
renameOutputTags
type The field type of the output data source string,int,int32,float,
str_timestamp,decimal,
datetime,timestamp
default Default value When nullable is false, if the output value is an empty string, you can specify the default value of the output by default
nullable Whether to allow null false cannot be empty. It must be used with default. true is allowed to be null.

Connection

attribute description Suitable for
id Unique mark
type Data source type INFLUXDB_V1、MYSQL、CLICKHOUSE、SQLITE、POSTGRES、ORACLE、ELASTIC
dbURL Connection address ck,mysql,influx,postgre,oracle,elastic
database Database name ck,mysql,influx,sqlite,postgre,oracle,elastic
username User name ck,mysql,influx,postgre,oracle,elastic
password password ck,mysql,influx,postgre,oracle,elastic
token token name influx 2x
org Organization name influx 2x
rp Name of the data retention policy influx 1x

Graph

attribute description Suitable for
runMode 1. Serial mode; 2 Parallel mode Parallel mode is recommended by default,
If you want the processes to execute in order, you can use the serial mode

Line

attribute description Suitable for
id Unique mark
from Input node Unique mark
to Output node Unique mark
type STANDARD one in, one out,COPY copies data streams and copies data in the intermediate link
order Serial sort numbers, in ascending order of positive integers. When the graph attribute runMode is 1,
Configure 0,1, and 2 to implement serial execution
metadata ID of the target metadata

You can configure global variables

Global variables are passed on the command line

etl_engine -fileUrl ./global6.xml -logLevel debug arg1="d:/test3.xlsx" arg2=上海

arg1 and arg2 are global variables passed in from the command line

  • Global variables are referenced in configuration files

    <Node id="DB_INPUT_01" dbConnection="CONNECT_01" type="DB_INPUT_TABLE" desc="node 1" fetchSize="500">
     <Script name="sqlScript"><![CDATA[
		         select * from (select * from t5 where tag_1='${arg2}' limit 1000)
    ]]></Script>

  <Node id="XLS_WRITER_01"   type="XLS_WRITER" desc="output node 2" appendRow="true"  fileURL="${arg1}"  startRow="3" metadataRow="2" sheetName="Personnel information" outputFields="c1;c3;tag_1"  renameOutputFields="Index=B;Year=C;Region=D"  >
 

${arg1} in the configuration file will be replaced with the value of arg1 d:/test3.xlsx at service runtime.
${arg2} in the configuration file will be replaced with the value of arg2 ShangHai service runtime

  • Built-in variable specification

To facilitate the generation of fixed formatted content, the system has built-in common variables, which can be used to dynamically replace variable values when configuring global variables.
Built-in variable prefix _HW_

  1. Time variable
    Format: _HW_YYYY-MM-DD hh:mm:ss.SSS
    The current system time is displayed, for example, 2022-01-02 19:33:06.108
    Note that Spaces are escaped by 0x32, so the correct way to pass them is
    _HW_YYYY-MM-DD0x32hh:mm:ss.SSS
    YYYY  Output four-bit Year  2022
    MM output two-bit month 01
    DD output two-bit day  02
    hh output two-bit hour 19
    mm  output two-bit minute  33
    ss output two-bit second  06
    .SSS  output a prefix. And three milliseconds .108

The above parts can be combined at will, for example, _HW_YYYYMM, output202201

  1. Time displacement variable
    On the basis of the original time variable, the capital Z character represents the addition or subtraction of time, minutes and seconds.


    For example, the format is: _HW_YYYY-MM-DD hh:mm:ss.SSSZ2h45m
    Indicates the time after 2 hours and 45 minutes.
    For example, the format is: _HW_YYYY-MM-DD hh:mm:ssZ-24h10m
    大A negative number follows the uppercase character Z to reduce the displacement.
    Output the time after the current time is reduced by 24 hours and 10 minutes.
    Support the time and frequency of the unit is as follows:
    "ns", "us" (or "µs"), "ms", "s", "m", "h"


    On the basis of the original time variable, the lowercase z character represents the addition or subtraction of the year, month, and day.


    For example, the format is :_HW_YYYY-MM-DD hh:mm:ssz1,2,3
    The current time is accumulated 1 year,2 months, and 3 days.
    For example, the format is:_HW_YYYY-MM-DD hhz-1,-2,-3
    The current output time is reduced by 1 year, 2 months and 3 days

  2. Timestamp variable
    format: _HW_timestamp10
    Displays the current 10-bit system timestamp, For example,1669450716
    Format: _HW_timestamp13
    Displays the current 13-bit system timestamp, for example,1669450716142
    Format: _HW_timestamp16
    Displays the 16-bit timestamp of the current system, for example, 1669450716142516
    Format: _HW_timestamp19
    Displays the 19-bit timestamp of the current system, for example, 1669450716142516700

  3. UUID variables
    format: _HW_UUID32
    output 32-bit UUID,for example,D54C3C7163844E4DB4F073E8EEC83328
    format: _HW_uuid32
    output 32-bit UUID,for example,d54c3c7163844e4dB4f073e8eec83328

  • Pass built-in variables on the command line

etl_engine -fileUrl ./global6.xml -logLevel debug arg1=_HW_YYYY-MM-DD0x32hh:mm:ss.SSS arg2=_HW_YYYY-MM-DD
  • Built-in variables are referenced in configuration files

    <Node id="DB_INPUT_01" dbConnection="CONNECT_01" type="DB_INPUT_TABLE" desc="node 1" fetchSize="500">
     <Script name="sqlScript"><![CDATA[
		         select * from (select * from t5 where tag_1='${arg1}' limit 1000)
    ]]></Script>

  <Node id="XLS_WRITER_01"   type="XLS_WRITER" desc="output node 2" appendRow="true"  fileURL="${arg2}.xlsx" _fileURL="d:/demo/test2.xlsx" startRow="3" metadataRow="2" sheetName="Personnel information" outputFields="c1;c3;tag_1"  renameOutputFields="Index=B;Year=C;Region=D"  >
 

Support for parsing embedded go language

You can embed your own business logic in the tag <BeforeOut></BeforeOut> of any output node,More introduction

  • Add field

Multiple fields can be added and given default values

package ext
import (
	"errors"
	"fmt"
	"strconv"
)
func RunScript(dataValue string) (result string, topErr error) {
	newRows := ""
	rows := gjson.Get(dataValue, "rows")
	for index, row := range rows.Array() {
	  	//tmpStr, _ := sjson.Set(row.String(), "addCol1", time.Now().Format("2006-01-02 15:04:05.000"))
		tmpStr, _ := sjson.Set(row.String(), "addCol1", "1")
		tmpStr, _ = sjson.Set(tmpStr, "addCol2", "${arg2}")
		newRows, _ = sjson.SetRaw(newRows, "rows.-1", tmpStr)
	}
	return newRows, nil
}
  • Merge field

Multiple fields can be combined into a single field

package ext
import (
	"errors"
	"fmt"
	"strconv"
)
func RunScript(dataValue string) (result string, topErr error) {
	newRows := ""
	rows := gjson.Get(dataValue, "rows")
	for index, row := range rows.Array() {
		area := gjson.Get(row.String(),"tag_1").String()
		year := gjson.Get(row.String(),"c3").String()
		tmpStr, _ := sjson.Set(row.String(), "tag_1", area + "_" + year)
		newRows, _ = sjson.SetRaw(newRows, "rows.-1", tmpStr)
	}
	return newRows, nil
}
  • Complete sample

<?xml version="1.0" encoding="UTF-8"?>
<Graph>
 
  <Node id="CSV_READER_01"   type="CSV_READER" desc="node 1" fetchSize="500"  fileURL="d:/demo.csv" startRow="1" fields="field1;field2;field3;field4"  fieldsIndex="0;1;2;3"  >
  </Node>
 
    <Node id="OUTPUT_TRASH_01"   type="OUTPUT_TRASH" desc="node 2"  >
        <BeforeOut>
            <![CDATA[
package ext
import (
	"errors"
	"fmt"
	"strconv"
	"strings"
	"time"
	"github.com/tidwall/gjson"
	"github.com/tidwall/sjson"
	"etl-engine/etl/tool/extlibs/common"
	"io/ioutil"
	"os"
)
func RunScript(dataValue string) (result string, topErr error) {
	defer func() {
		if topLevelErr := recover(); topLevelErr != nil {
			topErr = errors.New("RunScript Capture fatal error" + topLevelErr.(error).Error())
		} else {
			topErr = nil
		}
	}()
	newRows := ""
	GenLine(dataValue,"db1","autogen","t13","field2","field3;field4")
	return newRows, nil
}

//Received is JSON
func GenLine(dataValue string, db string, rp string, measurement string, fields string, tags string) error {
	head := "# DML\n# CONTEXT-DATABASE: " + db + "\n# CONTEXT-RETENTION-POLICY: " + rp + "\n\n"
	line := ""
    fieldLine := ""
    tagLine := ""
	_t_ := strings.Split(tags, ";")
	_f_ := strings.Split(fields, ";")
	rows := gjson.Get(dataValue, "rows")
	for _, row := range rows.Array() {
        fieldLine = ""
        tagLine = ""
		for i1 := 0; i1 < len(_t_); i1++ {
			tagValue := gjson.Get(row.String(), _t_[i1])
			tagLine = tagLine + _t_[i1] + "=\"" + tagValue.String() + "\","
		}
		tagLine = tagLine[0 : len(tagLine)-1]
		for i1 := 0; i1 < len(_f_); i1++ {
			fieldValue := gjson.Get(row.String(), _f_[i1])
			fieldLine = fieldLine + _f_[i1] + "=" + fieldValue.String() + ","
		}
		fieldLine = fieldLine[0 : len(fieldLine)-1]

		if len(tagLine) > 0 && len(fieldLine) > 0 {
		    line = line + measurement + "," + tagLine + " " + fieldLine + " " + strconv.FormatInt(time.Now().Add(500*time.Millisecond).UnixNano(), 10) + "\n"
        } else {
            
            if len(fieldLine) > 0 {
                line = line + measurement + "," + fieldLine + " " + strconv.FormatInt(time.Now().Add(500*time.Millisecond).UnixNano(), 10) + "\n"
            }
        }

	}

	if len(line) > 0 {
		txt := head + line
		fileName := "d:/"+strconv.FormatInt(time.Now().UnixNano(), 10)
		WriteFileToDB(fileName, txt)
		err1 := os.Remove(fileName)
			if err1 != nil {
				fmt.Println("delete temp file fail:", fileName)
				return err1
			}
	}
	return nil
}
func WriteFileToDB(fileName string, txt string) {

	buf := []byte(txt)
	err := ioutil.WriteFile(fileName, buf, 0666)
	if err != nil {
		fmt.Println("write file fail:", err)
		return
	} else {
		cmdLine := "D:/software/influxdb-1.8.10-1/influx.exe  -import -path=" + fileName + " -host 127.0.0.1 -port 58086 -username u1 -password 123456 -precision=ns"
		//fmt.Println("cmdLine:",cmdLine)
		common.Command3("GB18030", "cmd", "/c", cmdLine)

	}
}

              ]]>
        </BeforeOut>

    </Node>
    
  <Line id="LINE_01" type="STANDARD" from="CSV_READER_01" to="OUTPUT_TRASH_01" order="0" metadata="METADATA_03">线标注</Line>
    <Metadata id="METADATA_03">
        <Field name="field1" type="string" default="-1" nullable="false"/>
        <Field name="field2" type="string" default="-1" nullable="false"/>
        <Field name="field3" type="string" default="-1" nullable="false"/>
        <Field name="field4" type="string" default="-1" nullable="false"/>
    </Metadata>
   
</Graph>


Cooperative mode

Welcome docking and cooperation

etl-engine industry-wide connection...

```
 @auth Mr Huang
 mail:hw2499@sohu.com
 vx:weigeonlyyou
```
Clone this wiki locally