Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

curvefs client : update inode async #1020

Merged
merged 1 commit into from
Jan 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion curvefs/src/client/fuse_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -669,8 +669,14 @@ CURVEFS_ERROR FuseClient::FuseOpSetAttr(fuse_req_t req, fuse_ino_t ino,
return tRet;
}
inode->set_length(attr->st_size);
ret = inodeWrapper->Sync();
if (ret != CURVEFS_ERROR::OK) {
return ret;
}
inodeWrapper->GetInodeAttrUnLocked(attrOut);
return ret;
}
ret = inodeWrapper->Sync();
ret = inodeWrapper->SyncAttr();
if (ret != CURVEFS_ERROR::OK) {
return ret;
}
Expand Down
32 changes: 4 additions & 28 deletions curvefs/src/client/inode_cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,7 @@ CURVEFS_ERROR InodeCacheManagerImpl::GetInode(uint64_t inodeid,
std::shared_ptr<InodeWrapper> eliminatedOne;
bool eliminated = iCache_->Put(inodeid, out, &eliminatedOne);
if (eliminated) {
CURVEFS_ERROR ret = eliminatedOne->Sync();
if (ret != CURVEFS_ERROR::OK) {
LOG(ERROR) << "sync inode failed, ret = " << ret;
return ret;
}
eliminatedOne->FlushAsync();
}
return CURVEFS_ERROR::OK;
}
Expand All @@ -93,11 +89,7 @@ CURVEFS_ERROR InodeCacheManagerImpl::CreateInode(
std::shared_ptr<InodeWrapper> eliminatedOne;
bool eliminated = iCache_->Put(inodeid, out, &eliminatedOne);
if (eliminated) {
CURVEFS_ERROR ret = eliminatedOne->Sync();
if (ret != CURVEFS_ERROR::OK) {
LOG(ERROR) << "sync inode failed, ret = " << ret;
return ret;
}
eliminatedOne->FlushAsync();
}
return CURVEFS_ERROR::OK;
}
Expand Down Expand Up @@ -136,31 +128,15 @@ void InodeCacheManagerImpl::FlushAll() {
}
}


