Skip to content

Commit

Permalink
Merge pull request #3 from tushengxia/main
Browse files Browse the repository at this point in the history
move v1.1.0 tag of openlookeng and spark
  • Loading branch information
Jutao-liu authored Apr 15, 2022
2 parents 4bb36bc + ffa44fd commit 511a7fe
Show file tree
Hide file tree
Showing 87 changed files with 1,444 additions and 659 deletions.
8 changes: 4 additions & 4 deletions omnidata/omnidata-openlookeng-connector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,21 @@

OmniData Connector is a data source connector developed for openLooKeng.

The OmniData connector allows querying data sources where c Server is deployed. It pushes down some operators such as filter to the OmniData service close to the storage to improve the performance of storage-computing-separated system.
The OmniData connector allows querying data sources where OmniData Server is deployed. It pushes down some operators such as filter to the OmniData service close to the storage to improve the performance of storage-computing-separated system.

## Building OmniData Connector

1. OmniData Connector is developed under the architecture of openLooKeng. You need to build openLooKeng first as a non-root user.
2. Simply run the following command from the project root directory:<br>
`mvn clean install -Dos.detected.arch="aarch64"`<br>
Then you will find omnidata-openlookeng-connector-*.zip under the omnidata-openlookeng-connector/connector/target/ directory.
OmniData Connector has a comprehensive set of unit tests that can take several minutes to run. You can disable the tests when building:<br>
Then you will find omnidata-openlookeng-connector-*.zip in the omnidata-openlookeng-connector/connector/target/ directory.
OmniData Connector has a comprehensive set of unit tests that can take several minutes to run. You can disable the tests during building:<br>
`mvn clean install -DskipTests -Dos.detected.arch="aarch64"`<br>

## Deploying OmniData Connector

1. Unzip omnidata-openlookeng-connector-*.zip to the plugin directory of openLooKeng.
2. Obtain the latest OmniData software package, replace the boostkit-omnidata-client-\*.jar and boostkit-omnidata-core-\*.jar in the omnidata-openlookeng-connector-\* directory.
2. Obtain the latest OmniData software package, and replace the boostkit-omnidata-server-\*.jar in the omnidata-openlookeng-connector-\* directory.
3. Set "connector.name=omnidata-openlookeng" in the openLooKeng catalog properties file.

## Contribution Guidelines
Expand Down
285 changes: 77 additions & 208 deletions omnidata/omnidata-openlookeng-connector/connector/pom.xml

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;

import java.io.File;
import java.io.IOException;
import java.text.Normalizer;
import java.util.List;
import java.util.Optional;
import java.util.TimeZone;
Expand All @@ -67,7 +64,7 @@ public class HiveConfig
{
private static final Logger log = Logger.get(HiveConfig.class);
private static final Splitter SPLITTER = Splitter.on(',').trimResults().omitEmptyStrings();
public static final double MIN_OFFLOAD_FACTOR = 0.5;
public static final double MAX_OFFLOAD_FACTOR = 0.25;
public static final long MIN_OFFLOAD_ROW_NUM = 500;

private DataSize maxSplitSize = new DataSize(64, MEGABYTE);
Expand Down Expand Up @@ -216,18 +213,11 @@ public class HiveConfig
private boolean autoVacuumEnabled;
private boolean orcPredicatePushdownEnabled;

private boolean omniDataSslEnabled;
private Optional<String> omniDataSslPkiDir = Optional.empty();
private Optional<String> omniDataSslClientCertFilePath = Optional.empty();
private Optional<String> omniDataSslPrivateKeyFilePath = Optional.empty();
private Optional<String> omniDataSslTrustCertFilePath = Optional.empty();
private Optional<String> omniDataSslCrlFilePath = Optional.empty();

private boolean omniDataEnabled;
private boolean omniDataEnabled = true;
private boolean filterOffloadEnabled = true;
private double minFilterOffloadFactor = MIN_OFFLOAD_FACTOR;
private double filterOffloadFactor = MAX_OFFLOAD_FACTOR;
private boolean aggregatorOffloadEnabled = true;
private double minAggregatorOffloadFactor = MIN_OFFLOAD_FACTOR;
private double aggregatorOffloadFactor = MAX_OFFLOAD_FACTOR;
private long minOffloadRowNumber = MIN_OFFLOAD_ROW_NUM;

private int hmsWriteBatchSize = 8;
Expand Down Expand Up @@ -1915,105 +1905,6 @@ public boolean getWorkerMetaStoreCacheEnabled()
return this.workerMetaStoreCacheEnabled;
}

public boolean isOmniDataSslEnabled()
{
return omniDataSslEnabled;
}

