Skip to content

Commit

Permalink
[#18771, #21352] docdb: Fix LightweightMessage max size when parsing
Browse files Browse the repository at this point in the history
Summary:
LightweightMessage currently sets a maximum size for reading to `rpc_max_message_size`
(default 255 MB), but Preparer batches based on `protobuf_message_total_bytes_limit` (default
511 MB). This results in cases where under default settings, we may have protobufs between 255 MB
and 511 MB, which `LightweightMessage::ParseFromSlice` is unable to read.

This diff changes the limit to use `protobuf_message_total_bytes_limit`, so that protobufs between
`rpc_max_message_size` and `protobuf_message_total_bytes_limit` can be parsed properly.

This addresses errors such as:
```
Found a corruption in a closed log segment: OK
Error: Corruption (yb/consensus/log_util.cc:965): Log file corruption detected.: Failed to parse PB at offset: 26013423, length: 303149529. Cause: Corruption (yb/rpc/lightweight_message.cc:376): Failed to parse ‘entry’: Failed trying to read batch #5 at offset 26013423 for log segment /mnt/d0/yb-data/tserver/wals/table-12345678901234567890123456789012/tablet-12345678901234567890123456789012/wal-000003000: ...
```
(length larger than 255 MB) when such protobufs are written to WALs.

This also fixes cause of flakiness for TabletPeerTest.MaxRaftBatchProtobufLimit in TSAN builds.

Jira: DB-7654, DB-10251

Test Plan:
Jenkins.

Added test:
```
yb_build.sh --cxx-test rpc_lwproto-test --gtest_filter LWProtoTest.BigMessage
```

Also ran TabletPeerTest.MaxRaftBatchProtobufLimit 100x on Jenkins.

Reviewers: sergei, qhu

Reviewed By: qhu

Subscribers: yyan, bogdan, rthallam, ybase

Differential Revision: https://phorge.dev.yugabyte.com/D33041
  • Loading branch information
es1024 committed Sep 4, 2024
1 parent 7b1f22a commit 788434a
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 9 deletions.
5 changes: 3 additions & 2 deletions src/yb/rpc/lightweight_message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ DEFINE_UNKNOWN_uint64(rpc_max_message_size, 255_MB,
"The maximum size of a message of any RPC that the server will accept. The sum of "
"consensus_max_batch_size_bytes and 1KB should be less than rpc_max_message_size");

DECLARE_int32(protobuf_message_total_bytes_limit);

using google::protobuf::internal::WireFormatLite;
using google::protobuf::io::CodedOutputStream;

Expand Down Expand Up @@ -378,8 +380,7 @@ Status ParseFailed(const char* field_name) {
}

void SetupLimit(google::protobuf::io::CodedInputStream* in) {
in->SetTotalBytesLimit(narrow_cast<int>(FLAGS_rpc_max_message_size),
narrow_cast<int>(FLAGS_rpc_max_message_size * 3 / 4));
in->SetTotalBytesLimit(FLAGS_protobuf_message_total_bytes_limit, 0 /* unused */);
}

ThreadSafeArena& empty_arena() {
Expand Down
54 changes: 47 additions & 7 deletions src/yb/rpc/lwproto-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,33 @@

#include "yb/util/faststring.h"
#include "yb/util/logging.h"
#include "yb/util/random_util.h"
#include "yb/util/size_literals.h"
#include "yb/util/test_macros.h"

DECLARE_int32(protobuf_message_total_bytes_limit);
DECLARE_uint64(rpc_max_message_size);

namespace yb {
namespace rpc {

namespace {

template <typename PB>
Status SerializePB(PB& pb, faststring& buf) {
LOG(INFO) << "Source proto: " << pb.ShortDebugString();

AnyMessageConstPtr ptr(&pb);
buf.resize(ptr.SerializedSize());
RETURN_NOT_OK(ptr.SerializeToArray(buf.data()));

LOG(INFO) << "Binary dump: " << Slice(buf).ToDebugHexString();

return Status::OK();
}

} // namespace

// Make sure LW protobuf skips unknown fields.
TEST(LWProtoTest, SkipsUnknownFields) {
rpc_test::TestObjectPB pb;
Expand All @@ -37,13 +59,7 @@ TEST(LWProtoTest, SkipsUnknownFields) {
pb.set_int32_2(15);
pb.mutable_record2()->set_text("record2");

LOG(INFO) << "Source proto: " << pb.ShortDebugString();

AnyMessageConstPtr ptr(&pb);
buf.resize(ptr.SerializedSize());
ASSERT_OK(ptr.SerializeToArray(buf.data()));

LOG(INFO) << "Binary dump: " << Slice(buf).ToDebugHexString();
ASSERT_OK(SerializePB(pb, buf));
}

{
Expand Down Expand Up @@ -82,5 +98,29 @@ TEST(LWProtoTest, SkipsUnknownFields) {
}
}

// Test a very large proto (rpc_max_message_size < proto size < protobuf_message_total_bytes_limit).
TEST(LWProtoTest, BigMessage) {
faststring buf;
rpc_test::TestObjectPB pb;

ANNOTATE_UNPROTECTED_WRITE(FLAGS_rpc_max_message_size) = 4_MB;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_protobuf_message_total_bytes_limit) = 8_MB;

constexpr auto kPBSize = 6_MB;

pb.set_string1(RandomHumanReadableString(kPBSize));
ASSERT_OK(SerializePB(pb, buf));

ThreadSafeArena arena;
rpc_test::LWTestObjectPBv2 lwpb2(&arena);
AnyMessagePtr ptr(&lwpb2);

ASSERT_OK(ptr.ParseFromSlice(Slice(buf)));
LOG(INFO) << "Read lightweight proto: " << lwpb2.ShortDebugString();

ASSERT_TRUE(lwpb2.has_string1());
ASSERT_EQ(pb.string1(), lwpb2.string1());
}

} // namespace rpc
} // namespace yb

0 comments on commit 788434a

Please sign in to comment.