Skip to content

Commit

Permalink
DistributionRoute changes to accommodate SET distribution
Browse files Browse the repository at this point in the history
Summary: Distribution operation is async, the local RPC request is sync.

Reviewed By: stuclar

Differential Revision: D56957380

fbshipit-source-id: 339188c0302bdf6ae04ad8b63f4800f983f3f7c2
  • Loading branch information
Lenar Fatikhov authored and facebook-github-bot committed Aug 30, 2024
1 parent d25381f commit 53dd319
Show file tree
Hide file tree
Showing 4 changed files with 302 additions and 136 deletions.
7 changes: 7 additions & 0 deletions mcrouter/routes/DistributionRoute-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ inline DistributionRouteSettings parseDistributionRouteSettings(
settings.distributedDeleteRpcEnabled = parseBool(
*jDistributedDeleteRpcEnabled, "distributed_delete_rpc_enabled");
}
if (auto* jDistributionSourceRegion =
json.get_ptr("distribution_source_region")) {
checkLogic(
jDistributionSourceRegion->isString(),
"DistributionRoute: distribution_source_region must be a string");
settings.srcRegion = jDistributionSourceRegion->getString();
}
return settings;
}

Expand Down
122 changes: 115 additions & 7 deletions mcrouter/routes/DistributionRoute.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ namespace facebook::memcache::mcrouter {
struct DistributionRouteSettings {
bool distributedDeleteRpcEnabled{true};
bool replay{false};
std::string srcRegion;
};

constexpr std::string_view kAsynclogDistributionEndpoint = "0.0.0.0";
Expand All @@ -35,6 +36,7 @@ constexpr std::string_view kAsynclogDistributionEndpoint = "0.0.0.0";
* - distribution_delete_rpc_enabled(bool) - enable sending the request via rpc
* after it is distributed
* - replay(bool) - enable replay mode (for mcreplay)
* - src_region(string) - the region where the distribution request originated
*/
template <class RouterInfo>
class DistributionRoute {
Expand All @@ -46,13 +48,15 @@ class DistributionRoute {
DistributionRoute(RouteHandlePtr rh, DistributionRouteSettings& settings)
: rh_(std::move(rh)),
distributedDeleteRpcEnabled_(settings.distributedDeleteRpcEnabled),
replay_{settings.replay} {}
replay_{settings.replay},
srcRegion_{settings.srcRegion} {}

std::string routeName() const {
return fmt::format(
"distribution|distributed_delete_rpc_enabled={}|replay={}",
"distribution|distributed_delete_rpc_enabled={}|replay={}|distribution_source_region={}",
distributedDeleteRpcEnabled_ ? "true" : "false",
replay_ ? "true" : "false");
replay_ ? "true" : "false",
srcRegion_);
}

template <class Request>
Expand All @@ -67,6 +71,83 @@ class DistributionRoute {
return rh_->route(req);
}

/**
* @param req request to route
* @return Reply from the route handle
*
* SETs can be:
* 1. In-region rpc
* 1.1 With no routing prefix
* 1.2 With current region in the prefix
* 2. Cross-region directed rpc SET with a region prefix not equal to the
* current region.
* 3. Broadcast SET with routing prefix = /(star)/(star)/
*
* In the first case, we skip distribution.
* In the second case, we write to Axon synchronously and return the reply of
* the write success/failure. In the third case, we write to Axon
* asynchronously, also send an RPC request to the local region and return the
* reply of the RPC.
*/
McSetReply route(const McSetRequest& req) const {
// the `distributionRegionOpt` optional in the fiber can be in 3 states:
// - empty (no distribution)
// - holds a value of "" (broadcast distribution)
// - holds a value of a region name (directed cross-region distribution)
auto distributionRegionOpt =
fiber_local<RouterInfo>::getDistributionTargetRegion();
if (FOLLY_LIKELY(!distributionRegionOpt.has_value())) {
return rh_->route(req);
}

auto axonCtx = fiber_local<RouterInfo>::getAxonCtx();
auto bucketId = fiber_local<RouterInfo>::getBucketId();
assert(axonCtx && bucketId);
auto finalReq = req;
finalReq.bucketId_ref() = fmt::to_string(*bucketId);
auto distributionRegion = distributionRegionOpt.value().empty()
? std::string(kBroadcast)
: std::move(distributionRegionOpt.value());
// for directed cross-region distribution write to Axon synchronously:
if (FOLLY_UNLIKELY(distributionRegion != kBroadcast)) {
return distributeWithLogging(
finalReq,
distributeWriteRequest,
axonCtx,
*bucketId,
distributionRegion,
srcRegion_,
std::nullopt)
.first;
}
folly::fibers::addTask([this,
bucketId,
ctx = fiber_local<RouterInfo>::getSharedCtx(),
axonCtx,
finalReq = std::move(finalReq),
distributionRegion =
std::move(distributionRegion)]() {
auto [_, axonLogRes] = distributeWithLogging(
finalReq,
distributeWriteRequest,
axonCtx,
*bucketId,
distributionRegion,
srcRegion_,
std::nullopt);
if (axonLogRes) {
ctx->proxy().stats().increment(
distribution_set_axon_write_success_stat);
} else {
ctx->proxy().stats().increment(distribution_set_axon_write_fail_stat);
}
});
// route to the local region:
fiber_local<RouterInfo>::getSharedCtx()->proxy().stats().increment(
distribution_set_local_region_write_stat);
return rh_->route(req);
}

/**
* Delete can be:
* 1. In-region rpc delete
Expand Down Expand Up @@ -110,7 +191,8 @@ class DistributionRoute {
invalidation::DistributionType::Distribution,
std::move(
distributionRegion.value().empty() ? std::string(kBroadcast)
: *distributionRegion));
: *distributionRegion),
srcRegion_);

auto reply = axonLogRes ? createReply(DefaultReply, finalReq)
: McDeleteReply(carbon::Result::LOCAL_ERROR);
Expand Down Expand Up @@ -151,6 +233,7 @@ class DistributionRoute {
const RouteHandlePtr rh_;
const bool distributedDeleteRpcEnabled_;
const bool replay_;
const std::string srcRegion_;

std::optional<std::string> inferDistributionRegionForReplay(
const McDeleteRequest& req,
Expand All @@ -177,8 +260,9 @@ class DistributionRoute {
}
}

template <typename Request>
void onBeforeDistribution(
const McDeleteRequest& req,
const Request& req,
ProxyRequestContextWithInfo<RouterInfo>& ctx,
const std::string& bucketId,
const DestinationRequestCtx& dctx) const {
Expand All @@ -192,9 +276,10 @@ class DistributionRoute {
/*bucketId*/ bucketId);
}

