diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h index c5786bedff7c041..cb4bee9648e2544 100644 --- a/cloud/src/common/config.h +++ b/cloud/src/common/config.h @@ -62,6 +62,8 @@ CONF_Strings(recycle_whitelist, ""); // Comma seprated list CONF_Strings(recycle_blacklist, ""); // Comma seprated list CONF_mInt32(instance_recycler_worker_pool_size, "1"); CONF_Bool(enable_checker, "false"); +// The parallelism for parallel recycle operation +CONF_Int32(recycle_pool_parallelism, "10"); // Currently only used for recycler test CONF_Bool(enable_inverted_check, "false"); // interval for scanning instances to do checks and inspections diff --git a/cloud/src/recycler/azure_obj_client.cpp b/cloud/src/recycler/azure_obj_client.cpp index b983ab3c44194da..571cee70c662c3e 100644 --- a/cloud/src/recycler/azure_obj_client.cpp +++ b/cloud/src/recycler/azure_obj_client.cpp @@ -179,7 +179,8 @@ std::unique_ptr AzureObjClient::list_objects(ObjectStoragePa // You can find out the num in https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id // > Each batch request supports a maximum of 256 subrequests. ObjectStorageResponse AzureObjClient::delete_objects(const std::string& bucket, - std::vector keys) { + std::vector keys, + ObjClientIoOptions option) { if (keys.empty()) { return {0}; } @@ -243,8 +244,9 @@ ObjectStorageResponse AzureObjClient::delete_object(ObjectStoragePathRef path) { } ObjectStorageResponse AzureObjClient::delete_objects_recursively(ObjectStoragePathRef path, + ObjClientIoOptions option, int64_t expiration_time) { - return delete_objects_recursively_(path, expiration_time, BlobBatchMaxOperations); + return delete_objects_recursively_(path, option, expiration_time, BlobBatchMaxOperations); } ObjectStorageResponse AzureObjClient::get_life_cycle(const std::string& bucket, diff --git a/cloud/src/recycler/azure_obj_client.h b/cloud/src/recycler/azure_obj_client.h index 49b54ca8c6db919..6a3ff6a3243f7f4 100644 --- a/cloud/src/recycler/azure_obj_client.h +++ b/cloud/src/recycler/azure_obj_client.h @@ -38,12 +38,13 @@ class AzureObjClient final : public ObjStorageClient { std::unique_ptr list_objects(ObjectStoragePathRef path) override; - ObjectStorageResponse delete_objects(const std::string& bucket, - std::vector keys) override; + ObjectStorageResponse delete_objects(const std::string& bucket, std::vector keys, + ObjClientIoOptions option) override; ObjectStorageResponse delete_object(ObjectStoragePathRef path) override; ObjectStorageResponse delete_objects_recursively(ObjectStoragePathRef path, + ObjClientIoOptions option, int64_t expiration_time = 0) override; ObjectStorageResponse get_life_cycle(const std::string& bucket, diff --git a/cloud/src/recycler/obj_storage_client.cpp b/cloud/src/recycler/obj_storage_client.cpp index 1dd6435214d3291..18eef2ce47efff7 100644 --- a/cloud/src/recycler/obj_storage_client.cpp +++ b/cloud/src/recycler/obj_storage_client.cpp @@ -18,17 +18,22 @@ #include "recycler/obj_storage_client.h" #include "cpp/sync_point.h" +#include "recycler/util.h" namespace doris::cloud { -ObjectStorageResponse ObjStorageClient::delete_objects_recursively_(ObjectStoragePathRef path, - int64_t expired_time, - size_t batch_size) { +ObjectStorageResponse ObjStorageClient::delete_objects_recursively_( + ObjectStoragePathRef path, const ObjClientIoOptions& option, int64_t expired_time, + size_t batch_size) { TEST_SYNC_POINT_CALLBACK("ObjStorageClient::delete_objects_recursively_", &batch_size); auto list_iter = list_objects(path); ObjectStorageResponse ret; std::vector keys; + SyncExecutor concurrent_delete_executor( + option.executor.get(), + fmt::format("delete objects under bucket {}, path {}", path.bucket, path.key), + [](const int& ret) { return ret != 0; }); for (auto obj = list_iter->next(); obj.has_value(); obj = list_iter->next()) { if (expired_time > 0 && obj->mtime_s > expired_time) { @@ -39,20 +44,31 @@ ObjectStorageResponse ObjStorageClient::delete_objects_recursively_(ObjectStorag if (keys.size() < batch_size) { continue; } - - ret = delete_objects(path.bucket, std::move(keys)); - if (ret.ret != 0) { - return ret; - } + concurrent_delete_executor.add([this, &path, k = std::move(keys), option]() mutable { + return delete_objects(path.bucket, std::move(k), option).ret; + }); } if (!list_iter->is_valid()) { + bool finished; + concurrent_delete_executor.when_all(&finished); return {-1}; } if (!keys.empty()) { - return delete_objects(path.bucket, std::move(keys)); + concurrent_delete_executor.add([this, &path, k = std::move(keys), option]() mutable { + return delete_objects(path.bucket, std::move(k), option).ret; + }); } + bool finished = true; + std::vector rets = concurrent_delete_executor.when_all(&finished); + for (int r : rets) { + if (r != 0) { + ret = -1; + } + } + + ret = finished ? ret : -1; return ret; } diff --git a/cloud/src/recycler/obj_storage_client.h b/cloud/src/recycler/obj_storage_client.h index 955a8c174e5b15f..2c66bd01dbe8537 100644 --- a/cloud/src/recycler/obj_storage_client.h +++ b/cloud/src/recycler/obj_storage_client.h @@ -51,6 +51,12 @@ class ObjectListIterator { virtual std::optional next() = 0; }; +class SimpleThreadPool; +struct ObjClientIoOptions { + bool prefetch {true}; + std::shared_ptr executor; +}; + class ObjStorageClient { public: ObjStorageClient() = default; @@ -71,7 +77,8 @@ class ObjStorageClient { // According to the bucket and prefix specified by the user, it performs batch deletion based on the object names in the object array. virtual ObjectStorageResponse delete_objects(const std::string& bucket, - std::vector keys) = 0; + std::vector keys, + ObjClientIoOptions option) = 0; // Delete the file named key in the object storage bucket. virtual ObjectStorageResponse delete_object(ObjectStoragePathRef path) = 0; @@ -79,6 +86,7 @@ class ObjStorageClient { // According to the prefix, recursively delete all objects under the prefix. // If `expiration_time` > 0, only delete objects with mtime earlier than `expiration_time`. virtual ObjectStorageResponse delete_objects_recursively(ObjectStoragePathRef path, + ObjClientIoOptions option, int64_t expiration_time = 0) = 0; // Get the objects' expiration time on the bucket @@ -91,6 +99,7 @@ class ObjStorageClient { protected: ObjectStorageResponse delete_objects_recursively_(ObjectStoragePathRef path, + const ObjClientIoOptions& option, int64_t expiration_time, size_t batch_size); }; diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index b4c0ae84fc3ef4e..1924ad67038e970 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -164,7 +164,19 @@ static inline void check_recycle_task(const std::string& instance_id, const std: return; } -Recycler::Recycler(std::shared_ptr txn_kv) : txn_kv_(std::move(txn_kv)) { +RecyclerThreadPoolGroup::RecyclerThreadPoolGroup() { + s3_producer_pool = std::make_unique(config::recycle_pool_parallelism); + s3_producer_pool->start(); + recycle_tablet_pool = std::make_unique(config::recycle_pool_parallelism); + recycle_tablet_pool->start(); + group_recycle_function_pool = + std::make_unique(config::recycle_pool_parallelism); + group_recycle_function_pool->start(); +} + +Recycler::Recycler(std::shared_ptr txn_kv) + : txn_kv_(std::move(txn_kv)), + _thread_pool_group(std::make_unique()) { ip_port_ = std::string(butil::my_ip_cstr()) + ":" + std::to_string(config::brpc_listen_port); } @@ -225,7 +237,8 @@ void Recycler::recycle_callback() { // skip instance in recycling if (recycling_instance_map_.count(instance_id)) continue; } - auto instance_recycler = std::make_shared(txn_kv_, instance); + auto instance_recycler = + std::make_shared(txn_kv_, instance, *_thread_pool_group); if (instance_recycler->init() != 0) { LOG(WARNING) << "failed to init instance recycler, instance_id=" << instance_id; continue; @@ -438,11 +451,13 @@ class InstanceRecycler::InvertedIndexIdCache { std::unordered_set schemas_without_inverted_index_; }; -InstanceRecycler::InstanceRecycler(std::shared_ptr txn_kv, const InstanceInfoPB& instance) +InstanceRecycler::InstanceRecycler(std::shared_ptr txn_kv, const InstanceInfoPB& instance, + const RecyclerThreadPoolGroup& thread_pool_group) : txn_kv_(std::move(txn_kv)), instance_id_(instance.instance_id()), instance_info_(instance), - inverted_index_id_cache_(std::make_unique(instance_id_, txn_kv_)) {} + inverted_index_id_cache_(std::make_unique(instance_id_, txn_kv_)), + _thread_pool_group(thread_pool_group) {} InstanceRecycler::~InstanceRecycler() = default; @@ -539,22 +554,51 @@ int InstanceRecycler::init() { return init_storage_vault_accessors(); } +template +auto task_wrapper(Func... funcs) -> std::function { + return [funcs...]() { + return [](std::initializer_list numbers) { + int i = 0; + for (int num : numbers) { + if (num != 0) { + i = num; + } + } + return i; + }({funcs()...}); + }; +} + int InstanceRecycler::do_recycle() { TEST_SYNC_POINT("InstanceRecycler.do_recycle"); if (instance_info_.status() == InstanceInfoPB::DELETED) { return recycle_deleted_instance(); } else if (instance_info_.status() == InstanceInfoPB::NORMAL) { - int ret = recycle_indexes(); - if (recycle_partitions() != 0) ret = -1; - if (recycle_tmp_rowsets() != 0) ret = -1; - if (recycle_rowsets() != 0) ret = -1; - if (abort_timeout_txn() != 0) ret = -1; - if (recycle_expired_txn_label() != 0) ret = -1; - if (recycle_copy_jobs() != 0) ret = -1; - if (recycle_stage() != 0) ret = -1; - if (recycle_expired_stage_objects() != 0) ret = -1; - if (recycle_versions() != 0) ret = -1; - return ret; + SyncExecutor sync_executor(_thread_pool_group.group_recycle_function_pool.get(), + fmt::format("instance id {}", instance_id_), + [](int r) { return r != 0; }); + sync_executor + .add(task_wrapper( + [this]() -> int { return InstanceRecycler::recycle_indexes(); }, + [this]() -> int { return InstanceRecycler::recycle_partitions(); }, + [this]() -> int { return InstanceRecycler::recycle_tmp_rowsets(); }, + [this]() -> int { return InstanceRecycler::recycle_rowsets(); })) + .add(task_wrapper( + [this]() { return InstanceRecycler::abort_timeout_txn(); }, + [this]() { return InstanceRecycler::recycle_expired_txn_label(); })) + .add(task_wrapper([this]() { return InstanceRecycler::recycle_copy_jobs(); })) + .add(task_wrapper([this]() { return InstanceRecycler::recycle_stage(); })) + .add(task_wrapper( + [this]() { return InstanceRecycler::recycle_expired_stage_objects(); })) + .add(task_wrapper([this]() { return InstanceRecycler::recycle_versions(); })); + bool finished = true; + std::vector rets = sync_executor.when_all(&finished); + for (int ret : rets) { + if (ret != 0) { + return ret; + } + } + return finished ? 0 : -1; } else { LOG(WARNING) << "invalid instance status: " << instance_info_.status() << " instance_id=" << instance_id_; @@ -1009,7 +1053,7 @@ int InstanceRecycler::recycle_versions() { int InstanceRecycler::recycle_tablets(int64_t table_id, int64_t index_id, int64_t partition_id, bool is_empty_tablet) { int num_scanned = 0; - int num_recycled = 0; + std::atomic_int num_recycled = 0; std::string tablet_key_begin, tablet_key_end; std::string stats_key_begin, stats_key_end; @@ -1051,12 +1095,20 @@ int InstanceRecycler::recycle_tablets(int64_t table_id, int64_t index_id, int64_ .tag("num_recycled", num_recycled); }); + // The first string_view represents the tablet key which has been recycled + // The second bool represents whether the following fdb's tablet key deletion could be done using range move or not + using TabletKeyPair = std::pair; + SyncExecutor sync_executor( + _thread_pool_group.recycle_tablet_pool.get(), + fmt::format("recycle tablets, tablet id {}, index id {}, partition id {}", table_id, + index_id, partition_id), + [](const TabletKeyPair& k) { return k.first.empty(); }); + // Elements in `tablet_keys` has the same lifetime as `it` in `scan_and_recycle` - std::vector tablet_keys; std::vector tablet_idx_keys; std::vector init_rs_keys; - bool use_range_remove = true; auto recycle_func = [&, is_empty_tablet, this](std::string_view k, std::string_view v) -> int { + bool use_range_remove = true; ++num_scanned; doris::TabletMetaCloudPB tablet_meta_pb; if (!tablet_meta_pb.ParseFromArray(v.data(), v.size())) { @@ -1067,13 +1119,19 @@ int InstanceRecycler::recycle_tablets(int64_t table_id, int64_t index_id, int64_ int64_t tablet_id = tablet_meta_pb.tablet_id(); tablet_idx_keys.push_back(meta_tablet_idx_key({instance_id_, tablet_id})); if (!is_empty_tablet) { - if (recycle_tablet(tablet_id) != 0) { - LOG_WARNING("failed to recycle tablet") - .tag("instance_id", instance_id_) - .tag("tablet_id", tablet_id); - use_range_remove = false; - return -1; - } + sync_executor.add([this, &num_recycled, tid = tablet_id, range_move = use_range_remove, + k]() mutable -> TabletKeyPair { + if (recycle_tablet(tid) != 0) { + LOG_WARNING("failed to recycle tablet") + .tag("instance_id", instance_id_) + .tag("tablet_id", tid); + range_move = false; + return {std::string_view(), range_move}; + } + ++num_recycled; + LOG_INFO("k is {}, is empty {}", k, k.empty()); + return {k, range_move}; + }); } else { // Empty tablet only has a [0-1] init rowset init_rs_keys.push_back(meta_rowset_key({instance_id_, tablet_id, 1})); @@ -1097,19 +1155,38 @@ int InstanceRecycler::recycle_tablets(int64_t table_id, int64_t index_id, int64_ } return true; }()); + sync_executor.add([k]() mutable -> TabletKeyPair { + LOG_INFO("k is {}, is empty {}", k, k.empty()); + return {k, true}; + }); + ++num_recycled; } - ++num_recycled; - tablet_keys.push_back(k); return 0; }; + // TODO(AlexYue): Add one ut to cover use_range_remove = false auto loop_done = [&, this]() -> int { + bool finished = true; + auto tablet_keys = sync_executor.when_all(&finished); + if (!finished) { + LOG_WARNING("failed to recycle tablet").tag("instance_id", instance_id_); + return -1; + } + sync_executor.reset(); if (tablet_keys.empty() && tablet_idx_keys.empty()) return 0; + // sort the vector using key's order + std::sort(tablet_keys.begin(), tablet_keys.end(), + [](const auto& prev, const auto& last) { return prev.first < last.first; }); + bool use_range_remove = true; + for (auto& [_, remove] : tablet_keys) { + if (!remove) { + use_range_remove = remove; + break; + } + } std::unique_ptr> defer((int*)0x01, [&](int*) { - tablet_keys.clear(); tablet_idx_keys.clear(); init_rs_keys.clear(); - use_range_remove = true; }); std::unique_ptr txn; if (txn_kv_->create_txn(&txn) != TxnErrorCode::TXN_OK) { @@ -1119,10 +1196,10 @@ int InstanceRecycler::recycle_tablets(int64_t table_id, int64_t index_id, int64_ std::string tablet_key_end; if (!tablet_keys.empty()) { if (use_range_remove) { - tablet_key_end = std::string(tablet_keys.back()) + '\x00'; - txn->remove(tablet_keys.front(), tablet_key_end); + tablet_key_end = std::string(tablet_keys.back().first) + '\x00'; + txn->remove(tablet_keys.front().first, tablet_key_end); } else { - for (auto k : tablet_keys) { + for (auto& [k, _] : tablet_keys) { txn->remove(k); } } @@ -1284,13 +1361,26 @@ int InstanceRecycler::delete_rowset_data(const std::vector concurrent_delete_executor(_thread_pool_group.s3_producer_pool.get(), + "delete_rowset_data", + [](const int& ret) { return ret != 0; }); for (auto& [resource_id, file_paths] : resource_file_paths) { - auto& accessor = accessor_map_[resource_id]; - DCHECK(accessor); - if (accessor->delete_files(file_paths) != 0) { + concurrent_delete_executor.add([&, rid = &resource_id, paths = &file_paths]() -> int { + auto& accessor = accessor_map_[*rid]; + DCHECK(accessor); + return accessor->delete_files(*paths); + }); + } + bool finished = true; + std::vector rets = concurrent_delete_executor.when_all(&finished); + for (int r : rets) { + if (r != 0) { ret = -1; + break; } } + ret = finished ? ret : -1; return ret; } @@ -1349,15 +1439,32 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id) { ret = -1; } + SyncExecutor concurrent_delete_executor( + _thread_pool_group.s3_producer_pool.get(), + fmt::format("delete tablet {} s3 rowset", tablet_id), + [](const int& ret) { return ret != 0; }); + // delete all rowset data in this tablet for (auto& [_, accessor] : accessor_map_) { - if (accessor->delete_directory(tablet_path_prefix(tablet_id)) != 0) { - LOG(WARNING) << "failed to delete rowset data of tablet " << tablet_id - << " s3_path=" << accessor->uri(); + concurrent_delete_executor.add([&, accessor_ptr = &accessor]() { + if ((*accessor_ptr)->delete_directory(tablet_path_prefix(tablet_id)) != 0) { + LOG(WARNING) << "failed to delete rowset data of tablet " << tablet_id + << " s3_path=" << accessor->uri(); + return -1; + } + return 0; + }); + } + bool finished = true; + std::vector rets = concurrent_delete_executor.when_all(&finished); + for (int r : rets) { + if (r != 0) { ret = -1; } } + ret = finished ? ret : -1; + if (ret == 0) { // All object files under tablet have been deleted std::lock_guard lock(recycled_tablets_mtx_); diff --git a/cloud/src/recycler/recycler.h b/cloud/src/recycler/recycler.h index 130d860849d6d77..a4475337c095fd2 100644 --- a/cloud/src/recycler/recycler.h +++ b/cloud/src/recycler/recycler.h @@ -40,6 +40,15 @@ class TxnKv; class InstanceRecycler; class StorageVaultAccessor; class Checker; +struct RecyclerThreadPoolGroup; +class SimpleThreadPool; +struct RecyclerThreadPoolGroup { + RecyclerThreadPoolGroup(); + ~RecyclerThreadPoolGroup() = default; + std::unique_ptr s3_producer_pool; + std::unique_ptr recycle_tablet_pool; + std::unique_ptr group_recycle_function_pool; +}; class Recycler { public: @@ -83,11 +92,13 @@ class Recycler { WhiteBlackList instance_filter_; std::unique_ptr checker_; + std::unique_ptr _thread_pool_group; }; class InstanceRecycler { public: - explicit InstanceRecycler(std::shared_ptr txn_kv, const InstanceInfoPB& instance); + explicit InstanceRecycler(std::shared_ptr txn_kv, const InstanceInfoPB& instance, + const RecyclerThreadPoolGroup& thread_pool_group); ~InstanceRecycler(); // returns 0 for success otherwise error @@ -217,6 +228,7 @@ class InstanceRecycler { std::mutex recycle_tasks_mutex; // > std::map running_recycle_tasks; + const RecyclerThreadPoolGroup& _thread_pool_group; }; } // namespace doris::cloud diff --git a/cloud/src/recycler/recycler_service.cpp b/cloud/src/recycler/recycler_service.cpp index eca3bc6563b6e80..f2e94e45d68973f 100644 --- a/cloud/src/recycler/recycler_service.cpp +++ b/cloud/src/recycler/recycler_service.cpp @@ -148,7 +148,8 @@ void RecyclerServiceImpl::check_instance(const std::string& instance_id, MetaSer } void recycle_copy_jobs(const std::shared_ptr& txn_kv, const std::string& instance_id, - MetaServiceCode& code, std::string& msg) { + MetaServiceCode& code, std::string& msg, + RecyclerThreadPoolGroup* thread_pool_group) { std::unique_ptr txn; TxnErrorCode err = txn_kv->create_txn(&txn); if (err != TxnErrorCode::TXN_OK) { @@ -185,13 +186,13 @@ void recycle_copy_jobs(const std::shared_ptr& txn_kv, const std::string& return; } } - auto recycler = std::make_unique(txn_kv, instance); + + auto recycler = std::make_unique(txn_kv, instance, *thread_pool_group); if (recycler->init() != 0) { LOG(WARNING) << "failed to init InstanceRecycler recycle_copy_jobs on instance " << instance_id; return; } - std::thread worker([recycler = std::move(recycler), instance_id] { LOG(INFO) << "manually trigger recycle_copy_jobs on instance " << instance_id; recycler->recycle_copy_jobs(); @@ -329,7 +330,7 @@ void RecyclerServiceImpl::http(::google::protobuf::RpcController* controller, status_code = 400; return; } - recycle_copy_jobs(txn_kv_, *instance_id, code, msg); + recycle_copy_jobs(txn_kv_, *instance_id, code, msg, recycler_->_thread_pool_group.get()); response_body = msg; return; } diff --git a/cloud/src/recycler/s3_accessor.cpp b/cloud/src/recycler/s3_accessor.cpp index 514baeb7943cc10..e223b5085c9f26f 100644 --- a/cloud/src/recycler/s3_accessor.cpp +++ b/cloud/src/recycler/s3_accessor.cpp @@ -34,6 +34,7 @@ #include "common/config.h" #include "common/encryption_util.h" #include "common/logging.h" +#include "common/simple_thread_pool.h" #include "common/string_util.h" #include "common/util.h" #include "rate-limiter/s3_rate_limiter.h" @@ -249,7 +250,15 @@ int S3Accessor::create(S3Conf conf, std::shared_ptr* accessor) { return (*accessor)->init(); } +std::shared_ptr worker_pool; + int S3Accessor::init() { + static std::once_flag log_annotated_tags_key_once; + std::call_once(log_annotated_tags_key_once, [&]() { + LOG_INFO("start s3 accessor parallel worker pool"); + worker_pool = std::make_shared(config::recycle_pool_parallelism); + worker_pool->start(); + }); switch (conf_.provider) { case S3Conf::AZURE: { Azure::Storage::Blobs::BlobClientOptions options; @@ -295,7 +304,7 @@ int S3Accessor::delete_prefix_impl(const std::string& path_prefix, int64_t expir LOG_INFO("delete prefix").tag("uri", to_uri(path_prefix)); return obj_client_ ->delete_objects_recursively({.bucket = conf_.bucket, .key = get_key(path_prefix)}, - expiration_time) + {.executor = worker_pool}, expiration_time) .ret; } @@ -337,7 +346,8 @@ int S3Accessor::delete_files(const std::vector& paths) { keys.emplace_back(get_key(path)); } - return obj_client_->delete_objects(conf_.bucket, std::move(keys)).ret; + return obj_client_->delete_objects(conf_.bucket, std::move(keys), {.executor = worker_pool}) + .ret; } int S3Accessor::delete_file(const std::string& path) { diff --git a/cloud/src/recycler/s3_accessor.h b/cloud/src/recycler/s3_accessor.h index 8e9b53b439267e2..130c94bef731eea 100644 --- a/cloud/src/recycler/s3_accessor.h +++ b/cloud/src/recycler/s3_accessor.h @@ -29,11 +29,14 @@ class S3Client; namespace doris::cloud { class ObjectStoreInfoPB; +class SimpleThreadPool; enum class S3RateLimitType; extern int reset_s3_rate_limiter(S3RateLimitType type, size_t max_speed, size_t max_burst, size_t limit); +extern std::shared_ptr worker_pool; + struct S3Conf { std::string ak; std::string sk; diff --git a/cloud/src/recycler/s3_obj_client.cpp b/cloud/src/recycler/s3_obj_client.cpp index 4137867fa83559c..f421a58938e434e 100644 --- a/cloud/src/recycler/s3_obj_client.cpp +++ b/cloud/src/recycler/s3_obj_client.cpp @@ -162,7 +162,8 @@ std::unique_ptr S3ObjClient::list_objects(ObjectStoragePathR } ObjectStorageResponse S3ObjClient::delete_objects(const std::string& bucket, - std::vector keys) { + std::vector keys, + ObjClientIoOptions option) { if (keys.empty()) { return {0}; } @@ -250,8 +251,9 @@ ObjectStorageResponse S3ObjClient::delete_object(ObjectStoragePathRef path) { } ObjectStorageResponse S3ObjClient::delete_objects_recursively(ObjectStoragePathRef path, + ObjClientIoOptions option, int64_t expiration_time) { - return delete_objects_recursively_(path, expiration_time, MaxDeleteBatch); + return delete_objects_recursively_(path, option, expiration_time, MaxDeleteBatch); } ObjectStorageResponse S3ObjClient::get_life_cycle(const std::string& bucket, diff --git a/cloud/src/recycler/s3_obj_client.h b/cloud/src/recycler/s3_obj_client.h index 7804d0818162566..1960a98de6ed105 100644 --- a/cloud/src/recycler/s3_obj_client.h +++ b/cloud/src/recycler/s3_obj_client.h @@ -39,12 +39,13 @@ class S3ObjClient final : public ObjStorageClient { std::unique_ptr list_objects(ObjectStoragePathRef path) override; - ObjectStorageResponse delete_objects(const std::string& bucket, - std::vector keys) override; + ObjectStorageResponse delete_objects(const std::string& bucket, std::vector keys, + ObjClientIoOptions option) override; ObjectStorageResponse delete_object(ObjectStoragePathRef path) override; ObjectStorageResponse delete_objects_recursively(ObjectStoragePathRef path, + ObjClientIoOptions option, int64_t expiration_time = 0) override; ObjectStorageResponse get_life_cycle(const std::string& bucket, diff --git a/cloud/src/recycler/util.h b/cloud/src/recycler/util.h index 20ea66def8c0b79..d899345ec05da8e 100644 --- a/cloud/src/recycler/util.h +++ b/cloud/src/recycler/util.h @@ -17,15 +17,112 @@ #pragma once +#include #include #include +#include +#include #include +#include "common/simple_thread_pool.h" + namespace doris::cloud { class TxnKv; +template +class SyncExecutor { +public: + SyncExecutor( + SimpleThreadPool* pool, std::string name_tag, + std::function cancel = [](const T& /**/) { return false; }) + : _pool(pool), _cancel(std::move(cancel)), _name_tag(std::move(name_tag)) {} + auto add(std::function callback) -> SyncExecutor& { + auto task = std::make_unique(std::move(callback), _cancel, _count); + _count.add_count(); + // The actual task logic would be wrapped by one promise and passed to the threadpool. + // The result would be returned by the future once the task is finished. + // Or the task would be invalid if the whole task is cancelled. + int r = _pool->submit([this, t = task.get()]() { (*t)(_stop_token); }); + CHECK(r == 0); + _res.emplace_back(std::move(task)); + return *this; + } + std::vector when_all(bool* finished) { + timespec current_time; + auto current_time_second = time(nullptr); + current_time.tv_sec = current_time_second + 300; + current_time.tv_nsec = 0; + auto msg = fmt::format("{} has already taken 5 min", _name_tag); + while (0 != _count.timed_wait(current_time)) { + current_time.tv_sec += 300; + LOG(WARNING) << msg; + } + *finished = !_stop_token; + std::vector res; + res.reserve(_res.size()); + for (auto& task : _res) { + if (!task->valid()) { + *finished = false; + return res; + } + res.emplace_back((*task).get()); + } + return res; + } + void reset() { + _res.clear(); + _stop_token = false; + } + +private: + class Task { + public: + Task(std::function callback, std::function cancel, + bthread::CountdownEvent& count) + : _callback(std::move(callback)), + _cancel(std::move(cancel)), + _count(count), + _fut(_pro.get_future()) {} + void operator()(std::atomic_bool& stop_token) { + std::unique_ptr> defer((int*)0x01, + [&](int*) { _count.signal(); }); + if (stop_token) { + _valid = false; + return; + } + T t = _callback(); + // We'll return this task result to user even if this task return error + // So we don't set _valid to false here + if (_cancel(t)) { + stop_token = true; + } + _pro.set_value(std::move(t)); + } + bool valid() { return _valid; } + T get() { return _fut.get(); } + + private: + // It's guarantted that the valid function can only be called inside SyncExecutor's `when_all()` function + // and only be called when the _count.timed_wait function returned. So there would be no data race for + // _valid then it doesn't need to be one atomic bool. + bool _valid = true; + std::function _callback; + std::function _cancel; + std::promise _pro; + bthread::CountdownEvent& _count; + std::future _fut; + }; + std::vector> _res; + // use CountdownEvent to do periodically log using CountdownEvent::time_wait() + bthread::CountdownEvent _count {0}; + std::atomic_bool _stop_token {false}; + SimpleThreadPool* _pool; + std::function _cancel; + std::string _name_tag; +}; + /** * Get all instances, include DELETED instance * @return 0 for success, otherwise error diff --git a/cloud/test/CMakeLists.txt b/cloud/test/CMakeLists.txt index a27f7a1c538f936..ba5e2909918e53f 100644 --- a/cloud/test/CMakeLists.txt +++ b/cloud/test/CMakeLists.txt @@ -50,10 +50,10 @@ add_executable(s3_accessor_test s3_accessor_test.cpp) add_executable(hdfs_accessor_test hdfs_accessor_test.cpp) -add_executable(util_test util_test.cpp) - add_executable(stopwatch_test stopwatch_test.cpp) +add_executable(util_test util_test.cpp) + add_executable(network_util_test network_util_test.cpp) message("Meta-service test dependencies: ${TEST_LINK_LIBS}") @@ -85,10 +85,10 @@ target_link_libraries(s3_accessor_test ${TEST_LINK_LIBS}) target_link_libraries(hdfs_accessor_test ${TEST_LINK_LIBS}) -target_link_libraries(util_test ${TEST_LINK_LIBS}) - target_link_libraries(stopwatch_test ${TEST_LINK_LIBS}) +target_link_libraries(util_test ${TEST_LINK_LIBS}) + target_link_libraries(network_util_test ${TEST_LINK_LIBS}) # FDB related tests need to be linked with libfdb_c diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp index d8da067457bad2c..701a71d71a31346 100644 --- a/cloud/test/recycler_test.cpp +++ b/cloud/test/recycler_test.cpp @@ -48,6 +48,8 @@ static const std::string instance_id = "instance_id_recycle_test"; static int64_t current_time = 0; static constexpr int64_t db_id = 1000; +static std::unique_ptr thread_group = nullptr; + int main(int argc, char** argv) { auto conf_file = "doris_cloud.conf"; if (!cloud::config::init(conf_file, true)) { @@ -63,6 +65,7 @@ int main(int argc, char** argv) { current_time = duration_cast(system_clock::now().time_since_epoch()).count(); ::testing::InitGoogleTest(&argc, argv); + thread_group = std::make_unique(); return RUN_ALL_TESTS(); } @@ -618,7 +621,7 @@ TEST(RecyclerTest, recycle_empty) { obj_info->set_bucket(config::test_s3_bucket); obj_info->set_prefix("recycle_empty"); - InstanceRecycler recycler(txn_kv, instance); + InstanceRecycler recycler(txn_kv, instance, *thread_group); ASSERT_EQ(recycler.init(), 0); ASSERT_EQ(recycler.recycle_rowsets(), 0); @@ -651,7 +654,7 @@ TEST(RecyclerTest, recycle_rowsets) { sp->set_call_back("InvertedIndexIdCache::insert2", [&](auto&&) { ++insert_inverted_index; }); sp->enable_processing(); - InstanceRecycler recycler(txn_kv, instance); + InstanceRecycler recycler(txn_kv, instance, *thread_group); ASSERT_EQ(recycler.init(), 0); std::vector schemas; @@ -713,7 +716,7 @@ TEST(RecyclerTest, bench_recycle_rowsets) { config::instance_recycler_worker_pool_size = 10; config::recycle_task_threshold_seconds = 0; - InstanceRecycler recycler(txn_kv, instance); + InstanceRecycler recycler(txn_kv, instance, *thread_group); ASSERT_EQ(recycler.init(), 0); auto sp = SyncPoint::get_instance(); @@ -793,7 +796,7 @@ TEST(RecyclerTest, recycle_tmp_rowsets) { sp->set_call_back("InvertedIndexIdCache::insert2", [&](auto&&) { ++insert_inverted_index; }); sp->enable_processing(); - InstanceRecycler recycler(txn_kv, instance); + InstanceRecycler recycler(txn_kv, instance, *thread_group); ASSERT_EQ(recycler.init(), 0); std::vector schemas; @@ -852,7 +855,7 @@ TEST(RecyclerTest, recycle_tablet) { obj_info->set_bucket(config::test_s3_bucket); obj_info->set_prefix("recycle_tablet"); - InstanceRecycler recycler(txn_kv, instance); + InstanceRecycler recycler(txn_kv, instance, *thread_group); ASSERT_EQ(recycler.init(), 0); std::vector schemas; @@ -925,7 +928,7 @@ TEST(RecyclerTest, recycle_indexes) { obj_info->set_bucket(config::test_s3_bucket); obj_info->set_prefix("recycle_indexes"); - InstanceRecycler recycler(txn_kv, instance); + InstanceRecycler recycler(txn_kv, instance, *thread_group); ASSERT_EQ(recycler.init(), 0); std::vector schemas; @@ -1034,7 +1037,7 @@ TEST(RecyclerTest, recycle_partitions) { obj_info->set_bucket(config::test_s3_bucket); obj_info->set_prefix("recycle_partitions"); - InstanceRecycler recycler(txn_kv, instance); + InstanceRecycler recycler(txn_kv, instance, *thread_group); ASSERT_EQ(recycler.init(), 0); std::vector schemas; @@ -1142,7 +1145,7 @@ TEST(RecyclerTest, recycle_versions) { InstanceInfoPB instance; instance.set_instance_id(instance_id); - InstanceRecycler recycler(txn_kv, instance); + InstanceRecycler recycler(txn_kv, instance, *thread_group); ASSERT_EQ(recycler.init(), 0); // Recycle all partitions in table except 30006 ASSERT_EQ(recycler.recycle_partitions(), 0); @@ -1211,7 +1214,7 @@ TEST(RecyclerTest, abort_timeout_txn) { } InstanceInfoPB instance; instance.set_instance_id(mock_instance); - InstanceRecycler recycler(txn_kv, instance); + InstanceRecycler recycler(txn_kv, instance, *thread_group); ASSERT_EQ(recycler.init(), 0); sleep(1); ASSERT_EQ(recycler.abort_timeout_txn(), 0); @@ -1254,7 +1257,7 @@ TEST(RecyclerTest, abort_timeout_txn_and_rebegin) { } InstanceInfoPB instance; instance.set_instance_id(mock_instance); - InstanceRecycler recycler(txn_kv, instance); + InstanceRecycler recycler(txn_kv, instance, *thread_group); ASSERT_EQ(recycler.init(), 0); sleep(1); ASSERT_EQ(recycler.abort_timeout_txn(), 0); @@ -1321,7 +1324,7 @@ TEST(RecyclerTest, recycle_expired_txn_label) { } InstanceInfoPB instance; instance.set_instance_id(mock_instance); - InstanceRecycler recycler(txn_kv, instance); + InstanceRecycler recycler(txn_kv, instance, *thread_group); ASSERT_EQ(recycler.init(), 0); recycler.abort_timeout_txn(); TxnInfoPB txn_info_pb; @@ -1372,7 +1375,7 @@ TEST(RecyclerTest, recycle_expired_txn_label) { } InstanceInfoPB instance; instance.set_instance_id(mock_instance); - InstanceRecycler recycler(txn_kv, instance); + InstanceRecycler recycler(txn_kv, instance, *thread_group); ASSERT_EQ(recycler.init(), 0); sleep(1); recycler.abort_timeout_txn(); @@ -1424,7 +1427,7 @@ TEST(RecyclerTest, recycle_expired_txn_label) { } InstanceInfoPB instance; instance.set_instance_id(mock_instance); - InstanceRecycler recycler(txn_kv, instance); + InstanceRecycler recycler(txn_kv, instance, *thread_group); ASSERT_EQ(recycler.init(), 0); sleep(1); recycler.abort_timeout_txn(); @@ -1483,7 +1486,7 @@ TEST(RecyclerTest, recycle_expired_txn_label) { } InstanceInfoPB instance; instance.set_instance_id(mock_instance); - InstanceRecycler recycler(txn_kv, instance); + InstanceRecycler recycler(txn_kv, instance, *thread_group); ASSERT_EQ(recycler.init(), 0); sleep(1); recycler.abort_timeout_txn(); @@ -1620,7 +1623,7 @@ TEST(RecyclerTest, recycle_copy_jobs) { InstanceInfoPB instance_info; create_instance(internal_stage_id, external_stage_id, instance_info); - InstanceRecycler recycler(txn_kv, instance_info); + InstanceRecycler recycler(txn_kv, instance_info, *thread_group); ASSERT_EQ(recycler.init(), 0); auto internal_accessor = recycler.accessor_map_.find(internal_stage_id)->second; @@ -1779,7 +1782,7 @@ TEST(RecyclerTest, recycle_batch_copy_jobs) { InstanceInfoPB instance_info; create_instance(internal_stage_id, external_stage_id, instance_info); - InstanceRecycler recycler(txn_kv, instance_info); + InstanceRecycler recycler(txn_kv, instance_info, *thread_group); ASSERT_EQ(recycler.init(), 0); const auto& internal_accessor = recycler.accessor_map_.find(internal_stage_id)->second; @@ -1893,7 +1896,7 @@ TEST(RecyclerTest, recycle_stage) { instance.set_instance_id(mock_instance); instance.add_obj_info()->CopyFrom(object_info); - InstanceRecycler recycler(txn_kv, instance); + InstanceRecycler recycler(txn_kv, instance, *thread_group); ASSERT_EQ(recycler.init(), 0); auto accessor = recycler.accessor_map_.begin()->second; for (int i = 0; i < 10; ++i) { @@ -1953,7 +1956,7 @@ TEST(RecyclerTest, recycle_deleted_instance) { InstanceInfoPB instance_info; create_instance(internal_stage_id, external_stage_id, instance_info); - InstanceRecycler recycler(txn_kv, instance_info); + InstanceRecycler recycler(txn_kv, instance_info, *thread_group); ASSERT_EQ(recycler.init(), 0); // create txn key for (size_t i = 0; i < 100; i++) { @@ -2531,7 +2534,7 @@ TEST(RecyclerTest, delete_rowset_data) { } { - InstanceRecycler recycler(txn_kv, instance); + InstanceRecycler recycler(txn_kv, instance, *thread_group); ASSERT_EQ(recycler.init(), 0); auto accessor = recycler.accessor_map_.begin()->second; int64_t txn_id_base = 114115; @@ -2565,7 +2568,7 @@ TEST(RecyclerTest, delete_rowset_data) { tmp_obj_info->set_bucket(config::test_s3_bucket); tmp_obj_info->set_prefix(resource_id); - InstanceRecycler recycler(txn_kv, tmp_instance); + InstanceRecycler recycler(txn_kv, tmp_instance, *thread_group); ASSERT_EQ(recycler.init(), 0); auto accessor = recycler.accessor_map_.begin()->second; // Delete multiple rowset files using one series of RowsetPB @@ -2585,7 +2588,7 @@ TEST(RecyclerTest, delete_rowset_data) { ASSERT_FALSE(list_iter->has_next()); } { - InstanceRecycler recycler(txn_kv, instance); + InstanceRecycler recycler(txn_kv, instance, *thread_group); ASSERT_EQ(recycler.init(), 0); auto accessor = recycler.accessor_map_.begin()->second; // Delete multiple rowset files using one series of RowsetPB diff --git a/cloud/test/util_test.cpp b/cloud/test/util_test.cpp index 0292117076165cc..0c86701bd3fe05e 100644 --- a/cloud/test/util_test.cpp +++ b/cloud/test/util_test.cpp @@ -15,17 +15,34 @@ // specific language governing permissions and limitations // under the License. +#include "recycler/util.h" + +#include #include +#include +#include #include #include #include "common/config.h" +#include "common/logging.h" #include "common/string_util.h" -#include "glog/logging.h" #include "gtest/gtest.h" +#include "recycler/recycler.h" + +using namespace doris::cloud; int main(int argc, char** argv) { - doris::cloud::config::init(nullptr, true); + const auto* conf_file = "doris_cloud.conf"; + if (!config::init(conf_file, true)) { + std::cerr << "failed to init config file, conf=" << conf_file << std::endl; + return -1; + } + if (!::init_glog("util")) { + std::cerr << "failed to init glog" << std::endl; + return -1; + } + ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } @@ -89,3 +106,132 @@ TEST(StringUtilTest, test_string_strip) { // clang-format on } + +template +auto task_wrapper(Func... funcs) -> std::function { + return [funcs...]() { + return [](std::initializer_list numbers) { + int i = 0; + for (int num : numbers) { + if (num != 0) { + i = num; + } + } + return i; + }({funcs()...}); + }; +} + +TEST(UtilTest, stage_wrapper) { + std::function func1 = []() { return 0; }; + std::function func2 = []() { return -1; }; + std::function func3 = []() { return 0; }; + auto f = task_wrapper(func1, func2, func3); + ASSERT_EQ(-1, f()); + + f = task_wrapper(func1, func3); + ASSERT_EQ(0, f()); +} + +TEST(UtilTest, delay) { + std::unique_ptr s3_producer_pool = + std::make_unique(config::recycle_pool_parallelism); + s3_producer_pool->start(); + // test normal execute + { + SyncExecutor sync_executor(s3_producer_pool.get(), "normal test", + [](int k) { return k == -1; }); + auto f1 = []() { return -1; }; + auto f2 = []() { + std::this_thread::sleep_for(std::chrono::seconds(1)); + return 1; + }; + sync_executor.add(f2); + sync_executor.add(f2); + sync_executor.add(f1); + bool finished = true; + std::vector res = sync_executor.when_all(&finished); + ASSERT_EQ(finished, false); + ASSERT_EQ(3, res.size()); + } + // test normal execute + { + SyncExecutor sync_executor( + s3_producer_pool.get(), "normal test", + [](const std::string_view k) { return k.empty(); }); + auto f1 = []() { return ""; }; + auto f2 = []() { + std::this_thread::sleep_for(std::chrono::seconds(1)); + return "fake"; + }; + sync_executor.add(f2); + sync_executor.add(f2); + sync_executor.add(f1); + bool finished = true; + auto res = sync_executor.when_all(&finished); + ASSERT_EQ(finished, false); + ASSERT_EQ(3, res.size()); + } +} + +TEST(UtilTest, normal) { + std::unique_ptr s3_producer_pool = + std::make_unique(config::recycle_pool_parallelism); + s3_producer_pool->start(); + // test normal execute + { + SyncExecutor sync_executor(s3_producer_pool.get(), "normal test", + [](int k) { return k == -1; }); + auto f1 = []() { return 1; }; + sync_executor.add(f1); + sync_executor.add(f1); + sync_executor.add(f1); + bool finished = true; + std::vector res = sync_executor.when_all(&finished); + ASSERT_EQ(3, res.size()); + ASSERT_EQ(finished, true); + std::for_each(res.begin(), res.end(), [](auto&& n) { ASSERT_EQ(1, n); }); + } + // test when error happen + { + SyncExecutor sync_executor(s3_producer_pool.get(), "normal test", + [](int k) { return k == -1; }); + auto f1 = []() { return 1; }; + sync_executor._stop_token = true; + sync_executor.add(f1); + sync_executor.add(f1); + sync_executor.add(f1); + bool finished = true; + std::vector res = sync_executor.when_all(&finished); + ASSERT_EQ(finished, false); + ASSERT_EQ(0, res.size()); + } + { + SyncExecutor sync_executor(s3_producer_pool.get(), "normal test", + [](int k) { return k == -1; }); + auto f1 = []() { return 1; }; + auto cancel = []() { return -1; }; + sync_executor.add(f1); + sync_executor.add(f1); + sync_executor.add(f1); + sync_executor.add(cancel); + bool finished = true; + std::vector res = sync_executor.when_all(&finished); + ASSERT_EQ(finished, false); + } + // test string_view + { + SyncExecutor sync_executor( + s3_producer_pool.get(), "normal test", + [](const std::string_view k) { return k.empty(); }); + std::string s = "Hello World"; + auto f1 = [&s]() { return std::string_view(s); }; + sync_executor.add(f1); + sync_executor.add(f1); + sync_executor.add(f1); + bool finished = true; + std::vector res = sync_executor.when_all(&finished); + ASSERT_EQ(3, res.size()); + std::for_each(res.begin(), res.end(), [&s](auto&& n) { ASSERT_EQ(s, n); }); + } +}