Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Turn on v2 engine by default #10543

Merged
merged 3 commits into from
Apr 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.apache.pinot.core.transport.ListenerConfig;
import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
import org.apache.pinot.core.util.ListenerConfigUtil;
import org.apache.pinot.query.service.QueryConfig;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
Expand Down Expand Up @@ -132,6 +133,9 @@ public void init(PinotConfiguration brokerConf)
_clusterName = brokerConf.getProperty(Helix.CONFIG_OF_CLUSTER_NAME);
ServiceStartableUtils.applyClusterConfig(_brokerConf, _zkServers, _clusterName, ServiceRole.BROKER);

if (_brokerConf.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, QueryConfig.DEFAULT_QUERY_RUNNER_PORT) == 0) {
_brokerConf.setProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, NetUtils.findOpenPort());
}
setupHelixSystemProperties();
_listenerConfigs = ListenerConfigUtil.buildBrokerConfigs(brokerConf);
_hostname = brokerConf.getProperty(Broker.CONFIG_OF_BROKER_HOSTNAME);
Expand Down Expand Up @@ -391,9 +395,19 @@ public void start()
private void updateInstanceConfigAndBrokerResourceIfNeeded() {
InstanceConfig instanceConfig = HelixHelper.getInstanceConfig(_participantHelixManager, _instanceId);
boolean updated = HelixHelper.updateHostnamePort(instanceConfig, _hostname, _port);

ZNRecord znRecord = instanceConfig.getRecord();
Map<String, String> simpleFields = znRecord.getSimpleFields();
if (_tlsPort > 0) {
HelixHelper.updateTlsPort(instanceConfig, _tlsPort);
}
// Update multi-stage query engine ports
if (_brokerConf.getProperty(Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, Helix.DEFAULT_MULTI_STAGE_ENGINE_ENABLED)) {
updated |= updatePortIfNeeded(simpleFields, Helix.Instance.MULTI_STAGE_QUERY_ENGINE_MAILBOX_PORT_KEY,
Integer.parseInt(_brokerConf.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT)));
} else {
updated |= updatePortIfNeeded(simpleFields, Helix.Instance.MULTI_STAGE_QUERY_ENGINE_MAILBOX_PORT_KEY, -1);
}
updated |= HelixHelper.removeDisabledPartitions(instanceConfig);
boolean shouldUpdateBrokerResource = false;
String brokerTag = null;
Expand Down Expand Up @@ -462,6 +476,25 @@ private String getDefaultBrokerId() {
}
}

private boolean updatePortIfNeeded(Map<String, String> instanceConfigSimpleFields, String key, int port) {
String existingPortStr = instanceConfigSimpleFields.get(key);
if (port > 0) {
String portStr = Integer.toString(port);
if (!portStr.equals(existingPortStr)) {
LOGGER.info("Updating '{}' for instance: {} to: {}", key, _instanceId, port);
instanceConfigSimpleFields.put(key, portStr);
return true;
}
} else {
if (existingPortStr != null) {
LOGGER.info("Removing '{}' from instance: {}", key, _instanceId);
instanceConfigSimpleFields.remove(key);
return true;
}
}
return false;
}

