diff --git a/nebd/etc/nebd/nebd-server.conf b/nebd/etc/nebd/nebd-server.conf index a3e7d4b40d..708b833a49 100644 --- a/nebd/etc/nebd/nebd-server.conf +++ b/nebd/etc/nebd/nebd-server.conf @@ -12,3 +12,6 @@ heartbeat.timeout.sec=30 #文件超时检测时间间隔 heartbeat.check.interval.ms=3000 + +# return rpc when io error +response.returnRpcWhenIoError=false diff --git a/nebd/src/part2/define.h b/nebd/src/part2/define.h index 3a842635ee..49ec2afe15 100644 --- a/nebd/src/part2/define.h +++ b/nebd/src/part2/define.h @@ -94,6 +94,8 @@ struct NebdServerAioContext { Closure *done = nullptr; // rpc请求的controller RpcController* cntl = nullptr; + // return rpc when io error + bool returnRpcWhenIoError = false; }; struct NebdFileInfo { @@ -119,6 +121,7 @@ const char METAFILEPATH[] = "meta.file.path"; const char HEARTBEATTIMEOUTSEC[] = "heartbeat.timeout.sec"; const char HEARTBEATCHECKINTERVALMS[] = "heartbeat.check.interval.ms"; const char CURVECLIENTCONFPATH[] = "curveclient.confPath"; +const char RESPONSERETURNRPCWHENIOERROR[] = "response.returnRpcWhenIoError"; } // namespace server } // namespace nebd diff --git a/nebd/src/part2/file_service.cpp b/nebd/src/part2/file_service.cpp index 999343d613..860b49ed9f 100644 --- a/nebd/src/part2/file_service.cpp +++ b/nebd/src/part2/file_service.cpp @@ -32,67 +32,49 @@ namespace server { using nebd::client::RetCode; -void NebdFileServiceCallback(NebdServerAioContext* context) { - CHECK(context != nullptr); - std::unique_ptr contextGuard(context); - std::unique_ptr iobufGuard( - reinterpret_cast(context->buf)); - brpc::ClosureGuard doneGuard(context->done); +/** + * use curl -L clientIp:nebd-serverPort/flags/dropRpc?setvalue=true + * to modify the parameter dynamic + */ +static bool pass_bool(const char*, bool) { return true; } +DEFINE_bool(dropRpc, false, "drop the request rpc"); +DEFINE_validator(dropRpc, &pass_bool); + +void SetResponse(NebdServerAioContext* context, RetCode retCode) { switch (context->op) { case LIBAIO_OP::LIBAIO_OP_READ: { nebd::client::ReadResponse* response = dynamic_cast(context->response); - if (context->ret < 0) { - response->set_retcode(RetCode::kNoOK); - LOG(ERROR) << "Read file failed. " - << "return code: " << context->ret; - } else { + response->set_retcode(retCode); + if (context->ret >= 0) { brpc::Controller* cntl = dynamic_cast(context->cntl); cntl->response_attachment() = *reinterpret_cast(context->buf); - response->set_retcode(RetCode::kOK); } + break; } case LIBAIO_OP::LIBAIO_OP_WRITE: { nebd::client::WriteResponse* response = dynamic_cast(context->response); - if (context->ret < 0) { - response->set_retcode(RetCode::kNoOK); - LOG(ERROR) << "Write file failed. " - << "return code: " << context->ret; - } else { - response->set_retcode(RetCode::kOK); - } + response->set_retcode(retCode); break; } case LIBAIO_OP::LIBAIO_OP_FLUSH: { nebd::client::FlushResponse* response = dynamic_cast(context->response); - if (context->ret < 0) { - response->set_retcode(RetCode::kNoOK); - LOG(ERROR) << "Flush file failed. " - << "return code: " << context->ret; - } else { - response->set_retcode(RetCode::kOK); - } + response->set_retcode(retCode); break; } case LIBAIO_OP::LIBAIO_OP_DISCARD: { nebd::client::DiscardResponse* response = dynamic_cast(context->response); - if (context->ret < 0) { - response->set_retcode(RetCode::kNoOK); - LOG(ERROR) << "Discard file failed. " - << "return code: " << context->ret; - } else { - response->set_retcode(RetCode::kOK); - } + response->set_retcode(retCode); break; } default: @@ -100,6 +82,36 @@ void NebdFileServiceCallback(NebdServerAioContext* context) { } } +void NebdFileServiceCallback(NebdServerAioContext* context) { + CHECK(context != nullptr); + std::unique_ptr contextGuard(context); + std::unique_ptr iobufGuard( + reinterpret_cast(context->buf)); + brpc::ClosureGuard doneGuard(context->done); + // for test + if (FLAGS_dropRpc) { + doneGuard.release(); + delete context->done; + LOG(ERROR) << Op2Str(context->op) + << " file failed and drop the request rpc."; + return; + } + + if (context->ret < 0 && !context->returnRpcWhenIoError) { + LOG(ERROR) << *context; + // drop the rpc to ensure not return ioerror + doneGuard.release(); + delete context->done; + LOG(ERROR) << Op2Str(context->op) + << " file failed and drop the request rpc."; + } else if (context->ret < 0) { + LOG(ERROR) << *context; + SetResponse(context, RetCode::kNoOK); + } else { + SetResponse(context, RetCode::kOK); + } +} + void NebdFileServiceImpl::OpenFile( google::protobuf::RpcController* cntl_base, const nebd::client::OpenFileRequest* request, @@ -136,6 +148,7 @@ void NebdFileServiceImpl::Write( aioContext->size = request->size(); aioContext->op = LIBAIO_OP::LIBAIO_OP_WRITE; aioContext->cb = NebdFileServiceCallback; + aioContext->returnRpcWhenIoError = returnRpcWhenIoError_; brpc::Controller* cntl = dynamic_cast(cntl_base); @@ -183,6 +196,7 @@ void NebdFileServiceImpl::Read( aioContext->size = request->size(); aioContext->op = LIBAIO_OP::LIBAIO_OP_READ; aioContext->cb = NebdFileServiceCallback; + aioContext->returnRpcWhenIoError = returnRpcWhenIoError_; std::unique_ptr buf(new butil::IOBuf()); aioContext->buf = buf.get(); @@ -218,6 +232,7 @@ void NebdFileServiceImpl::Flush( aioContext->response = response; aioContext->done = done; aioContext->cntl = cntl_base; + aioContext->returnRpcWhenIoError = returnRpcWhenIoError_; int rc = fileManager_->Flush(request->fd(), aioContext); if (rc < 0) { LOG(ERROR) << "Flush file failed. " @@ -245,6 +260,7 @@ void NebdFileServiceImpl::Discard( aioContext->response = response; aioContext->done = done; aioContext->cntl = cntl_base; + aioContext->returnRpcWhenIoError = returnRpcWhenIoError_; int rc = fileManager_->Discard(request->fd(), aioContext); if (rc < 0) { LOG(ERROR) << "Flush file failed. " diff --git a/nebd/src/part2/file_service.h b/nebd/src/part2/file_service.h index 794ffe8f43..9f868058fe 100644 --- a/nebd/src/part2/file_service.h +++ b/nebd/src/part2/file_service.h @@ -39,8 +39,10 @@ void NebdFileServiceCallback(NebdServerAioContext* context); class NebdFileServiceImpl : public nebd::client::NebdFileService { public: - explicit NebdFileServiceImpl(std::shared_ptr fileManager) - : fileManager_(fileManager) {} + explicit NebdFileServiceImpl(std::shared_ptr fileManager, + const bool returnRpcWhenIoError) + : fileManager_(fileManager), + returnRpcWhenIoError_(returnRpcWhenIoError) {} virtual ~NebdFileServiceImpl() {} @@ -91,6 +93,7 @@ class NebdFileServiceImpl : public nebd::client::NebdFileService { private: std::shared_ptr fileManager_; + const bool returnRpcWhenIoError_; }; } // namespace server diff --git a/nebd/src/part2/nebd_server.cpp b/nebd/src/part2/nebd_server.cpp index e2a74aab29..27d65ceb4a 100644 --- a/nebd/src/part2/nebd_server.cpp +++ b/nebd/src/part2/nebd_server.cpp @@ -210,7 +210,15 @@ bool NebdServer::InitHeartbeatManager() { bool NebdServer::StartServer() { // add service - NebdFileServiceImpl fileService(fileManager_); + bool returnRpcWhenIoError; + bool ret = conf_.GetBoolValue(RESPONSERETURNRPCWHENIOERROR, + &returnRpcWhenIoError); + if (false == ret) { + LOG(ERROR) << "get " << RESPONSERETURNRPCWHENIOERROR << " fail"; + return false; + } + + NebdFileServiceImpl fileService(fileManager_, returnRpcWhenIoError); int addFileServiceRes = server_.AddService( &fileService, brpc::SERVER_DOESNT_OWN_SERVICE); if (0 != addFileServiceRes) { diff --git a/nebd/src/part2/util.cpp b/nebd/src/part2/util.cpp index bf0b084937..0195313637 100644 --- a/nebd/src/part2/util.cpp +++ b/nebd/src/part2/util.cpp @@ -73,6 +73,7 @@ std::ostream& operator<<(std::ostream& os, const NebdServerAioContext& c) { << ", offset: " << c.offset << ", size: " << c.size << ", ret: " << c.ret + << ", returnRpcWhenIoError: " << c.returnRpcWhenIoError << "]"; return os; } diff --git a/nebd/src/part2/util.h b/nebd/src/part2/util.h index 1f63c200f5..f733a04577 100644 --- a/nebd/src/part2/util.h +++ b/nebd/src/part2/util.h @@ -38,6 +38,8 @@ std::string NebdFileType2Str(NebdFileType type); std::string NebdFileStatus2Str(NebdFileStatus status); +std::string Op2Str(LIBAIO_OP op); + std::ostream& operator<<(std::ostream& os, const NebdServerAioContext& c); std::ostream& operator<<(std::ostream& os, const NebdFileMeta& meta); diff --git a/nebd/test/part2/file_service_unittest.cpp b/nebd/test/part2/file_service_unittest.cpp index a720a50b02..c45192a9a7 100644 --- a/nebd/test/part2/file_service_unittest.cpp +++ b/nebd/test/part2/file_service_unittest.cpp @@ -72,7 +72,8 @@ class FileServiceTest : public ::testing::Test { public: void SetUp() { fileManager_ = std::make_shared(); - fileService_ = std::make_shared(fileManager_); + fileService_ = std::make_shared(fileManager_, + false); } void TearDown() {} protected: @@ -354,22 +355,41 @@ TEST_F(FileServiceTest, CallbackTest) { ASSERT_TRUE(done.IsRunned()); ASSERT_EQ(response.retcode(), RetCode::kOK); } - // read failed + // read failed return rpc { brpc::Controller cntl; nebd::client::ReadResponse response; - FileServiceTestClosure done; + FileServiceTestClosure* done = new FileServiceTestClosure(); NebdServerAioContext* context = new NebdServerAioContext; context->op = LIBAIO_OP::LIBAIO_OP_READ; context->cntl = &cntl; context->response = &response; context->offset = 0; context->size = 4096; - context->done = &done; + context->done = done; context->buf = new butil::IOBuf(); context->ret = -1; + context->returnRpcWhenIoError = true; + NebdFileServiceCallback(context); + ASSERT_TRUE(done->IsRunned()); + ASSERT_EQ(response.retcode(), RetCode::kNoOK); + } + // read failed don't return rpc + { + brpc::Controller cntl; + nebd::client::ReadResponse response; + FileServiceTestClosure* done = new FileServiceTestClosure(); + NebdServerAioContext* context = new NebdServerAioContext; + context->op = LIBAIO_OP::LIBAIO_OP_READ; + context->cntl = &cntl; + context->response = &response; + context->offset = 0; + context->size = 4096; + context->done = done; + context->buf = new butil::IOBuf(); + context->ret = -1; + context->returnRpcWhenIoError = false; NebdFileServiceCallback(context); - ASSERT_TRUE(done.IsRunned()); ASSERT_EQ(response.retcode(), RetCode::kNoOK); } // write success @@ -390,22 +410,41 @@ TEST_F(FileServiceTest, CallbackTest) { ASSERT_TRUE(done.IsRunned()); ASSERT_EQ(response.retcode(), RetCode::kOK); } - // write failed + // write failed return rpc { brpc::Controller cntl; nebd::client::WriteResponse response; - FileServiceTestClosure done; + FileServiceTestClosure* done = new FileServiceTestClosure(); NebdServerAioContext* context = new NebdServerAioContext; context->op = LIBAIO_OP::LIBAIO_OP_WRITE; context->cntl = &cntl; context->response = &response; context->offset = 0; context->size = 4096; - context->done = &done; + context->done = done; context->buf = new butil::IOBuf(); context->ret = -1; + context->returnRpcWhenIoError = true; + NebdFileServiceCallback(context); + ASSERT_TRUE(done->IsRunned()); + ASSERT_EQ(response.retcode(), RetCode::kNoOK); + } + // write failed don't return rpc + { + brpc::Controller cntl; + nebd::client::WriteResponse response; + FileServiceTestClosure* done = new FileServiceTestClosure(); + NebdServerAioContext* context = new NebdServerAioContext; + context->op = LIBAIO_OP::LIBAIO_OP_WRITE; + context->cntl = &cntl; + context->response = &response; + context->offset = 0; + context->size = 4096; + context->done = done; + context->buf = new butil::IOBuf(); + context->ret = -1; + context->returnRpcWhenIoError = false; NebdFileServiceCallback(context); - ASSERT_TRUE(done.IsRunned()); ASSERT_EQ(response.retcode(), RetCode::kNoOK); } // flush success @@ -423,19 +462,35 @@ TEST_F(FileServiceTest, CallbackTest) { ASSERT_TRUE(done.IsRunned()); ASSERT_EQ(response.retcode(), RetCode::kOK); } - // flush failed + // flush failed return rpc { brpc::Controller cntl; nebd::client::FlushResponse response; - FileServiceTestClosure done; + FileServiceTestClosure* done = new FileServiceTestClosure(); NebdServerAioContext* context = new NebdServerAioContext; context->op = LIBAIO_OP::LIBAIO_OP_FLUSH; context->cntl = &cntl; context->response = &response; - context->done = &done; + context->done = done; context->ret = -1; + context->returnRpcWhenIoError = true; + NebdFileServiceCallback(context); + ASSERT_TRUE(done->IsRunned()); + ASSERT_EQ(response.retcode(), RetCode::kNoOK); + } + // flush failed don't return rpc + { + brpc::Controller cntl; + nebd::client::FlushResponse response; + FileServiceTestClosure* done = new FileServiceTestClosure(); + NebdServerAioContext* context = new NebdServerAioContext; + context->op = LIBAIO_OP::LIBAIO_OP_FLUSH; + context->cntl = &cntl; + context->response = &response; + context->done = done; + context->ret = -1; + context->returnRpcWhenIoError = false; NebdFileServiceCallback(context); - ASSERT_TRUE(done.IsRunned()); ASSERT_EQ(response.retcode(), RetCode::kNoOK); } // discard success @@ -453,19 +508,35 @@ TEST_F(FileServiceTest, CallbackTest) { ASSERT_TRUE(done.IsRunned()); ASSERT_EQ(response.retcode(), RetCode::kOK); } - // discard failed + // discard failed return rpc { brpc::Controller cntl; nebd::client::DiscardResponse response; - FileServiceTestClosure done; + FileServiceTestClosure* done = new FileServiceTestClosure(); NebdServerAioContext* context = new NebdServerAioContext; context->op = LIBAIO_OP::LIBAIO_OP_DISCARD; context->cntl = &cntl; context->response = &response; - context->done = &done; + context->done = done; context->ret = -1; + context->returnRpcWhenIoError = true; + NebdFileServiceCallback(context); + ASSERT_TRUE(done->IsRunned()); + ASSERT_EQ(response.retcode(), RetCode::kNoOK); + } + // discard failed don't return rpc + { + brpc::Controller cntl; + nebd::client::DiscardResponse response; + FileServiceTestClosure* done = new FileServiceTestClosure(); + NebdServerAioContext* context = new NebdServerAioContext; + context->op = LIBAIO_OP::LIBAIO_OP_DISCARD; + context->cntl = &cntl; + context->response = &response; + context->done = done; + context->ret = -1; + context->returnRpcWhenIoError = false; NebdFileServiceCallback(context); - ASSERT_TRUE(done.IsRunned()); ASSERT_EQ(response.retcode(), RetCode::kNoOK); } }