Skip to content

B 嵌入脚本开发

hw2499 edited this page Jun 27, 2024 · 6 revisions

总体结构

总体结构

第三方系统调用etl-engine

  1. etl-crontab暴露Http/Https API接口
    第三方系统可通过Http/Https方式调用上述接口运行etl-engine
  2. etl-engine运行HttpInput节点实现阻塞模式
    第三方系统可通过Http/Https方式向etl-engine提交数据
  3. etl-engine运行MQ消费者节点实现阻塞模式
    第三方系统可通过MQ生产者向etl-engine发送消息数据

etl-engine实现嵌入脚本

节点类型 节点描述 标签名称 接口格式 说明
Node 输出节点 BeforeOut func RunScript(dataValue string) (result string, topErr error) {} 数据写操作之前,可进行数据转换或其它操作
Node 输出节点 AfterOut func RunScript(dataValue string) (result string, topErr error) {} 数据写操作之后,可进行其它操作
Node 转换节点 BeforeOut func RunScript(dataValue string) (result string, topErr error) {} 数据输出之前,可进行数据转换或其它操作。
Node CUSTOM_READER_WRITER节点 AfterOut func RunScript() (result string, topErr error) {} 属于独立的一个自定义节点,只能输出到垃圾桶节点,此标签内可进行其它操作。
Line 线节点 BeforeRun func RunScript() (result bool, topErr error) {} 进行流程执行之前的判断工作,返回true就可以向下执行,返回false就退出向下执行。
Graph 根节点 BeforeRun func RunScript() (result bool, topErr error) {} 进行任务执行之前的判断工作,返回true就可以执行当前任务,返回false就结束当前任务执行。

脚本编写规则

  1. 根据对应标签名称去实现相应的接口,如上所述标签名称:
<BeforeOut/> 
<AfterOut/> 
<BeforeRun/>
  1. 脚本内包名称格式固定
 package ext 
  1. 根据实际需要引入常用工具包
  import (
	"errors"
	"fmt"
	"strconv"
	"time"
	"github.com/tidwall/gjson"
	"github.com/tidwall/sjson"
	"etl-engine/etl/tool/extlibs/common"
 )
  1. Line节点实现对应的接口样本
   <Line from="DB_INPUT_TABLE_2" to="DB_OUTPUT_TABLE_1" type="STANDARD"  metadata="METADATA_2"   id="LINE_STANDARD_2">

 <BeforeRun>
    <![CDATA[
package ext
import (
	"errors"
	"fmt"
	"strconv"
	"time"
	"github.com/tidwall/gjson"
	"github.com/tidwall/sjson"
		"etl-engine/etl/tool/extlibs/common"
)
func RunScript() (result bool, topErr error) {
		
  		 fmt.Println("common.GlobalVar.Vint:",common.GlobalVar.Vint)
                
		 if ( common.GlobalVar.Vint < 8 ){
                        fmt.Println("返回false")
			return false,nil
		 }else{
		        fmt.Println("返回true")
		        return true,nil
		 }
	 
}
]]>
    </BeforeRun>

  </Line>  

嵌入go脚本样本

下面展示在执行整个任务前,先判断当前系统时间是否大于等于2023-01-17 13:50:10,如果大于约定的时间则向下执行,否则结束整个任务的执行。