private Optional<String> getNormalizedFilePath(String filePath)
{
if (filePath == null || filePath.isEmpty()) {
return Optional.empty();
}
String outputPath;
try {
String normalizePath = Normalizer.normalize(filePath, Normalizer.Form.NFKC);
outputPath = new File(normalizePath).getCanonicalPath();
}
catch (IOException | IllegalArgumentException exception) {
log.error("File path [%s] is invalid, exception %s", filePath, exception.getMessage());
return Optional.empty();
}
File file = new File(outputPath);
if (!file.exists()) {
log.error("File [%s] is not exist.", outputPath);
return Optional.empty();
}
return Optional.of(outputPath);
}

@Config("omni-data.ssl.enabled")
public HiveConfig setOmniDataSslEnabled(boolean omniDataSslEnabled)
{
this.omniDataSslEnabled = omniDataSslEnabled;
return this;
}

public Optional<String> getOmniDataSslPkiDir()
{
return omniDataSslPkiDir;
}

@Config("omni-data.ssl.pki.dir")
@ConfigDescription("Directory of Public Key Infrastructure.")
public HiveConfig setOmniDataSslPkiDir(String omniDataSslPkiDir)
{
this.omniDataSslPkiDir = getNormalizedFilePath(omniDataSslPkiDir);
return this;
}

public Optional<String> getOmniDataSslClientCertFilePath()
{
return omniDataSslClientCertFilePath;
}

@Config("omni-data.ssl.client.cert.file.path")
@ConfigDescription("Path to the SSL client certificate file.")
public HiveConfig setOmniDataSslClientCertFilePath(String omniDataSslClientCertFilePath)
{
this.omniDataSslClientCertFilePath = getNormalizedFilePath(omniDataSslClientCertFilePath);
return this;
}

public Optional<String> getOmniDataSslPrivateKeyFilePath()
{
return omniDataSslPrivateKeyFilePath;
}

@Config("omni-data.ssl.private.key.file.path")
@ConfigDescription("Path to the SSL private key file.")
public HiveConfig setOmniDataSslPrivateKeyFilePath(String omniDataSslPrivateKeyFilePath)
{
this.omniDataSslPrivateKeyFilePath = getNormalizedFilePath(omniDataSslPrivateKeyFilePath);
return this;
}

public Optional<String> getOmniDataSslTrustCertFilePath()
{
return omniDataSslTrustCertFilePath;
}

@Config("omni-data.ssl.trust.cert.file.path")
@ConfigDescription("Path to the SSL trust certificate file.")
public HiveConfig setOmniDataSslTrustCertFilePath(String omniDataSslTrustCertFilePath)
{
this.omniDataSslTrustCertFilePath = getNormalizedFilePath(omniDataSslTrustCertFilePath);
return this;
}

public Optional<String> getOmniDataSslCrlFilePath()
{
return omniDataSslCrlFilePath;
}

@Config("omni-data.ssl.crl.file.path")
@ConfigDescription("Path to the SSL Certificate Revocation List file.")
public HiveConfig setOmniDataSslCrlFilePath(String omniDataSslCrlFilePath)
{
this.omniDataSslCrlFilePath = getNormalizedFilePath(omniDataSslCrlFilePath);
return this;
}

@Config("hive.filter-offload-enabled")
@ConfigDescription("Enables offload filter operators to storage device.")
public HiveConfig setFilterOffloadEnabled(boolean filterOffloadEnabled)
Expand Down Expand Up @@ -2053,34 +1944,34 @@ public boolean isAggregatorOffloadEnabled()
return aggregatorOffloadEnabled;
}

@Config("hive.min-filter-offload-factor")
@ConfigDescription("The minimum data filtering threshold for predicate expression offload.")
public HiveConfig setMinFilterOffloadFactor(double minFilterOffloadFactor)
@Config("hive.filter-offload-factor")
@ConfigDescription("The maximum data filtering threshold for predicate expression offload.")
public HiveConfig setFilterOffloadFactor(double filterOffloadFactor)
{
this.minFilterOffloadFactor = minFilterOffloadFactor;
this.filterOffloadFactor = filterOffloadFactor;
return this;
}

@DecimalMin("0.0")
@DecimalMax("1.0")
public double getMinFilterOffloadFactor()
public double getFilterOffloadFactor()
{
return minFilterOffloadFactor;
return filterOffloadFactor;
}

