Skip to content

Commit

Permalink
Pick "[feature](Resource) Support to specify the root path for hdfs r…
Browse files Browse the repository at this point in the history
…esource apache#32632" (apache#35848)


same as apache#32632
  • Loading branch information
ByteYue authored Jun 5, 2024
1 parent c2b830e commit 630fd06
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 2 deletions.
9 changes: 7 additions & 2 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1279,9 +1279,13 @@ void push_storage_policy_callback(StorageEngine& engine, const TAgentTaskRequest
} else if (resource.__isset.hdfs_storage_param) {
Status st;
std::shared_ptr<io::HdfsFileSystem> fs;
std::string root_path = resource.hdfs_storage_param.__isset.root_path
? resource.hdfs_storage_param.root_path
: "";
if (existed_resource.fs == nullptr) {
st = io::HdfsFileSystem::create(resource.hdfs_storage_param,
std::to_string(resource.id), "", nullptr, &fs);
std::to_string(resource.id), root_path, nullptr,
&fs);
} else {
fs = std::static_pointer_cast<io::HdfsFileSystem>(existed_resource.fs);
}
Expand All @@ -1290,7 +1294,8 @@ void push_storage_policy_callback(StorageEngine& engine, const TAgentTaskRequest
} else {
LOG_INFO("successfully update hdfs resource")
.tag("resource_id", resource.id)
.tag("resource_name", resource.name);
.tag("resource_name", resource.name)
.tag("root_path", fs->root_path().string());
put_storage_resource(resource.id, {std::move(fs), resource.version});
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
public class HdfsResource extends Resource {
public static final String HADOOP_FS_PREFIX = "dfs.";
public static String HADOOP_FS_NAME = "fs.defaultFS";
public static String HADOOP_FS_ROOT_PATH = "root_path";
public static String HADOOP_SHORT_CIRCUIT = "dfs.client.read.shortcircuit";
public static String HADOOP_SOCKET_PATH = "dfs.domain.socket.path";
public static String DSF_NAMESERVICES = "dfs.nameservices";
Expand Down Expand Up @@ -106,6 +107,8 @@ public static THdfsParams generateHdfsParam(Map<String, String> properties) {
for (Map.Entry<String, String> property : properties.entrySet()) {
if (property.getKey().equalsIgnoreCase(HADOOP_FS_NAME)) {
tHdfsParams.setFsName(property.getValue());
} else if (property.getKey().equalsIgnoreCase(HADOOP_FS_ROOT_PATH)) {
tHdfsParams.setRootPath(property.getValue());
} else if (property.getKey().equalsIgnoreCase(AuthenticationConfig.HADOOP_USER_NAME)) {
tHdfsParams.setUser(property.getValue());
} else if (property.getKey().equalsIgnoreCase(AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL)) {
Expand Down
2 changes: 2 additions & 0 deletions gensrc/thrift/PlanNodes.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ struct THdfsParams {
3: optional string hdfs_kerberos_principal
4: optional string hdfs_kerberos_keytab
5: optional list<THdfsConf> hdfs_conf
// Used for Cold Heat Separation to specify the root path
6: optional string root_path
}

// One broker range information.
Expand Down

0 comments on commit 630fd06

Please sign in to comment.