<?xml version="1.0" encoding="UTF-8"?>
<Graph fileName="" desc="" runMode="2">
    <BeforeRun>
        <![CDATA[package ext
import (
	"errors"
	"fmt"
	"strconv"
	"time"
	"github.com/tidwall/gjson"
	"github.com/tidwall/sjson"
	"etl-engine/etl/tool/extlibs/common"
)
func RunScript() (result bool, topErr error) {
  		 fmt.Println("common.GlobalVar.Vint:",common.GlobalVar.Vint)
		 strCurTime10 := "2023-01-17 13:50:10"//GetSystemTimeFormatExt("YYYY-MM-DD hh:mm:ss")
		 tmpTimeNumber10, _ := common.GetUnixTimeFromStrTimeExt("YYYY-MM-DD hh:mm:ss", strCurTime10, 1)
		 curTime := time.Now().Unix()
		 fmt.Println("约定系统时间:",strCurTime10)
		 fmt.Println("约定系统时间转换成时间戳:",tmpTimeNumber10)
		 fmt.Println("获取当前系统时间戳:",curTime)
		 if  curTime >= tmpTimeNumber10{
			return true,nil
		 }else{
			fmt.Println("Grpah运行前判断当前系统时间小于约定系统时间,不允许向下执行")
			return false,nil
		 }
		 //if( common.GlobalVar.Vint >8 ){
		 //   fmt.Println("Grpah运行前判断,不允许向下执行")
		 //	return false,nil
		 //}else{
		 //   return true,nil
		 //}
	 
}]]>
    </BeforeRun>
    <Node id="ELASTIC_READER_1" type="ELASTIC_READER" sourceFields="custom_type;username;desc;address" fetchSize="20" index="es_db3" dbConnection="CONNECT_1" desc="读elastic" x="72" y="56" isLink="true" sortId="1">
        <Script name="sqlScript">
            <![CDATA[{
	  "query" : {
		   "bool":{
			  "must":[
				  //{
					// "term": { "username.keyword": "王先生"  }
				 //     "match": { "username": ""  }
				 //  },
				   {
					 "term":   {   "custom_type":"t_user_info"  }
				   }
			  ]

		   }
		}
	}]]>
        </Script>
    </Node>
    <Node id="ELASTIC_WRITER_1" type="ELASTIC_WRITER" dbConnection="CONNECT_1" index="es_db4" idType="2" idExpress="id" outputFields="custom_type;username;desc;address;_id" renameOutputFields="custom_type;username;desc;address;id" desc="写elastic" x="306" y="92" isLink="true" sortId="1"/>
    <Line from="ELASTIC_READER_1" to="ELASTIC_WRITER_1" type="STANDARD" order="0" metadata="METADATA_1" sortId="1" id="LINE_STANDARD_1"/>
    <Metadata id="METADATA_1" sortId="1">
        <Field name="custom_type" type="string" default="" nullable="true"/>
        <Field name="username" type="string" default="" nullable="true"/>
        <Field name="desc" type="string" default="" nullable="true"/>
        <Field name="address" type="string" default="" nullable="true"/>
        <Field name="id" type="string" default="" nullable="true"/>
    </Metadata>
    <Connection sortId="1" id="CONNECT_1" type="ELASTIC" dbURL="http://127.0.0.1:9200" database="es_db3" username="elastic" password="123456" token="" org=""/>
</Graph>

