Skip to content

Commit

Permalink
not return io error
Browse files Browse the repository at this point in the history
  • Loading branch information
SeanHai committed Jan 8, 2021
1 parent 53885cf commit c0d147e
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 62 deletions.
3 changes: 3 additions & 0 deletions nebd/etc/nebd/nebd-server.conf
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,6 @@ heartbeat.timeout.sec=30

#文件超时检测时间间隔
heartbeat.check.interval.ms=3000

# return rpc when io error
response.returnRpcWhenIoError=false
1 change: 1 addition & 0 deletions nebd/src/part2/define.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ const char METAFILEPATH[] = "meta.file.path";
const char HEARTBEATTIMEOUTSEC[] = "heartbeat.timeout.sec";
const char HEARTBEATCHECKINTERVALMS[] = "heartbeat.check.interval.ms";
const char CURVECLIENTCONFPATH[] = "curveclient.confPath";
const char RESPONSERETURNRPCWHENIOERROR[] = "response.returnRpcWhenIoError";

} // namespace server
} // namespace nebd
Expand Down
83 changes: 37 additions & 46 deletions nebd/src/part2/file_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,71 +32,62 @@ namespace server {

using nebd::client::RetCode;

bool returnRpcWhenIoError;

void NebdFileServiceCallback(NebdServerAioContext* context) {
CHECK(context != nullptr);
std::unique_ptr<NebdServerAioContext> contextGuard(context);
std::unique_ptr<butil::IOBuf> iobufGuard(
reinterpret_cast<butil::IOBuf*>(context->buf));
brpc::ClosureGuard doneGuard(context->done);
switch (context->op) {
case LIBAIO_OP::LIBAIO_OP_READ:
{
nebd::client::ReadResponse* response =
dynamic_cast<nebd::client::ReadResponse*>(context->response);
if (context->ret < 0) {
response->set_retcode(RetCode::kNoOK);
LOG(ERROR) << "Read file failed. "
<< "return code: " << context->ret;
} else {
// io error
if (context->ret < 0) {
LOG(ERROR) << Op2Str(context->op) << " file failed. "
<< "return code: " << context->ret;
// don't return rpc
if (!returnRpcWhenIoError) {
// drop the rpc to ensure not return ioerror
doneGuard.release();
delete context->done;
return;
}
} else {
switch (context->op) {
case LIBAIO_OP::LIBAIO_OP_READ:
{
nebd::client::ReadResponse* response = dynamic_cast
<nebd::client::ReadResponse*>(context->response);
brpc::Controller* cntl =
dynamic_cast<brpc::Controller *>(context->cntl);
cntl->response_attachment() =
*reinterpret_cast<butil::IOBuf*>(context->buf);
response->set_retcode(RetCode::kOK);
break;
}
break;
}
case LIBAIO_OP::LIBAIO_OP_WRITE:
{
nebd::client::WriteResponse* response =
dynamic_cast<nebd::client::WriteResponse*>(context->response);
if (context->ret < 0) {
response->set_retcode(RetCode::kNoOK);
LOG(ERROR) << "Write file failed. "
<< "return code: " << context->ret;
} else {
case LIBAIO_OP::LIBAIO_OP_WRITE:
{
nebd::client::WriteResponse* response = dynamic_cast
<nebd::client::WriteResponse*>(context->response);
response->set_retcode(RetCode::kOK);
break;
}
break;
}
case LIBAIO_OP::LIBAIO_OP_FLUSH:
{
nebd::client::FlushResponse* response =
dynamic_cast<nebd::client::FlushResponse*>(context->response);
if (context->ret < 0) {
response->set_retcode(RetCode::kNoOK);
LOG(ERROR) << "Flush file failed. "
<< "return code: " << context->ret;
} else {
case LIBAIO_OP::LIBAIO_OP_FLUSH:
{
nebd::client::FlushResponse* response = dynamic_cast
<nebd::client::FlushResponse*>(context->response);
response->set_retcode(RetCode::kOK);
break;
}
break;
}
case LIBAIO_OP::LIBAIO_OP_DISCARD:
{
nebd::client::DiscardResponse* response =
dynamic_cast<nebd::client::DiscardResponse*>(context->response);
if (context->ret < 0) {
response->set_retcode(RetCode::kNoOK);
LOG(ERROR) << "Discard file failed. "
<< "return code: " << context->ret;
} else {
case LIBAIO_OP::LIBAIO_OP_DISCARD:
{
nebd::client::DiscardResponse* response = dynamic_cast
<nebd::client::DiscardResponse*>(context->response);
response->set_retcode(RetCode::kOK);
break;
}
break;
default:
break;
}
default:
break;
}
}

Expand Down
1 change: 1 addition & 0 deletions nebd/src/part2/file_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
namespace nebd {
namespace server {

extern bool returnRpcWhenIoError;
void NebdFileServiceCallback(NebdServerAioContext* context);

class NebdFileServiceImpl : public nebd::client::NebdFileService {
Expand Down
7 changes: 7 additions & 0 deletions nebd/src/part2/nebd_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ int NebdServer::Init(const std::string &confPath,
}
LOG(INFO) << "NebdServer load config from file ok";

bool ret = conf_.GetBoolValue(RESPONSERETURNRPCWHENIOERROR,
&nebd::server::returnRpcWhenIoError);
if (false == ret) {
LOG(ERROR) << "get " << RESPONSERETURNRPCWHENIOERROR << " fail";
return false;
}

bool initAddressOk = conf_.GetStringValue(LISTENADDRESS, &listenAddress_);
if (false == initAddressOk) {
LOG(ERROR) << "NebdServer init socket file address fail";
Expand Down
2 changes: 2 additions & 0 deletions nebd/src/part2/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ std::string NebdFileType2Str(NebdFileType type);

std::string NebdFileStatus2Str(NebdFileStatus status);

std::string Op2Str(LIBAIO_OP op);

std::ostream& operator<<(std::ostream& os, const NebdServerAioContext& c);
std::ostream& operator<<(std::ostream& os, const NebdFileMeta& meta);

Expand Down
102 changes: 86 additions & 16 deletions nebd/test/part2/file_service_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -354,22 +354,41 @@ TEST_F(FileServiceTest, CallbackTest) {
ASSERT_TRUE(done.IsRunned());
ASSERT_EQ(response.retcode(), RetCode::kOK);
}
// read failed
// read failed return rpc
{
brpc::Controller cntl;
nebd::client::ReadResponse response;
FileServiceTestClosure done;
FileServiceTestClosure* done = new FileServiceTestClosure();
NebdServerAioContext* context = new NebdServerAioContext;
context->op = LIBAIO_OP::LIBAIO_OP_READ;
context->cntl = &cntl;
context->response = &response;
context->offset = 0;
context->size = 4096;
context->done = &done;
context->done = done;
context->buf = new butil::IOBuf();
context->ret = -1;
returnRpcWhenIoError = true;
NebdFileServiceCallback(context);
ASSERT_TRUE(done->IsRunned());
ASSERT_EQ(response.retcode(), RetCode::kNoOK);
}
// read failed don't return rpc
{
brpc::Controller cntl;
nebd::client::ReadResponse response;
FileServiceTestClosure* done = new FileServiceTestClosure();
NebdServerAioContext* context = new NebdServerAioContext;
context->op = LIBAIO_OP::LIBAIO_OP_READ;
context->cntl = &cntl;
context->response = &response;
context->offset = 0;
context->size = 4096;
context->done = done;
context->buf = new butil::IOBuf();
context->ret = -1;
returnRpcWhenIoError = false;
NebdFileServiceCallback(context);
ASSERT_TRUE(done.IsRunned());
ASSERT_EQ(response.retcode(), RetCode::kNoOK);
}
// write success
Expand All @@ -390,22 +409,41 @@ TEST_F(FileServiceTest, CallbackTest) {
ASSERT_TRUE(done.IsRunned());
ASSERT_EQ(response.retcode(), RetCode::kOK);
}
// write failed
// write failed return rpc
{
brpc::Controller cntl;
nebd::client::WriteResponse response;
FileServiceTestClosure done;
FileServiceTestClosure* done = new FileServiceTestClosure();
NebdServerAioContext* context = new NebdServerAioContext;
context->op = LIBAIO_OP::LIBAIO_OP_WRITE;
context->cntl = &cntl;
context->response = &response;
context->offset = 0;
context->size = 4096;
context->done = &done;
context->done = done;
context->buf = new butil::IOBuf();
context->ret = -1;
returnRpcWhenIoError = true;
NebdFileServiceCallback(context);
ASSERT_TRUE(done->IsRunned());
ASSERT_EQ(response.retcode(), RetCode::kNoOK);
}
// write failed don't return rpc
{
brpc::Controller cntl;
nebd::client::WriteResponse response;
FileServiceTestClosure* done = new FileServiceTestClosure();
NebdServerAioContext* context = new NebdServerAioContext;
context->op = LIBAIO_OP::LIBAIO_OP_WRITE;
context->cntl = &cntl;
context->response = &response;
context->offset = 0;
context->size = 4096;
context->done = done;
context->buf = new butil::IOBuf();
context->ret = -1;
returnRpcWhenIoError = false;
NebdFileServiceCallback(context);
ASSERT_TRUE(done.IsRunned());
ASSERT_EQ(response.retcode(), RetCode::kNoOK);
}
// flush success
Expand All @@ -423,19 +461,35 @@ TEST_F(FileServiceTest, CallbackTest) {
ASSERT_TRUE(done.IsRunned());
ASSERT_EQ(response.retcode(), RetCode::kOK);
}
// flush failed
// flush failed return rpc
{
brpc::Controller cntl;
nebd::client::FlushResponse response;
FileServiceTestClosure done;
FileServiceTestClosure* done = new FileServiceTestClosure();
NebdServerAioContext* context = new NebdServerAioContext;
context->op = LIBAIO_OP::LIBAIO_OP_FLUSH;
context->cntl = &cntl;
context->response = &response;
context->done = &done;
context->done = done;
context->ret = -1;
returnRpcWhenIoError = true;
NebdFileServiceCallback(context);
ASSERT_TRUE(done->IsRunned());
ASSERT_EQ(response.retcode(), RetCode::kNoOK);
}
// flush failed don't return rpc
{
brpc::Controller cntl;
nebd::client::FlushResponse response;
FileServiceTestClosure* done = new FileServiceTestClosure();
NebdServerAioContext* context = new NebdServerAioContext;
context->op = LIBAIO_OP::LIBAIO_OP_FLUSH;
context->cntl = &cntl;
context->response = &response;
context->done = done;
context->ret = -1;
returnRpcWhenIoError = false;
NebdFileServiceCallback(context);
ASSERT_TRUE(done.IsRunned());
ASSERT_EQ(response.retcode(), RetCode::kNoOK);
}
// discard success
Expand All @@ -453,19 +507,35 @@ TEST_F(FileServiceTest, CallbackTest) {
ASSERT_TRUE(done.IsRunned());
ASSERT_EQ(response.retcode(), RetCode::kOK);
}
// discard failed
// discard failed return rpc
{
brpc::Controller cntl;
nebd::client::DiscardResponse response;
FileServiceTestClosure done;
FileServiceTestClosure* done = new FileServiceTestClosure();
NebdServerAioContext* context = new NebdServerAioContext;
context->op = LIBAIO_OP::LIBAIO_OP_DISCARD;
context->cntl = &cntl;
context->response = &response;
context->done = &done;
context->done = done;
context->ret = -1;
returnRpcWhenIoError = true;
NebdFileServiceCallback(context);
ASSERT_TRUE(done->IsRunned());
ASSERT_EQ(response.retcode(), RetCode::kNoOK);
}
// discard failed don't return rpc
{
brpc::Controller cntl;
nebd::client::DiscardResponse response;
FileServiceTestClosure* done = new FileServiceTestClosure();
NebdServerAioContext* context = new NebdServerAioContext;
context->op = LIBAIO_OP::LIBAIO_OP_DISCARD;
context->cntl = &cntl;
context->response = &response;
context->done = done;
context->ret = -1;
returnRpcWhenIoError = false;
NebdFileServiceCallback(context);
ASSERT_TRUE(done.IsRunned());
ASSERT_EQ(response.retcode(), RetCode::kNoOK);
}
}
Expand Down

0 comments on commit c0d147e

Please sign in to comment.