Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace 101tec ZkClient with Helix ZkClient #909

Merged
merged 5 commits into from
Aug 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ project(':datastream-server-api') {
project(':datastream-utils') {
dependencies {
compile project(':datastream-common')
compile "com.101tec:zkclient:$zkclientVersion"
compile "org.apache.helix:zookeeper-api:$helixZkclientVersion"
compile "com.google.guava:guava:$guavaVersion"
testCompile project(":datastream-kafka")
testCompile project(":datastream-testcommon")
Expand Down Expand Up @@ -318,7 +318,6 @@ project(':datastream-client') {
project(':datastream-server') {

dependencies {
compile "com.101tec:zkclient:$zkclientVersion"
compile "com.fasterxml.jackson.core:jackson-annotations:$jacksonVersion"
compile "com.fasterxml.jackson.core:jackson-core:$jacksonVersion"
compile "com.fasterxml.jackson.core:jackson-databind:$jacksonVersion"
Expand Down Expand Up @@ -355,7 +354,6 @@ project(':datastream-server-restli') {
compile "com.linkedin.pegasus:restli-netty-standalone:$pegasusVersion"
compile "com.linkedin.pegasus:r2-jetty:$pegasusVersion"
compile "com.linkedin.parseq:parseq:$parseqVersion"
compile "com.101tec:zkclient:$zkclientVersion"
compile "com.fasterxml.jackson.core:jackson-annotations:$jacksonVersion"
compile "com.fasterxml.jackson.core:jackson-core:$jacksonVersion"
compile "com.fasterxml.jackson.core:jackson-databind:$jacksonVersion"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.exception.ZkException;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.zookeeper.zkclient.IZkChildListener;
import org.apache.helix.zookeeper.zkclient.IZkDataListener;
import org.apache.helix.zookeeper.zkclient.IZkStateListener;
import org.apache.helix.zookeeper.zkclient.exception.ZkException;
import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
Expand Down Expand Up @@ -1831,9 +1831,9 @@ public void handleStateChanged(Watcher.Event.KeeperState state) {
}

@Override
public void handleNewSession() {
public void handleNewSession(final String sessionId) {
synchronized (_zkSessionLock) {
LOG.info("ZkStateChangeListener::A new session has been established.");
LOG.info("ZkStateChangeListener::A new session with ID {} has been established.", sessionId);
if (_reinitOnNewSession) {
onNewSession();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,27 @@
package com.linkedin.datastream.common.zk;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.Stack;

import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.apache.helix.zookeeper.zkclient.exception.ZkMarshallingError;
import org.apache.helix.zookeeper.zkclient.exception.ZkNodeExistsException;
import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;


/**
* ZKClient is a wrapper of {@link org.I0Itec.zkclient.ZkClient}. It provides the following
* ZKClient is a wrapper of {@link org.apache.helix.zookeeper.impl.client.ZkClient}. It provides the following
* basic features:
* <ol>
* <li>tolerate network reconnects so the caller doesn't have to handle the retries</li>
* <li>provide a String serializer since we only need to store JSON strings in ZooKeeper</li>
* <li>additional features like ensurePath to recursively create paths</li>
* </ol>
*/
public class ZkClient extends org.I0Itec.zkclient.ZkClient {
public class ZkClient extends org.apache.helix.zookeeper.impl.client.ZkClient {
public static final String ZK_PATH_SEPARATOR = "/";
public static final int DEFAULT_CONNECTION_TIMEOUT = 60 * 1000;
public static final int DEFAULT_SESSION_TIMEOUT = 30 * 1000;
Expand Down Expand Up @@ -86,71 +78,20 @@ public ZkClient(String zkServers, int sessionTimeoutMs, int connectionTimeoutMs,
_zkSessionTimeoutMs = sessionTimeoutMs;
}

@Override
public void close() throws ZkInterruptedException {
if (LOG.isTraceEnabled()) {
StackTraceElement[] calls = Thread.currentThread().getStackTrace();
LOG.trace("closing zkclient. callStack: {}", Arrays.asList(calls));
}
getEventLock().lock();
try {
if (_connection == null) {
return;
}
LOG.info("closing zkclient: {}", ((ZkConnection) _connection).getZookeeper());
super.close();
} catch (ZkInterruptedException e) {
/*
* Workaround for HELIX-264: calling ZkClient#disconnect() in its own eventThread context will
* throw ZkInterruptedException and skip ZkConnection#disconnect()
*/
try {
/*
* ZkInterruptedException#construct() honors InterruptedException by calling
* Thread.currentThread().interrupt(); clear it first, so we can safely disconnect the
* zk-connection
*/
Thread.interrupted();
_connection.close();
/*
* restore interrupted status of current thread
*/
Thread.currentThread().interrupt();
} catch (InterruptedException e1) {
throw new ZkInterruptedException(e1);
}
} finally {
getEventLock().unlock();
LOG.info("closed zkclient");
}
}

/**
* Check if a zk path exists. Changes the access modified to public, its defined as protected in parent class.
*/
@Override
public boolean exists(final String path, final boolean watch) {
long startT = System.nanoTime();

try {
return retryUntilConnected(() -> _connection.exists(path, watch));
} finally {
long endT = System.nanoTime();
if (LOG.isTraceEnabled()) {
LOG.trace("exists, path: {}, time: {} ns", path, (endT - startT));
}
}
return super.exists(path, watch);
}

/**
* Get all children of zk path. Changes the access modified to public, its defined as protected in parent class.
*/
@Override
public List<String> getChildren(final String path, final boolean watch) {
long startT = System.nanoTime();

try {
return retryUntilConnected(() -> _connection.getChildren(path, watch));
} finally {
long endT = System.nanoTime();
if (LOG.isTraceEnabled()) {
LOG.trace("getChildren, path: {}, time: {} ns", path, (endT - startT));
}
}
return super.getChildren(path, watch);
}

/**
Expand Down Expand Up @@ -212,80 +153,6 @@ public String ensureReadData(final String path) {
return ensureReadData(path, _zkSessionTimeoutMs);
}

@Override
@SuppressWarnings("unchecked")
protected <T extends Object> T readData(final String path, final Stat stat, final boolean watch) {
long startT = System.nanoTime();
try {
byte[] data = retryUntilConnected(() -> _connection.readData(path, stat, watch));
return (T) deserialize(data);
} finally {
long endT = System.nanoTime();
if (LOG.isTraceEnabled()) {
LOG.trace("readData, path: {}, time: {} ns", path, (endT - startT));
}
}
}

@Override
public void writeData(final String path, Object data, final int expectedVersion) {
long startT = System.nanoTime();
try {
final byte[] bytes = serialize(data);

retryUntilConnected(() -> {
_connection.writeData(path, bytes, expectedVersion);
return null;
});
} finally {
long endT = System.nanoTime();
if (LOG.isTraceEnabled()) {
LOG.trace("writeData, path: {}, time: {} ns", path, (endT - startT));
}
}
}

@Override
public String create(final String path, Object data, final CreateMode mode) throws RuntimeException {
if (path == null) {
throw new IllegalArgumentException("path must not be null.");
}

long startT = System.nanoTime();
try {
final byte[] bytes = data == null ? null : serialize(data);

return retryUntilConnected(() -> _connection.create(path, bytes, mode));
} finally {
long endT = System.nanoTime();
if (LOG.isTraceEnabled()) {
LOG.trace("create, path: {}, time: {} ns", path, (endT - startT));
}
}
}

@Override
public boolean delete(final String path) {
long startT = System.nanoTime();
try {
try {
retryUntilConnected(() -> {
_connection.delete(path);
return null;
});

return true;
} catch (ZkNoNodeException e) {
return false;
}
} finally {
long endT = System.nanoTime();
if (LOG.isTraceEnabled()) {
LOG.trace("delete, path: {}, time: {} ns", path, (endT - startT));
}
}
}

/**
* Ensure that all the paths in the given full path String are created
* @param path the zk path
Expand Down Expand Up @@ -347,11 +214,6 @@ public <T extends Object> T deserialize(byte[] data) {
return (T) _zkSerializer.deserialize(data);
}

@VisibleForTesting
public long getSessionId() {
return ((ZkConnection) _connection).getZookeeper().getSessionId();
}

private static class ZKStringSerializer implements ZkSerializer {
@Override
public byte[] serialize(Object data) throws ZkMarshallingError {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
import java.io.IOException;
import java.util.List;

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.apache.helix.zookeeper.zkclient.IZkChildListener;
import org.apache.helix.zookeeper.zkclient.IZkDataListener;
import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -63,6 +64,30 @@ public void testReadAndWriteRoundTrip() throws Exception {
zkClient.close();
}

@Test
public void testCreateNoNodeException() throws Exception {
ZkClient zkClient = new ZkClient(_zkConnectionString);

String electionPath = "/leaderelection";
String electionNodeName = electionPath + "/coordinator-";

// now create this node with persistent mode
Assert.assertThrows(ZkNoNodeException.class,
() -> zkClient.create(electionNodeName, "test", CreateMode.PERSISTENT_SEQUENTIAL));
}

@Test
public void testCreateIllegalArgumentException() throws Exception {
ZkClient zkClient = new ZkClient(_zkConnectionString);

String electionPath = "/leaderelection";
String electionNodeName = electionPath + "/coordinator-";

// now create this node with persistent mode
Assert.assertThrows(NullPointerException.class,
() -> zkClient.create(null, "test", CreateMode.PERSISTENT_SEQUENTIAL));
}

@Test
public void testCreateEphemeralSequentialNode() throws Exception {
ZkClient zkClient = new ZkClient(_zkConnectionString);
Expand Down
3 changes: 1 addition & 2 deletions gradle/dependency-versions.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ ext {
scalaVersion = "2.12"
slf4jVersion = "1.7.5"
testngVersion = "7.1.0"
zkclientVersion = "0.11"
zookeeperVersion = "3.4.13"
helixZkclientVersion = "1.0.1"
helixZkclientVersion = "1.0.2"
}
2 changes: 1 addition & 1 deletion gradle/maven.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
allprojects {
version = "3.0.0"
version = "4.0.0"
}

subprojects {
Expand Down