diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java index e9b6ec3c9a0..cb24a4d5d82 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java @@ -67,6 +67,7 @@ import org.apache.pinot.core.transport.ListenerConfig; import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager; import org.apache.pinot.core.util.ListenerConfigUtil; +import org.apache.pinot.query.service.QueryConfig; import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.metrics.PinotMetricUtils; @@ -132,6 +133,9 @@ public void init(PinotConfiguration brokerConf) _clusterName = brokerConf.getProperty(Helix.CONFIG_OF_CLUSTER_NAME); ServiceStartableUtils.applyClusterConfig(_brokerConf, _zkServers, _clusterName, ServiceRole.BROKER); + if (_brokerConf.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, QueryConfig.DEFAULT_QUERY_RUNNER_PORT) == 0) { + _brokerConf.setProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, NetUtils.findOpenPort()); + } setupHelixSystemProperties(); _listenerConfigs = ListenerConfigUtil.buildBrokerConfigs(brokerConf); _hostname = brokerConf.getProperty(Broker.CONFIG_OF_BROKER_HOSTNAME); @@ -391,9 +395,19 @@ public void start() private void updateInstanceConfigAndBrokerResourceIfNeeded() { InstanceConfig instanceConfig = HelixHelper.getInstanceConfig(_participantHelixManager, _instanceId); boolean updated = HelixHelper.updateHostnamePort(instanceConfig, _hostname, _port); + + ZNRecord znRecord = instanceConfig.getRecord(); + Map simpleFields = znRecord.getSimpleFields(); if (_tlsPort > 0) { HelixHelper.updateTlsPort(instanceConfig, _tlsPort); } + // Update multi-stage query engine ports + if (_brokerConf.getProperty(Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, Helix.DEFAULT_MULTI_STAGE_ENGINE_ENABLED)) { + updated |= updatePortIfNeeded(simpleFields, Helix.Instance.MULTI_STAGE_QUERY_ENGINE_MAILBOX_PORT_KEY, + Integer.parseInt(_brokerConf.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT))); + } else { + updated |= updatePortIfNeeded(simpleFields, Helix.Instance.MULTI_STAGE_QUERY_ENGINE_MAILBOX_PORT_KEY, -1); + } updated |= HelixHelper.removeDisabledPartitions(instanceConfig); boolean shouldUpdateBrokerResource = false; String brokerTag = null; @@ -462,6 +476,25 @@ private String getDefaultBrokerId() { } } + private boolean updatePortIfNeeded(Map instanceConfigSimpleFields, String key, int port) { + String existingPortStr = instanceConfigSimpleFields.get(key); + if (port > 0) { + String portStr = Integer.toString(port); + if (!portStr.equals(existingPortStr)) { + LOGGER.info("Updating '{}' for instance: {} to: {}", key, _instanceId, port); + instanceConfigSimpleFields.put(key, portStr); + return true; + } + } else { + if (existingPortStr != null) { + LOGGER.info("Removing '{}' from instance: {}", key, _instanceId); + instanceConfigSimpleFields.remove(key); + return true; + } + } + return false; + } + @Override public void stop() { LOGGER.info("Shutting down Pinot broker"); diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index 9ebc6ea6b46..a1686aa47af 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -93,7 +93,8 @@ public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerId reducerHostname = brokerId; } _reducerHostname = reducerHostname; - _reducerPort = config.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, QueryConfig.DEFAULT_QUERY_RUNNER_PORT); + // This config has to be set to a valid port number. + _reducerPort = Integer.parseInt(config.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT)); _defaultBrokerTimeoutMs = config.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_TIMEOUT_MS, CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS); _queryEnvironment = new QueryEnvironment(new TypeFactory(new TypeSystem()), diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java index d4d27634f94..a4d69ee714e 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java @@ -150,15 +150,12 @@ public DataTableImplV4(ByteBuffer byteBuffer) } // Read variable size data. + _variableSizeDataBytes = new byte[variableSizeDataLength]; if (variableSizeDataLength != 0) { - _variableSizeDataBytes = new byte[variableSizeDataLength]; byteBuffer.position(variableSizeDataStart); byteBuffer.get(_variableSizeDataBytes); - _variableSizeData = ByteBuffer.wrap(_variableSizeDataBytes); - } else { - _variableSizeDataBytes = null; - _variableSizeData = null; } + _variableSizeData = ByteBuffer.wrap(_variableSizeDataBytes); // Read metadata. int metadataLength = byteBuffer.getInt(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderFactory.java index 2bf0426a787..967233b2ee5 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderFactory.java @@ -34,7 +34,7 @@ private DataTableBuilderFactory() { private static final Logger LOGGER = LoggerFactory.getLogger(DataTableBuilderFactory.class); - public static final int DEFAULT_VERSION = DataTableFactory.VERSION_3; + public static final int DEFAULT_VERSION = DataTableFactory.VERSION_4; private static int _version = DEFAULT_VERSION; diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/AllNullQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/AllNullQueriesTest.java index 960bde9ccac..f345e99f585 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/AllNullQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/AllNullQueriesTest.java @@ -26,7 +26,6 @@ import java.util.List; import java.util.Map; import org.apache.commons.io.FileUtils; -import org.apache.pinot.common.datatable.DataTableFactory; import org.apache.pinot.common.response.broker.BrokerResponseNative; import org.apache.pinot.common.response.broker.ResultTable; import org.apache.pinot.common.utils.DataSchema; @@ -285,7 +284,6 @@ public void testQueriesWithNoDictStringColumn() public void testQueries(ColumnDataType columnDataType, File indexDir) throws IOException { - DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4); Map queryOptions = new HashMap<>(); queryOptions.put("enableNullHandling", "true"); DataType dataType = columnDataType.toDataType(); diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/BigDecimalQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/BigDecimalQueriesTest.java index d44a266b02a..42978199c99 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/BigDecimalQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/BigDecimalQueriesTest.java @@ -28,7 +28,6 @@ import java.util.Map; import java.util.Random; import org.apache.commons.io.FileUtils; -import org.apache.pinot.common.datatable.DataTableFactory; import org.apache.pinot.common.response.broker.BrokerResponseNative; import org.apache.pinot.common.response.broker.ResultTable; import org.apache.pinot.common.utils.DataSchema; @@ -160,7 +159,6 @@ public void testQueriesWithNoDictColumn() } public void testQueries() { - DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4); Map queryOptions = new HashMap<>(); queryOptions.put("enableNullHandling", "true"); { diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/BooleanNullEnabledQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/BooleanNullEnabledQueriesTest.java index 63443449d95..e707bf0fde2 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/BooleanNullEnabledQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/BooleanNullEnabledQueriesTest.java @@ -27,7 +27,6 @@ import java.util.List; import java.util.Map; import org.apache.commons.io.FileUtils; -import org.apache.pinot.common.datatable.DataTableFactory; import org.apache.pinot.common.response.broker.BrokerResponseNative; import org.apache.pinot.common.response.broker.ResultTable; import org.apache.pinot.common.utils.DataSchema; @@ -181,7 +180,6 @@ public void testQueriesWithNoDictColumn() } public void testQueries() { - DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4); Map queryOptions = new HashMap<>(); queryOptions.put("enableNullHandling", "true"); HashSet trueIndices = new HashSet(Arrays.asList(1, 3, 5)); diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/NullEnabledQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/NullEnabledQueriesTest.java index a93cd88a71f..f279487fa61 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/NullEnabledQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/NullEnabledQueriesTest.java @@ -28,7 +28,6 @@ import java.util.Map; import java.util.Random; import org.apache.commons.io.FileUtils; -import org.apache.pinot.common.datatable.DataTableFactory; import org.apache.pinot.common.response.broker.BrokerResponseNative; import org.apache.pinot.common.response.broker.ResultTable; import org.apache.pinot.common.utils.DataSchema; @@ -281,7 +280,6 @@ public void testQueriesWithNoDictDoubleColumnNoNullValues() } public void testQueries(Number baseValue, ColumnDataType dataType, boolean nullValuesExist) { - DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4); Map queryOptions = new HashMap<>(); queryOptions.put("enableNullHandling", "true"); { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineCustomTenantIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineCustomTenantIntegrationTest.java index 599b95f3410..a9f204a3655 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineCustomTenantIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineCustomTenantIntegrationTest.java @@ -26,14 +26,9 @@ import org.apache.commons.io.FileUtils; import org.apache.pinot.client.Connection; import org.apache.pinot.client.ConnectionFactory; -import org.apache.pinot.common.datatable.DataTableFactory; import org.apache.pinot.controller.ControllerConf; -import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; -import org.apache.pinot.query.service.QueryConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; -import org.apache.pinot.spi.env.PinotConfiguration; -import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.util.TestUtils; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -97,9 +92,6 @@ public void setUp() // Wait for all documents loaded waitForAllDocsLoaded(600_000L); - - // Setting data table version to 4 - DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4); } @Test @@ -137,24 +129,6 @@ protected Connection getPinotConnection() { return _pinotConnection; } - @Override - protected void overrideBrokerConf(PinotConfiguration brokerConf) { - brokerConf.setProperty(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, true); - brokerConf.setProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, 8421); - } - - @Override - protected void overrideServerConf(PinotConfiguration serverConf) { - serverConf.setProperty(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, true); - serverConf.setProperty(QueryConfig.KEY_OF_QUERY_SERVER_PORT, 8842); - serverConf.setProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, 8422); - } - - @Override - protected void overrideControllerConf(Map properties) { - properties.put(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, true); - } - @Override protected void testQuery(String pinotQuery, String h2Query) throws Exception { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java index 6dabf814d8c..fa5e51b7e2c 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java @@ -25,13 +25,8 @@ import org.apache.commons.io.FileUtils; import org.apache.pinot.client.Connection; import org.apache.pinot.client.ConnectionFactory; -import org.apache.pinot.common.datatable.DataTableFactory; -import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; -import org.apache.pinot.query.service.QueryConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; -import org.apache.pinot.spi.env.PinotConfiguration; -import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.util.TestUtils; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -78,9 +73,6 @@ public void setUp() // Wait for all documents loaded waitForAllDocsLoaded(600_000L); - - // Setting data table version to 4 - DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4); } @Test @@ -129,19 +121,6 @@ protected Connection getPinotConnection() { return _pinotConnection; } - @Override - protected void overrideBrokerConf(PinotConfiguration brokerConf) { - brokerConf.setProperty(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, true); - brokerConf.setProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, 8421); - } - - @Override - protected void overrideServerConf(PinotConfiguration serverConf) { - serverConf.setProperty(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, true); - serverConf.setProperty(QueryConfig.KEY_OF_QUERY_SERVER_PORT, 8842); - serverConf.setProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, 8422); - } - @Override protected void testQuery(String pinotQuery, String h2Query) throws Exception { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java index c93b37835f9..16b8307ecfe 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java @@ -23,7 +23,6 @@ import java.util.List; import javax.annotation.Nullable; import org.apache.commons.io.FileUtils; -import org.apache.pinot.common.datatable.DataTableFactory; import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; import org.apache.pinot.util.TestUtils; import org.testng.annotations.AfterClass; @@ -196,7 +195,6 @@ public void testCaseWithIsNotDistinctFrom() @Test public void testTotalCountWithNullHandlingQueryOptionEnabled() throws Exception { - DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4); String pinotQuery = "SELECT COUNT(*) FROM " + getTableName() + " option(enableNullHandling=true)"; String h2Query = "SELECT COUNT(*) FROM " + getTableName(); testQuery(pinotQuery, h2Query); @@ -210,7 +208,6 @@ public void testTotalCountWithNullHandlingQueryOptionEnabled() @Test public void testNullLiteralSelectionOnlyBroker() throws Exception { - DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4); // Null literal only String sqlQuery = "SELECT null FROM mytable OPTION(enableNullHandling=true)"; JsonNode response = postQuery(sqlQuery, _brokerBaseApiUrl); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java index 1a77e109298..d4920c2c159 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -49,6 +49,7 @@ import org.apache.http.message.BasicNameValuePair; import org.apache.pinot.client.PinotConnection; import org.apache.pinot.client.PinotDriver; +import org.apache.pinot.common.datatable.DataTableFactory; import org.apache.pinot.common.exception.HttpErrorStatusException; import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; @@ -56,6 +57,7 @@ import org.apache.pinot.common.utils.ServiceStatus; import org.apache.pinot.common.utils.SimpleHttpResponse; import org.apache.pinot.common.utils.http.HttpClient; +import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; import org.apache.pinot.core.operator.query.NonScanBasedAggregationOperator; import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair; import org.apache.pinot.spi.config.instance.InstanceType; @@ -165,6 +167,7 @@ protected String getSchemaFileName() { @BeforeClass public void setUp() throws Exception { + DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_3); TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); // Start the Pinot cluster diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SSBQueryIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SSBQueryIntegrationTest.java index 5776f2c82bb..5bbd692aad4 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SSBQueryIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SSBQueryIntegrationTest.java @@ -32,13 +32,8 @@ import org.apache.pinot.client.Connection; import org.apache.pinot.client.ConnectionFactory; import org.apache.pinot.client.ResultSetGroup; -import org.apache.pinot.common.datatable.DataTableFactory; -import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; -import org.apache.pinot.query.service.QueryConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; -import org.apache.pinot.spi.env.PinotConfiguration; -import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.tools.utils.JarUtils; import org.apache.pinot.util.TestUtils; import org.slf4j.Logger; @@ -103,9 +98,6 @@ public void setUp() // H2 ClusterIntegrationTestUtils.setUpH2TableWithAvro(Collections.singletonList(dataFile), tableName, _h2Connection); } - - // Setting data table version to 4 - DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4); } @Test(dataProvider = "QueryDataProvider") @@ -170,19 +162,6 @@ protected Connection getPinotConnection() { return _pinotConnection; } - @Override - protected void overrideBrokerConf(PinotConfiguration brokerConf) { - brokerConf.setProperty(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, true); - brokerConf.setProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, 8421); - } - - @Override - protected void overrideServerConf(PinotConfiguration serverConf) { - serverConf.setProperty(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, true); - serverConf.setProperty(QueryConfig.KEY_OF_QUERY_SERVER_PORT, 8842); - serverConf.setProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, 8422); - } - @AfterClass public void tearDown() throws Exception { diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java index de8cc2288ac..649a2bee054 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java @@ -26,7 +26,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; -import org.apache.pinot.common.datatable.DataTableFactory; import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; import org.apache.pinot.query.QueryEnvironmentTestBase; import org.apache.pinot.query.QueryServerEnclosure; @@ -95,7 +94,6 @@ public static List buildRows(String tableName) { @BeforeClass public void setUp() throws Exception { - DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4); MockInstanceDataManagerFactory factory1 = new MockInstanceDataManagerFactory("server1") .registerTable(SCHEMA_BUILDER.setSchemaName("a").build(), "a_REALTIME") .registerTable(SCHEMA_BUILDER.setSchemaName("b").build(), "b_REALTIME") diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java index a38d7e083aa..4911f3b57f2 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java @@ -38,7 +38,6 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.pinot.common.datatable.DataTable; -import org.apache.pinot.common.datatable.DataTableFactory; import org.apache.pinot.common.response.broker.BrokerResponseNativeV2; import org.apache.pinot.common.response.broker.BrokerResponseStats; import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; @@ -75,8 +74,6 @@ public class ResourceBasedQueriesTest extends QueryRunnerTestBase { @BeforeClass public void setUp() throws Exception { - DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4); - // Setting up mock server factories. // All test data are loaded upfront b/c the mock server and brokers needs to be in sync. MockInstanceDataManagerFactory factory1 = new MockInstanceDataManagerFactory("server1"); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java index e9fdab9511d..0f77155dbed 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java @@ -68,6 +68,7 @@ import org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager; import org.apache.pinot.core.transport.ListenerConfig; import org.apache.pinot.core.util.ListenerConfigUtil; +import org.apache.pinot.query.service.QueryConfig; import org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneIndexRefreshState; import org.apache.pinot.segment.spi.memory.PinotDataBuffer; import org.apache.pinot.server.access.AccessControlFactory; @@ -170,6 +171,12 @@ public void init(PinotConfiguration serverConf) // NOTE: Need to add the instance id to the config because it is required in HelixInstanceDataManagerConfig _serverConf.addProperty(Server.CONFIG_OF_INSTANCE_ID, _instanceId); } + if (_serverConf.getProperty(QueryConfig.KEY_OF_QUERY_SERVER_PORT, QueryConfig.DEFAULT_QUERY_SERVER_PORT) == 0) { + _serverConf.setProperty(QueryConfig.KEY_OF_QUERY_SERVER_PORT, NetUtils.findOpenPort()); + } + if (_serverConf.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, QueryConfig.DEFAULT_QUERY_RUNNER_PORT) == 0) { + _serverConf.setProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, NetUtils.findOpenPort()); + } _instanceConfigScope = new HelixConfigScopeBuilder(ConfigScopeProperty.PARTICIPANT, _helixClusterName).forParticipant(_instanceId) diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 46b0474d267..7a84ef257b6 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -194,7 +194,7 @@ public static class Instance { public static final String CONFIG_OF_PINOT_MINION_STARTABLE_CLASS = "pinot.minion.startable.class"; public static final String CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED = "pinot.multistage.engine.enabled"; - public static final boolean DEFAULT_MULTI_STAGE_ENGINE_ENABLED = false; + public static final boolean DEFAULT_MULTI_STAGE_ENGINE_ENABLED = true; } public static class Broker { diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/MultistageEngineQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/MultistageEngineQuickStart.java index 690e99990b9..226f3dc5189 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/MultistageEngineQuickStart.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/MultistageEngineQuickStart.java @@ -21,7 +21,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.pinot.spi.utils.CommonConstants; @@ -91,14 +90,6 @@ public String[] getDefaultBatchTableDirectories() { return MULTI_STAGE_TABLE_DIRECTORIES; } - @Override - public Map getConfigOverrides() { - Map overrides = new HashMap<>(super.getConfigOverrides()); - overrides.put("pinot.multistage.engine.enabled", "true"); - overrides.put("pinot.server.instance.currentDataTableVersion", 4); - return overrides; - } - @Override protected int getNumQuickstartRunnerServers() { return 3; diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java index cac14a4239d..e1e80e8ae02 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java @@ -55,10 +55,6 @@ public class QuickstartRunner { private static final int DEFAULT_SERVER_GRPC_PORT = 7100; private static final int DEFAULT_MINION_PORT = 6000; - private static final int DEFAULT_BROKER_MULTISTAGE_RUNNER_PORT = 8421; - private static final int DEFAULT_SERVER_MULTISTAGE_RUNNER_PORT = 8442; - private static final int DEFAULT_SERVER_MULTISTAGE_SERVER_PORT = 8842; - private static final String DEFAULT_ZK_DIR = "PinotZkDir"; private static final String DEFAULT_CONTROLLER_DIR = "PinotControllerDir"; private static final String DEFAULT_SERVER_DATA_DIR = "PinotServerDataDir"; @@ -143,7 +139,6 @@ private void startBrokers() for (int i = 0; i < _numBrokers; i++) { StartBrokerCommand brokerStarter = new StartBrokerCommand(); brokerStarter.setPort(DEFAULT_BROKER_PORT + i) - .setBrokerMultiStageRunnerPort(DEFAULT_BROKER_MULTISTAGE_RUNNER_PORT + i) .setZkAddress(_zkExternalAddress != null ? _zkExternalAddress : ZK_ADDRESS).setClusterName(CLUSTER_NAME) .setConfigOverrides(_configOverrides); if (!brokerStarter.execute()) { @@ -159,8 +154,6 @@ private void startServers() StartServerCommand serverStarter = new StartServerCommand(); serverStarter.setPort(DEFAULT_SERVER_NETTY_PORT + i).setAdminPort(DEFAULT_SERVER_ADMIN_API_PORT + i) .setGrpcPort(DEFAULT_SERVER_GRPC_PORT + i) - .setMultiStageServerPort(DEFAULT_SERVER_MULTISTAGE_SERVER_PORT + i) - .setMultiStageRunnerPort(DEFAULT_SERVER_MULTISTAGE_RUNNER_PORT + i) .setZkAddress(_zkExternalAddress != null ? _zkExternalAddress : ZK_ADDRESS).setClusterName(CLUSTER_NAME) .setDataDir(new File(_tempDir, DEFAULT_SERVER_DATA_DIR + i).getAbsolutePath()) .setSegmentDir(new File(_tempDir, DEFAULT_SERVER_SEGMENT_DIR + i).getAbsolutePath())