Skip to content

Commit

Permalink
nebd-server: drop failed reqeusts' rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
SeanHai committed Jan 20, 2021
1 parent c36170d commit 1948e6f
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 40 deletions.
3 changes: 3 additions & 0 deletions nebd/etc/nebd/nebd-server.conf
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,6 @@ heartbeat.timeout.sec=30

#文件超时检测时间间隔
heartbeat.check.interval.ms=3000

# return rpc when io error
response.returnRpcWhenIoError=false
3 changes: 3 additions & 0 deletions nebd/src/part2/define.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ struct NebdServerAioContext {
Closure *done = nullptr;
// rpc请求的controller
RpcController* cntl = nullptr;
// return rpc when io error
bool returnRpcWhenIoError = false;
};

struct NebdFileInfo {
Expand All @@ -119,6 +121,7 @@ const char METAFILEPATH[] = "meta.file.path";
const char HEARTBEATTIMEOUTSEC[] = "heartbeat.timeout.sec";
const char HEARTBEATCHECKINTERVALMS[] = "heartbeat.check.interval.ms";
const char CURVECLIENTCONFPATH[] = "curveclient.confPath";
const char RESPONSERETURNRPCWHENIOERROR[] = "response.returnRpcWhenIoError";

} // namespace server
} // namespace nebd
Expand Down
69 changes: 49 additions & 20 deletions nebd/src/part2/file_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,24 @@ void NebdFileServiceCallback(NebdServerAioContext* context) {
std::unique_ptr<butil::IOBuf> iobufGuard(
reinterpret_cast<butil::IOBuf*>(context->buf));
brpc::ClosureGuard doneGuard(context->done);

switch (context->op) {
case LIBAIO_OP::LIBAIO_OP_READ:
{
nebd::client::ReadResponse* response =
dynamic_cast<nebd::client::ReadResponse*>(context->response);
nebd::client::ReadResponse* response = dynamic_cast
<nebd::client::ReadResponse*>(context->response);
// io error
if (context->ret < 0) {
response->set_retcode(RetCode::kNoOK);
LOG(ERROR) << "Read file failed. "
<< "return code: " << context->ret;
LOG(ERROR) << "Read file failed: " << *context;
// don't return rpc
if (!context->returnRpcWhenIoError) {
// drop the rpc to ensure not return ioerror
doneGuard.release();
delete context->done;
LOG(ERROR) << "Read file failed and drop the request rpc.";
} else {
response->set_retcode(RetCode::kNoOK);
}
} else {
brpc::Controller* cntl =
dynamic_cast<brpc::Controller *>(context->cntl);
Expand All @@ -58,38 +67,54 @@ void NebdFileServiceCallback(NebdServerAioContext* context) {
}
case LIBAIO_OP::LIBAIO_OP_WRITE:
{
nebd::client::WriteResponse* response =
dynamic_cast<nebd::client::WriteResponse*>(context->response);
nebd::client::WriteResponse* response = dynamic_cast
<nebd::client::WriteResponse*>(context->response);
if (context->ret < 0) {
response->set_retcode(RetCode::kNoOK);
LOG(ERROR) << "Write file failed. "
<< "return code: " << context->ret;
LOG(ERROR) << "Write file failed: " << *context;
if (!context->returnRpcWhenIoError) {
doneGuard.release();
delete context->done;
LOG(ERROR) << "Write file failed and drop the request rpc.";
} else {
response->set_retcode(RetCode::kNoOK);
}
} else {
response->set_retcode(RetCode::kOK);
}
break;
}
case LIBAIO_OP::LIBAIO_OP_FLUSH:
{
nebd::client::FlushResponse* response =
dynamic_cast<nebd::client::FlushResponse*>(context->response);
nebd::client::FlushResponse* response = dynamic_cast
<nebd::client::FlushResponse*>(context->response);
if (context->ret < 0) {
response->set_retcode(RetCode::kNoOK);
LOG(ERROR) << "Flush file failed. "
<< "return code: " << context->ret;
LOG(ERROR) << "Flush file failed: " << *context;
if (!context->returnRpcWhenIoError) {
doneGuard.release();
delete context->done;
LOG(ERROR) << "Flush file failed and drop the request rpc.";
} else {
response->set_retcode(RetCode::kNoOK);
}
} else {
response->set_retcode(RetCode::kOK);
}
break;
}
case LIBAIO_OP::LIBAIO_OP_DISCARD:
{
nebd::client::DiscardResponse* response =
dynamic_cast<nebd::client::DiscardResponse*>(context->response);
nebd::client::DiscardResponse* response = dynamic_cast
<nebd::client::DiscardResponse*>(context->response);
if (context->ret < 0) {
response->set_retcode(RetCode::kNoOK);
LOG(ERROR) << "Discard file failed. "
<< "return code: " << context->ret;
LOG(ERROR) << "Discard file failed: " << *context;
if (!context->returnRpcWhenIoError) {
doneGuard.release();
delete context->done;
LOG(ERROR) << "Discard file failed and drop the request"
<< " rpc.";
} else {
response->set_retcode(RetCode::kNoOK);
}
} else {
response->set_retcode(RetCode::kOK);
}
Expand Down Expand Up @@ -136,6 +161,7 @@ void NebdFileServiceImpl::Write(
aioContext->size = request->size();
aioContext->op = LIBAIO_OP::LIBAIO_OP_WRITE;
aioContext->cb = NebdFileServiceCallback;
aioContext->returnRpcWhenIoError = returnRpcWhenIoError_;

brpc::Controller* cntl = dynamic_cast<brpc::Controller *>(cntl_base);

Expand Down Expand Up @@ -183,6 +209,7 @@ void NebdFileServiceImpl::Read(
aioContext->size = request->size();
aioContext->op = LIBAIO_OP::LIBAIO_OP_READ;
aioContext->cb = NebdFileServiceCallback;
aioContext->returnRpcWhenIoError = returnRpcWhenIoError_;

std::unique_ptr<butil::IOBuf> buf(new butil::IOBuf());
aioContext->buf = buf.get();
Expand Down Expand Up @@ -218,6 +245,7 @@ void NebdFileServiceImpl::Flush(
aioContext->response = response;
aioContext->done = done;
aioContext->cntl = cntl_base;
aioContext->returnRpcWhenIoError = returnRpcWhenIoError_;
int rc = fileManager_->Flush(request->fd(), aioContext);
if (rc < 0) {
LOG(ERROR) << "Flush file failed. "
Expand Down Expand Up @@ -245,6 +273,7 @@ void NebdFileServiceImpl::Discard(
aioContext->response = response;
aioContext->done = done;
aioContext->cntl = cntl_base;
aioContext->returnRpcWhenIoError = returnRpcWhenIoError_;
int rc = fileManager_->Discard(request->fd(), aioContext);
if (rc < 0) {
LOG(ERROR) << "Flush file failed. "
Expand Down
7 changes: 5 additions & 2 deletions nebd/src/part2/file_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@ void NebdFileServiceCallback(NebdServerAioContext* context);

class NebdFileServiceImpl : public nebd::client::NebdFileService {
public:
explicit NebdFileServiceImpl(std::shared_ptr<NebdFileManager> fileManager)
: fileManager_(fileManager) {}
explicit NebdFileServiceImpl(std::shared_ptr<NebdFileManager> fileManager,
const bool returnRpcWhenIoError)
: fileManager_(fileManager),
returnRpcWhenIoError_(returnRpcWhenIoError) {}

virtual ~NebdFileServiceImpl() {}

Expand Down Expand Up @@ -91,6 +93,7 @@ class NebdFileServiceImpl : public nebd::client::NebdFileService {

private:
std::shared_ptr<NebdFileManager> fileManager_;
const bool returnRpcWhenIoError_;
};

} // namespace server
Expand Down
10 changes: 9 additions & 1 deletion nebd/src/part2/nebd_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,15 @@ bool NebdServer::InitHeartbeatManager() {

bool NebdServer::StartServer() {
// add service
NebdFileServiceImpl fileService(fileManager_);
bool returnRpcWhenIoError;
bool ret = conf_.GetBoolValue(RESPONSERETURNRPCWHENIOERROR,
&returnRpcWhenIoError);
if (false == ret) {
LOG(ERROR) << "get " << RESPONSERETURNRPCWHENIOERROR << " fail";
return false;
}

NebdFileServiceImpl fileService(fileManager_, returnRpcWhenIoError);
int addFileServiceRes = server_.AddService(
&fileService, brpc::SERVER_DOESNT_OWN_SERVICE);
if (0 != addFileServiceRes) {
Expand Down
1 change: 1 addition & 0 deletions nebd/src/part2/util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ std::ostream& operator<<(std::ostream& os, const NebdServerAioContext& c) {
<< ", offset: " << c.offset
<< ", size: " << c.size
<< ", ret: " << c.ret
<< ", returnRpcWhenIoError: " << c.returnRpcWhenIoError
<< "]";
return os;
}
Expand Down
105 changes: 88 additions & 17 deletions nebd/test/part2/file_service_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ class FileServiceTest : public ::testing::Test {
public:
void SetUp() {
fileManager_ = std::make_shared<MockFileManager>();
fileService_ = std::make_shared<NebdFileServiceImpl>(fileManager_);
fileService_ = std::make_shared<NebdFileServiceImpl>(fileManager_,
false);
}
void TearDown() {}
protected:
Expand Down Expand Up @@ -354,22 +355,41 @@ TEST_F(FileServiceTest, CallbackTest) {
ASSERT_TRUE(done.IsRunned());
ASSERT_EQ(response.retcode(), RetCode::kOK);
}
// read failed
// read failed return rpc
{
brpc::Controller cntl;
nebd::client::ReadResponse response;
FileServiceTestClosure done;
FileServiceTestClosure* done = new FileServiceTestClosure();
NebdServerAioContext* context = new NebdServerAioContext;
context->op = LIBAIO_OP::LIBAIO_OP_READ;
context->cntl = &cntl;
context->response = &response;
context->offset = 0;
context->size = 4096;
context->done = &done;
context->done = done;
context->buf = new butil::IOBuf();
context->ret = -1;
context->returnRpcWhenIoError = true;
NebdFileServiceCallback(context);
ASSERT_TRUE(done->IsRunned());
ASSERT_EQ(response.retcode(), RetCode::kNoOK);
}
// read failed don't return rpc
{
brpc::Controller cntl;
nebd::client::ReadResponse response;
FileServiceTestClosure* done = new FileServiceTestClosure();
NebdServerAioContext* context = new NebdServerAioContext;
context->op = LIBAIO_OP::LIBAIO_OP_READ;
context->cntl = &cntl;
context->response = &response;
context->offset = 0;
context->size = 4096;
context->done = done;
context->buf = new butil::IOBuf();
context->ret = -1;
context->returnRpcWhenIoError = false;
NebdFileServiceCallback(context);
ASSERT_TRUE(done.IsRunned());
ASSERT_EQ(response.retcode(), RetCode::kNoOK);
}
// write success
Expand All @@ -390,22 +410,41 @@ TEST_F(FileServiceTest, CallbackTest) {
ASSERT_TRUE(done.IsRunned());
ASSERT_EQ(response.retcode(), RetCode::kOK);
}
// write failed
// write failed return rpc
{
brpc::Controller cntl;
nebd::client::WriteResponse response;
FileServiceTestClosure done;
FileServiceTestClosure* done = new FileServiceTestClosure();
NebdServerAioContext* context = new NebdServerAioContext;
context->op = LIBAIO_OP::LIBAIO_OP_WRITE;
context->cntl = &cntl;
context->response = &response;
context->offset = 0;
context->size = 4096;
context->done = &done;
context->done = done;
context->buf = new butil::IOBuf();
context->ret = -1;
context->returnRpcWhenIoError = true;
NebdFileServiceCallback(context);
ASSERT_TRUE(done->IsRunned());
ASSERT_EQ(response.retcode(), RetCode::kNoOK);
}
// write failed don't return rpc
{
brpc::Controller cntl;
nebd::client::WriteResponse response;
FileServiceTestClosure* done = new FileServiceTestClosure();
NebdServerAioContext* context = new NebdServerAioContext;
context->op = LIBAIO_OP::LIBAIO_OP_WRITE;
context->cntl = &cntl;
context->response = &response;
context->offset = 0;
context->size = 4096;
context->done = done;
context->buf = new butil::IOBuf();
context->ret = -1;
context->returnRpcWhenIoError = false;
NebdFileServiceCallback(context);
ASSERT_TRUE(done.IsRunned());
ASSERT_EQ(response.retcode(), RetCode::kNoOK);
}
// flush success
Expand All @@ -423,19 +462,35 @@ TEST_F(FileServiceTest, CallbackTest) {
ASSERT_TRUE(done.IsRunned());
ASSERT_EQ(response.retcode(), RetCode::kOK);
}
// flush failed
// flush failed return rpc
{
brpc::Controller cntl;
nebd::client::FlushResponse response;
FileServiceTestClosure done;
FileServiceTestClosure* done = new FileServiceTestClosure();
NebdServerAioContext* context = new NebdServerAioContext;
context->op = LIBAIO_OP::LIBAIO_OP_FLUSH;
context->cntl = &cntl;
context->response = &response;
context->done = &done;
context->done = done;
context->ret = -1;
context->returnRpcWhenIoError = true;
NebdFileServiceCallback(context);
ASSERT_TRUE(done->IsRunned());
ASSERT_EQ(response.retcode(), RetCode::kNoOK);
}
// flush failed don't return rpc
{
brpc::Controller cntl;
nebd::client::FlushResponse response;
FileServiceTestClosure* done = new FileServiceTestClosure();
NebdServerAioContext* context = new NebdServerAioContext;
context->op = LIBAIO_OP::LIBAIO_OP_FLUSH;
context->cntl = &cntl;
context->response = &response;
context->done = done;
context->ret = -1;
context->returnRpcWhenIoError = false;
NebdFileServiceCallback(context);
ASSERT_TRUE(done.IsRunned());
ASSERT_EQ(response.retcode(), RetCode::kNoOK);
}
// discard success
Expand All @@ -453,19 +508,35 @@ TEST_F(FileServiceTest, CallbackTest) {
ASSERT_TRUE(done.IsRunned());
ASSERT_EQ(response.retcode(), RetCode::kOK);
}
// discard failed
// discard failed return rpc
{
brpc::Controller cntl;
nebd::client::DiscardResponse response;
FileServiceTestClosure done;
FileServiceTestClosure* done = new FileServiceTestClosure();
NebdServerAioContext* context = new NebdServerAioContext;
context->op = LIBAIO_OP::LIBAIO_OP_DISCARD;
context->cntl = &cntl;
context->response = &response;
context->done = &done;
context->done = done;
context->ret = -1;
context->returnRpcWhenIoError = true;
NebdFileServiceCallback(context);
ASSERT_TRUE(done->IsRunned());
ASSERT_EQ(response.retcode(), RetCode::kNoOK);
}
// discard failed don't return rpc
{
brpc::Controller cntl;
nebd::client::DiscardResponse response;
FileServiceTestClosure* done = new FileServiceTestClosure();
NebdServerAioContext* context = new NebdServerAioContext;
context->op = LIBAIO_OP::LIBAIO_OP_DISCARD;
context->cntl = &cntl;
context->response = &response;
context->done = done;
context->ret = -1;
context->returnRpcWhenIoError = false;
NebdFileServiceCallback(context);
ASSERT_TRUE(done.IsRunned());
ASSERT_EQ(response.retcode(), RetCode::kNoOK);
}
}
Expand Down

0 comments on commit 1948e6f

Please sign in to comment.