template <typename Request>
void onAfterDistribution(
const McDeleteRequest& req,
const McDeleteReply& reply,
const Request& req,
const typename Request::reply_type& reply,
ProxyRequestContextWithInfo<RouterInfo>& ctx,
const std::string& bucketId,
const DestinationRequestCtx& dctx) const {
Expand All @@ -216,6 +301,29 @@ class DistributionRoute {
/*extraDataCallback*/ fiber_local<RouterInfo>::getExtraDataCallbacks(),
/*bucketId*/ bucketId);
}

template <typename Request, typename DistrFn, typename... Args>
FOLLY_ALWAYS_INLINE std::pair<ReplyT<Request>, bool> distributeWithLogging(
const Request& req,
DistrFn&& distributionFn,
Args... args) const {
auto& ctx = *fiber_local<RouterInfo>::getSharedCtx();
DestinationRequestCtx dctx(nowUs());
onBeforeDistribution(req, ctx, *req.bucketId_ref(), dctx);

auto axonLogRes = distributionFn(req, std::forward<Args>(args)...);
auto reply = axonLogRes ? createReply(DefaultReply, req)
: ReplyT<Request>(carbon::Result::LOCAL_ERROR);
dctx.endTime = nowUs();
onAfterDistribution(req, reply, ctx, *req.bucketId_ref(), dctx);

if (axonLogRes) {
ctx.proxy().stats().increment(distribution_axon_write_success_stat);
} else {
ctx.proxy().stats().increment(distribution_axon_write_failed_stat);
}
return {reply, axonLogRes};
}
};

template <class RouterInfo>
Expand Down
Loading

0 comments on commit 53dd319

Please sign in to comment.