@Config("hive.min-aggregator-offload-factor")
@ConfigDescription("The minimum data aggregation threshold for aggregation expression offload.")
public HiveConfig setMinAggregatorOffloadFactor(double minAggregatorOffloadFactor)
@Config("hive.aggregator-offload-factor")
@ConfigDescription("The maximum data aggregation threshold for aggregation expression offload.")
public HiveConfig setAggregatorOffloadFactor(double aggregatorOffloadFactor)
{
this.minAggregatorOffloadFactor = minAggregatorOffloadFactor;
this.aggregatorOffloadFactor = aggregatorOffloadFactor;
return this;
}

@DecimalMin("0.0")
@DecimalMax("1.0")
public double getMinAggregatorOffloadFactor()
public double getAggregatorOffloadFactor()
{
return minAggregatorOffloadFactor;
return aggregatorOffloadFactor;
}

@Config("hive.min-offload-row-number")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.huawei.boostkit.omnidata.block.BlockDeserializer;
import com.huawei.boostkit.omnidata.decode.impl.OpenLooKengDeserializer;
import com.huawei.boostkit.omnidata.model.Predicate;
import com.huawei.boostkit.omnidata.model.TaskSource;
import com.huawei.boostkit.omnidata.model.datasource.DataSource;
Expand All @@ -30,6 +30,7 @@
import io.prestosql.plugin.hive.HiveBucketing.BucketingVersion;
import io.prestosql.plugin.hive.coercions.HiveCoercer;
import io.prestosql.plugin.hive.omnidata.OmniDataNodeManager;
import io.prestosql.plugin.hive.omnidata.OmniDataNodeStatus;
import io.prestosql.plugin.hive.orc.OrcConcatPageSource;
import io.prestosql.plugin.hive.util.IndexCache;
import io.prestosql.spi.connector.ColumnHandle;
Expand Down Expand Up @@ -86,8 +87,8 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.Maps.uniqueIndex;
import static com.huawei.boostkit.omnidata.OmniDataProperty.GRPC_CLIENT_TARGET_LIST;
import static com.huawei.boostkit.omnidata.OmniDataProperty.HOSTADDRESS_DELIMITER;
import static com.huawei.boostkit.omnidata.transfer.OmniDataProperty.HOSTADDRESS_DELIMITER;
import static com.huawei.boostkit.omnidata.transfer.OmniDataProperty.OMNIDATA_CLIENT_TARGET_LIST;
import static io.prestosql.plugin.hive.HiveColumnHandle.ColumnType.DUMMY_OFFLOADED;
import static io.prestosql.plugin.hive.HiveColumnHandle.ColumnType.REGULAR;
import static io.prestosql.plugin.hive.HiveColumnHandle.MAX_PARTITION_KEY_COLUMN_INDEX;
Expand All @@ -96,7 +97,6 @@
import static io.prestosql.plugin.hive.coercions.HiveCoercer.createCoercer;
import static io.prestosql.plugin.hive.metastore.MetastoreUtil.META_PARTITION_COLUMNS;
import static io.prestosql.plugin.hive.util.PageSourceUtil.buildPushdownContext;
import static io.prestosql.plugin.hive.util.PageSourceUtil.getSslConfiguredProperties;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;
import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_COLUMNS;
Expand All @@ -111,11 +111,11 @@ public class HivePageSourceProvider

private final Set<HivePageSourceFactory> pageSourceFactories;

private static final int DEFAULT_SPLIT_NUM = 3;
private static final String HIVE_DEFAULT_PARTITION_VALUE = "\\N";
private final IndexCache indexCache;
private final Set<HiveSelectivePageSourceFactory> selectivePageSourceFactories;
private final OmniDataNodeManager omniDataNodeManager;
private final ImmutableMap sslPropertyMap;