void InodeCacheManagerImpl::FlushInodeOnce() {
std::map<uint64_t, std::shared_ptr<InodeWrapper>> temp_;
{
curve::common::LockGuard lg(dirtyMapMutex_);
temp_.swap(dirtyMap_);
}
for (auto it = temp_.begin(); it != temp_.end();) {
for (auto it = temp_.begin(); it != temp_.end(); it++) {
curve::common::UniqueLock ulk = it->second->GetUniqueLock();
CURVEFS_ERROR ret = it->second->Sync();
if (ret != CURVEFS_ERROR::OK && ret != CURVEFS_ERROR::NOTEXIST) {
LOG(ERROR) << "Flush inode failed, inodeid = "
<< it->second->GetInodeId();
it++;
continue;
}
it = temp_.erase(it);
}
LOG_IF(WARNING, temp_.size() > 0) << "FlushInodeOnce, remain inode num = "
<< temp_.size();
{
curve::common::LockGuard lg(dirtyMapMutex_);
for (const auto &v : temp_) {
dirtyMap_.emplace(v.first, v.second);
}
it->second->FlushAsync();
}
}

Expand Down
80 changes: 78 additions & 2 deletions curvefs/src/client/inode_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ namespace client {

using rpcclient::MetaServerClient;
using rpcclient::MetaServerClientImpl;
using rpcclient::MetaServerClientDone;

std::ostream &operator<<(std::ostream &os, const struct stat &attr) {
os << "{ st_ino = " << attr.st_ino << ", st_mode = " << attr.st_mode
Expand Down Expand Up @@ -65,7 +66,56 @@ void AppendS3ChunkInfoToMap(uint64_t chunkIndex, const S3ChunkInfo &info,
}
}

CURVEFS_ERROR InodeWrapper::Sync() {
class UpdateInodeAsyncDone : public MetaServerClientDone {
public:
UpdateInodeAsyncDone(
const std::shared_ptr<InodeWrapper> &inodeWrapper):
inodeWrapper_(inodeWrapper) {}
~UpdateInodeAsyncDone() {}

void Run() override {
std::unique_ptr<UpdateInodeAsyncDone> self_guard(this);
MetaStatusCode ret = GetStatusCode();
if (ret != MetaStatusCode::OK && ret != MetaStatusCode::NOT_FOUND) {
LOG(ERROR) << "metaClient_ UpdateInode failed, "
<< "MetaStatusCode: " << ret
<< ", MetaStatusCode_Name: " << MetaStatusCode_Name(ret)
<< ", inodeid: " << inodeWrapper_->GetInodeId();
inodeWrapper_->MarkInodeError();
}
inodeWrapper_->ReleaseSyncingInode();
};

private:
std::shared_ptr<InodeWrapper> inodeWrapper_;
};

class GetOrModifyS3ChunkInfoAsyncDone : public MetaServerClientDone {
public:
GetOrModifyS3ChunkInfoAsyncDone(
const std::shared_ptr<InodeWrapper> &inodeWrapper):
inodeWrapper_(inodeWrapper) {}
~GetOrModifyS3ChunkInfoAsyncDone() {}

void Run() override {
std::unique_ptr<GetOrModifyS3ChunkInfoAsyncDone> self_guard(this);
MetaStatusCode ret = GetStatusCode();
if (ret != MetaStatusCode::OK && ret != MetaStatusCode::NOT_FOUND) {
LOG(ERROR) << "metaClient_ GetOrModifyS3ChunkInfo failed, "
<< "MetaStatusCode: " << ret
<< ", MetaStatusCode_Name: " << MetaStatusCode_Name(ret)
<< ", inodeid: " << inodeWrapper_->GetInodeId();
inodeWrapper_->MarkInodeError();
}
inodeWrapper_->ReleaseSyncingS3ChunkInfo();
};

private:
std::shared_ptr<InodeWrapper> inodeWrapper_;
};

CURVEFS_ERROR InodeWrapper::SyncAttr() {
curve::common::UniqueLock lock = GetSyncingInodeUniqueLock();
if (dirty_) {
MetaStatusCode ret = metaClient_->UpdateInode(inode_);

Expand All @@ -78,6 +128,11 @@ CURVEFS_ERROR InodeWrapper::Sync() {
}
dirty_ = false;
}
return CURVEFS_ERROR::OK;
}

CURVEFS_ERROR InodeWrapper::SyncS3ChunkInfo() {
curve::common::UniqueLock lock = GetSyncingS3ChunkInfoUniqueLock();
if (!s3ChunkInfoAdd_.empty()) {
MetaStatusCode ret = metaClient_->GetOrModifyS3ChunkInfo(
inode_.fsid(), inode_.inodeid(), s3ChunkInfoAdd_);
Expand All @@ -93,7 +148,28 @@ CURVEFS_ERROR InodeWrapper::Sync() {
return CURVEFS_ERROR::OK;
}

CURVEFS_ERROR InodeWrapper::Refresh() {
void InodeWrapper::FlushAttrAsync() {
if (dirty_) {
LockSyncingInode();
auto *done = new UpdateInodeAsyncDone(shared_from_this());
metaClient_->UpdateInodeAsync(inode_, done);
dirty_ = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dirty_=false should update in done?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, should be update here, so when new write come, it can be set dirty again

}
}

void InodeWrapper::FlushS3ChunkInfoAsync() {
if (!s3ChunkInfoAdd_.empty()) {
LockSyncingS3ChunkInfo();
auto *done = new GetOrModifyS3ChunkInfoAsyncDone(shared_from_this());
metaClient_->GetOrModifyS3ChunkInfoAsync(
inode_.fsid(), inode_.inodeid(), s3ChunkInfoAdd_,
done);
s3ChunkInfoAdd_.clear();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not move it when calling GetOrModifyS3ChunkInfoAsync?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s3ChunkInfoAdd_ should clear here, so that new s3chunkinfo can add to it when new write come

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should clear it in done?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s3ChunkInfoAdd_ should clear here, so that new s3chunkinfo can add to it when new write come

}
}

CURVEFS_ERROR InodeWrapper::RefreshS3ChunkInfo() {
curve::common::UniqueLock lock = GetSyncingS3ChunkInfoUniqueLock();
google::protobuf::Map<
uint64_t, S3ChunkInfoList> s3ChunkInfoMap;
MetaStatusCode ret = metaClient_->GetOrModifyS3ChunkInfo(
Expand Down
66 changes: 62 additions & 4 deletions curvefs/src/client/inode_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ using ::curvefs::metaserver::S3ChunkInfo;
namespace curvefs {
namespace client {

enum InodeStatus {
Normal = 0,
Error = -1,
};

// TODO(xuchaojie) : get from conf maybe?
const uint32_t kOptimalIOBlockSize = 0x10000u;

Expand All @@ -53,20 +58,21 @@ std::ostream &operator<<(std::ostream &os, const struct stat &attr);
void AppendS3ChunkInfoToMap(uint64_t chunkIndex, const S3ChunkInfo &info,
google::protobuf::Map<uint64_t, S3ChunkInfoList> *s3ChunkInfoMap);


class InodeWrapper {
class InodeWrapper : public std::enable_shared_from_this<InodeWrapper> {
public:
InodeWrapper(const Inode &inode,
const std::shared_ptr<MetaServerClient> &metaClient)
: inode_(inode),
openCount_(0),
status_(InodeStatus::Normal),
metaClient_(metaClient),
dirty_(false) {}

InodeWrapper(Inode &&inode,
const std::shared_ptr<MetaServerClient> &metaClient)
: inode_(std::move(inode)),
openCount_(0),
status_(InodeStatus::Normal),
metaClient_(metaClient),
dirty_(false) {}

Expand Down Expand Up @@ -187,9 +193,28 @@ class InodeWrapper {

CURVEFS_ERROR DecreaseNLink();

CURVEFS_ERROR Sync();
CURVEFS_ERROR Sync() {
CURVEFS_ERROR ret = SyncAttr();
if (ret != CURVEFS_ERROR::OK) {
return ret;
}
return SyncS3ChunkInfo();
}

CURVEFS_ERROR SyncAttr();

CURVEFS_ERROR Refresh();
CURVEFS_ERROR SyncS3ChunkInfo();

void FlushAsync() {
FlushAttrAsync();
FlushS3ChunkInfoAsync();
}

void FlushAttrAsync();

void FlushS3ChunkInfoAsync();

CURVEFS_ERROR RefreshS3ChunkInfo();

CURVEFS_ERROR Open();

Expand Down Expand Up @@ -220,18 +245,51 @@ class InodeWrapper {
inode_.mutable_s3chunkinfomap());
}

void MarkInodeError() {
// TODO(xuchaojie) : when inode is marked error, prevent futher write.
status_ = InodeStatus::Error;
}

void LockSyncingInode() const {
syncingInodeMtx_.lock();
}

void ReleaseSyncingInode() const {
syncingInodeMtx_.unlock();
}

curve::common::UniqueLock GetSyncingInodeUniqueLock() {
return curve::common::UniqueLock(syncingInodeMtx_);
}

void LockSyncingS3ChunkInfo() const {
syncingS3ChunkInfoMtx_.lock();
}

void ReleaseSyncingS3ChunkInfo() const {
syncingS3ChunkInfoMtx_.unlock();
}

curve::common::UniqueLock GetSyncingS3ChunkInfoUniqueLock() {
return curve::common::UniqueLock(syncingS3ChunkInfoMtx_);
}

private:
CURVEFS_ERROR SetOpenFlag(bool flag);

private:
Inode inode_;
uint32_t openCount_;
InodeStatus status_;

google::protobuf::Map<uint64_t, S3ChunkInfoList> s3ChunkInfoAdd_;

std::shared_ptr<MetaServerClient> metaClient_;
bool dirty_;
mutable ::curve::common::Mutex mtx_;

mutable ::curve::common::Mutex syncingInodeMtx_;
mutable ::curve::common::Mutex syncingS3ChunkInfoMtx_;
};

} // namespace client
Expand Down
Loading