From 53dd319ad7f33c03e45a504339bbf6aba21fa24c Mon Sep 17 00:00:00 2001 From: Lenar Fatikhov Date: Fri, 30 Aug 2024 11:51:23 -0700 Subject: [PATCH] DistributionRoute changes to accommodate SET distribution Summary: Distribution operation is async, the local RPC request is sync. Reviewed By: stuclar Differential Revision: D56957380 fbshipit-source-id: 339188c0302bdf6ae04ad8b63f4800f983f3f7c2 --- mcrouter/routes/DistributionRoute-inl.h | 7 + mcrouter/routes/DistributionRoute.h | 122 ++++++- .../routes/test/DistributionRouteTest.cpp | 306 ++++++++++-------- mcrouter/stat_list.h | 3 + 4 files changed, 302 insertions(+), 136 deletions(-) diff --git a/mcrouter/routes/DistributionRoute-inl.h b/mcrouter/routes/DistributionRoute-inl.h index 8def4eeff..aa5cbd4a7 100644 --- a/mcrouter/routes/DistributionRoute-inl.h +++ b/mcrouter/routes/DistributionRoute-inl.h @@ -25,6 +25,13 @@ inline DistributionRouteSettings parseDistributionRouteSettings( settings.distributedDeleteRpcEnabled = parseBool( *jDistributedDeleteRpcEnabled, "distributed_delete_rpc_enabled"); } + if (auto* jDistributionSourceRegion = + json.get_ptr("distribution_source_region")) { + checkLogic( + jDistributionSourceRegion->isString(), + "DistributionRoute: distribution_source_region must be a string"); + settings.srcRegion = jDistributionSourceRegion->getString(); + } return settings; } diff --git a/mcrouter/routes/DistributionRoute.h b/mcrouter/routes/DistributionRoute.h index 6d8f3df61..6a2518c2d 100644 --- a/mcrouter/routes/DistributionRoute.h +++ b/mcrouter/routes/DistributionRoute.h @@ -24,6 +24,7 @@ namespace facebook::memcache::mcrouter { struct DistributionRouteSettings { bool distributedDeleteRpcEnabled{true}; bool replay{false}; + std::string srcRegion; }; constexpr std::string_view kAsynclogDistributionEndpoint = "0.0.0.0"; @@ -35,6 +36,7 @@ constexpr std::string_view kAsynclogDistributionEndpoint = "0.0.0.0"; * - distribution_delete_rpc_enabled(bool) - enable sending the request via rpc * after it is distributed * - replay(bool) - enable replay mode (for mcreplay) + * - src_region(string) - the region where the distribution request originated */ template class DistributionRoute { @@ -46,13 +48,15 @@ class DistributionRoute { DistributionRoute(RouteHandlePtr rh, DistributionRouteSettings& settings) : rh_(std::move(rh)), distributedDeleteRpcEnabled_(settings.distributedDeleteRpcEnabled), - replay_{settings.replay} {} + replay_{settings.replay}, + srcRegion_{settings.srcRegion} {} std::string routeName() const { return fmt::format( - "distribution|distributed_delete_rpc_enabled={}|replay={}", + "distribution|distributed_delete_rpc_enabled={}|replay={}|distribution_source_region={}", distributedDeleteRpcEnabled_ ? "true" : "false", - replay_ ? "true" : "false"); + replay_ ? "true" : "false", + srcRegion_); } template @@ -67,6 +71,83 @@ class DistributionRoute { return rh_->route(req); } + /** + * @param req request to route + * @return Reply from the route handle + * + * SETs can be: + * 1. In-region rpc + * 1.1 With no routing prefix + * 1.2 With current region in the prefix + * 2. Cross-region directed rpc SET with a region prefix not equal to the + * current region. + * 3. Broadcast SET with routing prefix = /(star)/(star)/ + * + * In the first case, we skip distribution. + * In the second case, we write to Axon synchronously and return the reply of + * the write success/failure. In the third case, we write to Axon + * asynchronously, also send an RPC request to the local region and return the + * reply of the RPC. + */ + McSetReply route(const McSetRequest& req) const { + // the `distributionRegionOpt` optional in the fiber can be in 3 states: + // - empty (no distribution) + // - holds a value of "" (broadcast distribution) + // - holds a value of a region name (directed cross-region distribution) + auto distributionRegionOpt = + fiber_local::getDistributionTargetRegion(); + if (FOLLY_LIKELY(!distributionRegionOpt.has_value())) { + return rh_->route(req); + } + + auto axonCtx = fiber_local::getAxonCtx(); + auto bucketId = fiber_local::getBucketId(); + assert(axonCtx && bucketId); + auto finalReq = req; + finalReq.bucketId_ref() = fmt::to_string(*bucketId); + auto distributionRegion = distributionRegionOpt.value().empty() + ? std::string(kBroadcast) + : std::move(distributionRegionOpt.value()); + // for directed cross-region distribution write to Axon synchronously: + if (FOLLY_UNLIKELY(distributionRegion != kBroadcast)) { + return distributeWithLogging( + finalReq, + distributeWriteRequest, + axonCtx, + *bucketId, + distributionRegion, + srcRegion_, + std::nullopt) + .first; + } + folly::fibers::addTask([this, + bucketId, + ctx = fiber_local::getSharedCtx(), + axonCtx, + finalReq = std::move(finalReq), + distributionRegion = + std::move(distributionRegion)]() { + auto [_, axonLogRes] = distributeWithLogging( + finalReq, + distributeWriteRequest, + axonCtx, + *bucketId, + distributionRegion, + srcRegion_, + std::nullopt); + if (axonLogRes) { + ctx->proxy().stats().increment( + distribution_set_axon_write_success_stat); + } else { + ctx->proxy().stats().increment(distribution_set_axon_write_fail_stat); + } + }); + // route to the local region: + fiber_local::getSharedCtx()->proxy().stats().increment( + distribution_set_local_region_write_stat); + return rh_->route(req); + } + /** * Delete can be: * 1. In-region rpc delete @@ -110,7 +191,8 @@ class DistributionRoute { invalidation::DistributionType::Distribution, std::move( distributionRegion.value().empty() ? std::string(kBroadcast) - : *distributionRegion)); + : *distributionRegion), + srcRegion_); auto reply = axonLogRes ? createReply(DefaultReply, finalReq) : McDeleteReply(carbon::Result::LOCAL_ERROR); @@ -151,6 +233,7 @@ class DistributionRoute { const RouteHandlePtr rh_; const bool distributedDeleteRpcEnabled_; const bool replay_; + const std::string srcRegion_; std::optional inferDistributionRegionForReplay( const McDeleteRequest& req, @@ -177,8 +260,9 @@ class DistributionRoute { } } + template void onBeforeDistribution( - const McDeleteRequest& req, + const Request& req, ProxyRequestContextWithInfo& ctx, const std::string& bucketId, const DestinationRequestCtx& dctx) const { @@ -192,9 +276,10 @@ class DistributionRoute { /*bucketId*/ bucketId); } + template void onAfterDistribution( - const McDeleteRequest& req, - const McDeleteReply& reply, + const Request& req, + const typename Request::reply_type& reply, ProxyRequestContextWithInfo& ctx, const std::string& bucketId, const DestinationRequestCtx& dctx) const { @@ -216,6 +301,29 @@ class DistributionRoute { /*extraDataCallback*/ fiber_local::getExtraDataCallbacks(), /*bucketId*/ bucketId); } + + template + FOLLY_ALWAYS_INLINE std::pair, bool> distributeWithLogging( + const Request& req, + DistrFn&& distributionFn, + Args... args) const { + auto& ctx = *fiber_local::getSharedCtx(); + DestinationRequestCtx dctx(nowUs()); + onBeforeDistribution(req, ctx, *req.bucketId_ref(), dctx); + + auto axonLogRes = distributionFn(req, std::forward(args)...); + auto reply = axonLogRes ? createReply(DefaultReply, req) + : ReplyT(carbon::Result::LOCAL_ERROR); + dctx.endTime = nowUs(); + onAfterDistribution(req, reply, ctx, *req.bucketId_ref(), dctx); + + if (axonLogRes) { + ctx.proxy().stats().increment(distribution_axon_write_success_stat); + } else { + ctx.proxy().stats().increment(distribution_axon_write_failed_stat); + } + return {reply, axonLogRes}; + } }; template diff --git a/mcrouter/routes/test/DistributionRouteTest.cpp b/mcrouter/routes/test/DistributionRouteTest.cpp index bf9963ae8..5ada3241d 100644 --- a/mcrouter/routes/test/DistributionRouteTest.cpp +++ b/mcrouter/routes/test/DistributionRouteTest.cpp @@ -21,6 +21,47 @@ namespace facebook::memcache::mcrouter { +struct AxonMockResult { + uint64_t bucketId; + std::string region; + std::string pool; + std::string serialized; + invalidation::DistributionOperation operation; + invalidation::DistributionType type; + std::string srcRegion; +}; + +namespace { +AxonMockResult setupAxonFn(std::shared_ptr& ctx) { + AxonMockResult res; + ctx->writeProxyFn = [&](auto bucketId, auto&& payload) { + res.bucketId = bucketId; + if (payload.find(invalidation::kRegion) != payload.end()) { + res.region = payload.at(invalidation::kRegion); + } + if (payload.find(invalidation::kPool) != payload.end()) { + res.pool = payload.at(invalidation::kPool); + } + if (payload.find(invalidation::kSerialized) != payload.end()) { + res.serialized = payload.at(invalidation::kSerialized); + } + if (payload.find(invalidation::kOperation) != payload.end()) { + res.operation = static_cast( + std::stoi(payload.at(invalidation::kOperation))); + } + if (payload.find(invalidation::kType) != payload.end()) { + res.type = static_cast( + std::stoi(payload.at(invalidation::kType))); + } + if (payload.find(invalidation::kSourceRegion) != payload.end()) { + res.srcRegion = payload.at(invalidation::kSourceRegion); + } + return true; + }; + return res; +} +} // namespace + TEST(DistributionRouteTest, getSetAreForwardedToRpc) { std::vector> srHandleVec{ std::make_shared( @@ -31,7 +72,8 @@ TEST(DistributionRouteTest, getSetAreForwardedToRpc) { constexpr folly::StringPiece kDistributionRouteConfig = R"( { "distributed_delete_rpc_enabled": true, - "replay": false + "replay": false, + "distribution_source_region": "oregon" } )"; @@ -58,7 +100,8 @@ TEST(DistributionRouteTest, getSetAreForwardedToRpc2) { constexpr folly::StringPiece kDistributionRouteConfig = R"( { "distributed_delete_rpc_enabled": true, - "replay": false + "replay": false, + "distribution_source_region": "oregon" } )"; @@ -85,7 +128,8 @@ TEST(DistributionRouteTest, deleteForwardedToRpcIfDisabled) { constexpr folly::StringPiece kDistributionRouteConfig = R"( { "distributed_delete_rpc_enabled": false, - "replay": false + "replay": false, + "distribution_source_region": "oregon" } )"; @@ -107,26 +151,7 @@ TEST(DistributionRouteTest, crossRegionDeleteDisabledRpc) { auto mockSrHandle = get_route_handles(srHandleVec)[0]; auto axonCtx = std::make_shared(); - struct Tmp { - uint64_t bucketId; - std::string region; - std::string pool; - std::string serialized; - }; - auto tmp = Tmp{}; - axonCtx->writeProxyFn = [&](auto bucketId, auto&& payload) { - tmp.bucketId = bucketId; - if (payload.find(invalidation::kRegion) != payload.end()) { - tmp.region = payload.at(invalidation::kRegion); - } - if (payload.find(invalidation::kPool) != payload.end()) { - tmp.pool = payload.at(invalidation::kPool); - } - if (payload.find(invalidation::kSerialized) != payload.end()) { - tmp.serialized = payload.at(invalidation::kSerialized); - } - return true; - }; + auto tmp = setupAxonFn(axonCtx); axonCtx->allDelete = false; axonCtx->fallbackAsynclog = false; axonCtx->poolFilter = "testPool"; @@ -134,7 +159,8 @@ TEST(DistributionRouteTest, crossRegionDeleteDisabledRpc) { constexpr folly::StringPiece kDistributionRouteConfig = R"( { "distributed_delete_rpc_enabled": false, - "replay": false + "replay": false, + "distribution_source_region": "oregon" } )"; @@ -160,6 +186,9 @@ TEST(DistributionRouteTest, crossRegionDeleteDisabledRpc) { static_cast( req.attributes_ref()->find(memcache::kMcDeleteReqAttrSource)->second), memcache::McDeleteRequestSource::CROSS_REGION_DIRECTED_INVALIDATION); + EXPECT_EQ(tmp.operation, invalidation::DistributionOperation::Delete); + EXPECT_EQ(tmp.type, invalidation::DistributionType::Distribution); + EXPECT_EQ(tmp.srcRegion, "oregon"); } TEST(DistributionRouteTest, crossRegionDeleteEnabledRpc) { @@ -170,26 +199,7 @@ TEST(DistributionRouteTest, crossRegionDeleteEnabledRpc) { auto mockSrHandle = get_route_handles(srHandleVec)[0]; auto axonCtx = std::make_shared(); - struct Tmp { - uint64_t bucketId; - std::string region; - std::string pool; - std::string serialized; - }; - auto tmp = Tmp{}; - axonCtx->writeProxyFn = [&](auto bucketId, auto&& payload) { - tmp.bucketId = bucketId; - if (payload.find(invalidation::kRegion) != payload.end()) { - tmp.region = payload.at(invalidation::kRegion); - } - if (payload.find(invalidation::kPool) != payload.end()) { - tmp.pool = payload.at(invalidation::kPool); - } - if (payload.find(invalidation::kSerialized) != payload.end()) { - tmp.serialized = payload.at(invalidation::kSerialized); - } - return true; - }; + auto tmp = setupAxonFn(axonCtx); axonCtx->allDelete = false; axonCtx->fallbackAsynclog = false; axonCtx->poolFilter = "testPool"; @@ -197,7 +207,8 @@ TEST(DistributionRouteTest, crossRegionDeleteEnabledRpc) { constexpr folly::StringPiece kDistributionRouteConfig = R"( { "distributed_delete_rpc_enabled": true, - "replay": false + "replay": false, + "distribution_source_region": "oregon" } )"; @@ -225,6 +236,9 @@ TEST(DistributionRouteTest, crossRegionDeleteEnabledRpc) { static_cast( req.attributes_ref()->find(memcache::kMcDeleteReqAttrSource)->second), memcache::McDeleteRequestSource::CROSS_REGION_DIRECTED_INVALIDATION); + EXPECT_EQ(tmp.operation, invalidation::DistributionOperation::Delete); + EXPECT_EQ(tmp.type, invalidation::DistributionType::Distribution); + EXPECT_EQ(tmp.srcRegion, "oregon"); } TEST(DistributionRouteTest, broadcastDeleteDisabledRpc) { @@ -235,26 +249,7 @@ TEST(DistributionRouteTest, broadcastDeleteDisabledRpc) { auto mockSrHandle = get_route_handles(srHandleVec)[0]; auto axonCtx = std::make_shared(); - struct Tmp { - uint64_t bucketId; - std::string region; - std::string pool; - std::string serialized; - }; - auto tmp = Tmp{}; - axonCtx->writeProxyFn = [&](auto bucketId, auto&& payload) { - tmp.bucketId = bucketId; - if (payload.find(invalidation::kRegion) != payload.end()) { - tmp.region = payload.at(invalidation::kRegion); - } - if (payload.find(invalidation::kPool) != payload.end()) { - tmp.pool = payload.at(invalidation::kPool); - } - if (payload.find(invalidation::kSerialized) != payload.end()) { - tmp.serialized = payload.at(invalidation::kSerialized); - } - return true; - }; + auto tmp = setupAxonFn(axonCtx); axonCtx->allDelete = false; axonCtx->fallbackAsynclog = false; axonCtx->poolFilter = "testPool"; @@ -262,7 +257,8 @@ TEST(DistributionRouteTest, broadcastDeleteDisabledRpc) { constexpr folly::StringPiece kDistributionRouteConfig = R"( { "distributed_delete_rpc_enabled": false, - "replay": false + "replay": false, + "distribution_source_region": "oregon" } )"; @@ -288,6 +284,9 @@ TEST(DistributionRouteTest, broadcastDeleteDisabledRpc) { static_cast( req.attributes_ref()->find(memcache::kMcDeleteReqAttrSource)->second), memcache::McDeleteRequestSource::CROSS_REGION_BROADCAST_INVALIDATION); + EXPECT_EQ(tmp.operation, invalidation::DistributionOperation::Delete); + EXPECT_EQ(tmp.type, invalidation::DistributionType::Distribution); + EXPECT_EQ(tmp.srcRegion, "oregon"); } TEST(DistributionRouteTest, broadcastDeleteEnabledRpc) { @@ -298,26 +297,7 @@ TEST(DistributionRouteTest, broadcastDeleteEnabledRpc) { auto mockSrHandle = get_route_handles(srHandleVec)[0]; auto axonCtx = std::make_shared(); - struct Tmp { - uint64_t bucketId; - std::string region; - std::string pool; - std::string serialized; - }; - auto tmp = Tmp{}; - axonCtx->writeProxyFn = [&](auto bucketId, auto&& payload) { - tmp.bucketId = bucketId; - if (payload.find(invalidation::kRegion) != payload.end()) { - tmp.region = payload.at(invalidation::kRegion); - } - if (payload.find(invalidation::kPool) != payload.end()) { - tmp.pool = payload.at(invalidation::kPool); - } - if (payload.find(invalidation::kSerialized) != payload.end()) { - tmp.serialized = payload.at(invalidation::kSerialized); - } - return true; - }; + auto tmp = setupAxonFn(axonCtx); axonCtx->allDelete = false; axonCtx->fallbackAsynclog = false; axonCtx->poolFilter = "testPool"; @@ -325,7 +305,8 @@ TEST(DistributionRouteTest, broadcastDeleteEnabledRpc) { constexpr folly::StringPiece kDistributionRouteConfig = R"( { "distributed_delete_rpc_enabled": true, - "replay": false + "replay": false, + "distribution_source_region": "oregon" } )"; @@ -362,33 +343,15 @@ TEST(DistributionRouteTest, broadcastSpooledDelete) { auto mockSrHandle = get_route_handles(srHandleVec)[0]; auto axonCtx = std::make_shared(); - struct Tmp { - uint64_t bucketId; - std::string region; - std::string pool; - std::string serialized; - }; - auto tmp = Tmp{}; - axonCtx->writeProxyFn = [&](auto bucketId, auto&& payload) { - tmp.bucketId = bucketId; - if (payload.find(invalidation::kRegion) != payload.end()) { - tmp.region = payload.at(invalidation::kRegion); - } - if (payload.find(invalidation::kPool) != payload.end()) { - tmp.pool = payload.at(invalidation::kPool); - } - if (payload.find(invalidation::kSerialized) != payload.end()) { - tmp.serialized = payload.at(invalidation::kSerialized); - } - return true; - }; + auto tmp = setupAxonFn(axonCtx); axonCtx->fallbackAsynclog = false; axonCtx->poolFilter = "testPool"; constexpr folly::StringPiece kDistributionRouteConfig = R"( { "distributed_delete_rpc_enabled": false, - "replay": true + "replay": true, + "distribution_source_region": "oregon" } )"; @@ -417,6 +380,9 @@ TEST(DistributionRouteTest, broadcastSpooledDelete) { static_cast( req.attributes_ref()->find(memcache::kMcDeleteReqAttrSource)->second), memcache::McDeleteRequestSource::CROSS_REGION_BROADCAST_INVALIDATION); + EXPECT_EQ(tmp.operation, invalidation::DistributionOperation::Delete); + EXPECT_EQ(tmp.type, invalidation::DistributionType::Distribution); + EXPECT_EQ(tmp.srcRegion, "oregon"); } TEST(DistributionRouteTest, crossRegionDirectedSpooledDelete) { @@ -427,32 +393,14 @@ TEST(DistributionRouteTest, crossRegionDirectedSpooledDelete) { auto mockSrHandle = get_route_handles(srHandleVec)[0]; auto axonCtx = std::make_shared(); - struct Tmp { - uint64_t bucketId; - std::string region; - std::string pool; - std::string serialized; - }; - auto tmp = Tmp{}; - axonCtx->writeProxyFn = [&](auto bucketId, auto&& payload) { - tmp.bucketId = bucketId; - if (payload.find(invalidation::kRegion) != payload.end()) { - tmp.region = payload.at(invalidation::kRegion); - } - if (payload.find(invalidation::kPool) != payload.end()) { - tmp.pool = payload.at(invalidation::kPool); - } - if (payload.find(invalidation::kSerialized) != payload.end()) { - tmp.serialized = payload.at(invalidation::kSerialized); - } - return true; - }; + auto tmp = setupAxonFn(axonCtx); axonCtx->fallbackAsynclog = false; axonCtx->poolFilter = "testPool"; constexpr folly::StringPiece kDistributionRouteConfig = R"( { "distributed_delete_rpc_enabled": false, - "replay": true + "replay": true, + "distribution_source_region": "oregon" } )"; @@ -481,6 +429,106 @@ TEST(DistributionRouteTest, crossRegionDirectedSpooledDelete) { static_cast( req.attributes_ref()->find(memcache::kMcDeleteReqAttrSource)->second), memcache::McDeleteRequestSource::CROSS_REGION_DIRECTED_INVALIDATION); + EXPECT_EQ(tmp.operation, invalidation::DistributionOperation::Delete); + EXPECT_EQ(tmp.type, invalidation::DistributionType::Distribution); + EXPECT_EQ(tmp.srcRegion, "oregon"); +} + +TEST(DistributionRouteTest, crossRegionSet) { + std::vector> srHandleVec{ + std::make_shared( + GetRouteTestData(carbon::Result::FOUND, "a")), + }; + auto mockSrHandle = get_route_handles(srHandleVec)[0]; + + auto axonCtx = std::make_shared(); + auto tmp = setupAxonFn(axonCtx); + axonCtx->fallbackAsynclog = false; + axonCtx->poolFilter = "testPool"; + + constexpr folly::StringPiece kDistributionRouteConfig = R"( + { + "distributed_delete_rpc_enabled": true, + "replay": false, + "distribution_source_region": "oregon" + } + )"; + + auto rh = makeDistributionRoute( + mockSrHandle, folly::parseJson(kDistributionRouteConfig)); + + mockFiberContext(); + fiber_local::runWithLocals([&]() { + fiber_local::setAxonCtx(axonCtx); + fiber_local::setBucketId(1234); + fiber_local::setDistributionTargetRegion("georgia"); + auto req = McSetRequest("/georgia/default/test1"); + req.value_ref() = folly::IOBuf(folly::IOBuf::COPY_BUFFER, "value"); + rh->route(req); + }); + + // the key is NOT routed to RPC: + EXPECT_TRUE(srHandleVec[0]->saw_keys.empty()); + // spooled to axon: + EXPECT_EQ(tmp.bucketId, 1234); + EXPECT_EQ(tmp.region, "georgia"); + EXPECT_EQ(tmp.pool, "testPool"); + auto req2 = apache::thrift::CompactSerializer::deserialize( + tmp.serialized); + EXPECT_EQ(req2.key_ref()->fullKey().str(), "test1"); + EXPECT_EQ(tmp.operation, invalidation::DistributionOperation::Write); + EXPECT_EQ(tmp.type, invalidation::DistributionType::Distribution); + EXPECT_EQ(tmp.srcRegion, "oregon"); +} + +TEST(DistributionRouteTest, broadcastSet) { + std::vector> srHandleVec{ + std::make_shared( + GetRouteTestData(carbon::Result::FOUND, "a")), + }; + auto mockSrHandle = get_route_handles(srHandleVec)[0]; + + auto axonCtx = std::make_shared(); + auto tmp = setupAxonFn(axonCtx); + axonCtx->fallbackAsynclog = false; + axonCtx->poolFilter = "testPool"; + + constexpr folly::StringPiece kDistributionRouteConfig = R"( + { + "distributed_delete_rpc_enabled": true, + "replay": false, + "distribution_source_region": "oregon" + } + )"; + + auto rh = makeDistributionRoute( + mockSrHandle, folly::parseJson(kDistributionRouteConfig)); + TestFiberManager fm; + fm.runAll({[&]() { + mockFiberContext(); + fiber_local::runWithLocals([&]() { + fiber_local::setAxonCtx(axonCtx); + fiber_local::setBucketId(1234); + fiber_local::setDistributionTargetRegion(""); + auto req = McSetRequest("/*/*/test1"); + req.value_ref() = folly::IOBuf(folly::IOBuf::COPY_BUFFER, "value"); + rh->route(req); + }); + }}); + // the key is routed to RPC: + EXPECT_FALSE(srHandleVec[0]->saw_keys.empty()); + EXPECT_EQ(srHandleVec[0]->saw_keys[0], "/*/*/test1"); + EXPECT_EQ("set", srHandleVec[0]->sawOperations[0]); + // spooled to axon: + EXPECT_EQ(tmp.bucketId, 1234); + EXPECT_EQ(tmp.region, "DistributionRoute"); + EXPECT_EQ(tmp.pool, "testPool"); + auto req = apache::thrift::CompactSerializer::deserialize( + tmp.serialized); + EXPECT_EQ(req.key_ref()->fullKey(), "test1"); + EXPECT_EQ(tmp.operation, invalidation::DistributionOperation::Write); + EXPECT_EQ(tmp.type, invalidation::DistributionType::Distribution); + EXPECT_EQ(tmp.srcRegion, "oregon"); } } // namespace facebook::memcache::mcrouter diff --git a/mcrouter/stat_list.h b/mcrouter/stat_list.h index a94158a4a..bbc0543ad 100644 --- a/mcrouter/stat_list.h +++ b/mcrouter/stat_list.h @@ -162,6 +162,9 @@ STUI(proxy_queue_full, 0, 1) STUI(proxy_queues_all_full, 0, 1) // distribution stats STUI(distribution_axon_write_success, 0, 1) +STUI(distribution_set_axon_write_success, 0, 1) +STUI(distribution_set_axon_write_fail, 0, 1) +STUI(distribution_set_local_region_write, 0, 1) STUI(distribution_axon_write_failed, 0, 1) STUI(distribution_async_spool_failed, 0, 1) STUI(distribution_replay_no_source, 0, 1)