Skip to content

Commit

Permalink
nebd-server: drop failed reqeusts' rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
SeanHai authored and ilixiaocui committed Jan 28, 2021
1 parent 650674a commit fef25e0
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 53 deletions.
3 changes: 3 additions & 0 deletions nebd/etc/nebd/nebd-server.conf
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,6 @@ heartbeat.timeout.sec=30

#文件超时检测时间间隔
heartbeat.check.interval.ms=3000

# return rpc when io error
response.returnRpcWhenIoError=false
3 changes: 3 additions & 0 deletions nebd/src/part2/define.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ struct NebdServerAioContext {
Closure *done = nullptr;
// rpc请求的controller
RpcController* cntl = nullptr;
// return rpc when io error
bool returnRpcWhenIoError = false;
};

struct NebdFileInfo {
Expand All @@ -119,6 +121,7 @@ const char METAFILEPATH[] = "meta.file.path";
const char HEARTBEATTIMEOUTSEC[] = "heartbeat.timeout.sec";
const char HEARTBEATCHECKINTERVALMS[] = "heartbeat.check.interval.ms";
const char CURVECLIENTCONFPATH[] = "curveclient.confPath";
const char RESPONSERETURNRPCWHENIOERROR[] = "response.returnRpcWhenIoError";

} // namespace server
} // namespace nebd
Expand Down
82 changes: 49 additions & 33 deletions nebd/src/part2/file_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,74 +32,86 @@ namespace server {

using nebd::client::RetCode;

void NebdFileServiceCallback(NebdServerAioContext* context) {
CHECK(context != nullptr);
std::unique_ptr<NebdServerAioContext> contextGuard(context);
std::unique_ptr<butil::IOBuf> iobufGuard(
reinterpret_cast<butil::IOBuf*>(context->buf));
brpc::ClosureGuard doneGuard(context->done);
/**
* use curl -L clientIp:nebd-serverPort/flags/dropRpc?setvalue=true
* to modify the parameter dynamic
*/
static bool pass_bool(const char*, bool) { return true; }
DEFINE_bool(dropRpc, false, "drop the request rpc");
DEFINE_validator(dropRpc, &pass_bool);

void SetResponse(NebdServerAioContext* context, RetCode retCode) {
switch (context->op) {
case LIBAIO_OP::LIBAIO_OP_READ:
{
nebd::client::ReadResponse* response =
dynamic_cast<nebd::client::ReadResponse*>(context->response);
if (context->ret < 0) {
response->set_retcode(RetCode::kNoOK);
LOG(ERROR) << "Read file failed. "
<< "return code: " << context->ret;
} else {
response->set_retcode(retCode);
if (context->ret >= 0) {
brpc::Controller* cntl =
dynamic_cast<brpc::Controller *>(context->cntl);
cntl->response_attachment() =
*reinterpret_cast<butil::IOBuf*>(context->buf);
response->set_retcode(RetCode::kOK);
}

break;
}
case LIBAIO_OP::LIBAIO_OP_WRITE:
{
nebd::client::WriteResponse* response =
dynamic_cast<nebd::client::WriteResponse*>(context->response);
if (context->ret < 0) {
response->set_retcode(RetCode::kNoOK);
LOG(ERROR) << "Write file failed. "
<< "return code: " << context->ret;
} else {
response->set_retcode(RetCode::kOK);
}
response->set_retcode(retCode);
break;
}
case LIBAIO_OP::LIBAIO_OP_FLUSH:
{
nebd::client::FlushResponse* response =
dynamic_cast<nebd::client::FlushResponse*>(context->response);
if (context->ret < 0) {
response->set_retcode(RetCode::kNoOK);
LOG(ERROR) << "Flush file failed. "
<< "return code: " << context->ret;
} else {
response->set_retcode(RetCode::kOK);
}
response->set_retcode(retCode);
break;
}
case LIBAIO_OP::LIBAIO_OP_DISCARD:
{
nebd::client::DiscardResponse* response =
dynamic_cast<nebd::client::DiscardResponse*>(context->response);
if (context->ret < 0) {
response->set_retcode(RetCode::kNoOK);
LOG(ERROR) << "Discard file failed. "
<< "return code: " << context->ret;
} else {
response->set_retcode(RetCode::kOK);
}
response->set_retcode(retCode);
break;
}
default:
break;
}
}

void NebdFileServiceCallback(NebdServerAioContext* context) {
CHECK(context != nullptr);
std::unique_ptr<NebdServerAioContext> contextGuard(context);
std::unique_ptr<butil::IOBuf> iobufGuard(
reinterpret_cast<butil::IOBuf*>(context->buf));
brpc::ClosureGuard doneGuard(context->done);
// for test
if (FLAGS_dropRpc) {
doneGuard.release();
delete context->done;
LOG(ERROR) << Op2Str(context->op)
<< " file failed and drop the request rpc.";
return;
}

if (context->ret < 0 && !context->returnRpcWhenIoError) {
LOG(ERROR) << *context;
// drop the rpc to ensure not return ioerror
doneGuard.release();
delete context->done;
LOG(ERROR) << Op2Str(context->op)
<< " file failed and drop the request rpc.";
} else if (context->ret < 0) {
LOG(ERROR) << *context;
SetResponse(context, RetCode::kNoOK);
} else {
SetResponse(context, RetCode::kOK);
}
}

void NebdFileServiceImpl::OpenFile(
google::protobuf::RpcController* cntl_base,
const nebd::client::OpenFileRequest* request,
Expand Down Expand Up @@ -136,6 +148,7 @@ void NebdFileServiceImpl::Write(
aioContext->size = request->size();
aioContext->op = LIBAIO_OP::LIBAIO_OP_WRITE;
aioContext->cb = NebdFileServiceCallback;
aioContext->returnRpcWhenIoError = returnRpcWhenIoError_;

brpc::Controller* cntl = dynamic_cast<brpc::Controller *>(cntl_base);

Expand Down Expand Up @@ -183,6 +196,7 @@ void NebdFileServiceImpl::Read(
aioContext->size = request->size();
aioContext->op = LIBAIO_OP::LIBAIO_OP_READ;
aioContext->cb = NebdFileServiceCallback;
aioContext->returnRpcWhenIoError = returnRpcWhenIoError_;

std::unique_ptr<butil::IOBuf> buf(new butil::IOBuf());
aioContext->buf = buf.get();
Expand Down Expand Up @@ -218,6 +232,7 @@ void NebdFileServiceImpl::Flush(
aioContext->response = response;
aioContext->done = done;
aioContext->cntl = cntl_base;
aioContext->returnRpcWhenIoError = returnRpcWhenIoError_;
int rc = fileManager_->Flush(request->fd(), aioContext);
if (rc < 0) {
LOG(ERROR) << "Flush file failed. "
Expand Down Expand Up @@ -245,6 +260,7 @@ void NebdFileServiceImpl::Discard(
aioContext->response = response;
aioContext->done = done;
aioContext->cntl = cntl_base;
aioContext->returnRpcWhenIoError = returnRpcWhenIoError_;
int rc = fileManager_->Discard(request->fd(), aioContext);
if (rc < 0) {
LOG(ERROR) << "Flush file failed. "
Expand Down
7 changes: 5 additions & 2 deletions nebd/src/part2/file_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@ void NebdFileServiceCallback(NebdServerAioContext* context);

class NebdFileServiceImpl : public nebd::client::NebdFileService {
public:
explicit NebdFileServiceImpl(std::shared_ptr<NebdFileManager> fileManager)
: fileManager_(fileManager) {}
explicit NebdFileServiceImpl(std::shared_ptr<NebdFileManager> fileManager,
const bool returnRpcWhenIoError)
: fileManager_(fileManager),
returnRpcWhenIoError_(returnRpcWhenIoError) {}

virtual ~NebdFileServiceImpl() {}

Expand Down Expand Up @@ -91,6 +93,7 @@ class NebdFileServiceImpl : public nebd::client::NebdFileService {

private:
std::shared_ptr<NebdFileManager> fileManager_;
const bool returnRpcWhenIoError_;
};

} // namespace server
Expand Down
10 changes: 9 additions & 1 deletion nebd/src/part2/nebd_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,15 @@ bool NebdServer::InitHeartbeatManager() {

bool NebdServer::StartServer() {
// add service
NebdFileServiceImpl fileService(fileManager_);
bool returnRpcWhenIoError;
bool ret = conf_.GetBoolValue(RESPONSERETURNRPCWHENIOERROR,
&returnRpcWhenIoError);
if (false == ret) {
LOG(ERROR) << "get " << RESPONSERETURNRPCWHENIOERROR << " fail";
return false;
}

NebdFileServiceImpl fileService(fileManager_, returnRpcWhenIoError);
int addFileServiceRes = server_.AddService(
&fileService, brpc::SERVER_DOESNT_OWN_SERVICE);
if (0 != addFileServiceRes) {
Expand Down
1 change: 1 addition & 0 deletions nebd/src/part2/util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ std::ostream& operator<<(std::ostream& os, const NebdServerAioContext& c) {
<< ", offset: " << c.offset
<< ", size: " << c.size
<< ", ret: " << c.ret
<< ", returnRpcWhenIoError: " << c.returnRpcWhenIoError
<< "]";
return os;
}
Expand Down
2 changes: 2 additions & 0 deletions nebd/src/part2/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ std::string NebdFileType2Str(NebdFileType type);

std::string NebdFileStatus2Str(NebdFileStatus status);

std::string Op2Str(LIBAIO_OP op);

std::ostream& operator<<(std::ostream& os, const NebdServerAioContext& c);
std::ostream& operator<<(std::ostream& os, const NebdFileMeta& meta);

Expand Down
Loading

0 comments on commit fef25e0

Please sign in to comment.