Skip to content

Commit

Permalink
add util test
Browse files Browse the repository at this point in the history
use atomic

use value
  • Loading branch information
ByteYue committed Jul 19, 2024
1 parent 76c6d17 commit 6f5219d
Show file tree
Hide file tree
Showing 17 changed files with 545 additions and 86 deletions.
2 changes: 2 additions & 0 deletions cloud/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ CONF_Strings(recycle_whitelist, ""); // Comma seprated list
CONF_Strings(recycle_blacklist, ""); // Comma seprated list
CONF_mInt32(instance_recycler_worker_pool_size, "1");
CONF_Bool(enable_checker, "false");
// The parallelism for parallel recycle operation
CONF_Int32(recycle_pool_parallelism, "10");
// Currently only used for recycler test
CONF_Bool(enable_inverted_check, "false");
// interval for scanning instances to do checks and inspections
Expand Down
6 changes: 4 additions & 2 deletions cloud/src/recycler/azure_obj_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,8 @@ std::unique_ptr<ObjectListIterator> AzureObjClient::list_objects(ObjectStoragePa
// You can find out the num in https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id
// > Each batch request supports a maximum of 256 subrequests.
ObjectStorageResponse AzureObjClient::delete_objects(const std::string& bucket,
std::vector<std::string> keys) {
std::vector<std::string> keys,
ObjClientOptions option) {
if (keys.empty()) {
return {0};
}
Expand Down Expand Up @@ -275,8 +276,9 @@ ObjectStorageResponse AzureObjClient::delete_object(ObjectStoragePathRef path) {
}

ObjectStorageResponse AzureObjClient::delete_objects_recursively(ObjectStoragePathRef path,
ObjClientOptions option,
int64_t expiration_time) {
return delete_objects_recursively_(path, expiration_time, BlobBatchMaxOperations);
return delete_objects_recursively_(path, option, expiration_time, BlobBatchMaxOperations);
}

ObjectStorageResponse AzureObjClient::get_life_cycle(const std::string& bucket,
Expand Down
5 changes: 3 additions & 2 deletions cloud/src/recycler/azure_obj_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,13 @@ class AzureObjClient final : public ObjStorageClient {

std::unique_ptr<ObjectListIterator> list_objects(ObjectStoragePathRef path) override;

ObjectStorageResponse delete_objects(const std::string& bucket,
std::vector<std::string> keys) override;
ObjectStorageResponse delete_objects(const std::string& bucket, std::vector<std::string> keys,
ObjClientOptions option) override;

ObjectStorageResponse delete_object(ObjectStoragePathRef path) override;

ObjectStorageResponse delete_objects_recursively(ObjectStoragePathRef path,
ObjClientOptions option,
int64_t expiration_time = 0) override;

ObjectStorageResponse get_life_cycle(const std::string& bucket,
Expand Down
30 changes: 24 additions & 6 deletions cloud/src/recycler/obj_storage_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,24 @@
#include "recycler/obj_storage_client.h"

#include "cpp/sync_point.h"
#include "recycler/sync_executor.h"
#include "recycler/util.h"

namespace doris::cloud {

ObjectStorageResponse ObjStorageClient::delete_objects_recursively_(ObjectStoragePathRef path,
const ObjClientOptions& option,
int64_t expired_time,
size_t batch_size) {
TEST_SYNC_POINT_CALLBACK("ObjStorageClient::delete_objects_recursively_", &batch_size);
auto list_iter = list_objects(path);

ObjectStorageResponse ret;
std::vector<std::string> keys;
SyncExecutor<int> concurrent_delete_executor(
option.executor,
fmt::format("delete objects under bucket {}, path {}", path.bucket, path.key),
[](const int& ret) { return ret != 0; });

for (auto obj = list_iter->next(); obj.has_value(); obj = list_iter->next()) {
if (expired_time > 0 && obj->mtime_s > expired_time) {
Expand All @@ -39,20 +46,31 @@ ObjectStorageResponse ObjStorageClient::delete_objects_recursively_(ObjectStorag
if (keys.size() < batch_size) {
continue;
}

ret = delete_objects(path.bucket, std::move(keys));
if (ret.ret != 0) {
return ret;
}
concurrent_delete_executor.add([this, &path, k = std::move(keys), option]() mutable {
return delete_objects(path.bucket, std::move(k), option).ret;
});
}

if (!list_iter->is_valid()) {
bool finished;
concurrent_delete_executor.when_all(&finished);
return {-1};
}

if (!keys.empty()) {
return delete_objects(path.bucket, std::move(keys));
concurrent_delete_executor.add([this, &path, k = std::move(keys), option]() mutable {
return delete_objects(path.bucket, std::move(k), option).ret;
});
}
bool finished = true;
std::vector<int> rets = concurrent_delete_executor.when_all(&finished);
for (int r : rets) {
if (r != 0) {
ret = -1;
}
}

ret = finished ? ret : -1;

return ret;
}
Expand Down
11 changes: 10 additions & 1 deletion cloud/src/recycler/obj_storage_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ class ObjectListIterator {
virtual std::optional<ObjectMeta> next() = 0;
};

class SimpleThreadPool;
struct ObjClientOptions {
bool prefetch {true};
std::shared_ptr<SimpleThreadPool> executor;
};

class ObjStorageClient {
public:
ObjStorageClient() = default;
Expand All @@ -71,14 +77,16 @@ class ObjStorageClient {

// According to the bucket and prefix specified by the user, it performs batch deletion based on the object names in the object array.
virtual ObjectStorageResponse delete_objects(const std::string& bucket,
std::vector<std::string> keys) = 0;
std::vector<std::string> keys,
ObjClientOptions option) = 0;

// Delete the file named key in the object storage bucket.
virtual ObjectStorageResponse delete_object(ObjectStoragePathRef path) = 0;

// According to the prefix, recursively delete all objects under the prefix.
// If `expiration_time` > 0, only delete objects with mtime earlier than `expiration_time`.
virtual ObjectStorageResponse delete_objects_recursively(ObjectStoragePathRef path,
ObjClientOptions option,
int64_t expiration_time = 0) = 0;

// Get the objects' expiration time on the bucket
Expand All @@ -91,6 +99,7 @@ class ObjStorageClient {

protected:
ObjectStorageResponse delete_objects_recursively_(ObjectStoragePathRef path,
const ObjClientOptions& option,
int64_t expiration_time, size_t batch_size);
};

Expand Down
Loading

0 comments on commit 6f5219d

Please sign in to comment.