Skip to content

Client User Guide

myksl edited this page Apr 27, 2022 · 3 revisions


Before starting the client, you need to ensure:

  1. The server has been started, refer to QuickStart
  2. zookeeper has been started and obtained the address zk-ip:zk-port

2. Start mode one

Execute the git command to get the ptubes code

git clone

Find the class in the IDE


Replace the zookeeper address in ExampleByMainFunction with the real address zk-ip:zk-port, and modify the reader address, subscription list, etc. as needed. An example is as follows:

public <T> T getConfig(String confName, Class<T> confClass) {
    // ignore confName, when there is only one implement for subscript or consumer config in task.
    try {
        // ...
        if (confClass.isAssignableFrom(PtubesSdkSubscriptionConfig.class)) {
            return (T) new PtubesSdkSubscriptionConfig(
                    "demoR1", // reader taskName, eg. demoR1
                    taskName, // SDK task name
                    "", // change the zookeeper address to a real address.
                    "test_test.wm_risk_list_account,test_test.test_table" // tables for subscription
        // ...

    } catch (Exception e) {
        System.out.println("getConfig error for confName:{" + confName + "}, confClass:{" + confClass + "}");
    return null;

3. Start mode two

Run the compiled client

  1. Download the compiled SDK client ptubes-sdk-client.tar.gz
  2. Unzip the client archive
  3. Check the directory and configuration
    The structure of the compressed package is as follows:
drwxr-xr-x 4 admin root 12:00 bin
drwxr-xr-x 5 admin root 12:00 conf
drwxr-xr-x 4 admin root 12:10 lib

The bin directory stores start and stop scripts, the conf directory stores dependent configuration files, and the lib directory stores required packages.
./conf/ is the configuration file for the example task, where ptubes.sdk.zookeeper.address= should be replaced with the real zookeeper address zk-ip:zk-port that was started sdk.conf

ptubes.sdk.task.set=ptubes_demo_task // Each task name needs to create a corresponding configuration file, separated by commas // The corresponding Task name in the reader, such as demoR1 // SDK's own task name
ptubes.sdk.zookeeper.address= // The corresponding zk address
ptubes.sdk.subs=test.test_table// Subscription library table names, separated by commas
ptubes.sdk.reader.ip= // Corresponding Reader addresses, separated by commas
  1. Execute
# This command will run all sdk tasks listed in the ./conf/sdk.conf file
# At the same time, the configuration of these tasks also needs to exist in ./conf/${sdkTaskName}.properties
cd bin && sh
  1. Stop
# This command will stop all running SDK tasks on the machine

4. Start mode three

Create a new maven project and depend on the ptubes project

  1. Create a new maven project and add dependencies
  1. Add two configuration files in the project resource directory and adjust the parameters sdk.conf

# The corresponding Task name in the reader, such as demoR1

#SDK's own task name

# Corresponding zk address

# Subscription library table names, separated by commas

# Corresponding Reader addresses, separated by commas
  1. Add a new class TestMain
package com.example.buffalo;

import com.meituan.ptubes.common.utils.PbJsonUtil;
import com.meituan.ptubes.sdk.IPtubesConnector;
import com.meituan.ptubes.common.log.LoggerFactory;
import com.meituan.ptubes.sdk.IRdsCdcEventListener;
import com.meituan.ptubes.sdk.RdsCdcEventStatus;
import com.meituan.ptubes.sdk.config.notification.IConfigChangeNotifier;
import com.meituan.ptubes.sdk.RdsCdcConnectorFactory;
import com.meituan.ptubes.sdk.config.notification.SimpleLocalFileConfigChangeNotifier;
import com.meituan.ptubes.sdk.protocol.RdsPacket;

import java.util.List;

public class TestMain {
    public static void main(String[] args) {
        String taskName = "ptubes_demo_task";
        // set log file directory
        System.setProperty(LoggerFactory.DEFAULT_LOG_DIR_PROPERTY, TestMain.class.getResource("/").getPath() + "/logs");
        // set log type
        System.setProperty(LoggerFactory.DEFAULT_LOG_TYPE_PROPERTY, "log4j2");
        IPtubesConnector rdsCdcConnector = null;
        IConfigChangeNotifier iConfigChangeNotifier = new SimpleLocalFileConfigChangeNotifier(taskName);

        try {
            rdsCdcConnector = RdsCdcConnectorFactory.buildMySQLConnector(taskName, iConfigChangeNotifier, new IRdsCdcEventListener() {

                public RdsCdcEventStatus onEvents(List<RdsPacket.RdsEvent> events) {
                    for (RdsPacket.RdsEvent event : events) {

                    return RdsCdcEventStatus.SUCCESS;

            long eachSleepTime = 30000L;
            long sleepTimeCount = 300000L;

            do {
                sleepTimeCount -= eachSleepTime;
                try {
                    if (null != rdsCdcConnector) {

                         * hot to get a buffalo sdk task runtime info
                } catch (Exception e) {

            } while (sleepTimeCount > 0);
        } catch (Exception e) {
        } finally {
            // rdsCdcConnector.shutdown();
  1. Execute the TestMain.main method. and observe the console output