Skip to content

Commit

Permalink
fix GetFile atomicity when disable file pool
Browse files Browse the repository at this point in the history
  • Loading branch information
root committed Dec 25, 2020
1 parent 0011847 commit 0ee8a75
Show file tree
Hide file tree
Showing 3 changed files with 310 additions and 135 deletions.
143 changes: 63 additions & 80 deletions src/chunkserver/datastore/file_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,23 @@
* Author: tongguangxun
*/

#include "src/chunkserver/datastore/file_pool.h"

#include <errno.h>
#include <fcntl.h>
#include <glog/logging.h>
#include <json/json.h>
#include <fcntl.h>
#include <linux/fs.h>
#include <errno.h>
#include <cctype>

#include <algorithm>
#include <cctype>
#include <climits>
#include <vector>
#include <memory>
#include <vector>

#include "src/common/crc32.h"
#include "src/common/configuration.h"
#include "src/common/crc32.h"
#include "src/common/curve_define.h"
#include "src/chunkserver/datastore/file_pool.h"

using curve::common::kFilePoolMaigic;

Expand All @@ -48,40 +49,32 @@ const char* FilePoolHelper::kCRC = "crc";
const uint32_t FilePoolHelper::kPersistSize = 4096;

int FilePoolHelper::PersistEnCodeMetaInfo(
std::shared_ptr<LocalFileSystem> fsptr,
uint32_t chunkSize,
uint32_t metaPageSize,
const std::string& filePoolPath,
const std::string& persistPath) {
std::shared_ptr<LocalFileSystem> fsptr, uint32_t chunkSize,
uint32_t metaPageSize, const std::string& filePoolPath,
const std::string& persistPath) {
Json::Value root;
root[kFileSize] = chunkSize;
root[kMetaPageSize] = metaPageSize;
root[kFilePoolPath] = filePoolPath;

uint32_t crcsize = sizeof(kFilePoolMaigic) +
sizeof(chunkSize) +
sizeof(metaPageSize) +
filePoolPath.size();
uint32_t crcsize = sizeof(kFilePoolMaigic) + sizeof(chunkSize) +
sizeof(metaPageSize) + filePoolPath.size();
char* crcbuf = new char[crcsize];

::memcpy(crcbuf, kFilePoolMaigic,
sizeof(kFilePoolMaigic));
::memcpy(crcbuf + sizeof(kFilePoolMaigic),
&chunkSize, sizeof(uint32_t));
::memcpy(crcbuf + sizeof(uint32_t) + sizeof(kFilePoolMaigic),
&metaPageSize, sizeof(uint32_t));
::memcpy(crcbuf + 2*sizeof(uint32_t) + sizeof(kFilePoolMaigic),
filePoolPath.c_str(),
filePoolPath.size());
::memcpy(crcbuf, kFilePoolMaigic, sizeof(kFilePoolMaigic));
::memcpy(crcbuf + sizeof(kFilePoolMaigic), &chunkSize, sizeof(uint32_t));
::memcpy(crcbuf + sizeof(uint32_t) + sizeof(kFilePoolMaigic), &metaPageSize,
sizeof(uint32_t));
::memcpy(crcbuf + 2 * sizeof(uint32_t) + sizeof(kFilePoolMaigic),
filePoolPath.c_str(), filePoolPath.size());
uint32_t crc = ::curve::common::CRC32(crcbuf, crcsize);
delete[] crcbuf;

root[kCRC] = crc;

int fd = fsptr->Open(persistPath.c_str(), O_RDWR|O_CREAT|O_SYNC);
int fd = fsptr->Open(persistPath.c_str(), O_RDWR | O_CREAT | O_SYNC);
if (fd < 0) {
LOG(ERROR) << "meta file open failed, "
<< persistPath.c_str();
LOG(ERROR) << "meta file open failed, " << persistPath.c_str();
return -1;
}

Expand All @@ -94,8 +87,7 @@ int FilePoolHelper::PersistEnCodeMetaInfo(

int ret = fsptr->Write(fd, writeBuffer, 0, kPersistSize);
if (ret != kPersistSize) {
LOG(ERROR) << "meta file write failed, "
<< persistPath.c_str()
LOG(ERROR) << "meta file write failed, " << persistPath.c_str()
<< ", ret = " << ret;
fsptr->Close(fd);
delete[] writeBuffer;
Expand All @@ -108,12 +100,9 @@ int FilePoolHelper::PersistEnCodeMetaInfo(
}

int FilePoolHelper::DecodeMetaInfoFromMetaFile(
std::shared_ptr<LocalFileSystem> fsptr,
const std::string& metaFilePath,
uint32_t metaFileSize,
uint32_t* chunksize,
uint32_t* metapagesize,
std::string* chunkfilePath) {
std::shared_ptr<LocalFileSystem> fsptr, const std::string& metaFilePath,
uint32_t metaFileSize, uint32_t* chunksize, uint32_t* metapagesize,
std::string* chunkfilePath) {
int fd = fsptr->Open(metaFilePath, O_RDWR);
if (fd < 0) {
LOG(ERROR) << "meta file open failed, " << metaFilePath;
Expand Down Expand Up @@ -145,31 +134,31 @@ int FilePoolHelper::DecodeMetaInfoFromMetaFile(
*chunksize = value[kFileSize].asUInt();
} else {
LOG(ERROR) << "chunkfile meta file got error!"
<< " no chunksize!";
<< " no chunksize!";
break;
}

if (!value[kMetaPageSize].isNull()) {
*metapagesize = value[kMetaPageSize].asUInt();
} else {
LOG(ERROR) << "chunkfile meta file got error!"
<< " no metaPageSize!";
<< " no metaPageSize!";
break;
}

if (!value[kFilePoolPath].isNull()) {
*chunkfilePath = value[kFilePoolPath].asString();
} else {
LOG(ERROR) << "chunkfile meta file got error!"
<< " no FilePool path!";
<< " no FilePool path!";
break;
}

if (!value[kCRC].isNull()) {
crcvalue = value[kCRC].asUInt();
} else {
LOG(ERROR) << "chunkfile meta file got error!"
<< " no crc!";
<< " no crc!";
break;
}

Expand All @@ -181,20 +170,21 @@ int FilePoolHelper::DecodeMetaInfoFromMetaFile(
return -1;
}

uint32_t crcCheckSize = 2*sizeof(uint32_t) +
sizeof(kFilePoolMaigic) +
chunkfilePath->size();
uint32_t crcCheckSize =
2 * sizeof(uint32_t) + sizeof(kFilePoolMaigic) + chunkfilePath->size();

std::unique_ptr<char[]> crcCheckBuf(new char[crcCheckSize]);

::memcpy(crcCheckBuf.get(), kFilePoolMaigic, sizeof(kFilePoolMaigic)); // NOLINT
::memcpy(crcCheckBuf.get() + sizeof(kFilePoolMaigic),
chunksize, sizeof(uint32_t));
::memcpy(crcCheckBuf.get() + sizeof(uint32_t) + sizeof(kFilePoolMaigic), // NOLINT
::memcpy(crcCheckBuf.get(), kFilePoolMaigic,
sizeof(kFilePoolMaigic)); // NOLINT
::memcpy(crcCheckBuf.get() + sizeof(kFilePoolMaigic), chunksize,
sizeof(uint32_t));
::memcpy(crcCheckBuf.get() + sizeof(uint32_t) +
sizeof(kFilePoolMaigic), // NOLINT
metapagesize, sizeof(uint32_t));
::memcpy(crcCheckBuf.get() + 2*sizeof(uint32_t) + sizeof(kFilePoolMaigic), // NOLINT
chunkfilePath->c_str(),
chunkfilePath->size());
::memcpy(crcCheckBuf.get() + 2 * sizeof(uint32_t) +
sizeof(kFilePoolMaigic), // NOLINT
chunkfilePath->c_str(), chunkfilePath->size());
uint32_t crcCalc = ::curve::common::CRC32(crcCheckBuf.get(), crcCheckSize);

if (crcvalue != crcCalc) {
Expand All @@ -205,8 +195,8 @@ int FilePoolHelper::DecodeMetaInfoFromMetaFile(
return 0;
}

FilePool::FilePool(std::shared_ptr<LocalFileSystem> fsptr):
currentmaxfilenum_(0) {
FilePool::FilePool(std::shared_ptr<LocalFileSystem> fsptr)
: currentmaxfilenum_(0) {
CHECK(fsptr != nullptr) << "fs ptr allocate failed!";
fsptr_ = fsptr;
tmpChunkvec_.clear();
Expand Down Expand Up @@ -240,12 +230,9 @@ bool FilePool::CheckValid() {
uint32_t metapagesize = 0;
std::string filePath;

int ret = FilePoolHelper::DecodeMetaInfoFromMetaFile(fsptr_,
poolOpt_.metaPath,
poolOpt_.metaFileSize,
&chunksize,
&metapagesize,
&filePath);
int ret = FilePoolHelper::DecodeMetaInfoFromMetaFile(
fsptr_, poolOpt_.metaPath, poolOpt_.metaFileSize, &chunksize,
&metapagesize, &filePath);
if (ret == -1) {
LOG(ERROR) << "Decode meta info from meta file failed!";
return false;
Expand Down Expand Up @@ -275,8 +262,8 @@ int FilePool::GetFile(const std::string& targetpath, char* metapage) {
tmpChunkvec_.pop_back();
--currentState_.preallocatedChunksLeft;
} else {
currentmaxfilenum_.fetch_add(1);
srcpath = currentdir_ + "/" + std::to_string(currentmaxfilenum_);
srcpath = currentdir_ + "/" +
std::to_string(currentmaxfilenum_.fetch_add(1));
int r = AllocateChunk(srcpath);
if (r < 0) {
LOG(ERROR) << "file allocate failed, " << srcpath.c_str();
Expand All @@ -286,8 +273,6 @@ int FilePool::GetFile(const std::string& targetpath, char* metapage) {
}

bool rc = WriteMetaPage(srcpath, metapage);
LOG(INFO) << "src path = " << srcpath.c_str()
<< ", dist path = " << targetpath.c_str();
if (rc) {
// Here, the RENAME_NOREPLACE mode is used to rename the file.
// When the target file exists, it is not allowed to be overwritten.
Expand All @@ -301,19 +286,19 @@ int FilePool::GetFile(const std::string& targetpath, char* metapage) {
// The target file already exists, exit the current logic directly,
// and delete the written file
if (ret == -EEXIST) {
LOG(ERROR) << targetpath << ", already exists! src path = "
<< srcpath;
LOG(ERROR) << targetpath
<< ", already exists! src path = " << srcpath;
break;
} else if (ret < 0) {
LOG(ERROR) << "file rename failed, " << srcpath.c_str();
} else {
LOG(INFO) << "get file success! now pool size = "
LOG(INFO) << "get file " << targetpath
<< " success! now pool size = "
<< tmpChunkvec_.size();
break;
}
} else {
LOG(ERROR) << "write metapage failed, "
<< srcpath.c_str();
LOG(ERROR) << "write metapage failed, " << srcpath.c_str();
}
retry++;
}
Expand Down Expand Up @@ -407,7 +392,7 @@ int FilePool::RecycleFile(const std::string& chunkpath) {
} else {
// Check whether the size of the file to be recovered meets the
// requirements, and delete it if it does not
uint64_t chunklen = poolOpt_.fileSize+poolOpt_.metaPageSize;
uint64_t chunklen = poolOpt_.fileSize + poolOpt_.metaPageSize;
int fd = fsptr_->Open(chunkpath.c_str(), O_RDWR);
if (fd < 0) {
LOG(ERROR) << "file open failed! delete file dirctly"
Expand All @@ -418,18 +403,17 @@ int FilePool::RecycleFile(const std::string& chunkpath) {
struct stat info;
int ret = fsptr_->Fstat(fd, &info);
if (ret != 0) {
LOG(ERROR) << "Fstat file " << chunkpath.c_str()
<< "failed, ret = " << ret
<< ", delete file dirctly";
LOG(ERROR) << "Fstat file " << chunkpath.c_str()
<< "failed, ret = " << ret << ", delete file dirctly";
fsptr_->Close(fd);
return fsptr_->Delete(chunkpath.c_str());
}

if (info.st_size != chunklen) {
LOG(ERROR) << "file size illegal, " << chunkpath.c_str()
<< ", delete file dirctly"
<< ", standard size = " << chunklen
<< ", current file size = " << info.st_size;
<< ", delete file dirctly"
<< ", standard size = " << chunklen
<< ", current file size = " << info.st_size;
fsptr_->Close(fd);
return fsptr_->Delete(chunkpath.c_str());
}
Expand Down Expand Up @@ -462,7 +446,7 @@ int FilePool::RecycleFile(const std::string& chunkpath) {
}

void FilePool::UnInitialize() {
currentdir_ = "";
currentdir_ = "";

std::unique_lock<std::mutex> lk(mtx_);
tmpChunkvec_.clear();
Expand All @@ -471,19 +455,18 @@ void FilePool::UnInitialize() {
bool FilePool::ScanInternal() {
uint64_t maxnum = 0;
std::vector<std::string> tmpvec;
LOG(INFO) << "scan dir" << currentdir_;
int ret = fsptr_->List(currentdir_.c_str(), &tmpvec);
if (ret < 0) {
LOG(ERROR) << "list file pool dir failed!";
return false;
} else {
LOG(INFO) << "list file pool dir done, size = "
<< tmpvec.size();
LOG(INFO) << "list file pool dir done, size = " << tmpvec.size();
}

uint64_t chunklen = poolOpt_.fileSize + poolOpt_.metaPageSize;
for (auto& iter : tmpvec) {
auto it =
std::find_if(iter.begin(), iter.end(), [](unsigned char c) {
auto it = std::find_if(iter.begin(), iter.end(), [](unsigned char c) {
return !std::isdigit(c);
});
if (it != iter.end()) {
Expand Down Expand Up @@ -540,5 +523,5 @@ FilePoolState_t FilePool::GetState() {
return currentState_;
}

} // namespace chunkserver
} // namespace curve
} // namespace chunkserver
} // namespace curve
1 change: 0 additions & 1 deletion test/chunkserver/datastore/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ cc_test(
copts = ["-std=c++11"],
deps = [
"@com_google_googletest//:gtest",
"//external:braft",
"//external:gflags",
"//include:include-common",
"//include/chunkserver:include-chunkserver",
Expand Down
Loading

0 comments on commit 0ee8a75

Please sign in to comment.