Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding configs for zk client timeout #9975

Merged
merged 2 commits into from
Dec 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.services.ServiceRole;
import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -37,7 +38,6 @@ private ServiceStartableUtils() {
private static final String CLUSTER_CONFIG_ZK_PATH_TEMPLATE = "/%s/CONFIGS/CLUSTER/%s";
private static final String PINOT_ALL_CONFIG_KEY_PREFIX = "pinot.all.";
private static final String PINOT_INSTANCE_CONFIG_KEY_PREFIX_TEMPLATE = "pinot.%s.";
private static final int ZK_TIMEOUT_MS = 30_000;

/**
* Applies the ZK cluster config to the given instance config if it does not already exist.
Expand All @@ -47,10 +47,19 @@ private ServiceStartableUtils() {
*/
public static void applyClusterConfig(PinotConfiguration instanceConfig, String zkAddress, String clusterName,
ServiceRole serviceRole) {

ZkClient zkClient = new ZkClient.Builder().setZkSerializer(new ZNRecordSerializer()).setZkServer(zkAddress)
.setConnectionTimeout(ZK_TIMEOUT_MS).build();
zkClient.waitUntilConnected(ZK_TIMEOUT_MS, TimeUnit.MILLISECONDS);
int zkClientSessionConfig =
instanceConfig.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_SESSION_TIMEOUT_MS_CONFIG,
CommonConstants.Helix.ZkClient.DEFAULT_SESSION_TIMEOUT_MS);
int zkClientConnectionTimeoutMs =
instanceConfig.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_CONNECTION_TIMEOUT_MS_CONFIG,
CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_MS);
ZkClient zkClient = new ZkClient.Builder()
.setZkSerializer(new ZNRecordSerializer())
.setZkServer(zkAddress)
.setConnectionTimeout(zkClientConnectionTimeoutMs)
.setSessionTimeout(zkClientSessionConfig)
.build();
zkClient.waitUntilConnected(zkClientConnectionTimeoutMs, TimeUnit.MILLISECONDS);

