Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nebd-server: drop failed reqeusts' rpc #212

Merged
merged 1 commit into from
Jan 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions nebd/etc/nebd/nebd-server.conf
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,6 @@ heartbeat.timeout.sec=30

#文件超时检测时间间隔
heartbeat.check.interval.ms=3000

# return rpc when io error
response.returnRpcWhenIoError=false
3 changes: 3 additions & 0 deletions nebd/src/part2/define.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ struct NebdServerAioContext {
Closure *done = nullptr;
// rpc请求的controller
RpcController* cntl = nullptr;
// return rpc when io error
bool returnRpcWhenIoError = false;
};

struct NebdFileInfo {
Expand All @@ -119,6 +121,7 @@ const char METAFILEPATH[] = "meta.file.path";
const char HEARTBEATTIMEOUTSEC[] = "heartbeat.timeout.sec";
const char HEARTBEATCHECKINTERVALMS[] = "heartbeat.check.interval.ms";
const char CURVECLIENTCONFPATH[] = "curveclient.confPath";
const char RESPONSERETURNRPCWHENIOERROR[] = "response.returnRpcWhenIoError";

} // namespace server
} // namespace nebd
Expand Down
82 changes: 49 additions & 33 deletions nebd/src/part2/file_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,74 +32,86 @@ namespace server {

using nebd::client::RetCode;

void NebdFileServiceCallback(NebdServerAioContext* context) {
CHECK(context != nullptr);
std::unique_ptr<NebdServerAioContext> contextGuard(context);
std::unique_ptr<butil::IOBuf> iobufGuard(
reinterpret_cast<butil::IOBuf*>(context->buf));
brpc::ClosureGuard doneGuard(context->done);
/**
* use curl -L clientIp:nebd-serverPort/flags/dropRpc?setvalue=true
* to modify the parameter dynamic
*/
static bool pass_bool(const char*, bool) { return true; }
DEFINE_bool(dropRpc, false, "drop the request rpc");
DEFINE_validator(dropRpc, &pass_bool);

void SetResponse(NebdServerAioContext* context, RetCode retCode) {
switch (context->op) {
case LIBAIO_OP::LIBAIO_OP_READ:
{
nebd::client::ReadResponse* response =
dynamic_cast<nebd::client::ReadResponse*>(context->response);
if (context->ret < 0) {
response->set_retcode(RetCode::kNoOK);
LOG(ERROR) << "Read file failed. "
<< "return code: " << context->ret;
} else {
response->set_retcode(retCode);
if (context->ret >= 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

???漏删了

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

???漏删了

不是,在READ的正常返回除了需要设置response的retcode还需要把读取的数据拷贝进去

brpc::Controller* cntl =
dynamic_cast<brpc::Controller *>(context->cntl);
cntl->response_attachment() =
*reinterpret_cast<butil::IOBuf*>(context->buf);
response->set_retcode(RetCode::kOK);
}

break;
}
case LIBAIO_OP::LIBAIO_OP_WRITE:
{
nebd::client::WriteResponse* response =
dynamic_cast<nebd::client::WriteResponse*>(context->response);
if (context->ret < 0) {
response->set_retcode(RetCode::kNoOK);
LOG(ERROR) << "Write file failed. "
<< "return code: " << context->ret;
} else {
response->set_retcode(RetCode::kOK);
}
response->set_retcode(retCode);
break;
}
case LIBAIO_OP::LIBAIO_OP_FLUSH:
{
nebd::client::FlushResponse* response =
dynamic_cast<nebd::client::FlushResponse*>(context->response);
if (context->ret < 0) {
response->set_retcode(RetCode::kNoOK);
LOG(ERROR) << "Flush file failed. "
<< "return code: " << context->ret;
} else {
response->set_retcode(RetCode::kOK);
}
response->set_retcode(retCode);
break;
}
case LIBAIO_OP::LIBAIO_OP_DISCARD:
{
nebd::client::DiscardResponse* response =
dynamic_cast<nebd::client::DiscardResponse*>(context->response);
if (context->ret < 0) {
response->set_retcode(RetCode::kNoOK);
LOG(ERROR) << "Discard file failed. "
<< "return code: " << context->ret;
} else {
response->set_retcode(RetCode::kOK);
}
response->set_retcode(retCode);
break;
}
default:
break;
}
}

void NebdFileServiceCallback(NebdServerAioContext* context) {
CHECK(context != nullptr);
std::unique_ptr<NebdServerAioContext> contextGuard(context);
std::unique_ptr<butil::IOBuf> iobufGuard(
reinterpret_cast<butil::IOBuf*>(context->buf));
brpc::ClosureGuard doneGuard(context->done);
// for test
if (FLAGS_dropRpc) {
doneGuard.release();
delete context->done;
LOG(ERROR) << Op2Str(context->op)
<< " file failed and drop the request rpc.";
return;
}

if (context->ret < 0 && !context->returnRpcWhenIoError) {
LOG(ERROR) << *context;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*context能正常输出吗

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*context能正常输出吗

可以

// drop the rpc to ensure not return ioerror
doneGuard.release();
delete context->done;
LOG(ERROR) << Op2Str(context->op)
<< " file failed and drop the request rpc.";
} else if (context->ret < 0) {
LOG(ERROR) << *context;
SetResponse(context, RetCode::kNoOK);
} else {
SetResponse(context, RetCode::kOK);
}
}

void NebdFileServiceImpl::OpenFile(
google::protobuf::RpcController* cntl_base,
const nebd::client::OpenFileRequest* request,
Expand Down Expand Up @@ -136,6 +148,7 @@ void NebdFileServiceImpl::Write(
aioContext->size = request->size();
aioContext->op = LIBAIO_OP::LIBAIO_OP_WRITE;
aioContext->cb = NebdFileServiceCallback;
aioContext->returnRpcWhenIoError = returnRpcWhenIoError_;

brpc::Controller* cntl = dynamic_cast<brpc::Controller *>(cntl_base);

Expand Down Expand Up @@ -183,6 +196,7 @@ void NebdFileServiceImpl::Read(
aioContext->size = request->size();
aioContext->op = LIBAIO_OP::LIBAIO_OP_READ;
aioContext->cb = NebdFileServiceCallback;
aioContext->returnRpcWhenIoError = returnRpcWhenIoError_;

std::unique_ptr<butil::IOBuf> buf(new butil::IOBuf());
aioContext->buf = buf.get();
Expand Down Expand Up @@ -218,6 +232,7 @@ void NebdFileServiceImpl::Flush(
aioContext->response = response;
aioContext->done = done;
aioContext->cntl = cntl_base;
aioContext->returnRpcWhenIoError = returnRpcWhenIoError_;
int rc = fileManager_->Flush(request->fd(), aioContext);
if (rc < 0) {
LOG(ERROR) << "Flush file failed. "
Expand Down Expand Up @@ -245,6 +260,7 @@ void NebdFileServiceImpl::Discard(
aioContext->response = response;
aioContext->done = done;
aioContext->cntl = cntl_base;
aioContext->returnRpcWhenIoError = returnRpcWhenIoError_;
int rc = fileManager_->Discard(request->fd(), aioContext);
if (rc < 0) {
LOG(ERROR) << "Flush file failed. "
Expand Down
7 changes: 5 additions & 2 deletions nebd/src/part2/file_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@ void NebdFileServiceCallback(NebdServerAioContext* context);

class NebdFileServiceImpl : public nebd::client::NebdFileService {
public:
explicit NebdFileServiceImpl(std::shared_ptr<NebdFileManager> fileManager)
: fileManager_(fileManager) {}
explicit NebdFileServiceImpl(std::shared_ptr<NebdFileManager> fileManager,
const bool returnRpcWhenIoError)
: fileManager_(fileManager),
returnRpcWhenIoError_(returnRpcWhenIoError) {}

virtual ~NebdFileServiceImpl() {}

Expand Down Expand Up @@ -91,6 +93,7 @@ class NebdFileServiceImpl : public nebd::client::NebdFileService {

private:
std::shared_ptr<NebdFileManager> fileManager_;
const bool returnRpcWhenIoError_;
};

} // namespace server
Expand Down
10 changes: 9 additions & 1 deletion nebd/src/part2/nebd_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,15 @@ bool NebdServer::InitHeartbeatManager() {

bool NebdServer::StartServer() {
// add service
NebdFileServiceImpl fileService(fileManager_);
bool returnRpcWhenIoError;
bool ret = conf_.GetBoolValue(RESPONSERETURNRPCWHENIOERROR,
&returnRpcWhenIoError);
if (false == ret) {
LOG(ERROR) << "get " << RESPONSERETURNRPCWHENIOERROR << " fail";
return false;
}

NebdFileServiceImpl fileService(fileManager_, returnRpcWhenIoError);
int addFileServiceRes = server_.AddService(
&fileService, brpc::SERVER_DOESNT_OWN_SERVICE);
if (0 != addFileServiceRes) {
Expand Down
1 change: 1 addition & 0 deletions nebd/src/part2/util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ std::ostream& operator<<(std::ostream& os, const NebdServerAioContext& c) {
<< ", offset: " << c.offset
<< ", size: " << c.size
<< ", ret: " << c.ret
<< ", returnRpcWhenIoError: " << c.returnRpcWhenIoError
<< "]";
return os;
}
Expand Down
2 changes: 2 additions & 0 deletions nebd/src/part2/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ std::string NebdFileType2Str(NebdFileType type);

std::string NebdFileStatus2Str(NebdFileStatus status);

std::string Op2Str(LIBAIO_OP op);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个实现在哪里?
或者直接对LIBAIO_OP定义一个operator <<

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个实现在哪里?
或者直接对LIBAIO_OP定义一个operator <<

原来就有,在util.cpp中,但是头文件中没声明


std::ostream& operator<<(std::ostream& os, const NebdServerAioContext& c);
std::ostream& operator<<(std::ostream& os, const NebdFileMeta& meta);

Expand Down
Loading