公共库

  1. 引入包 "etl-engine/etl/tool/extlibs/common"

  2. 该包所提供的公共方法

    • Command3
      执行命令行

      cmdLine := "ls -l"
      common.Command3("GB18030", "cmd", "/c", cmdLine) 
    • GetUnixTimeFromStrTimeExt
      将指定的字符串格式的时间转换成时间戳
      第1个参数是时间格式,YYYY四位年,MM两位月,DD两位日,hh两位小时,mm两位分钟,ss两位秒,SSS三位毫秒
      第2个参数是符合第1个参数格式的字符串日期。
      第3个参数1代表返回10位数字时间戳,2代表返回13位数字时间戳,3代表返回16位数字时间戳

      strTmpTime10 := "2022-01-17 08:00:00.000"
      tmpTimeNumber10, _ := GetUnixTimeFromStrTimeExt("YYYY-MM-DD hh:mm:ss.SSS", strTmpTime10, 1)
      fmt.Println(strTmpTime10, "->转换10位时间戳", tmpTimeNumber10)
    
    • GetSystemTimeFormatExt

      获取系统时间格式
      内容为 YYYY-MM-DD hh:mm:ss.SSS

         t := common.GetSystemTimeFormatExt("YYYY-MM-DD hh")
         fmt.Println("当前系统时间:",t)
    • ParseJsonToArray
      将json转换成二维数组(注意json格式必须由rows键值组成)

       newRows := `
           {"rows":[
           {"name":"李先生","address":"长春","memo","Java或Go"},
           {"name":"王女士","address":"北京","memo","大数据"}
           ]}`
       fields := "address;name"
       array:=common.ParseJsonToArray(newRows,fields)
       fmt.Println("array:",array)
      
    • CommonQuery
      查询数据表

        newRows, err:=common.CommonQuery("CONNECT_1","select * from t_1")
    • CommonWriteDB
      写数据表(clickhouse,mysql,oracle,postgre,sqlite)
    newRows := `
         {"rows":[
         {"name":"李先生","address":"长春","memo","Java或Go"},
         {"name":"王女士","address":"北京","memo","大数据"}
         ]}`
    fields := "address;name"
    array:=common.ParseJsonToArray(newRows,fields)
    fmt.Println("array:",array)
    err3:=common.CommonWriteDB("CONNECT_1","insert into t_4 (f1,f2) values (?,?)",array)
    if err3!=nil {
        fmt.Println("err3:",err3)
    }
    
    • CommonExecSqlDB
      执行SQL语句(clickhouse,mysql,oracle,postgre,sqlite)
    _,err3:=common.CommonExecSqlDB("CONNECT_1","insert into t_4 (f1,f2) values (1,'客户1');delete from t_4")
    if err3!=nil {
    fmt.Println("err3:",err3)
    }
    • GetLogger
      获取日志操作能力,通过Infoln或Debugln或Traceln输出日志信息
    if err1!=nil{
        common.GetLogger().Infoln("查询失败:",err1) 
    }
    • GetUUID
      返回uuid
     uuid := common.GetUUID() 
    
    • CommonFusionQuery
      融合查询
    //融合查询语法
    federationQuery := "SELECT t_u.u_id,t_u.u_name,t_u.u_phone,t_o.o_id,t_o.o_price,t_o.o_number,t_o.o_money,t_o.o_writetime,t_p.p_id,t_p.p_name,t_p.p_contacts,t_p.p_desc  FROM  t_u inner JOIN  t_o  ON t_o.u_id=t_u.u_id inner JOIN  t_p ON t_o.p_id=t_p.p_id ORDER BY  INTEGER(SUBSTRING(t_u.u_id,3))  ASC "
    //表别名
    tableAliasName := []string{"t_o","t_u","t_p"}
    
    jsonRows := []string{
    	gjson.Parse(newRows).Get("rows").String(),
    	gjson.Parse(userInfoRows).Get("rows").String(),
    	gjson.Parse(productInfoRows).Get("rows").String(),
    	}
    
    
    newRows,err1 := common.CommonFusionQuery(federationQuery,tableAliasName,jsonRows)
    if err1!=nil{
    	fmt.Println("融合查询失败:",err1)
    	return "",err1
    }else{
    	return newRows,nil
    }
    
    • SendMail
      发送邮件
    from := "11@qq.com"
    to := []string{"22@sohu.com"}
    cc := []string{"11@qq.com"}
    bcc := []string{}
    subject := "测试发送邮件" + time.Now().Format("2006-01-02 15:04:05")
    text := []byte("您好:这是一封测试邮件,无需回复")
    //html := []byte("<h1>您好,XXX先生:</h1><p>验证码:<font color='red'>00182</font><p>")
    
    common.sendMail(from, to, cc, bcc, subject, text, []byte(""), nil)
    
    • Zip/GZip
      压缩 文件或目录
     err := common.GZip([]string{"d:/tmp/a.xml"}, "d:/tmp/g.gz")
    
    • UnZip/UnGZip
      解压缩文件
     err := common.UnGZip("d:/tmp/t_u_o_p_info.txt.gz", "d:/tmp/a")
    
    • MD5
      md5算法加密字符串
         code := common.MD5( "你好全世界")
         fmt.Println("md5加密:",code)

公共变量

  1. 公共变量的生命周期作用于每个etl-engine任务运行开始到运行结束之间。
  2. 引入包 "etl-engine/etl/tool/extlibs/common"
  3. 该包所提供的公共变量
    • GlobalVar
      Vint      int64
      Vbool     bool
      Vstring   string
      Vfloat    float64
      Vmap      map[string]string
      Vinteface interface{}
      ViArray   []int64
      VsArray   []string
      VbArray   []bool
      VfArray   []float64
  1. 脚本中调用公共变量
  common.GlobalVar.Vint = common.GlobalVar.Vint + 1
  fmt.Println("common.GlobalVar.Vint:",common.GlobalVar.Vint)