@Override
public void stop() {
LOGGER.info("Shutting down Pinot broker");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerId
reducerHostname = brokerId;
}
_reducerHostname = reducerHostname;
_reducerPort = config.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, QueryConfig.DEFAULT_QUERY_RUNNER_PORT);
// This config has to be set to a valid port number.
_reducerPort = Integer.parseInt(config.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT));
_defaultBrokerTimeoutMs = config.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_TIMEOUT_MS,
CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS);
_queryEnvironment = new QueryEnvironment(new TypeFactory(new TypeSystem()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,15 +150,12 @@ public DataTableImplV4(ByteBuffer byteBuffer)
}

// Read variable size data.
_variableSizeDataBytes = new byte[variableSizeDataLength];
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@walterddr Please verify this logic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be ok.

if (variableSizeDataLength != 0) {
_variableSizeDataBytes = new byte[variableSizeDataLength];
byteBuffer.position(variableSizeDataStart);
byteBuffer.get(_variableSizeDataBytes);
_variableSizeData = ByteBuffer.wrap(_variableSizeDataBytes);
} else {
_variableSizeDataBytes = null;
_variableSizeData = null;
}
_variableSizeData = ByteBuffer.wrap(_variableSizeDataBytes);

// Read metadata.
int metadataLength = byteBuffer.getInt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ private DataTableBuilderFactory() {

private static final Logger LOGGER = LoggerFactory.getLogger(DataTableBuilderFactory.class);

public static final int DEFAULT_VERSION = DataTableFactory.VERSION_3;
public static final int DEFAULT_VERSION = DataTableFactory.VERSION_4;

private static int _version = DEFAULT_VERSION;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.datatable.DataTableFactory;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
Expand Down Expand Up @@ -285,7 +284,6 @@ public void testQueriesWithNoDictStringColumn()

public void testQueries(ColumnDataType columnDataType, File indexDir)
throws IOException {
DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4);
Map<String, String> queryOptions = new HashMap<>();
queryOptions.put("enableNullHandling", "true");
DataType dataType = columnDataType.toDataType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.Map;
import java.util.Random;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.datatable.DataTableFactory;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
Expand Down Expand Up @@ -160,7 +159,6 @@ public void testQueriesWithNoDictColumn()
}

public void testQueries() {
DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4);
Map<String, String> queryOptions = new HashMap<>();
queryOptions.put("enableNullHandling", "true");
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.datatable.DataTableFactory;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
Expand Down Expand Up @@ -181,7 +180,6 @@ public void testQueriesWithNoDictColumn()
}

public void testQueries() {
DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4);
Map<String, String> queryOptions = new HashMap<>();
queryOptions.put("enableNullHandling", "true");
HashSet<Integer> trueIndices = new HashSet<Integer>(Arrays.asList(1, 3, 5));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.Map;
import java.util.Random;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.datatable.DataTableFactory;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
Expand Down Expand Up @@ -281,7 +280,6 @@ public void testQueriesWithNoDictDoubleColumnNoNullValues()
}

public void testQueries(Number baseValue, ColumnDataType dataType, boolean nullValuesExist) {
DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4);
Map<String, String> queryOptions = new HashMap<>();
queryOptions.put("enableNullHandling", "true");
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,9 @@
import org.apache.commons.io.FileUtils;
import org.apache.pinot.client.Connection;
import org.apache.pinot.client.ConnectionFactory;
import org.apache.pinot.common.datatable.DataTableFactory;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
import org.apache.pinot.query.service.QueryConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.util.TestUtils;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -97,9 +92,6 @@ public void setUp()

// Wait for all documents loaded
waitForAllDocsLoaded(600_000L);

// Setting data table version to 4
DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4);
}

@Test
Expand Down Expand Up @@ -137,24 +129,6 @@ protected Connection getPinotConnection() {
return _pinotConnection;
}

@Override
protected void overrideBrokerConf(PinotConfiguration brokerConf) {
brokerConf.setProperty(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, true);
brokerConf.setProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, 8421);
}

@Override
protected void overrideServerConf(PinotConfiguration serverConf) {
serverConf.setProperty(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, true);
serverConf.setProperty(QueryConfig.KEY_OF_QUERY_SERVER_PORT, 8842);
serverConf.setProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, 8422);
}

@Override
protected void overrideControllerConf(Map<String, Object> properties) {
properties.put(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, true);
}

@Override
protected void testQuery(String pinotQuery, String h2Query)
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,8 @@
import org.apache.commons.io.FileUtils;
import org.apache.pinot.client.Connection;
import org.apache.pinot.client.ConnectionFactory;
import org.apache.pinot.common.datatable.DataTableFactory;
import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
import org.apache.pinot.query.service.QueryConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.util.TestUtils;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -78,9 +73,6 @@ public void setUp()

// Wait for all documents loaded
waitForAllDocsLoaded(600_000L);

// Setting data table version to 4
DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4);
}

@Test
Expand Down Expand Up @@ -129,19 +121,6 @@ protected Connection getPinotConnection() {
return _pinotConnection;
}

@Override
protected void overrideBrokerConf(PinotConfiguration brokerConf) {
brokerConf.setProperty(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, true);
brokerConf.setProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, 8421);
}

@Override
protected void overrideServerConf(PinotConfiguration serverConf) {
serverConf.setProperty(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, true);
serverConf.setProperty(QueryConfig.KEY_OF_QUERY_SERVER_PORT, 8842);
serverConf.setProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, 8422);
}

