Skip to content

Client User Guide

myksl edited this page Apr 27, 2022 · 3 revisions

1.Prerequisites

Before starting the client, you need to ensure:

  1. The server has been started, refer to QuickStart
  2. zookeeper has been started and obtained the address zk-ip:zk-port

2. Start mode one

Execute the git command to get the ptubes code

git clone git@github.com:meituan/ptubes.git

Find the class in the IDE

com.meituan.ptubes.sdk.example.ExampleByMainFunction

Replace the zookeeper address in ExampleByMainFunction with the real address zk-ip:zk-port, and modify the reader address, subscription list, etc. as needed. An example is as follows:

public <T> T getConfig(String confName, Class<T> confClass) {
    // ignore confName, when there is only one implement for subscript or consumer config in task.
    try {
        // ...
        if (confClass.isAssignableFrom(PtubesSdkSubscriptionConfig.class)) {
            return (T) new PtubesSdkSubscriptionConfig(
                    "demoR1", // reader taskName, eg. demoR1
                    taskName, // SDK task name
                    "127.0.0.1:2181", // change the zookeeper address to a real address.
                    "test_test.wm_risk_list_account,test_test.test_table" // tables for subscription
            );
        }
        // ...

    } catch (Exception e) {
        System.out.println("getConfig error for confName:{" + confName + "}, confClass:{" + confClass + "}");
        e.printStackTrace();
    }
    return null;
}

3. Start mode two

Run the compiled client

  1. Download the compiled SDK client ptubes-sdk-client.tar.gz
  2. Unzip the client archive
  3. Check the directory and configuration
    The structure of the compressed package is as follows:
drwxr-xr-x 4 admin root 12:00 bin
drwxr-xr-x 5 admin root 12:00 conf
drwxr-xr-x 4 admin root 12:10 lib

The bin directory stores start and stop scripts, the conf directory stores dependent configuration files, and the lib directory stores required packages.
./conf/ptubes_demo_task.properties is the configuration file for the example task, where ptubes.sdk.zookeeper.address=127.0.0.1:2181 should be replaced with the real zookeeper address zk-ip:zk-port that was started sdk.conf

ptubes.sdk.task.set=ptubes_demo_task // Each task name needs to create a corresponding configuration file, separated by commas

ptubes_demo_task.properties

ptubes.sdk.reader.name=demoR1 // The corresponding Task name in the reader, such as demoR1
ptubes.sdk.task.name=ptubes_demo_task // SDK's own task name
ptubes.sdk.zookeeper.address=127.0.0.1:2181 // The corresponding zk address
ptubes.sdk.subs=test.test_table// Subscription library table names, separated by commas
ptubes.sdk.reader.ip=127.0.0.1:28332 // Corresponding Reader addresses, separated by commas
  1. Execute
# This command will run all sdk tasks listed in the ./conf/sdk.conf file
# At the same time, the configuration of these tasks also needs to exist in ./conf/${sdkTaskName}.properties
cd bin && sh start.sh
  1. Stop
# This command will stop all running SDK tasks on the machine
sh stop.sh

4. Start mode three

Create a new maven project and depend on the ptubes project

  1. Create a new maven project and add dependencies
<dependency>
    <groupId>com.meituan</groupId>
    <artifactId>ptubes-sdk</artifactId>
    <version>1.0.0</version>
</dependency>
  1. Add two configuration files in the project resource directory and adjust the parameters sdk.conf
ptubes.sdk.task.set=ptubes_demo_task

ptubes_demo_task.properties

# The corresponding Task name in the reader, such as demoR1
ptubes.sdk.reader.name=demoR1

#SDK's own task name
ptubes.sdk.task.name=ptubes_demo_task

# Corresponding zk address
ptubes.sdk.zookeeper.address=127.0.0.1:2181

# Subscription library table names, separated by commas
ptubes.sdk.subs=test.test_table

# Corresponding Reader addresses, separated by commas
ptubes.sdk.reader.ip=127.0.0.1:28332
  1. Add a new class TestMain
package com.example.buffalo;

import com.meituan.ptubes.common.utils.PbJsonUtil;
import com.meituan.ptubes.sdk.IPtubesConnector;
import com.meituan.ptubes.common.log.LoggerFactory;
import com.meituan.ptubes.sdk.IRdsCdcEventListener;
import com.meituan.ptubes.sdk.RdsCdcEventStatus;
import com.meituan.ptubes.sdk.config.notification.IConfigChangeNotifier;
import com.meituan.ptubes.sdk.RdsCdcConnectorFactory;
import com.meituan.ptubes.sdk.config.notification.SimpleLocalFileConfigChangeNotifier;
import com.meituan.ptubes.sdk.protocol.RdsPacket;

import java.util.List;

public class TestMain {
    public static void main(String[] args) {
        String taskName = "ptubes_demo_task";
        // set log file directory
        System.setProperty(LoggerFactory.DEFAULT_LOG_DIR_PROPERTY, TestMain.class.getResource("/").getPath() + "/logs");
        // set log type
        System.setProperty(LoggerFactory.DEFAULT_LOG_TYPE_PROPERTY, "log4j2");
        System.out.println(System.getProperty(LoggerFactory.DEFAULT_LOG_DIR_PROPERTY));
        IPtubesConnector rdsCdcConnector = null;
        IConfigChangeNotifier iConfigChangeNotifier = new SimpleLocalFileConfigChangeNotifier(taskName);

        try {
            rdsCdcConnector = RdsCdcConnectorFactory.buildMySQLConnector(taskName, iConfigChangeNotifier, new IRdsCdcEventListener() {

                @Override
                public RdsCdcEventStatus onEvents(List<RdsPacket.RdsEvent> events) {
                    for (RdsPacket.RdsEvent event : events) {
                        System.out.println(event.getRowData()
                                .getBeforeColumnsMap()
                                .get("id"));
                        System.out.println(event.getRowData()
                                .getAfterColumnsMap()
                                .get("id"));
                        System.out.println(PbJsonUtil.printToStringDefaultNull(event));
                    }

                    return RdsCdcEventStatus.SUCCESS;
                }
            });

            rdsCdcConnector.startup();
            long eachSleepTime = 30000L;
            long sleepTimeCount = 300000L;

            do {
                Thread.sleep(eachSleepTime);
                sleepTimeCount -= eachSleepTime;
                try {
                    if (null != rdsCdcConnector) {

                        /**
                         * hot to get a buffalo sdk task runtime info
                         */
                        System.out.println(rdsCdcConnector.getConnectorMonitorInfo());
                    }
                } catch (Exception e) {
                    System.out.println(e);
                }

            } while (sleepTimeCount > 0);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // rdsCdcConnector.shutdown();
        }
    }
}
  1. Execute the TestMain.main method. and observe the console output