Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

curvefs client : update inode async #1020

Merged
merged 1 commit into from
Jan 24, 2022
Merged

Conversation

xu-chaojie
Copy link
Member

@xu-chaojie xu-chaojie commented Jan 19, 2022

What problem does this PR solve?

Issue Number: close #935

Problem Summary:

update_inode_async

What is changed and how it works?

Update inode asynchronously, so that update inode will not block each others.

Check List

  • Relevant documentation/comments is changed or added
  • I acknowledge that all my contributions will be made under the project's license

@wu-hanqing wu-hanqing self-requested a review January 20, 2022 12:21
LOG(WARNING) << "UpdateInode Failed, errorcode = "
<< cntl.ErrorCode()
<< ", error content:" << cntl.ErrorText()
<< ", log id = " << cntl.log_id();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

separator should be :

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if (!HasValidTarget() && !GetTarget()) {
LOG(WARNING) << "get target fail for " << task_->TaskContextStr()
<< ", sleep and retry";
bthread_usleep(opt_.retryIntervalUS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the purpose of sleep in here? I think is conflicting with sleep in line 210

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if (!channel) {
LOG(WARNING) << "GetOrCreateChannel fail for "
<< task_->TaskContextStr() << ", sleep and retry";
bthread_usleep(opt_.retryIntervalUS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

MetaServerClientRpcDoneBase(done, metaserverClientMetric) {}
virtual ~UpdateInodeRpcDone() {}
void Run() override;
UpdateInodeResponse response;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public data member?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes


namespace curvefs {
namespace client {
namespace rpcclient {

class TaskExecutorDone;

MetaStatusCode ConvertToMetaStatusCode(int retcode);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems this function is only used in .cpp file, so no need to declare it in the header file.

Copy link
Member Author

@xu-chaojie xu-chaojie Jan 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was used by two different cpp files, so it can not be put in a cpp file.

@@ -72,6 +79,41 @@ int TaskExecutor::DoRPCTask(std::shared_ptr<TaskContext> task) {
return retCode;
}

void TaskExecutor::DoAsyncRPCTask(TaskExecutorDone *done) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

above synchronous processing can be implemented by this asynchronous, so the main logic here doesn't have to repeat twice.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeap. sync and async can be solved in same DoRPCTask distinguished by whether done is null.

Copy link
Member Author

@xu-chaojie xu-chaojie Jan 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, the synchronous way will retry in this function, but asynchronous way retry by closure return, they not same


curvefs::metaserver::MetaServerService_Stub stub(channel);
stub.UpdateInode(cntl, &request, &rpcDone->response, rpcDone);
return MetaStatusCode::OK;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return value is redundant

Copy link
Member Author

@xu-chaojie xu-chaojie Jan 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if remove this return value, the async task and sync task can not be assign to the same member of task context

curvefs::metaserver::MetaServerService_Stub stub(channel);
stub.GetOrModifyS3ChunkInfo(
cntl, &request, &rpcDone->response, rpcDone);
return MetaStatusCode::OK;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

metaClient_->GetOrModifyS3ChunkInfoAsync(
inode_.fsid(), inode_.inodeid(), s3ChunkInfoAdd_,
done);
s3ChunkInfoAdd_.clear();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not move it when calling GetOrModifyS3ChunkInfoAsync?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s3ChunkInfoAdd_ should clear here, so that new s3chunkinfo can add to it when new write come

LockSyncingInode();
auto *done = new UpdateInodeAsyncDone(shared_from_this());
metaClient_->UpdateInodeAsync(inode_, done);
dirty_ = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dirty_=false should update in done?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, should be update here, so when new write come, it can be set dirty again

metaClient_->GetOrModifyS3ChunkInfoAsync(
inode_.fsid(), inode_.inodeid(), s3ChunkInfoAdd_,
done);
s3ChunkInfoAdd_.clear();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should clear it in done?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s3ChunkInfoAdd_ should clear here, so that new s3chunkinfo can add to it when new write come

@@ -220,18 +238,51 @@ class InodeWrapper {
inode_.mutable_s3chunkinfomap());
}

void MarkInodeError() {
// TODO(xuchaojie) : when inode is marked error, prevent futher write.
status_ = -1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

enum status {
}?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

brpc::Controller * cntl) -> int
brpc::Controller * cntl, TaskExecutorDone *taskExecutorDone) -> int

#define AsyncRPCTask \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AsyncRPCTask is defined same as RPCTask

Copy link
Member Author

@xu-chaojie xu-chaojie Jan 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not the same, the AsyncRPCTask is "capture by copy" and SyncRpcTask is "capture by reference"

<< ", errmsg: " << MetaStatusCode_Name(ret);
done_->SetRetCode(ret);
return;
} else if (response.has_appliedindex()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not judge s3ChunkInfoMap in GetOrModifyS3ChunkInfoResponse?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

judge what? it can be empty.

@@ -72,6 +79,41 @@ int TaskExecutor::DoRPCTask(std::shared_ptr<TaskContext> task) {
return retCode;
}

void TaskExecutor::DoAsyncRPCTask(TaskExecutorDone *done) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeap. sync and async can be solved in same DoRPCTask distinguished by whether done is null.

@xu-chaojie xu-chaojie merged commit d0bd59d into opencurve:master Jan 24, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

CI failover: Fuse memory is increasing while running IO
3 participants