@Override
protected void testQuery(String pinotQuery, String h2Query)
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.List;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.datatable.DataTableFactory;
import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
import org.apache.pinot.util.TestUtils;
import org.testng.annotations.AfterClass;
Expand Down Expand Up @@ -196,7 +195,6 @@ public void testCaseWithIsNotDistinctFrom()
@Test
public void testTotalCountWithNullHandlingQueryOptionEnabled()
throws Exception {
DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4);
String pinotQuery = "SELECT COUNT(*) FROM " + getTableName() + " option(enableNullHandling=true)";
String h2Query = "SELECT COUNT(*) FROM " + getTableName();
testQuery(pinotQuery, h2Query);
Expand All @@ -210,7 +208,6 @@ public void testTotalCountWithNullHandlingQueryOptionEnabled()
@Test
public void testNullLiteralSelectionOnlyBroker()
throws Exception {
DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4);
// Null literal only
String sqlQuery = "SELECT null FROM mytable OPTION(enableNullHandling=true)";
JsonNode response = postQuery(sqlQuery, _brokerBaseApiUrl);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,15 @@
import org.apache.http.message.BasicNameValuePair;
import org.apache.pinot.client.PinotConnection;
import org.apache.pinot.client.PinotDriver;
import org.apache.pinot.common.datatable.DataTableFactory;
import org.apache.pinot.common.exception.HttpErrorStatusException;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.common.utils.SimpleHttpResponse;
import org.apache.pinot.common.utils.http.HttpClient;
import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
import org.apache.pinot.core.operator.query.NonScanBasedAggregationOperator;
import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
import org.apache.pinot.spi.config.instance.InstanceType;
Expand Down Expand Up @@ -165,6 +167,7 @@ protected String getSchemaFileName() {
@BeforeClass
public void setUp()
throws Exception {
DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_3);
TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);

// Start the Pinot cluster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,8 @@
import org.apache.pinot.client.Connection;
import org.apache.pinot.client.ConnectionFactory;
import org.apache.pinot.client.ResultSetGroup;
import org.apache.pinot.common.datatable.DataTableFactory;
import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
import org.apache.pinot.query.service.QueryConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.tools.utils.JarUtils;
import org.apache.pinot.util.TestUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -103,9 +98,6 @@ public void setUp()
// H2
ClusterIntegrationTestUtils.setUpH2TableWithAvro(Collections.singletonList(dataFile), tableName, _h2Connection);
}

// Setting data table version to 4
DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4);
}

@Test(dataProvider = "QueryDataProvider")
Expand Down Expand Up @@ -170,19 +162,6 @@ protected Connection getPinotConnection() {
return _pinotConnection;
}

@Override
protected void overrideBrokerConf(PinotConfiguration brokerConf) {
brokerConf.setProperty(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, true);
brokerConf.setProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, 8421);
}

@Override
protected void overrideServerConf(PinotConfiguration serverConf) {
serverConf.setProperty(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, true);
serverConf.setProperty(QueryConfig.KEY_OF_QUERY_SERVER_PORT, 8842);
serverConf.setProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, 8422);
}

@AfterClass
public void tearDown()
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.datatable.DataTableFactory;
import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
import org.apache.pinot.query.QueryEnvironmentTestBase;
import org.apache.pinot.query.QueryServerEnclosure;
Expand Down Expand Up @@ -95,7 +94,6 @@ public static List<GenericRow> buildRows(String tableName) {
@BeforeClass
public void setUp()
throws Exception {
DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4);
MockInstanceDataManagerFactory factory1 = new MockInstanceDataManagerFactory("server1")
.registerTable(SCHEMA_BUILDER.setSchemaName("a").build(), "a_REALTIME")
.registerTable(SCHEMA_BUILDER.setSchemaName("b").build(), "b_REALTIME")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.datatable.DataTableFactory;
import org.apache.pinot.common.response.broker.BrokerResponseNativeV2;
import org.apache.pinot.common.response.broker.BrokerResponseStats;
import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
Expand Down Expand Up @@ -75,8 +74,6 @@ public class ResourceBasedQueriesTest extends QueryRunnerTestBase {
@BeforeClass
public void setUp()
throws Exception {
DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4);

// Setting up mock server factories.
// All test data are loaded upfront b/c the mock server and brokers needs to be in sync.
MockInstanceDataManagerFactory factory1 = new MockInstanceDataManagerFactory("server1");
Expand Down
Loading