Skip to content

Commit

Permalink
Add complex-type transformation to offline segment creation (#6914)
Browse files Browse the repository at this point in the history
* add complex-type transformation to offline segment creation

* comments

* add a TODO
  • Loading branch information
yupeng9 committed May 14, 2021
1 parent e1d6ca4 commit 1a38329
Show file tree
Hide file tree
Showing 12 changed files with 10,337 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1251,9 +1251,7 @@ public LLRealtimeSegmentDataManager(RealtimeSegmentZKMetadata segmentZKMetadata,
_recordTransformer = CompositeTransformer.getDefaultTransformer(tableConfig, schema);

// Create complex type transformer
_complexTypeTransformer =
ComplexTypeTransformer.isComplexTypeHandlingEnabled(tableConfig) ? new ComplexTypeTransformer(tableConfig)
: null;
_complexTypeTransformer = ComplexTypeTransformer.getComplexTypeTransformer(tableConfig);

// Acquire semaphore to create stream consumers
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public void build(@Nullable SegmentVersion segmentVersion, ServerMetrics serverM
final SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
RealtimeSegmentSegmentCreationDataSource dataSource =
new RealtimeSegmentSegmentCreationDataSource(_realtimeSegmentImpl, reader, _dataSchema);
driver.init(genConfig, dataSource, CompositeTransformer.getPassThroughTransformer());
driver.init(genConfig, dataSource, CompositeTransformer.getPassThroughTransformer(), null);
driver.build();

if (segmentPartitionConfig != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,15 @@ private static String parseDelimiter(TableConfig tableConfig) {
}
}

public static boolean isComplexTypeHandlingEnabled(TableConfig tableConfig) {
return tableConfig.getIngestionConfig() != null && tableConfig.getIngestionConfig().getComplexTypeConfig() != null;
/**
* @return the complex type transformer defined table config, null if the table config does not have the config
*/
@Nullable
public static ComplexTypeTransformer getComplexTypeTransformer(TableConfig tableConfig) {
if (tableConfig.getIngestionConfig() != null && tableConfig.getIngestionConfig().getComplexTypeConfig() != null) {
return new ComplexTypeTransformer(tableConfig);
}
return null;
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.Collection;
import org.apache.pinot.common.Utils;
import org.apache.pinot.segment.local.recordtransformer.ComplexTypeTransformer;
import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
import org.apache.pinot.segment.local.segment.creator.impl.stats.SegmentPreIndexStatsCollectorImpl;
Expand Down Expand Up @@ -52,6 +53,8 @@ public SegmentPreIndexStatsCollector gatherStats(StatsCollectorConfig statsColle
try {
RecordTransformer recordTransformer = CompositeTransformer
.getDefaultTransformer(statsCollectorConfig.getTableConfig(), statsCollectorConfig.getSchema());
ComplexTypeTransformer complexTypeTransformer =
ComplexTypeTransformer.getComplexTypeTransformer(statsCollectorConfig.getTableConfig());

SegmentPreIndexStatsCollector collector = new SegmentPreIndexStatsCollectorImpl(statsCollectorConfig);
collector.init();
Expand All @@ -62,6 +65,10 @@ public SegmentPreIndexStatsCollector gatherStats(StatsCollectorConfig statsColle
reuse.clear();

reuse = _recordReader.next(reuse);
if (complexTypeTransformer != null) {
// TODO: consolidate complex type transformer into composite type transformer
reuse = complexTypeTransformer.transform(reuse);
}
if (reuse.getValue(GenericRow.MULTIPLE_RECORDS_KEY) != null) {
for (Object singleRow : (Collection) reuse.getValue(GenericRow.MULTIPLE_RECORDS_KEY)) {
GenericRow transformedRow = recordTransformer.transform((GenericRow) singleRow);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import javax.annotation.Nullable;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.segment.local.recordtransformer.ComplexTypeTransformer;
import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
import org.apache.pinot.segment.local.segment.creator.IntermediateSegmentSegmentCreationDataSource;
Expand Down Expand Up @@ -85,6 +87,7 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive
private SegmentIndexCreationInfo segmentIndexCreationInfo;
private Schema dataSchema;
private RecordTransformer _recordTransformer;
private ComplexTypeTransformer _complexTypeTransformer;
private IngestionSchemaValidator _ingestionSchemaValidator;
private int totalDocs = 0;
private File tempIndexDir;
Expand Down Expand Up @@ -147,11 +150,12 @@ public void init(SegmentGeneratorConfig config, RecordReader recordReader)
LOGGER.info("RecordReaderSegmentCreationDataSource is used");
dataSource = new RecordReaderSegmentCreationDataSource(recordReader);
}
init(config, dataSource, CompositeTransformer.getDefaultTransformer(config.getTableConfig(), config.getSchema()));
init(config, dataSource, CompositeTransformer.getDefaultTransformer(config.getTableConfig(), config.getSchema()),
ComplexTypeTransformer.getComplexTypeTransformer(config.getTableConfig()));
}

public void init(SegmentGeneratorConfig config, SegmentCreationDataSource dataSource,
RecordTransformer recordTransformer)
RecordTransformer recordTransformer, @Nullable ComplexTypeTransformer complexTypeTransformer)
throws Exception {
this.config = config;
recordReader = dataSource.getRecordReader();
Expand All @@ -161,6 +165,7 @@ public void init(SegmentGeneratorConfig config, SegmentCreationDataSource dataSo
}

_recordTransformer = recordTransformer;
_complexTypeTransformer = complexTypeTransformer;

// Initialize stats collection
segmentStats = dataSource
Expand Down Expand Up @@ -211,6 +216,10 @@ public void build()
long indexStopTime;
reuse.clear();
GenericRow decodedRow = recordReader.next(reuse);
if (_complexTypeTransformer != null) {
// TODO: consolidate complex type transformer into composite type transformer
decodedRow = _complexTypeTransformer.transform(decodedRow);
}
if (decodedRow.getValue(GenericRow.MULTIPLE_RECORDS_KEY) != null) {
recordReadStopTime = System.currentTimeMillis();
totalRecordReadTime += (recordReadStopTime - recordReadStartTime);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.tools;

import com.google.common.base.Preconditions;
import java.io.File;
import java.net.URL;
import java.util.Collections;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.tools.Quickstart.Color;
import org.apache.pinot.tools.admin.command.QuickstartRunner;

import static org.apache.pinot.tools.Quickstart.prettyPrintResponse;
import static org.apache.pinot.tools.Quickstart.printStatus;


public class OfflineComplexTypeHandlingQuickStart {

public void execute()
throws Exception {
File quickstartTmpDir = new File(FileUtils.getTempDirectory(), String.valueOf(System.currentTimeMillis()));
File baseDir = new File(quickstartTmpDir, "githubEvents");
File dataDir = new File(quickstartTmpDir, "rawdata");
Preconditions.checkState(dataDir.mkdirs());

File schemaFile = new File(baseDir, "githubEvents_schema.json");
File tableConfigFile = new File(baseDir, "githubEvents_offline_table_config.json");
File ingestionJobSpecFile = new File(baseDir, "ingestionJobSpec.yaml");

ClassLoader classLoader = OfflineComplexTypeHandlingQuickStart.class.getClassLoader();
URL resource = classLoader.getResource("examples/batch/githubEvents/githubEvents_offline_complexTypeHandling_table_config.json");
Preconditions.checkNotNull(resource);
FileUtils.copyURLToFile(resource, tableConfigFile);
// TODO: add all columns of the flattened fields after the schema inference
resource = classLoader.getResource("examples/batch/githubEvents/githubEvents_offline_complexTypeHandling_schema.json");
Preconditions.checkNotNull(resource);
FileUtils.copyURLToFile(resource, schemaFile);
resource = classLoader.getResource("examples/batch/githubEvents/ingestionJobComplexTypeHandlingSpec.yaml");
Preconditions.checkNotNull(resource);
FileUtils.copyURLToFile(resource, ingestionJobSpecFile);

QuickstartTableRequest request = new QuickstartTableRequest(baseDir.getAbsolutePath());
final QuickstartRunner runner = new QuickstartRunner(Collections.singletonList(request), 1, 1, 1, dataDir);

printStatus(Color.CYAN, "***** Starting Zookeeper, controller, broker and server *****");
runner.startAll();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
printStatus(Color.GREEN, "***** Shutting down offline quick start *****");
runner.stop();
FileUtils.deleteDirectory(quickstartTmpDir);
} catch (Exception e) {
e.printStackTrace();
}
}));
printStatus(Color.CYAN, "***** Bootstrap githubEvents table *****");
runner.bootstrapTable();

printStatus(Color.CYAN, "***** Waiting for 5 seconds for the server to fetch the assigned segment *****");
Thread.sleep(5000);

printStatus(Color.YELLOW, "***** Offline complex-type-handling quickstart setup complete *****");

String q1 =
"select id, \"payload.commits.author.name\", \"payload.commits.author.email\" from githubEvents limit 10";
printStatus(Color.CYAN, "Query : " + q1);
printStatus(Color.YELLOW, prettyPrintResponse(runner.runQuery(q1)));

printStatus(Color.GREEN, "***************************************************");
printStatus(Color.GREEN, "You can always go to http://localhost:9000 to play around in the query console");
}

public static void main(String[] args)
throws Exception {
PluginManager.get().init();
new OfflineComplexTypeHandlingQuickStart().execute();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
{
"dimensionFieldSpecs": [
{
"name": "id",
"dataType": "STRING"
},
{
"name": "type",
"dataType": "STRING"
},
{
"name": "payload.commits.sha",
"dataType": "STRING"
},
{
"name": "payload.commits.author.name",
"dataType": "STRING"
},
{
"name": "payload.commits.author.email",
"dataType": "STRING"
},
{
"name": "payload_json",
"dataType": "STRING",
"maxLength": 2147483647
}
],
"dateTimeFieldSpecs": [
{
"name": "created_at",
"dataType": "STRING",
"format": "1:SECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd'T'HH:mm:ss'Z'",
"granularity": "1:SECONDS"
},
{
"name": "created_at_timestamp",
"dataType": "TIMESTAMP",
"format": "1:MILLISECONDS:TIMESTAMP",
"granularity": "1:SECONDS"
}
],
"schemaName": "githubEvents"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{
"tableName": "githubEvents",
"tableType": "OFFLINE",
"tenants": {
},
"segmentsConfig": {
"segmentPushType": "REFRESH",
"replication": "1",
"timeColumnName": "created_at_timestamp"
},
"tableIndexConfig": {
"loadMode": "MMAP"
},
"ingestionConfig": {
"transformConfigs": [
{
"columnName": "created_at_timestamp",
"transformFunction": "fromDateTime(created_at, 'yyyy-MM-dd''T''HH:mm:ss''Z''')"
},
{
"columnName": "payload_json",
"transformFunction": "jsonFormat(\"payload\")"
}
],
"complexTypeConfig": {
"unnestFields": ["payload.commits"]
}
},
"metadata": {
"customConfigs": {
}
}
}
Loading

0 comments on commit 1a38329

Please sign in to comment.