Skip to content

Commit

Permalink
[HWORKS-1115] Create hive database should go directly to the metastor…
Browse files Browse the repository at this point in the history
…… (#1746)
  • Loading branch information
SirOibaf committed May 27, 2024
1 parent 4e77002 commit 2e9f2a1
Show file tree
Hide file tree
Showing 10 changed files with 215 additions and 336 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import io.hops.hopsworks.common.util.templates.jupyter.SparkMagicConfigTemplateBuilder;
import io.hops.hopsworks.exceptions.ApiKeyException;
import io.hops.hopsworks.exceptions.JobException;
import io.hops.hopsworks.common.hive.HiveController;
import io.hops.hopsworks.common.hosts.ServiceDiscoveryController;
import io.hops.hopsworks.common.kafka.KafkaBrokers;
import io.hops.hopsworks.persistence.entity.jobs.configuration.DockerJobConfiguration;
Expand Down Expand Up @@ -117,8 +116,6 @@ public class JupyterConfigFilesGenerator {
@EJB
private KafkaBrokers kafkaBrokers;
@EJB
private HiveController hiveController;
@EJB
private JobController jobController;
@EJB
private HdfsUsersController hdfsUsersController;
Expand Down Expand Up @@ -211,14 +208,12 @@ public void createJupyterKernelConfig(Writer out, Project project, JupyterSettin
.setAnacondaHome(settings.getAnacondaProjectDir())
.setSecretDirectory(settings.getStagingDir() + Settings.PRIVATE_DIRS + js.getSecret())
.setProject(project)
.setHiveEndpoints(hiveController.getHiveServerInternalEndpoint())
.setLibHdfsOpts("-Xmx512m")
.build();

Map<String, Object> dataModel = new HashMap<>(1);
dataModel.put("kernel", kernelTemplate);
templateEngine.template(KernelTemplate.TEMPLATE_NAME, dataModel, out);
} catch (TemplateException | ServiceDiscoveryException ex) {
} catch (TemplateException ex) {
throw new IOException(ex);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,17 @@
import io.hops.hopsworks.common.dao.user.activity.ActivityFacade;
import io.hops.hopsworks.common.dataset.DatasetController;
import io.hops.hopsworks.common.featurestore.featureview.FeatureViewFacade;
import io.hops.hopsworks.common.featurestore.online.OnlineFeaturestoreController;
import io.hops.hopsworks.common.featurestore.featuregroup.FeaturegroupFacade;
import io.hops.hopsworks.common.featurestore.online.OnlineFeaturestoreFacade;
import io.hops.hopsworks.common.featurestore.storageconnectors.FeaturestoreConnectorFacade;
import io.hops.hopsworks.common.featurestore.storageconnectors.FeaturestoreStorageConnectorController;
import io.hops.hopsworks.common.featurestore.storageconnectors.FeaturestoreStorageConnectorDTO;
import io.hops.hopsworks.common.featurestore.storageconnectors.hopsfs.FeaturestoreHopsfsConnectorDTO;
import io.hops.hopsworks.common.featurestore.storageconnectors.jdbc.FeaturestoreJdbcConnectorDTO;
import io.hops.hopsworks.common.featurestore.trainingdatasets.TrainingDatasetFacade;
import io.hops.hopsworks.common.hdfs.DistributedFileSystemOps;
import io.hops.hopsworks.common.hdfs.DistributedFsService;
import io.hops.hopsworks.common.hdfs.HdfsUsersController;
import io.hops.hopsworks.common.hdfs.inode.InodeController;
import io.hops.hopsworks.common.hive.HiveController;
import io.hops.hopsworks.common.hosts.ServiceDiscoveryController;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.exceptions.DatasetException;
import io.hops.hopsworks.exceptions.FeaturestoreException;
Expand All @@ -49,8 +45,6 @@
import io.hops.hopsworks.persistence.entity.user.Users;
import io.hops.hopsworks.persistence.entity.user.activity.ActivityFlag;
import io.hops.hopsworks.restutils.RESTCodes;
import io.hops.hopsworks.servicediscovery.HopsworksService;
import io.hops.hopsworks.servicediscovery.tags.HiveTags;
import org.apache.hadoop.fs.Path;

import javax.ejb.EJB;
Expand Down Expand Up @@ -78,10 +72,6 @@ public class FeaturestoreController {
@EJB
private Settings settings;
@EJB
private OnlineFeaturestoreController onlineFeaturestoreController;
@EJB
private OnlineFeaturestoreFacade onlineFeaturestoreFacade;
@EJB
private HiveController hiveController;
@EJB
private FeaturegroupFacade featuregroupFacade;
Expand All @@ -92,8 +82,6 @@ public class FeaturestoreController {
@EJB
private FeaturestoreStorageConnectorController featurestoreStorageConnectorController;
@EJB
private ServiceDiscoveryController serviceDiscoveryController;
@EJB
private InodeController inodeController;
@EJB
private DatasetController datasetController;
Expand Down Expand Up @@ -268,8 +256,6 @@ public Featurestore createProjectFeatureStore(Project project, Users user, Strin
activityFacade.persistActivity(ActivityFacade.ADDED_FEATURESTORE_STORAGE_CONNECTOR + trainingDatasetsFolder.
getName(), project, project.getOwner(), ActivityFlag.SERVICE);

featurestoreStorageConnectorController
.createStorageConnector(user, project, featurestore, createOfflineJdbcConnector(featurestoreName));
activityFacade.persistActivity(ActivityFacade.ADDED_FEATURESTORE_STORAGE_CONNECTOR + project.getName(),
project, project.getOwner(), ActivityFlag.SERVICE);

Expand Down Expand Up @@ -314,30 +300,6 @@ public FeaturestoreStorageConnectorDTO hopsfsTrainingDatasetConnector(Dataset ho
return featurestoreHopsfsConnectorDTO;
}

public FeaturestoreStorageConnectorDTO createOfflineJdbcConnector(String databaseName)
throws FeaturestoreException {
String hiveEndpoint = "";
try {
hiveEndpoint = serviceDiscoveryController
.constructServiceFQDNWithPort(HopsworksService.HIVE.getNameWithTag(HiveTags.hiveserver2_tls));
} catch (ServiceDiscoveryException e) {
throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.CONNECTOR_NOT_FOUND,
Level.SEVERE, "Could not create Hive connection string", e.getMessage(), e);
}
String connectionString = HiveController.HIVE_JDBC_PREFIX + hiveEndpoint + "/" + databaseName +
";auth=noSasl;ssl=true;twoWay=true;";
List<OptionDTO> arguments = FeaturestoreConstants.OFFLINE_JDBC_CONNECTOR_ARGS.stream()
.map(arg -> new OptionDTO(arg, null))
.collect(Collectors.toList());
FeaturestoreJdbcConnectorDTO featurestoreJdbcConnectorDTO = new FeaturestoreJdbcConnectorDTO();
featurestoreJdbcConnectorDTO.setStorageConnectorType(FeaturestoreConnectorType.JDBC);
featurestoreJdbcConnectorDTO.setName(databaseName);
featurestoreJdbcConnectorDTO.setDescription("JDBC connector for the Offline Feature Store");
featurestoreJdbcConnectorDTO.setConnectionString(connectionString);
featurestoreJdbcConnectorDTO.setArguments(arguments);
return featurestoreJdbcConnectorDTO;
}

/**
* Converts a featurestore entity to a Featurestore DTO, supplements the featurestore entity
* with Hive metadata and remove foreign keys that are less interesting for users.
Expand All @@ -352,17 +314,9 @@ public FeaturestoreDTO convertFeaturestoreToDTO(Featurestore featurestore) {
featurestoreDTO.setFeaturestoreName(featureStoreName);
featurestoreDTO.setOfflineFeaturestoreName(featureStoreName);

try {
featurestoreDTO.setHiveEndpoint(hiveController.getHiveServerInternalEndpoint());
if (settings.isOnlineFeaturestore() &&
onlineFeaturestoreController.checkIfDatabaseExists(
onlineFeaturestoreController.getOnlineFeaturestoreDbName(featurestore.getProject()))) {
featurestoreDTO.setMysqlServerEndpoint(onlineFeaturestoreFacade.getJdbcURL());
featurestoreDTO.setOnlineFeaturestoreName(featurestore.getProject().getName());
featurestoreDTO.setOnlineEnabled(true);
}
} catch (ServiceDiscoveryException ex) {
throw new RuntimeException(ex);
if (settings.isOnlineFeaturestore()) {
featurestoreDTO.setOnlineFeaturestoreName(featurestore.getProject().getName());
featurestoreDTO.setOnlineEnabled(true);
}

// add counters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ public class FeaturestoreDTO {
private Integer projectId;
private String onlineFeaturestoreName;
private String offlineFeaturestoreName;
private String hiveEndpoint;
private String mysqlServerEndpoint;
private Boolean onlineEnabled = false;
private Long numFeatureGroups;
Expand All @@ -55,7 +54,6 @@ public FeaturestoreDTO(Featurestore featurestore) {
this.featurestoreName = null;
this.onlineFeaturestoreName = null;
this.offlineFeaturestoreName = null;
this.hiveEndpoint = null;
this.mysqlServerEndpoint = null;
this.onlineEnabled = false;
}
Expand Down Expand Up @@ -103,24 +101,6 @@ public void setOfflineFeaturestoreName(String offlineFeaturestoreName) {
this.offlineFeaturestoreName = offlineFeaturestoreName;
}

@XmlElement
public String getHiveEndpoint() {
return hiveEndpoint;
}

public void setHiveEndpoint(String hiveEndpoint) {
this.hiveEndpoint = hiveEndpoint;
}

@XmlElement
public String getMysqlServerEndpoint() {
return mysqlServerEndpoint;
}

public void setMysqlServerEndpoint(String mysqlServerEndpoint) {
this.mysqlServerEndpoint = mysqlServerEndpoint;
}

public void setOnlineFeaturestoreName(String onlineFeaturestoreName) {
this.onlineFeaturestoreName = onlineFeaturestoreName;
}
Expand Down Expand Up @@ -176,7 +156,6 @@ public String toString() {
", projectId=" + projectId +
", onlineFeaturestoreName='" + onlineFeaturestoreName + '\'' +
", offlineFeaturestoreName='" + offlineFeaturestoreName + '\'' +
", hiveEndpoint='" + hiveEndpoint + '\'' +
", mysqlServerEndpoint='" + mysqlServerEndpoint + '\'' +
", onlineEnabled=" + onlineEnabled +
", numFeatureGroups=" + numFeatureGroups +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,17 @@

package io.hops.hopsworks.common.featurestore.featuregroup.cached;

import com.logicalclocks.servicediscoverclient.exceptions.ServiceDiscoveryException;
import com.logicalclocks.servicediscoverclient.service.Service;
import io.hops.hopsworks.common.featurestore.FeaturestoreController;
import io.hops.hopsworks.common.featurestore.feature.FeatureGroupFeatureDTO;
import io.hops.hopsworks.common.hdfs.HdfsUsersController;
import io.hops.hopsworks.common.hosts.ServiceDiscoveryController;
import io.hops.hopsworks.common.security.CertificateMaterializer;
import io.hops.hopsworks.common.hive.HiveController;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.exceptions.CryptoPasswordNotFoundException;
import io.hops.hopsworks.exceptions.FeaturestoreException;
import io.hops.hopsworks.exceptions.ServiceException;
import io.hops.hopsworks.persistence.entity.featurestore.Featurestore;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.persistence.entity.user.Users;
import io.hops.hopsworks.restutils.RESTCodes;
import io.hops.hopsworks.servicediscovery.HopsworksService;
import io.hops.hopsworks.servicediscovery.tags.HiveTags;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.TableType;
Expand All @@ -48,13 +42,7 @@
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.thrift.TConfiguration;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSSLTransportFactory;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;

import javax.annotation.PostConstruct;
import javax.ejb.EJB;
Expand Down Expand Up @@ -83,9 +71,7 @@ public class OfflineFeatureGroupController {
@EJB
private Settings settings;
@EJB
private CertificateMaterializer certificateMaterializer;
@EJB
private ServiceDiscoveryController serviceDiscoveryController;
private HiveController hiveController;

private Configuration metastoreConf;

Expand All @@ -95,9 +81,6 @@ public void init() {
metastoreConf.addResource(new Path(settings.getHiveConfPath()));
}

private static final String COMMENT = "comment";
private static final int CONNECTION_TIMEOUT = 600000;

public enum Formats {
ORC("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat",
"org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat",
Expand Down Expand Up @@ -166,7 +149,7 @@ public void createHiveTable(Featurestore featurestore, String tableName,
try {
createTable(client, table, defaultConstraints);
} finally {
finalizeMetastoreOperation(project, user, client);
hiveController.finalizeMetastoreOperation(project, user, client);
}
}

Expand All @@ -193,7 +176,7 @@ public void alterHiveTableFeatures(Featurestore featurestore, String tableName,
alterTable(client, table);
addDefaultConstraints(client, defaultConstraints);
} finally {
finalizeMetastoreOperation(project, user, client);
hiveController.finalizeMetastoreOperation(project, user, client);
}
}

Expand All @@ -209,7 +192,7 @@ public List<FeatureGroupFeatureDTO> getSchema(Featurestore featurestore, String
schema = getFields(client, dbName, tableName);
defaultConstraints = getDefaultConstraints(client, "hive", dbName, tableName);
} finally {
finalizeMetastoreOperation(project, user, client);
hiveController.finalizeMetastoreOperation(project, user, client);
}

// Setup a map of constraint values for easy access
Expand Down Expand Up @@ -299,95 +282,27 @@ private List<FieldSchema> getFields(ThriftHiveMetastore.Client client, String db

private ThriftHiveMetastore.Client getMetaStoreClient(Project project, Users user) throws FeaturestoreException {
try {
return openMetastoreClient(project, user);
return hiveController.openUserMetastoreClient(project, user);
} catch (ServiceException | IOException e) {
throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ERROR_CREATING_HIVE_METASTORE_CLIENT,
Level.SEVERE, "Error opening the Hive Metastore client: " + e.getMessage(), e.getMessage(), e);
}
}

private void finalizeMetastoreOperation(Project project, Users user, ThriftHiveMetastore.Client client) {
certificateMaterializer.removeCertificatesLocal(user.getUsername(), project.getName());
if (client != null) {
try {
client.shutdown();
} catch (TException e) {
LOGGER.log(Level.SEVERE, "Error closing Metastore connection", e);
}
}
}

public void dropFeatureGroup(String dbName, String tableName, Project project, Users user)
throws FeaturestoreException, ServiceException, IOException {
ThriftHiveMetastore.Client client = null;
try {
client = openMetastoreClient(project, user);
client = hiveController.openUserMetastoreClient(project, user);
client.drop_table(dbName, tableName, true);
} catch (TException e) {
throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.COULD_NOT_DELETE_FEATUREGROUP, Level.SEVERE,
"Error dropping feature group in the Hive Metastore: " + e.getMessage(), e.getMessage(), e);
} finally {
finalizeMetastoreOperation(project, user, client);
hiveController.finalizeMetastoreOperation(project, user, client);
}
}

// Here we can't use the HiveMetaStoreClient.java wrapper as we would need to export environment variables and so on
// instead we assemble directly the thirft client, which is what the HiveMetaStoreClient does behind the scenes.
private ThriftHiveMetastore.Client openMetastoreClient(Project project, Users user)
throws ServiceException, IOException {
String hdfsUsername = hdfsUsersController.getHdfsUserName(project, user);
ThriftHiveMetastore.Client client = null;

try {
certificateMaterializer.materializeCertificatesLocal(user.getUsername(), project.getName());
CertificateMaterializer.CryptoMaterial userMaterial =
certificateMaterializer.getUserMaterial(user.getUsername(), project.getName());

// read Password
String password = String.copyValueOf(userMaterial.getPassword());

// Get metastore service information from consul
Service metastoreService = serviceDiscoveryController
.getAnyAddressOfServiceWithDNS(HopsworksService.HIVE.getNameWithTag(HiveTags.metastore));

TTransport transport;
if (settings.getHopsRpcTls()) {
// Setup secure connection with the Hive metastore.
TSSLTransportFactory.TSSLTransportParameters params =
new TSSLTransportFactory.TSSLTransportParameters();
params.setTrustStore(certificateMaterializer.getUserTransientTruststorePath(project, user), password);
params.setKeyStore(certificateMaterializer.getUserTransientKeystorePath(project, user), password);

transport = TSSLTransportFactory.getClientSocket(metastoreService.getAddress(),
metastoreService.getPort(), CONNECTION_TIMEOUT, params);
} else {
transport = new TSocket(TConfiguration.DEFAULT, metastoreService.getAddress(), metastoreService.getPort(),
CONNECTION_TIMEOUT);
}

TProtocol protocol = new TBinaryProtocol(transport);
client = new ThriftHiveMetastore.Client(protocol);

// Open transport
if (!transport.isOpen()) {
transport.open();
}

// Set the UGI on the metastore side
client.set_ugi(hdfsUsername, new ArrayList<>());

if (settings.getHopsRpcTls()) {
// Send the certificate to the metastore so it can operate with the fs.
client.set_crypto(userMaterial.getKeyStore(), password, userMaterial.getTrustStore(), password, false);
}
} catch (CryptoPasswordNotFoundException | ServiceDiscoveryException | TException e) {
throw new ServiceException(RESTCodes.ServiceErrorCode.METASTORE_CONNECTION_ERROR, Level.SEVERE,
"Hive metastore connection error", e.getMessage(), e);
}

return client;
}

private Table getEmptyTable(String databaseName, String tableName, String username, Formats format) {
StorageDescriptor sd = new StorageDescriptor();
{
Expand Down
Loading

0 comments on commit 2e9f2a1

Please sign in to comment.