@Inject
public HivePageSourceProvider(
Expand All @@ -137,7 +137,6 @@ public HivePageSourceProvider(
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.indexCache = indexCache;
this.selectivePageSourceFactories = selectivePageSourceFactories;
this.sslPropertyMap = getSslConfiguredProperties(hiveConfig);
}

public HivePageSourceProvider(
Expand All @@ -158,7 +157,6 @@ public HivePageSourceProvider(
this.indexCache = indexCache;
this.selectivePageSourceFactories = selectivePageSourceFactories;
this.omniDataNodeManager = null;
this.sslPropertyMap = getSslConfiguredProperties(hiveConfig);
}

@Override
Expand Down Expand Up @@ -196,24 +194,40 @@ public ConnectorPageSource createPageSource(ConnectorTransactionHandle transacti
return new OrcConcatPageSource(pageSources);
}

private Optional<String> getSplitOmniDataAddrRandomly(HiveSplit hiveSplit)
{
List<OmniDataNodeStatus> omniDataNodeList = omniDataNodeManager.getAllNodes().values().stream().collect(toList());
if (omniDataNodeList.isEmpty()) {
return Optional.empty();
}

int nodeNum = omniDataNodeList.size();
int seed = (int) ((hiveSplit.getStart() / Math.max(1, hiveSplit.getLength()) + hiveSplit.getFileSize() + hiveSplit.getPath().hashCode()) % nodeNum);
StringJoiner addressJoiner = new StringJoiner(HOSTADDRESS_DELIMITER);
for (int i = 0; i < nodeNum && i < DEFAULT_SPLIT_NUM; i++) {
int index = Math.abs(seed + i) % nodeNum;
addressJoiner.add(omniDataNodeList.get(index).getHostAddress());
}
return Optional.of(addressJoiner.toString());
}

private Optional<String> getSplitOmniDataAddr(HiveOffloadExpression expression, HiveSplit hiveSplit)
{
if (!expression.isPresent()) {
return Optional.empty();
}

// empty split
// for empty or ceph split
if (hiveSplit.getAddresses().size() == 0) {
return omniDataNodeManager.getAllNodes().isEmpty() ? Optional.empty() :
Optional.of(omniDataNodeManager.getAllNodes().values().stream().findAny().get().getHostAddress());
return getSplitOmniDataAddrRandomly(hiveSplit);
}

StringJoiner hostAddressJoiner = new StringJoiner(HOSTADDRESS_DELIMITER);
int copyNumber = Math.max(1, hiveSplit.getAddresses().size());
int seed = (int) ((hiveSplit.getStart() / Math.max(1, hiveSplit.getLength()) + hiveSplit.getFileSize()) % copyNumber);
int counter = 0;
for (int i = 0; i < hiveSplit.getAddresses().size(); i++) {
int copyIndex = (i + seed) % copyNumber;
int copyIndex = Math.abs(i + seed) % copyNumber;
try {
String hostIp = InetAddress.getByName(hiveSplit.getAddresses().get(copyIndex).getHostText()).getHostAddress();
if (omniDataNodeManager.getAllNodes().containsKey(hostIp)) {
Expand Down Expand Up @@ -361,8 +375,7 @@ session, hiveSplit, assignUniqueIndicesToPartitionColumns(hiveColumns), typeMana
hiveSplit.getCustomSplitInfo(),
missingColumns,
omniDataAddress,
hiveTable.getOffloadExpression(),
sslPropertyMap);
hiveTable.getOffloadExpression());
if (pageSource.isPresent()) {
return pageSource.get();
}
Expand Down Expand Up @@ -530,8 +543,7 @@ public static Optional<ConnectorPageSource> createHivePageSource(
Map<String, String> customSplitInfo,
List<String> missingColumns,
Optional<String> omniDataAddress,
HiveOffloadExpression expression,
ImmutableMap sslPropertyMap)
HiveOffloadExpression expression)
{
List<ColumnMapping> columnMappings = ColumnMapping.buildColumnMappings(
partitionKeys,
Expand Down Expand Up @@ -594,8 +606,7 @@ public static Optional<ConnectorPageSource> createHivePageSource(
fileSize,
predicate,
omniDataAddress.get(),
schema,
sslPropertyMap);
schema);
return Optional.of(
new HivePageSource(
columnMappings,
Expand Down Expand Up @@ -669,21 +680,19 @@ private static ConnectorPageSource createPushDownPageSource(
long fileSize,
Predicate predicate,
String omniDataServerTarget,
Properties schema,
ImmutableMap sslPropertyMap)
Properties schema)
{
AggregatedMemoryContext systemMemoryUsage = AggregatedMemoryContext.newSimpleAggregatedMemoryContext();
Properties transProperties = new Properties();
transProperties.put(GRPC_CLIENT_TARGET_LIST, omniDataServerTarget);
transProperties.putAll(sslPropertyMap);
transProperties.put(OMNIDATA_CLIENT_TARGET_LIST, omniDataServerTarget);

DataSource pushDownDataSource = new HdfsRecordDataSource(path.toString(), start, length, fileSize, schema);

TaskSource readTaskInfo = new TaskSource(
pushDownDataSource,
predicate,
TaskSource.ONE_MEGABYTES);
DataReader dataReader = DataReaderFactory.create(transProperties, readTaskInfo, new BlockDeserializer());
DataReader dataReader = DataReaderFactory.create(transProperties, readTaskInfo, new OpenLooKengDeserializer());

return new HivePushDownRecordPageSource(dataReader, systemMemoryUsage);
}
Expand Down
Loading

0 comments on commit 511a7fe

Please sign in to comment.