Skip to content

客户端使用指南

ShanceWang edited this page Apr 13, 2022 · 11 revisions

1.Prerequisite

在启动客户端之前,需要保证:

  1. 服务端已经启动,参考 QuickStart
  2. zookeeper已启动,并获得地址 zk-ip:zk-port

2.启动方式一

执行git命令,获取ptubes代码

git clone git@github.com:meituan/ptubes.git

在IDE中找到类

com.meituan.ptubes.sdk.example.ExampleByMainFunction

替换ExampleByMainFunction 中zookeeper地址为真实地址zk-ip:zk-port,并按需修改reader地址,订阅列表等。示例如下:

public <T> T getConfig(String confName, Class<T> confClass) {
    // ignore confName, when there is only one implement for subscript or consumer config in task.
    try {
        // ... 
        if (confClass.isAssignableFrom(PtubesSdkSubscriptionConfig.class)) {
            return (T) new PtubesSdkSubscriptionConfig(
                    "demoR1",  // reader taskName, eg. demoR1
                    taskName, // SDK task name
                    "127.0.0.1:2181", // change the zookeeper address to a real address.
                    "test_test.wm_risk_list_account,test_test.test_table" // tables for subscription
            );
        }
        // ... 

    } catch (Exception e) {
        System.out.println("getConfig error for confName:{" + confName + "}, confClass:{" + confClass + "}");
        e.printStackTrace();
    }
    return null;
}

3.启动方式二

运行编译好的客户端

  1. 下载编译好的SDK客户端 ptubes-sdk-client.tar.gz
  2. 解压客户端压缩包
  3. 检查目录和配置
    压缩包内的结构如下:
drwxr-xr-x   4 admin  root  12:00 bin
drwxr-xr-x   5 admin  root  12:00 conf
drwxr-xr-x   4 admin  root  12:10 lib

bin目录存储启动和停止脚本, conf目录存储依赖的配置文件, lib目录存储所需要的程序包。
./conf/ptubes_demo_task.properties 是示例任务的配置文件,其中 ptubes.sdk.zookeeper.address=127.0.0.1:2181应该替换为已启动的真实zookeeper地址zk-ip:zk-port sdk.conf

ptubes.sdk.task.set=ptubes_demo_task // 每个任务名都需要创建一个对应的配置文件, 以逗号分隔

ptubes_demo_task.properties

ptubes.sdk.reader.name=demoR1 // reader中对应的Task名字, 比如 demoR1
ptubes.sdk.task.name=ptubes_demo_task // SDK自身的任务名
ptubes.sdk.zookeeper.address=127.0.0.1:2181 // 对应的zk地址
ptubes.sdk.subs=test.test_table// 订阅的库表名, 以逗号分隔
ptubes.sdk.reader.ip=127.0.0.1:28332 // 对应的Reader地址, 以逗号分隔
  1. 执行
# 此命令会运行 ./conf/sdk.conf 文件中所列出的所有sdk任务
# 同时这些任务的配置也需要在./conf/${sdkTaskName}.properties 中存在
cd bin && sh start.sh
  1. 停止
# 此命令会停止本机上所有在运行中的SDK任务
sh stop.sh

4.启动方式三

新建maven工程并依赖ptubes项目

  1. 新建maven工程并添加依赖
<dependency>
    <groupId>com.meituan</groupId>
    <artifactId>ptubes-sdk</artifactId>
    <version>1.0.0</version>
</dependency>
  1. 在工程 resource 目录下新增两个配置文件并调整参数 sdk.conf
ptubes.sdk.task.set=ptubes_demo_task

ptubes_demo_task.properties

# reader中对应的Task名字, 比如 demoR1
ptubes.sdk.reader.name=demoR1

#SDK自身的任务名
ptubes.sdk.task.name=ptubes_demo_task

# 对应的zk地址
ptubes.sdk.zookeeper.address=127.0.0.1:2181

# 订阅的库表名, 以逗号分隔
ptubes.sdk.subs=test.test_table

# 对应的Reader地址, 以逗号分隔
ptubes.sdk.reader.ip=127.0.0.1:28332
  1. 新增一个类TestMain
package com.example.buffalo;

import com.meituan.ptubes.common.utils.PbJsonUtil;
import com.meituan.ptubes.sdk.IPtubesConnector;
import com.meituan.ptubes.common.log.LoggerFactory;
import com.meituan.ptubes.sdk.IRdsCdcEventListener;
import com.meituan.ptubes.sdk.RdsCdcEventStatus;
import com.meituan.ptubes.sdk.config.notification.IConfigChangeNotifier;
import com.meituan.ptubes.sdk.RdsCdcConnectorFactory;
import com.meituan.ptubes.sdk.config.notification.SimpleLocalFileConfigChangeNotifier;
import com.meituan.ptubes.sdk.protocol.RdsPacket;

import java.util.List;

public class TestMain {
    public static void main(String[] args) {
        String taskName = "ptubes_demo_task";
        // set log file directory
        System.setProperty(LoggerFactory.DEFAULT_LOG_DIR_PROPERTY, TestMain.class.getResource("/").getPath()  + "/logs");
        // set log type
        System.setProperty(LoggerFactory.DEFAULT_LOG_TYPE_PROPERTY, "log4j2");
        System.out.println(System.getProperty(LoggerFactory.DEFAULT_LOG_DIR_PROPERTY));
        IPtubesConnector rdsCdcConnector = null;
        IConfigChangeNotifier iConfigChangeNotifier = new SimpleLocalFileConfigChangeNotifier(taskName);

        try {
            rdsCdcConnector = RdsCdcConnectorFactory.buildMySQLConnector(taskName, iConfigChangeNotifier, new IRdsCdcEventListener() {

                @Override
                public RdsCdcEventStatus onEvents(List<RdsPacket.RdsEvent> events) {
                    for (RdsPacket.RdsEvent event : events) {
                        System.out.println(event.getRowData()
                                .getBeforeColumnsMap()
                                .get("id"));
                        System.out.println(event.getRowData()
                                .getAfterColumnsMap()
                                .get("id"));
                        System.out.println(PbJsonUtil.printToStringDefaultNull(event));
                    }

                    return RdsCdcEventStatus.SUCCESS;
                }
            });

            rdsCdcConnector.startup();
            long eachSleepTime = 30000L;
            long sleepTimeCount = 300000L;

            do {
                Thread.sleep(eachSleepTime);
                sleepTimeCount -= eachSleepTime;
                try {
                    if (null != rdsCdcConnector) {

                        /**
                         * hot to get a buffalo sdk task runtime info
                         */
                        System.out.println(rdsCdcConnector.getConnectorMonitorInfo());
                    }
                } catch (Exception e) {
                    System.out.println(e);
                }

            } while (sleepTimeCount > 0);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // rdsCdcConnector.shutdown();
        }
    }
}
  1. 执行TestMain.main方法。并观察控制台输出