diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp index dc401398f68c35..6811ee3bc4ef36 100644 --- a/cloud/src/common/bvars.cpp +++ b/cloud/src/common/bvars.cpp @@ -50,6 +50,7 @@ BvarLatencyRecorderWithTag g_bvar_ms_drop_partition("ms", "drop_partition"); BvarLatencyRecorderWithTag g_bvar_ms_get_tablet_stats("ms", "get_tablet_stats"); BvarLatencyRecorderWithTag g_bvar_ms_get_obj_store_info("ms", "get_obj_store_info"); BvarLatencyRecorderWithTag g_bvar_ms_alter_obj_store_info("ms", "alter_obj_store_info"); +BvarLatencyRecorderWithTag g_bvar_ms_alter_storage_vault("ms", "alter_storage_vault"); BvarLatencyRecorderWithTag g_bvar_ms_create_instance("ms", "create_instance"); BvarLatencyRecorderWithTag g_bvar_ms_alter_instance("ms", "alter_instance"); BvarLatencyRecorderWithTag g_bvar_ms_alter_cluster("ms", "alter_cluster"); diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h index f2957e35940334..b8032060cb0313 100644 --- a/cloud/src/common/bvars.h +++ b/cloud/src/common/bvars.h @@ -148,6 +148,7 @@ extern BvarLatencyRecorderWithTag g_bvar_ms_drop_partition; extern BvarLatencyRecorderWithTag g_bvar_ms_get_tablet_stats; extern BvarLatencyRecorderWithTag g_bvar_ms_get_obj_store_info; extern BvarLatencyRecorderWithTag g_bvar_ms_alter_obj_store_info; +extern BvarLatencyRecorderWithTag g_bvar_ms_alter_storage_vault; extern BvarLatencyRecorderWithTag g_bvar_ms_create_instance; extern BvarLatencyRecorderWithTag g_bvar_ms_alter_instance; extern BvarLatencyRecorderWithTag g_bvar_ms_alter_cluster; diff --git a/cloud/src/meta-service/meta_service.h b/cloud/src/meta-service/meta_service.h index e2360e9e6ba2f7..b0401239a56c18 100644 --- a/cloud/src/meta-service/meta_service.h +++ b/cloud/src/meta-service/meta_service.h @@ -181,6 +181,11 @@ class MetaServiceImpl : public cloud::MetaService { AlterObjStoreInfoResponse* response, ::google::protobuf::Closure* done) override; + void alter_storage_vault(google::protobuf::RpcController* controller, + const AlterObjStoreInfoRequest* request, + AlterObjStoreInfoResponse* response, + ::google::protobuf::Closure* done) override; + void update_ak_sk(google::protobuf::RpcController* controller, const UpdateAkSkRequest* request, UpdateAkSkResponse* response, ::google::protobuf::Closure* done) override; @@ -485,6 +490,13 @@ class MetaServiceProxy final : public MetaService { call_impl(&cloud::MetaService::alter_obj_store_info, controller, request, response, done); } + void alter_storage_vault(google::protobuf::RpcController* controller, + const AlterObjStoreInfoRequest* request, + AlterObjStoreInfoResponse* response, + ::google::protobuf::Closure* done) override { + call_impl(&cloud::MetaService::alter_storage_vault, controller, request, response, done); + } + void update_ak_sk(google::protobuf::RpcController* controller, const UpdateAkSkRequest* request, UpdateAkSkResponse* response, ::google::protobuf::Closure* done) override { call_impl(&cloud::MetaService::update_ak_sk, controller, request, response, done); diff --git a/cloud/src/meta-service/meta_service_http.cpp b/cloud/src/meta-service/meta_service_http.cpp index 4542e05e486f23..ad56ffd9ca217e 100644 --- a/cloud/src/meta-service/meta_service_http.cpp +++ b/cloud/src/meta-service/meta_service_http.cpp @@ -208,7 +208,26 @@ static HttpResponse process_get_obj_store_info(MetaServiceImpl* service, brpc::C static HttpResponse process_alter_obj_store_info(MetaServiceImpl* service, brpc::Controller* ctrl) { static std::unordered_map operations { {"add_obj_info", AlterObjStoreInfoRequest::ADD_OBJ_INFO}, - {"legacy_update_ak_sk", AlterObjStoreInfoRequest::LEGACY_UPDATE_AK_SK}, + {"legacy_update_ak_sk", AlterObjStoreInfoRequest::LEGACY_UPDATE_AK_SK}}; + + auto& path = ctrl->http_request().unresolved_path(); + auto it = operations.find(remove_version_prefix(path)); + if (it == operations.end()) { + std::string msg = "not supportted alter obj store info operation: " + path; + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); + } + + AlterObjStoreInfoRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + req.set_op(it->second); + + AlterObjStoreInfoResponse resp; + service->alter_obj_store_info(ctrl, &req, &resp, nullptr); + return http_json_reply(resp.status()); +} + +static HttpResponse process_alter_storage_vault(MetaServiceImpl* service, brpc::Controller* ctrl) { + static std::unordered_map operations { {"drop_s3_vault", AlterObjStoreInfoRequest::DROP_S3_VAULT}, {"add_s3_vault", AlterObjStoreInfoRequest::ADD_S3_VAULT}, {"drop_hdfs_vault", AlterObjStoreInfoRequest::DROP_HDFS_INFO}, @@ -217,7 +236,7 @@ static HttpResponse process_alter_obj_store_info(MetaServiceImpl* service, brpc: auto& path = ctrl->http_request().unresolved_path(); auto it = operations.find(remove_version_prefix(path)); if (it == operations.end()) { - std::string msg = "not supportted alter obj store info operation: " + path; + std::string msg = "not supportted alter storage vault operation: " + path; return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); } @@ -226,7 +245,7 @@ static HttpResponse process_alter_obj_store_info(MetaServiceImpl* service, brpc: req.set_op(it->second); AlterObjStoreInfoResponse resp; - service->alter_obj_store_info(ctrl, &req, &resp, nullptr); + service->alter_storage_vault(ctrl, &req, &resp, nullptr); return http_json_reply(resp.status()); } @@ -447,10 +466,11 @@ void MetaServiceImpl::http(::google::protobuf::RpcController* controller, {"v1/legacy_update_ak_sk", process_alter_obj_store_info}, {"v1/update_ak_sk", process_update_ak_sk}, {"show_storage_vaults", process_get_obj_store_info}, - {"add_hdfs_vault", process_alter_obj_store_info}, - {"add_s3_vault", process_alter_obj_store_info}, - {"drop_s3_vault", process_alter_obj_store_info}, - {"drop_hdfs_vault", process_alter_obj_store_info}, + {"add_hdfs_vault", process_alter_storage_vault}, + {"add_s3_vault", process_alter_storage_vault}, + {"alter_s3_vault", process_alter_storage_vault}, + {"drop_s3_vault", process_alter_storage_vault}, + {"drop_hdfs_vault", process_alter_storage_vault}, // for tools {"decode_key", process_decode_key}, {"encode_key", process_encode_key}, diff --git a/cloud/src/meta-service/meta_service_resource.cpp b/cloud/src/meta-service/meta_service_resource.cpp index 6175df6c40dd75..a73301205dcf7e 100644 --- a/cloud/src/meta-service/meta_service_resource.cpp +++ b/cloud/src/meta-service/meta_service_resource.cpp @@ -71,6 +71,7 @@ static int encrypt_ak_sk_helper(const std::string plain_ak, const std::string pl MetaServiceCode& code, std::string& msg) { std::string key; int64_t key_id; + LOG_INFO("enter encrypt_ak_sk_helper, plain_ak {}", plain_ak); int ret = get_newest_encryption_key_for_ak_sk(&key_id, &key); TEST_SYNC_POINT_CALLBACK("encrypt_ak_sk:get_encryption_key", &ret, &key, &key_id); if (ret != 0) { @@ -509,56 +510,216 @@ static void set_default_vault_log_helper(const InstanceInfoPB& instance, LOG(INFO) << vault_msg; } -void MetaServiceImpl::alter_obj_store_info(google::protobuf::RpcController* controller, - const AlterObjStoreInfoRequest* request, - AlterObjStoreInfoResponse* response, - ::google::protobuf::Closure* done) { +static int alter_s3_storage_vault(InstanceInfoPB& instance, std::unique_ptr txn, + const StorageVaultPB& vault, MetaServiceCode& code, + std::string& msg) { + if (!vault.has_obj_info()) { + code = MetaServiceCode::INVALID_ARGUMENT; + std::stringstream ss; + ss << "Only s3 vault can be altered"; + msg = ss.str(); + return -1; + } + const auto& obj_info = vault.obj_info(); + if (obj_info.has_bucket() || obj_info.has_endpoint() || obj_info.has_prefix() || + obj_info.has_provider()) { + code = MetaServiceCode::INVALID_ARGUMENT; + std::stringstream ss; + ss << "Only ak, sk can be altered"; + msg = ss.str(); + return -1; + } + const auto& name = vault.name(); + auto name_itr = std::find_if(instance.storage_vault_names().begin(), + instance.storage_vault_names().end(), + [&](const auto& vault_name) { return name == vault_name; }); + if (name_itr == instance.storage_vault_names().end()) { + code = MetaServiceCode::INVALID_ARGUMENT; + std::stringstream ss; + ss << "invalid storage vault name, not found, name =" << name; + msg = ss.str(); + return -1; + } + auto pos = name_itr - instance.storage_vault_names().begin(); + auto id_itr = instance.resource_ids().begin() + pos; + auto vault_key = storage_vault_key({instance.instance_id(), *id_itr}); + std::string val; + + auto err = txn->get(vault_key, &val); + LOG(INFO) << "get instance_key=" << hex(vault_key); + + if (err != TxnErrorCode::TXN_OK) { + code = cast_as(err); + std::stringstream ss; + ss << "failed to get storage vault, vault_id=" << *name_itr << ", vault_name=" + << "" << name << " err=" << err; + msg = ss.str(); + return -1; + } + StorageVaultPB alter; + alter.ParseFromString(val); + AkSkPair pre {alter.obj_info().ak(), alter.obj_info().sk()}; + const auto& plain_ak = obj_info.has_ak() ? obj_info.ak() : alter.obj_info().ak(); + const auto& plain_sk = obj_info.has_ak() ? obj_info.sk() : alter.obj_info().sk(); + auto obfuscating_sk = [](const auto& sk) -> std::string { + if (sk.empty()) { + return ""; + } + std::string result(sk.length(), '*'); + result.replace(0, 2, sk, 0, 2); + result.replace(result.length() - 2, 2, sk, sk.length() - 2, 2); + return result; + }; + AkSkPair plain_ak_sk_pair {plain_ak, plain_sk}; + AkSkPair cipher_ak_sk_pair; + EncryptionInfoPB encryption_info; + auto ret = encrypt_ak_sk_helper(plain_ak, plain_sk, &encryption_info, &cipher_ak_sk_pair, code, + msg); + if (ret != 0) { + msg = "failed to encrypt"; + code = MetaServiceCode::ERR_ENCRYPT; + LOG(WARNING) << msg; + return -1; + } + alter.mutable_obj_info()->set_ak(cipher_ak_sk_pair.first); + alter.mutable_obj_info()->set_sk(cipher_ak_sk_pair.second); + alter.mutable_obj_info()->mutable_encryption_info()->CopyFrom(encryption_info); + + val = alter.SerializeAsString(); + if (val.empty()) { + msg = "failed to serialize"; + code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; + return -1; + } + + txn->put(vault_key, val); + LOG(INFO) << "put vault_id=" << *id_itr << ", instance_key=" << hex(vault_key) + << ", previous ak=" << pre.first << ", previous sk=" << obfuscating_sk(pre.second) + << ", new ak=" << cipher_ak_sk_pair.first + << ", new sk=" << obfuscating_sk(cipher_ak_sk_pair.second); + err = txn->commit(); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as(err); + msg = fmt::format("failed to commit kv txn, err={}", err); + LOG(WARNING) << msg; + } + + return 0; +} + +struct ObjectStorageDesc { + std::string& ak; + std::string& sk; + std::string& bucket; + std::string& prefix; + std::string& endpoint; + std::string& external_endpoint; + std::string& region; +}; + +static int extract_object_storage_info(const AlterObjStoreInfoRequest* request, + MetaServiceCode& code, std::string& msg, + ObjectStorageDesc& obj_desc, + EncryptionInfoPB& encryption_info, + AkSkPair& cipher_ak_sk_pair) { + if (!request->has_obj() && (!request->has_vault() || !request->vault().has_obj_info())) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "s3 obj info err " + proto_to_json(*request); + return -1; + } + auto& [ak, sk, bucket, prefix, endpoint, external_endpoint, region] = obj_desc; + const auto& obj = request->has_obj() ? request->obj() : request->vault().obj_info(); + // Prepare data + if (!obj.has_ak() || !obj.has_sk()) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "s3 obj info err " + proto_to_json(*request); + return -1; + } + + std::string plain_ak = obj.has_ak() ? obj.ak() : ""; + std::string plain_sk = obj.has_sk() ? obj.sk() : ""; + + auto ret = encrypt_ak_sk_helper(plain_ak, plain_sk, &encryption_info, &cipher_ak_sk_pair, code, + msg); + if (ret != 0) { + return -1; + } + TEST_SYNC_POINT_CALLBACK("extract_object_storage_info:get_aksk_pair", &cipher_ak_sk_pair); + + ak = cipher_ak_sk_pair.first; + sk = cipher_ak_sk_pair.second; + bucket = obj.has_bucket() ? obj.bucket() : ""; + prefix = obj.has_prefix() ? obj.prefix() : ""; + endpoint = obj.has_endpoint() ? obj.endpoint() : ""; + external_endpoint = obj.has_external_endpoint() ? obj.external_endpoint() : ""; + region = obj.has_region() ? obj.region() : ""; + // obj size > 1k, refuse + if (obj.ByteSizeLong() > 1024) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "s3 obj info greater than 1k " + proto_to_json(*request); + return -1; + }; + return 0; +} + +static ObjectStoreInfoPB object_info_pb_factory(ObjectStorageDesc& obj_info, + const ObjectStoreInfoPB& obj, + InstanceInfoPB& instance, + EncryptionInfoPB& encryption_info, + AkSkPair& cipher_ak_sk_pair) { + ObjectStoreInfoPB last_item; + auto& [ak, sk, bucket, prefix, endpoint, external_endpoint, region] = obj_info; + auto now_time = std::chrono::system_clock::now(); + uint64_t time = + std::chrono::duration_cast(now_time.time_since_epoch()).count(); + last_item.set_ctime(time); + last_item.set_mtime(time); + last_item.set_id(next_available_vault_id(instance)); + if (obj.has_user_id()) { + last_item.set_user_id(obj.user_id()); + } + last_item.set_ak(std::move(cipher_ak_sk_pair.first)); + last_item.set_sk(std::move(cipher_ak_sk_pair.second)); + last_item.mutable_encryption_info()->CopyFrom(encryption_info); + last_item.set_bucket(bucket); + // format prefix, such as `/aa/bb/`, `aa/bb//`, `//aa/bb`, ` /aa/bb` -> `aa/bb` + trim(prefix); + last_item.set_prefix(prefix); + last_item.set_endpoint(endpoint); + last_item.set_external_endpoint(external_endpoint); + last_item.set_region(region); + last_item.set_provider(obj.provider()); + last_item.set_sse_enabled(instance.sse_enabled()); + return last_item; +} + +void MetaServiceImpl::alter_storage_vault(google::protobuf::RpcController* controller, + const AlterObjStoreInfoRequest* request, + AlterObjStoreInfoResponse* response, + ::google::protobuf::Closure* done) { std::string ak, sk, bucket, prefix, endpoint, external_endpoint, region; EncryptionInfoPB encryption_info; AkSkPair cipher_ak_sk_pair; - RPC_PREPROCESS(alter_obj_store_info); + RPC_PREPROCESS(alter_storage_vault); switch (request->op()) { - case AlterObjStoreInfoRequest::ADD_OBJ_INFO: case AlterObjStoreInfoRequest::ADD_S3_VAULT: - case AlterObjStoreInfoRequest::DROP_S3_VAULT: - case AlterObjStoreInfoRequest::LEGACY_UPDATE_AK_SK: - case AlterObjStoreInfoRequest::UPDATE_AK_SK: { - if (!request->has_obj() && (!request->has_vault() || !request->vault().has_obj_info())) { - code = MetaServiceCode::INVALID_ARGUMENT; - msg = "s3 obj info err " + proto_to_json(*request); + case AlterObjStoreInfoRequest::DROP_S3_VAULT: { + auto tmp_desc = + ObjectStorageDesc {ak, sk, bucket, prefix, endpoint, external_endpoint, region}; + if (0 != extract_object_storage_info(request, code, msg, tmp_desc, encryption_info, + cipher_ak_sk_pair)) { return; } - auto& obj = request->has_obj() ? request->obj() : request->vault().obj_info(); - // Prepare data - if (!obj.has_ak() || !obj.has_sk()) { + } break; + case AlterObjStoreInfoRequest::ADD_BUILT_IN_VAULT: { + // It should at least has one hdfs info or obj info inside storage vault + if ((!request->has_vault())) { code = MetaServiceCode::INVALID_ARGUMENT; - msg = "s3 obj info err " + proto_to_json(*request); - return; - } - - std::string plain_ak = obj.has_ak() ? obj.ak() : ""; - std::string plain_sk = obj.has_sk() ? obj.sk() : ""; - - auto ret = encrypt_ak_sk_helper(plain_ak, plain_sk, &encryption_info, &cipher_ak_sk_pair, - code, msg); - if (ret != 0) { + msg = "storage vault is set " + proto_to_json(*request); return; } - ak = cipher_ak_sk_pair.first; - sk = cipher_ak_sk_pair.second; - bucket = obj.has_bucket() ? obj.bucket() : ""; - prefix = obj.has_prefix() ? obj.prefix() : ""; - endpoint = obj.has_endpoint() ? obj.endpoint() : ""; - external_endpoint = obj.has_external_endpoint() ? obj.external_endpoint() : ""; - region = obj.has_region() ? obj.region() : ""; - - // obj size > 1k, refuse - if (obj.ByteSizeLong() > 1024) { - code = MetaServiceCode::INVALID_ARGUMENT; - msg = "s3 obj info greater than 1k " + proto_to_json(*request); - return; - }; - } break; + break; + } case AlterObjStoreInfoRequest::ADD_HDFS_INFO: case AlterObjStoreInfoRequest::DROP_HDFS_INFO: { if (!request->has_vault() || !request->vault().has_name()) { @@ -575,22 +736,20 @@ void MetaServiceImpl::alter_obj_store_info(google::protobuf::RpcController* cont } break; } - case AlterObjStoreInfoRequest::ADD_BUILT_IN_VAULT: { - // It should at least has one hdfs info or obj info inside storage vault - if ((!request->has_vault())) { - code = MetaServiceCode::INVALID_ARGUMENT; - msg = "hdfs info is not found " + proto_to_json(*request); - return; - } + case AlterObjStoreInfoRequest::ALTER_S3_VAULT: + break; + case AlterObjStoreInfoRequest::UNSET_DEFAULT_VAULT: break; - } case AlterObjStoreInfoRequest::UNKNOWN: { code = MetaServiceCode::INVALID_ARGUMENT; msg = "Unknown alter info " + proto_to_json(*request); return; } break; - case AlterObjStoreInfoRequest::UNSET_DEFAULT_VAULT: - break; + default: + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "Unknown alter obj store info, request info " + proto_to_json(*request); + LOG_WARNING("Unknown alter obj store info, request info {}", request->DebugString()); + return; } // TODO(dx): check s3 info right @@ -646,45 +805,13 @@ void MetaServiceImpl::alter_obj_store_info(google::protobuf::RpcController* cont return; } - auto now_time = std::chrono::system_clock::now(); - uint64_t time = - std::chrono::duration_cast(now_time.time_since_epoch()).count(); - switch (request->op()) { - case AlterObjStoreInfoRequest::LEGACY_UPDATE_AK_SK: { - // get id - std::string id = request->obj().has_id() ? request->obj().id() : "0"; - int idx = std::stoi(id); - if (idx < 1 || idx > instance.obj_info().size()) { - // err - code = MetaServiceCode::INVALID_ARGUMENT; - msg = "id invalid, please check it"; - return; - } - auto& obj_info = - const_cast&>(instance.obj_info()); - for (auto& it : obj_info) { - if (std::stoi(it.id()) == idx) { - if (it.ak() == ak && it.sk() == sk) { - // not change, just return ok - code = MetaServiceCode::OK; - msg = ""; - return; - } - it.set_mtime(time); - it.set_ak(ak); - it.set_sk(sk); - it.mutable_encryption_info()->CopyFrom(encryption_info); - } - } - } break; - case AlterObjStoreInfoRequest::ADD_OBJ_INFO: - if (instance.enable_storage_vault()) { + case AlterObjStoreInfoRequest::ADD_S3_VAULT: { + if (!instance.enable_storage_vault()) { code = MetaServiceCode::INVALID_ARGUMENT; - msg = "Storage vault doesn't support add obj info"; + msg = "Storage vault doesn't support storage vault"; return; } - case AlterObjStoreInfoRequest::ADD_S3_VAULT: { auto& obj = request->has_obj() ? request->obj() : request->vault().obj_info(); if (!obj.has_provider()) { code = MetaServiceCode::INVALID_ARGUMENT; @@ -715,58 +842,37 @@ void MetaServiceImpl::alter_obj_store_info(google::protobuf::RpcController* cont } } // calc id - ObjectStoreInfoPB last_item; - last_item.set_ctime(time); - last_item.set_mtime(time); - last_item.set_id(next_available_vault_id(instance)); - if (obj.has_user_id()) { - last_item.set_user_id(obj.user_id()); - } - last_item.set_ak(std::move(cipher_ak_sk_pair.first)); - last_item.set_sk(std::move(cipher_ak_sk_pair.second)); - last_item.mutable_encryption_info()->CopyFrom(encryption_info); - last_item.set_bucket(bucket); - // format prefix, such as `/aa/bb/`, `aa/bb//`, `//aa/bb`, ` /aa/bb` -> `aa/bb` - trim(prefix); - last_item.set_prefix(prefix); - last_item.set_endpoint(endpoint); - last_item.set_external_endpoint(external_endpoint); - last_item.set_region(region); - last_item.set_provider(obj.provider()); - last_item.set_sse_enabled(instance.sse_enabled()); - if (request->op() == AlterObjStoreInfoRequest::ADD_OBJ_INFO) { - instance.add_obj_info()->CopyFrom(last_item); - LOG_INFO("Instance {} tries to put obj info", instance.instance_id()); - } else if (request->op() == AlterObjStoreInfoRequest::ADD_S3_VAULT) { - if (instance.storage_vault_names().end() != - std::find_if(instance.storage_vault_names().begin(), - instance.storage_vault_names().end(), - [&](const std::string& candidate_name) { - return candidate_name == request->vault().name(); - })) { - code = MetaServiceCode::ALREADY_EXISTED; - msg = fmt::format("vault_name={} already created", request->vault().name()); - return; - } - StorageVaultPB vault; - vault.set_id(last_item.id()); - vault.set_name(request->vault().name()); - *instance.mutable_resource_ids()->Add() = vault.id(); - *instance.mutable_storage_vault_names()->Add() = vault.name(); - vault.mutable_obj_info()->MergeFrom(last_item); - auto vault_key = storage_vault_key({instance.instance_id(), last_item.id()}); - txn->put(vault_key, vault.SerializeAsString()); - if (request->has_set_as_default_storage_vault() && - request->set_as_default_storage_vault()) { - response->set_default_storage_vault_replaced( - instance.has_default_storage_vault_id()); - set_default_vault_log_helper(instance, vault.name(), vault.id()); - instance.set_default_storage_vault_id(vault.id()); - instance.set_default_storage_vault_name(vault.name()); - } - LOG_INFO("try to put storage vault_id={}, vault_name={}, vault_key={}", vault.id(), - vault.name(), hex(vault_key)); + auto tmp_tuple = + ObjectStorageDesc {ak, sk, bucket, prefix, endpoint, external_endpoint, region}; + ObjectStoreInfoPB last_item = object_info_pb_factory(tmp_tuple, obj, instance, + encryption_info, cipher_ak_sk_pair); + if (instance.storage_vault_names().end() != + std::find_if(instance.storage_vault_names().begin(), + instance.storage_vault_names().end(), + [&](const std::string& candidate_name) { + return candidate_name == request->vault().name(); + })) { + code = MetaServiceCode::ALREADY_EXISTED; + msg = fmt::format("vault_name={} already created", request->vault().name()); + return; + } + StorageVaultPB vault; + vault.set_id(last_item.id()); + vault.set_name(request->vault().name()); + *instance.mutable_resource_ids()->Add() = vault.id(); + *instance.mutable_storage_vault_names()->Add() = vault.name(); + vault.mutable_obj_info()->MergeFrom(last_item); + auto vault_key = storage_vault_key({instance.instance_id(), last_item.id()}); + txn->put(vault_key, vault.SerializeAsString()); + if (request->has_set_as_default_storage_vault() && + request->set_as_default_storage_vault()) { + response->set_default_storage_vault_replaced(instance.has_default_storage_vault_id()); + set_default_vault_log_helper(instance, vault.name(), vault.id()); + instance.set_default_storage_vault_id(vault.id()); + instance.set_default_storage_vault_name(vault.name()); } + LOG_INFO("try to put storage vault_id={}, vault_name={}, vault_key={}", vault.id(), + vault.name(), hex(vault_key)); } break; case AlterObjStoreInfoRequest::ADD_HDFS_INFO: { if (auto ret = add_vault_into_instance( @@ -835,6 +941,10 @@ void MetaServiceImpl::alter_obj_store_info(google::protobuf::RpcController* cont instance.clear_default_storage_vault_name(); break; } + case AlterObjStoreInfoRequest::ALTER_S3_VAULT: { + alter_s3_storage_vault(instance, std::move(txn), request->vault(), code, msg); + return; + } case AlterObjStoreInfoRequest::DROP_S3_VAULT: [[fallthrough]]; default: { @@ -865,6 +975,193 @@ void MetaServiceImpl::alter_obj_store_info(google::protobuf::RpcController* cont } } +void MetaServiceImpl::alter_obj_store_info(google::protobuf::RpcController* controller, + const AlterObjStoreInfoRequest* request, + AlterObjStoreInfoResponse* response, + ::google::protobuf::Closure* done) { + std::string ak, sk, bucket, prefix, endpoint, external_endpoint, region; + EncryptionInfoPB encryption_info; + AkSkPair cipher_ak_sk_pair; + RPC_PREPROCESS(alter_obj_store_info); + switch (request->op()) { + case AlterObjStoreInfoRequest::ADD_OBJ_INFO: + case AlterObjStoreInfoRequest::LEGACY_UPDATE_AK_SK: + case AlterObjStoreInfoRequest::UPDATE_AK_SK: { + auto tmp_desc = + ObjectStorageDesc {ak, sk, bucket, prefix, endpoint, external_endpoint, region}; + if (0 != extract_object_storage_info(request, code, msg, tmp_desc, encryption_info, + cipher_ak_sk_pair)) { + return; + } + } break; + case AlterObjStoreInfoRequest::UNKNOWN: { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "Unknown alter info " + proto_to_json(*request); + return; + } break; + default: + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "Unknown alter obj store info, request info " + proto_to_json(*request); + LOG_WARNING("Unknown alter obj store info, request info {}", request->DebugString()); + return; + } + + // TODO(dx): check s3 info right + + std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : ""; + if (cloud_unique_id.empty()) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "cloud unique id not set"; + return; + } + + instance_id = get_instance_id(resource_mgr_, cloud_unique_id); + if (instance_id.empty()) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "empty instance_id"; + LOG(INFO) << msg << ", cloud_unique_id=" << cloud_unique_id; + return; + } + RPC_RATE_LIMIT(alter_obj_store_info) + InstanceKeyInfo key_info {instance_id}; + std::string key; + std::string val; + instance_key(key_info, &key); + + std::unique_ptr txn; + TxnErrorCode err = txn_kv_->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as(err); + msg = "failed to create txn"; + LOG(WARNING) << msg << " err=" << err; + return; + } + err = txn->get(key, &val); + LOG(INFO) << "get instance_key=" << hex(key); + + if (err != TxnErrorCode::TXN_OK) { + code = cast_as(err); + ss << "failed to get instance, instance_id=" << instance_id << " err=" << err; + msg = ss.str(); + return; + } + + InstanceInfoPB instance; + if (!instance.ParseFromString(val)) { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + msg = "failed to parse InstanceInfoPB"; + return; + } + + if (instance.status() == InstanceInfoPB::DELETED) { + code = MetaServiceCode::CLUSTER_NOT_FOUND; + msg = "instance status has been set delete, plz check it"; + return; + } + + switch (request->op()) { + case AlterObjStoreInfoRequest::LEGACY_UPDATE_AK_SK: { + // get id + std::string id = request->obj().has_id() ? request->obj().id() : "0"; + int idx = std::stoi(id); + if (idx < 1 || idx > instance.obj_info().size()) { + // err + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "id invalid, please check it"; + return; + } + auto& obj_info = + const_cast&>(instance.obj_info()); + for (auto& it : obj_info) { + if (std::stoi(it.id()) == idx) { + if (it.ak() == ak && it.sk() == sk) { + // not change, just return ok + code = MetaServiceCode::OK; + msg = ""; + return; + } + auto now_time = std::chrono::system_clock::now(); + uint64_t time = std::chrono::duration_cast( + now_time.time_since_epoch()) + .count(); + it.set_mtime(time); + it.set_ak(ak); + it.set_sk(sk); + it.mutable_encryption_info()->CopyFrom(encryption_info); + } + } + } break; + case AlterObjStoreInfoRequest::ADD_OBJ_INFO: { + if (instance.enable_storage_vault()) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "Storage vault doesn't support add obj info"; + return; + } + auto& obj = request->has_obj() ? request->obj() : request->vault().obj_info(); + if (!obj.has_provider()) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "s3 conf lease provider info"; + return; + } + if (instance.obj_info().size() >= 10) { + code = MetaServiceCode::UNDEFINED_ERR; + msg = "this instance history has greater than 10 objs, please new another instance"; + return; + } + // ATTN: prefix may be empty + if (ak.empty() || sk.empty() || bucket.empty() || endpoint.empty() || region.empty()) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "s3 conf info err, please check it"; + return; + } + + auto& objs = instance.obj_info(); + for (auto& it : objs) { + if (bucket == it.bucket() && prefix == it.prefix() && endpoint == it.endpoint() && + region == it.region() && ak == it.ak() && sk == it.sk() && + obj.provider() == it.provider() && external_endpoint == it.external_endpoint()) { + // err, anything not changed + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "original obj infos has a same conf, please check it"; + return; + } + } + // calc id + auto tmp_tuple = + ObjectStorageDesc {ak, sk, bucket, prefix, endpoint, external_endpoint, region}; + ObjectStoreInfoPB last_item = object_info_pb_factory(tmp_tuple, obj, instance, + encryption_info, cipher_ak_sk_pair); + instance.add_obj_info()->CopyFrom(last_item); + LOG_INFO("Instance {} tries to put obj info", instance.instance_id()); + } break; + default: { + code = MetaServiceCode::INVALID_ARGUMENT; + ss << "invalid request op, op=" << request->op(); + msg = ss.str(); + return; + } + } + + LOG(INFO) << "instance " << instance_id << " has " << instance.obj_info().size() + << " s3 history info, and instance = " << proto_to_json(instance); + + val = instance.SerializeAsString(); + if (val.empty()) { + msg = "failed to serialize"; + code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; + return; + } + + txn->put(key, val); + LOG(INFO) << "put instance_id=" << instance_id << " instance_key=" << hex(key); + err = txn->commit(); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as(err); + msg = fmt::format("failed to commit kv txn, err={}", err); + LOG(WARNING) << msg; + } +} + void MetaServiceImpl::update_ak_sk(google::protobuf::RpcController* controller, const UpdateAkSkRequest* request, UpdateAkSkResponse* response, ::google::protobuf::Closure* done) { diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp index 850f8cfbabd641..31292e9eb3f781 100644 --- a/cloud/test/meta_service_test.cpp +++ b/cloud/test/meta_service_test.cpp @@ -428,6 +428,110 @@ TEST(MetaServiceTest, CreateInstanceTest) { } } +TEST(MetaServiceTest, AlterS3StorageVaultTest) { + auto meta_service = get_meta_service(); + + auto sp = SyncPoint::get_instance(); + sp->enable_processing(); + sp->set_call_back("encrypt_ak_sk:get_encryption_key", [](auto&& args) { + auto* ret = try_any_cast(args[0]); + *ret = 0; + auto* key = try_any_cast(args[1]); + *key = "selectdbselectdbselectdbselectdb"; + auto* key_id = try_any_cast(args[2]); + *key_id = 1; + }); + std::pair pair; + sp->set_call_back("extract_object_storage_info:get_aksk_pair", [&](auto&& args) { + auto* ret = try_any_cast*>(args[0]); + pair = *ret; + }); + + std::unique_ptr txn; + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + std::string key; + std::string val; + InstanceKeyInfo key_info {"test_instance"}; + instance_key(key_info, &key); + + ObjectStoreInfoPB obj_info; + obj_info.set_id("1"); + obj_info.set_ak("ak"); + obj_info.set_sk("sk"); + StorageVaultPB vault; + vault.mutable_obj_info()->MergeFrom(obj_info); + vault.set_name("test_alter_s3_vault"); + vault.set_id("2"); + InstanceInfoPB instance; + instance.add_storage_vault_names(vault.name()); + instance.add_resource_ids(vault.id()); + instance.set_instance_id("GetObjStoreInfoTestInstance"); + val = instance.SerializeAsString(); + txn->put(key, val); + txn->put(storage_vault_key({instance.instance_id(), "2"}), vault.SerializeAsString()); + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); + txn = nullptr; + + auto get_test_instance = [&](InstanceInfoPB& i) { + std::string key; + std::string val; + std::unique_ptr txn; + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + InstanceKeyInfo key_info {"test_instance"}; + instance_key(key_info, &key); + ASSERT_EQ(txn->get(key, &val), TxnErrorCode::TXN_OK); + i.ParseFromString(val); + }; + + { + AlterObjStoreInfoRequest req; + req.set_cloud_unique_id("test_cloud_unique_id"); + req.set_op(AlterObjStoreInfoRequest::ALTER_S3_VAULT); + StorageVaultPB vault; + vault.mutable_obj_info()->set_ak("new_ak"); + vault.set_name("test_alter_s3_vault"); + req.mutable_vault()->CopyFrom(vault); + + brpc::Controller cntl; + AlterObjStoreInfoResponse res; + meta_service->alter_storage_vault( + reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg(); + InstanceInfoPB instance; + get_test_instance(instance); + + std::unique_ptr txn; + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + std::string val; + ASSERT_EQ(txn->get(storage_vault_key({instance.instance_id(), "2"}), &val), + TxnErrorCode::TXN_OK); + StorageVaultPB get_obj; + get_obj.ParseFromString(val); + ASSERT_EQ(get_obj.obj_info().ak(), "new_ak") << get_obj.obj_info().ak(); + } + + { + AlterObjStoreInfoRequest req; + req.set_cloud_unique_id("test_cloud_unique_id"); + req.set_op(AlterObjStoreInfoRequest::ALTER_S3_VAULT); + StorageVaultPB vault; + ObjectStoreInfoPB obj; + obj_info.set_ak("new_ak"); + vault.mutable_obj_info()->MergeFrom(obj); + vault.set_name("test_alter_s3_vault_non_exist"); + req.mutable_vault()->CopyFrom(vault); + + brpc::Controller cntl; + AlterObjStoreInfoResponse res; + meta_service->alter_storage_vault( + reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, nullptr); + ASSERT_NE(res.status().code(), MetaServiceCode::OK) << res.status().msg(); + } + + SyncPoint::get_instance()->disable_processing(); + SyncPoint::get_instance()->clear_all_call_backs(); +} + TEST(MetaServiceTest, AlterClusterTest) { auto meta_service = get_meta_service(); ASSERT_NE(meta_service, nullptr); @@ -5985,13 +6089,13 @@ TEST(MetaServiceTest, AddHdfsInfoTest) { brpc::Controller cntl; AlterObjStoreInfoResponse res; - meta_service->alter_obj_store_info( + meta_service->alter_storage_vault( reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, nullptr); // Invalid fs name ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT) << res.status().msg(); req.mutable_vault()->mutable_hdfs_info()->mutable_build_conf()->set_fs_name( "hdfs://ip:port"); - meta_service->alter_obj_store_info( + meta_service->alter_storage_vault( reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, nullptr); ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg(); @@ -6016,7 +6120,7 @@ TEST(MetaServiceTest, AddHdfsInfoTest) { brpc::Controller cntl; AlterObjStoreInfoResponse res; - meta_service->alter_obj_store_info( + meta_service->alter_storage_vault( reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, nullptr); ASSERT_EQ(res.status().code(), MetaServiceCode::ALREADY_EXISTED) << res.status().msg(); } @@ -6036,7 +6140,7 @@ TEST(MetaServiceTest, AddHdfsInfoTest) { brpc::Controller cntl; AlterObjStoreInfoResponse res; - meta_service->alter_obj_store_info( + meta_service->alter_storage_vault( reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, nullptr); ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg(); InstanceInfoPB instance; @@ -6106,7 +6210,7 @@ TEST(MetaServiceTest, DropHdfsInfoTest) { brpc::Controller cntl; AlterObjStoreInfoResponse res; - meta_service->alter_obj_store_info( + meta_service->alter_storage_vault( reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, nullptr); ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT) << res.status().msg(); } @@ -6125,7 +6229,7 @@ TEST(MetaServiceTest, DropHdfsInfoTest) { brpc::Controller cntl; AlterObjStoreInfoResponse res; - meta_service->alter_obj_store_info( + meta_service->alter_storage_vault( reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, nullptr); ASSERT_EQ(res.status().code(), MetaServiceCode::STORAGE_VAULT_NOT_FOUND) << res.status().msg(); @@ -6145,7 +6249,7 @@ TEST(MetaServiceTest, DropHdfsInfoTest) { brpc::Controller cntl; AlterObjStoreInfoResponse res; - meta_service->alter_obj_store_info( + meta_service->alter_storage_vault( reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, nullptr); ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg(); InstanceInfoPB instance; @@ -6177,7 +6281,7 @@ TEST(MetaServiceTest, DropHdfsInfoTest) { brpc::Controller cntl; AlterObjStoreInfoResponse res; - meta_service->alter_obj_store_info( + meta_service->alter_storage_vault( reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, nullptr); ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg(); InstanceInfoPB instance; @@ -6204,7 +6308,7 @@ TEST(MetaServiceTest, DropHdfsInfoTest) { brpc::Controller cntl; AlterObjStoreInfoResponse res; - meta_service->alter_obj_store_info( + meta_service->alter_storage_vault( reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, nullptr); ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg(); InstanceInfoPB instance; @@ -6230,7 +6334,7 @@ TEST(MetaServiceTest, DropHdfsInfoTest) { brpc::Controller cntl; AlterObjStoreInfoResponse res; - meta_service->alter_obj_store_info( + meta_service->alter_storage_vault( reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, nullptr); ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg(); InstanceInfoPB instance; @@ -6253,7 +6357,7 @@ TEST(MetaServiceTest, DropHdfsInfoTest) { brpc::Controller cntl; AlterObjStoreInfoResponse res; - meta_service->alter_obj_store_info( + meta_service->alter_storage_vault( reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, nullptr); ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg(); InstanceInfoPB instance; @@ -6291,7 +6395,7 @@ TEST(MetaServiceTest, DropHdfsInfoTest) { brpc::Controller cntl; AlterObjStoreInfoResponse res; - meta_service->alter_obj_store_info( + meta_service->alter_storage_vault( reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, nullptr); ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg(); InstanceInfoPB instance; @@ -6474,7 +6578,7 @@ TEST(MetaServiceTest, SetDefaultVaultTest) { brpc::Controller cntl; AlterObjStoreInfoResponse res; - meta_service->alter_obj_store_info( + meta_service->alter_storage_vault( reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, nullptr); ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg(); @@ -6483,7 +6587,7 @@ TEST(MetaServiceTest, SetDefaultVaultTest) { set_default_req.set_op(AlterObjStoreInfoRequest::SET_DEFAULT_VAULT); set_default_req.mutable_vault()->CopyFrom(hdfs); AlterObjStoreInfoResponse set_default_res; - meta_service->alter_obj_store_info( + meta_service->alter_storage_vault( reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &set_default_req, &set_default_res, nullptr); ASSERT_EQ(set_default_res.status().code(), MetaServiceCode::OK) @@ -6507,7 +6611,7 @@ TEST(MetaServiceTest, SetDefaultVaultTest) { set_default_req.set_op(AlterObjStoreInfoRequest::SET_DEFAULT_VAULT); set_default_req.mutable_vault()->CopyFrom(hdfs); AlterObjStoreInfoResponse set_default_res; - meta_service->alter_obj_store_info( + meta_service->alter_storage_vault( reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &set_default_req, &set_default_res, nullptr); ASSERT_NE(set_default_res.status().code(), MetaServiceCode::OK) @@ -6586,7 +6690,7 @@ TEST(MetaServiceTest, GetObjStoreInfoTest) { brpc::Controller cntl; AlterObjStoreInfoResponse res; - meta_service->alter_obj_store_info( + meta_service->alter_storage_vault( reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, nullptr); ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg(); InstanceInfoPB instance; @@ -6707,7 +6811,7 @@ TEST(MetaServiceTest, CreateTabletsVaultsTest) { brpc::Controller cntl; AlterObjStoreInfoResponse res; - meta_service->alter_obj_store_info( + meta_service->alter_storage_vault( reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, nullptr); ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg(); @@ -6733,7 +6837,7 @@ TEST(MetaServiceTest, CreateTabletsVaultsTest) { set_default_req.mutable_vault()->CopyFrom(hdfs); AlterObjStoreInfoResponse set_default_res; brpc::Controller cntl; - meta_service->alter_obj_store_info( + meta_service->alter_storage_vault( reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &set_default_req, &set_default_res, nullptr); ASSERT_EQ(set_default_res.status().code(), MetaServiceCode::OK) diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 index 045100055ae3ad..8d9ec98f25251c 100644 --- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 @@ -151,6 +151,7 @@ supportedCreateStatement supportedAlterStatement : ALTER VIEW name=multipartIdentifier (LEFT_PAREN cols=simpleColumnDefs RIGHT_PAREN)? AS query #alterView + | ALTER STORAGE VAULT name=multipartIdentifier properties=propertyClause #alterStorageVault ; supportedDropStatement diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/S3StorageVault.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/S3StorageVault.java index 3f06286f47dc98..710354b0092681 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/S3StorageVault.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/S3StorageVault.java @@ -19,9 +19,12 @@ import org.apache.doris.analysis.CreateResourceStmt; import org.apache.doris.common.DdlException; +import org.apache.doris.datasource.property.constants.S3Properties; import com.google.gson.annotations.SerializedName; +import java.util.Arrays; +import java.util.HashSet; import java.util.Map; /** @@ -53,6 +56,14 @@ public class S3StorageVault extends StorageVault { // Reuse all the code from S3Resource private Resource resource; + private static final String TYPE = "type"; + + public static final HashSet ALTER_CHECK_PROPERTIES = new HashSet<>(Arrays.asList( + TYPE, + S3Properties.ACCESS_KEY, + S3Properties.SECRET_KEY + )); + @SerializedName(value = "properties") private Map properties; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/StorageVaultMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/StorageVaultMgr.java index 95a17321832558..d2b78109f020f6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/StorageVaultMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/StorageVaultMgr.java @@ -19,6 +19,7 @@ import org.apache.doris.analysis.CreateStorageVaultStmt; import org.apache.doris.analysis.SetDefaultStorageVaultStmt; +import org.apache.doris.catalog.StorageVault.StorageVaultType; import org.apache.doris.cloud.proto.Cloud; import org.apache.doris.cloud.proto.Cloud.AlterObjStoreInfoRequest.Operation; import org.apache.doris.cloud.rpc.MetaServiceProxy; @@ -90,6 +91,39 @@ public String getVaultIdByName(String name) { return vaultId; } + public void alterStorageVault(StorageVaultType type, Map properties, String name) throws Exception { + if (type != StorageVaultType.S3) { + throw new DdlException("Only support alter s3 storage vault"); + } + properties.keySet().stream() + .filter(key -> !S3StorageVault.ALTER_CHECK_PROPERTIES.contains(key)) + .findAny() + .ifPresent(key -> { + throw new IllegalArgumentException("Alter property " + key + " is not allowed."); + }); + Cloud.AlterObjStoreInfoRequest.Builder requestBuilder + = Cloud.AlterObjStoreInfoRequest.newBuilder(); + requestBuilder.setOp(Cloud.AlterObjStoreInfoRequest.Operation.ALTER_S3_VAULT); + Cloud.ObjectStoreInfoPB.Builder objBuilder = S3Properties.getObjStoreInfoPB(properties); + Cloud.StorageVaultPB.Builder alterObjVaultBuilder = Cloud.StorageVaultPB.newBuilder(); + alterObjVaultBuilder.setName(name); + alterObjVaultBuilder.setObjInfo(objBuilder.build()); + requestBuilder.setVault(alterObjVaultBuilder.build()); + try { + Cloud.AlterObjStoreInfoResponse response = + MetaServiceProxy.getInstance().alterStorageVault(requestBuilder.build()); + if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) { + LOG.warn("failed to alter storage vault response: {} ", response); + throw new DdlException(response.getStatus().getMsg()); + } + LOG.info("Succeed to alter s3 vault {}, id {}, origin default vault replaced {}", + name, response.getStorageVaultId(), response.getDefaultStorageVaultReplaced()); + } catch (RpcException e) { + LOG.warn("failed to alter storage vault due to RpcException: {}", e); + throw new DdlException(e.getMessage()); + } + } + @VisibleForTesting public void setDefaultStorageVault(SetDefaultStorageVaultStmt stmt) throws DdlException { Cloud.AlterObjStoreInfoRequest.Builder builder = Cloud.AlterObjStoreInfoRequest.newBuilder(); @@ -101,7 +135,7 @@ public void setDefaultStorageVault(SetDefaultStorageVaultStmt stmt) throws DdlEx LOG.info("try to set vault {} as default vault", stmt.getStorageVaultName()); try { Cloud.AlterObjStoreInfoResponse resp = - MetaServiceProxy.getInstance().alterObjStoreInfo(builder.build()); + MetaServiceProxy.getInstance().alterStorageVault(builder.build()); if (resp.getStatus().getCode() != Cloud.MetaServiceCode.OK) { LOG.warn("failed to set default storage vault response: {}, vault name {}", resp, stmt.getStorageVaultName()); @@ -122,7 +156,7 @@ public void unsetDefaultStorageVault() throws DdlException { builder.setOp(Operation.UNSET_DEFAULT_VAULT); try { Cloud.AlterObjStoreInfoResponse resp = - MetaServiceProxy.getInstance().alterObjStoreInfo(builder.build()); + MetaServiceProxy.getInstance().alterStorageVault(builder.build()); if (resp.getStatus().getCode() != Cloud.MetaServiceCode.OK) { LOG.warn("failed to unset default storage vault"); throw new DdlException(resp.getStatus().getMsg()); @@ -170,7 +204,7 @@ public void createHdfsVault(StorageVault vault) throws DdlException { requestBuilder.setSetAsDefaultStorageVault(vault.setAsDefault()); try { Cloud.AlterObjStoreInfoResponse response = - MetaServiceProxy.getInstance().alterObjStoreInfo(requestBuilder.build()); + MetaServiceProxy.getInstance().alterStorageVault(requestBuilder.build()); if (response.getStatus().getCode() == Cloud.MetaServiceCode.ALREADY_EXISTED && hdfsStorageVault.ifNotExists()) { LOG.info("Hdfs vault {} already existed", hdfsStorageVault.getName()); @@ -217,7 +251,7 @@ public void createS3Vault(StorageVault vault) throws DdlException { requestBuilder.setSetAsDefaultStorageVault(vault.setAsDefault()); try { Cloud.AlterObjStoreInfoResponse response = - MetaServiceProxy.getInstance().alterObjStoreInfo(requestBuilder.build()); + MetaServiceProxy.getInstance().alterStorageVault(requestBuilder.build()); if (response.getStatus().getCode() == Cloud.MetaServiceCode.ALREADY_EXISTED && s3StorageVault.ifNotExists()) { LOG.info("S3 vault {} already existed", s3StorageVault.getName()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java index d5cdc79eb7f7d5..a3b1f31d5d7a7b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java @@ -321,6 +321,15 @@ public Cloud.AlterObjStoreInfoResponse alterObjStoreInfo(Cloud.AlterObjStoreInfo return blockingStub.alterObjStoreInfo(request); } + public Cloud.AlterObjStoreInfoResponse alterStorageVault(Cloud.AlterObjStoreInfoRequest request) { + if (!request.hasCloudUniqueId()) { + Cloud.AlterObjStoreInfoRequest.Builder builder = Cloud.AlterObjStoreInfoRequest.newBuilder(); + builder.mergeFrom(request); + return blockingStub.alterStorageVault(builder.setCloudUniqueId(Config.cloud_unique_id).build()); + } + return blockingStub.alterStorageVault(request); + } + public Cloud.GetDeleteBitmapUpdateLockResponse getDeleteBitmapUpdateLock( Cloud.GetDeleteBitmapUpdateLockRequest request) { if (!request.hasCloudUniqueId()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java index d7ec328906775e..74150a399e5121 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java @@ -487,6 +487,16 @@ public Cloud.AlterObjStoreInfoResponse alterObjStoreInfo(Cloud.AlterObjStoreInfo } } + public Cloud.AlterObjStoreInfoResponse alterStorageVault(Cloud.AlterObjStoreInfoRequest request) + throws RpcException { + try { + final MetaServiceClient client = getProxy(); + return client.alterStorageVault(request); + } catch (Exception e) { + throw new RpcException("", e.getMessage(), e); + } + } + public Cloud.GetRLTaskCommitAttachResponse getRLTaskCommitAttach(Cloud.GetRLTaskCommitAttachRequest request) throws RpcException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java index 521918de93b9a5..fa7604dc56bf4c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java @@ -308,14 +308,30 @@ public static TS3StorageParam getS3TStorageParam(Map properties) public static Cloud.ObjectStoreInfoPB.Builder getObjStoreInfoPB(Map properties) { Cloud.ObjectStoreInfoPB.Builder builder = Cloud.ObjectStoreInfoPB.newBuilder(); - builder.setEndpoint(properties.get(S3Properties.ENDPOINT)); - builder.setRegion(properties.get(S3Properties.REGION)); - builder.setAk(properties.get(S3Properties.ACCESS_KEY)); - builder.setSk(properties.get(S3Properties.SECRET_KEY)); - builder.setPrefix(properties.get(S3Properties.ROOT_PATH)); - builder.setBucket(properties.get(S3Properties.BUCKET)); - builder.setExternalEndpoint(properties.get(S3Properties.EXTERNAL_ENDPOINT)); - builder.setProvider(Provider.valueOf(properties.get(S3Properties.PROVIDER))); + if (properties.containsKey(S3Properties.ENDPOINT)) { + builder.setEndpoint(properties.get(S3Properties.ENDPOINT)); + } + if (properties.containsKey(S3Properties.REGION)) { + builder.setRegion(properties.get(S3Properties.REGION)); + } + if (properties.containsKey(S3Properties.ACCESS_KEY)) { + builder.setAk(properties.get(S3Properties.ACCESS_KEY)); + } + if (properties.containsKey(S3Properties.SECRET_KEY)) { + builder.setSk(properties.get(S3Properties.SECRET_KEY)); + } + if (properties.containsKey(S3Properties.ROOT_PATH)) { + builder.setPrefix(properties.get(S3Properties.ROOT_PATH)); + } + if (properties.containsKey(S3Properties.BUCKET)) { + builder.setBucket(properties.get(S3Properties.BUCKET)); + } + if (properties.containsKey(S3Properties.EXTERNAL_ENDPOINT)) { + builder.setExternalEndpoint(properties.get(S3Properties.EXTERNAL_ENDPOINT)); + } + if (properties.containsKey(S3Properties.PROVIDER)) { + builder.setProvider(Provider.valueOf(properties.get(S3Properties.PROVIDER))); + } return builder; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index 79ced9182dc08c..7cec5c6b09a819 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -49,6 +49,7 @@ import org.apache.doris.nereids.DorisParser.AliasQueryContext; import org.apache.doris.nereids.DorisParser.AliasedQueryContext; import org.apache.doris.nereids.DorisParser.AlterMTMVContext; +import org.apache.doris.nereids.DorisParser.AlterStorageVaultContext; import org.apache.doris.nereids.DorisParser.AlterViewContext; import org.apache.doris.nereids.DorisParser.ArithmeticBinaryContext; import org.apache.doris.nereids.DorisParser.ArithmeticUnaryContext; @@ -364,6 +365,7 @@ import org.apache.doris.nereids.trees.plans.algebra.SetOperation.Qualifier; import org.apache.doris.nereids.trees.plans.commands.AddConstraintCommand; import org.apache.doris.nereids.trees.plans.commands.AlterMTMVCommand; +import org.apache.doris.nereids.trees.plans.commands.AlterStorageVaultCommand; import org.apache.doris.nereids.trees.plans.commands.AlterViewCommand; import org.apache.doris.nereids.trees.plans.commands.CallCommand; import org.apache.doris.nereids.trees.plans.commands.CancelMTMVTaskCommand; @@ -862,6 +864,14 @@ public LogicalPlan visitAlterView(AlterViewContext ctx) { return new AlterViewCommand(info); } + @Override + public LogicalPlan visitAlterStorageVault(AlterStorageVaultContext ctx) { + List nameParts = this.visitMultipartIdentifier(ctx.name); + String vaultName = nameParts.get(0); + Map properties = this.visitPropertyClause(ctx.properties); + return new AlterStorageVaultCommand(vaultName, properties); + } + @Override public LogicalPlan visitShowConstraint(ShowConstraintContext ctx) { List parts = visitMultipartIdentifier(ctx.table); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java index 20d4aa9fb66143..e4f0f4c102e54e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java @@ -157,6 +157,7 @@ public enum PlanType { SHOW_CREATE_PROCEDURE_COMMAND, CREATE_VIEW_COMMAND, ALTER_VIEW_COMMAND, + ALTER_STORAGE_VAULT, DROP_CATALOG_RECYCLE_BIN_COMMAND, UNSUPPORTED_COMMAND, CREATE_TABLE_LIKE_COMMAND, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterStorageVaultCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterStorageVaultCommand.java new file mode 100644 index 00000000000000..cbdc5765839b40 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterStorageVaultCommand.java @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands; + +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.StorageVault; +import org.apache.doris.catalog.StorageVault.StorageVaultType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.StmtExecutor; + +import java.util.Map; + +/** + * Alter Storage Vault command + */ +public class AlterStorageVaultCommand extends Command implements ForwardWithSync { + private static final String TYPE = "type"; + private final Map properties; + private final String name; + + public AlterStorageVaultCommand(String name, final Map properties) { + super(PlanType.ALTER_STORAGE_VAULT); + this.name = name; + this.properties = properties; + } + + @Override + public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { + StorageVault.StorageVaultType vaultType = StorageVaultType.fromString(properties.get(TYPE)); + if (vaultType == StorageVault.StorageVaultType.UNKNOWN) { + throw new AnalysisException("Unsupported Storage Vault type: " + type); + } + Env.getCurrentEnv().getStorageVaultMgr().alterStorageVault(vaultType, properties, name); + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitCommand(this, context); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/HdfsStorageVaultTest.java b/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/HdfsStorageVaultTest.java index 5f92b9665b58d1..0f34b322b0580c 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/HdfsStorageVaultTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/HdfsStorageVaultTest.java @@ -65,7 +65,7 @@ public void testAlterMetaServiceNormal() throws Exception { new MockUp(MetaServiceProxy.class) { @Mock public Cloud.AlterObjStoreInfoResponse - alterObjStoreInfo(Cloud.AlterObjStoreInfoRequest request) throws RpcException { + alterStorageVault(Cloud.AlterObjStoreInfoRequest request) throws RpcException { Cloud.AlterObjStoreInfoResponse.Builder resp = Cloud.AlterObjStoreInfoResponse.newBuilder(); resp.setStatus(MetaServiceResponseStatus.newBuilder().build()); resp.setStorageVaultId("1"); @@ -88,7 +88,7 @@ public void testAlterMetaServiceWithDuplicateName() throws Exception { private Set existed = new HashSet<>(); @Mock public Cloud.AlterObjStoreInfoResponse - alterObjStoreInfo(Cloud.AlterObjStoreInfoRequest request) throws RpcException { + alterStorageVault(Cloud.AlterObjStoreInfoRequest request) throws RpcException { Cloud.AlterObjStoreInfoResponse.Builder resp = Cloud.AlterObjStoreInfoResponse.newBuilder(); MetaServiceResponseStatus.Builder status = MetaServiceResponseStatus.newBuilder(); if (existed.contains(request.getVault().getName())) { @@ -117,7 +117,7 @@ public void testAlterMetaServiceWithMissingFiels() throws Exception { new MockUp(MetaServiceProxy.class) { @Mock public Cloud.AlterObjStoreInfoResponse - alterObjStoreInfo(Cloud.AlterObjStoreInfoRequest request) throws RpcException { + alterStorageVault(Cloud.AlterObjStoreInfoRequest request) throws RpcException { Cloud.AlterObjStoreInfoResponse.Builder resp = Cloud.AlterObjStoreInfoResponse.newBuilder(); if (!request.getVault().hasName() || request.getVault().getName().isEmpty()) { resp.setStatus(MetaServiceResponseStatus.newBuilder() @@ -144,7 +144,7 @@ public void testAlterMetaServiceIfNotExists() throws Exception { private Set existed = new HashSet<>(); @Mock public Cloud.AlterObjStoreInfoResponse - alterObjStoreInfo(Cloud.AlterObjStoreInfoRequest request) throws RpcException { + alterStorageVault(Cloud.AlterObjStoreInfoRequest request) throws RpcException { Cloud.AlterObjStoreInfoResponse.Builder resp = Cloud.AlterObjStoreInfoResponse.newBuilder(); MetaServiceResponseStatus.Builder status = MetaServiceResponseStatus.newBuilder(); if (existed.contains(request.getVault().getName())) { @@ -178,7 +178,7 @@ public Pair getDefaultStorageVaultInfo() { @Mock public Cloud.AlterObjStoreInfoResponse - alterObjStoreInfo(Cloud.AlterObjStoreInfoRequest request) throws RpcException { + alterStorageVault(Cloud.AlterObjStoreInfoRequest request) throws RpcException { Cloud.AlterObjStoreInfoResponse.Builder resp = Cloud.AlterObjStoreInfoResponse.newBuilder(); MetaServiceResponseStatus.Builder status = MetaServiceResponseStatus.newBuilder(); if (request.getOp() == Operation.ADD_HDFS_INFO) { diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index f8797ea54e733a..bc6b5d536c7963 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -840,6 +840,7 @@ message AlterObjStoreInfoRequest { ADD_BUILT_IN_VAULT = 102; ADD_S3_VAULT = 103; DROP_S3_VAULT = 104; + ALTER_S3_VAULT = 105; SET_DEFAULT_VAULT = 200; UNSET_DEFAULT_VAULT = 201; @@ -1482,6 +1483,7 @@ service MetaService { rpc get_obj_store_info(GetObjStoreInfoRequest) returns (GetObjStoreInfoResponse); rpc alter_obj_store_info(AlterObjStoreInfoRequest) returns (AlterObjStoreInfoResponse); + rpc alter_storage_vault(AlterObjStoreInfoRequest) returns (AlterObjStoreInfoResponse); rpc update_ak_sk(UpdateAkSkRequest) returns (UpdateAkSkResponse); rpc create_instance(CreateInstanceRequest) returns (CreateInstanceResponse); rpc alter_instance(AlterInstanceRequest) returns (AlterInstanceResponse);