try {
ZNRecord clusterConfigZNRecord =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ private void setUpPinotController() {

// Set up Pinot cluster in Helix if needed
HelixSetupUtils.setupPinotCluster(_helixClusterName, _helixZkURL, _isUpdateStateModel, _enableBatchMessageMode,
_config.getLeadControllerResourceRebalanceStrategy());
_config);

// Start all components
initPinotFSFactory();
Expand Down Expand Up @@ -422,7 +422,8 @@ private void setUpPinotController() {

if (_config.getHLCTablesAllowed()) {
LOGGER.info("Realtime tables with High Level consumers will be supported");
_realtimeSegmentsManager = new PinotRealtimeSegmentManager(_helixResourceManager, _leadControllerManager);
_realtimeSegmentsManager =
new PinotRealtimeSegmentManager(_helixResourceManager, _leadControllerManager, _config);
_realtimeSegmentsManager.start(_controllerMetrics);
} else {
LOGGER.info("Realtime tables with High Level consumers will NOT be supported");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.pinot.common.utils.SegmentName;
import org.apache.pinot.common.utils.config.TableConfigUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
Expand Down Expand Up @@ -81,11 +82,13 @@ public class PinotRealtimeSegmentManager implements HelixPropertyListener, IZkCh
private ZkClient _zkClient;
private ControllerMetrics _controllerMetrics;
private final LeadControllerManager _leadControllerManager;
private final ControllerConf _controllerConf;

public PinotRealtimeSegmentManager(PinotHelixResourceManager pinotManager,
LeadControllerManager leadControllerManager) {
LeadControllerManager leadControllerManager, ControllerConf controllerConf) {
_pinotHelixResourceManager = pinotManager;
_leadControllerManager = leadControllerManager;
_controllerConf = controllerConf;
String clusterName = _pinotHelixResourceManager.getHelixClusterName();
_propertyStorePath = PropertyPathBuilder.propertyStore(clusterName);
_tableConfigPath = _propertyStorePath + TABLE_CONFIG;
Expand All @@ -96,9 +99,19 @@ public void start(ControllerMetrics controllerMetrics) {

LOGGER.info("Starting realtime segments manager, adding a listener on the property store table configs path.");
String zkUrl = _pinotHelixResourceManager.getHelixZkURL();
_zkClient = new ZkClient(zkUrl, ZkClient.DEFAULT_SESSION_TIMEOUT, ZkClient.DEFAULT_CONNECTION_TIMEOUT);
int zkClientSessionTimeoutMs =
_controllerConf.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_SESSION_TIMEOUT_MS_CONFIG,
CommonConstants.Helix.ZkClient.DEFAULT_SESSION_TIMEOUT_MS);
int zkClientConnectionTimeoutMs =
_controllerConf.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_CONNECTION_TIMEOUT_MS_CONFIG,
CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_MS);
_zkClient = new ZkClient.Builder()
.setZkServer(zkUrl)
.setSessionTimeout(zkClientSessionTimeoutMs)
.setConnectionTimeout(zkClientConnectionTimeoutMs)
.build();
_zkClient.setZkSerializer(new ZNRecordSerializer());
_zkClient.waitUntilConnected(CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_SEC, TimeUnit.SECONDS);
_zkClient.waitUntilConnected(zkClientConnectionTimeoutMs, TimeUnit.MILLISECONDS);

// Subscribe to any data/child changes to property
_zkClient.subscribeChildChanges(_tableConfigPath, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.pinot.common.utils.helix.LeadControllerUtils;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.helix.core.PinotHelixBrokerResourceOnlineOfflineStateModelGenerator;
import org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
import org.apache.pinot.spi.utils.CommonConstants;
Expand Down Expand Up @@ -92,11 +93,22 @@ private static void setupHelixClusterIfNeeded(String helixClusterName, String zk
}

public static void setupPinotCluster(String helixClusterName, String zkPath, boolean isUpdateStateModel,
boolean enableBatchMessageMode, String leadControllerResourceRebalanceStrategy) {
boolean enableBatchMessageMode, ControllerConf controllerConf) {
ZkClient zkClient = null;
int zkClientSessionConfig =
controllerConf.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_SESSION_TIMEOUT_MS_CONFIG,
CommonConstants.Helix.ZkClient.DEFAULT_SESSION_TIMEOUT_MS);
int zkClientConnectionTimeoutMs =
controllerConf.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_CONNECTION_TIMEOUT_MS_CONFIG,
CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_MS);
try {
zkClient = new ZkClient.Builder().setZkServer(zkPath).setZkSerializer(new ZNRecordSerializer()).build();
zkClient.waitUntilConnected(ZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
zkClient = new ZkClient.Builder()
.setZkServer(zkPath)
.setSessionTimeout(zkClientSessionConfig)
.setConnectionTimeout(zkClientConnectionTimeoutMs)
.setZkSerializer(new ZNRecordSerializer())
.build();
zkClient.waitUntilConnected(zkClientConnectionTimeoutMs, TimeUnit.MILLISECONDS);
HelixAdmin helixAdmin = new ZKHelixAdmin(zkClient);
HelixDataAccessor helixDataAccessor =
new ZKHelixDataAccessor(helixClusterName, new ZkBaseDataAccessor<>(zkClient));
Expand All @@ -113,7 +125,7 @@ public static void setupPinotCluster(String helixClusterName, String zkPath, boo

// Add lead controller resource if needed
createLeadControllerResourceIfNeeded(helixClusterName, helixAdmin, configAccessor, enableBatchMessageMode,
leadControllerResourceRebalanceStrategy);
controllerConf.getLeadControllerResourceRebalanceStrategy());
} finally {
if (zkClient != null) {
zkClient.close();
Expand Down
20 changes: 0 additions & 20 deletions pinot-query-runtime/src/test/resources/queries/Cast.json

This file was deleted.

50 changes: 0 additions & 50 deletions pinot-query-runtime/src/test/resources/queries/MathFuncs.json
Original file line number Diff line number Diff line change
Expand Up @@ -535,55 +535,5 @@
"sql": "SELECT longCol / 1e20 FROM {numTbl}"
}
]
},
"round": {
"tables": {
"numTbl": {
"schema": [
{"name": "intCol", "type": "INT"},
{"name": "longCol", "type": "LONG"},
{"name": "doubleCol", "type": "DOUBLE"},
{"name": "floatCol", "type": "FLOAT"}
],
"inputs": [
[0, 3, 0.123, 3.2],
[123, 321, 4.242, 3.03],
[-456, -2, 1.134, 7.722],
[123, -456, 3.634, 9.12]
]
}
},
"queries": [
{
"description": "test round on integer columns",
"ignored": true,
"comment": "we round the number up somehow",
"sql": "SELECT round(intCol, 2) FROM {numTbl}"
},
{
"description": "test round on long columns",
"ignored": true,
"comment": "we round the number up somehow",
"sql": "SELECT round(longCol, 2) FROM {numTbl}"
},
{
"description": "test round on double columns",
"ignored": true,
"comment": "double is rounded to 0",
"sql": "SELECT round(doubleCol, 2) FROM {numTbl}"
},
{
"description": "test ceil on float columns",
"ignored": true,
"comment": "float is rounded to 0",
"sql": "SELECT round(floatCol, 2) FROM {numTbl}"
},
{
"ignored": true,
"comment": "Caught exception while initializing transform function: round",
"description": "test round on literal",
"sql": "SELECT round(2.0, 0) FROM {numTbl}"
}
]
}
}
40 changes: 0 additions & 40 deletions pinot-query-runtime/src/test/resources/queries/NullHanlding.json

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@
},
"queries": [
{ "sql": "SELECT * FROM {tbl} WHERE intCol > 5" },
{ "sql": "SELECT * FROM {tbl} WHERE strCol = 'foo'" },
{ "sql": "SELECT * FROM {tbl} WHERE strCol IN ('foo', 'bar')" },
{ "sql": "SELECT * FROM {tbl} WHERE intCol IN (196883, 42)" },
{ "sql": "SELECT * FROM {tbl} WHERE intCol IN (111, 222)" },
{ "sql": "SELECT * FROM {tbl} WHERE intCol NOT IN (196883, 42) AND strCol IN ('alice')" },
{ "sql": "SELECT * FROM {tbl} WHERE strCol IN (SELECT strCol FROM {tbl} WHERE intCol > 100)" },
{ "sql": "SELECT * FROM {tbl} WHERE intCol < (SELECT SUM(intCol) FROM {tbl} AS b WHERE strCol BETWEEN 'bar' AND 'foo')" },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,13 @@ public static class BrokerResourceStateModel {
}

public static class ZkClient {
public static final long DEFAULT_CONNECT_TIMEOUT_SEC = 60L;
public static final int DEFAULT_CONNECT_TIMEOUT_MS = 60_000;
public static final int DEFAULT_SESSION_TIMEOUT_MS = 30_000;
// Retry interval and count for ZK operations where we would rather fail than get an empty (wrong) result back
public static final int RETRY_INTERVAL_MS = 50;
public static final int RETRY_COUNT = 2;
public static final String ZK_CLIENT_CONNECTION_TIMEOUT_MS_CONFIG = "zk.client.connection.timeout.ms";
public static final String ZK_CLIENT_SESSION_TIMEOUT_MS_CONFIG = "zk.client.session.timeout.ms";
}

public static class DataSource {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public boolean execute()
ZkClient zkClient = new ZkClient(_zkAddress);
zkClient.setZkSerializer(new ZNRecordStreamingSerializer());
LOGGER.info("Connecting to Zookeeper at: {}", _zkAddress);
zkClient.waitUntilConnected(CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_SEC, TimeUnit.SECONDS);
zkClient.waitUntilConnected(CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
ZkBaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<>(zkClient);
ZKHelixDataAccessor zkHelixDataAccessor = new ZKHelixDataAccessor(_clusterName, baseDataAccessor);
PropertyKey property = zkHelixDataAccessor.keyBuilder().liveInstances();
Expand Down