Skip to content

Commit

Permalink
grpc max message length update
Browse files Browse the repository at this point in the history
There is a message limit in gRPC. The default value is about 4MB
defined in GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH in grpc_types.h from
grpc source code.

This limitation creates ExecuteBatchError or returns an error status
code such as StatusCode.RESOURCE_EXHAUSTED.

In order to mitigate the issue, 'grpc.max_send_message_length' and
'grpc.max_receive_message_length' in both client and server are
increased to 1GB.
  • Loading branch information
myungjin committed Sep 13, 2022
1 parent c6cc492 commit 8ef4ab7
Showing 1 changed file with 14 additions and 5 deletions.
19 changes: 14 additions & 5 deletions lib/python/flame/backend/p2p.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
QUEUE_WAIT_TIME = 10 # 10 second
EXTRA_WAIT_TIME = QUEUE_WAIT_TIME / 2

GRPC_MAX_MESSAGE_LENGTH = 1073741824 # 1GB


class BackendServicer(msg_pb2_grpc.BackendRouteServicer):
"""Implements functionallity of backend route server."""
Expand Down Expand Up @@ -127,7 +129,10 @@ async def _init_loop_stuff():
self._initialized = True

async def _setup_server(self):
server = grpc.aio.server()
server = grpc.aio.server(options=[('grpc.max_send_message_length',
GRPC_MAX_MESSAGE_LENGTH),
('grpc.max_receive_message_length',
GRPC_MAX_MESSAGE_LENGTH)])
msg_pb2_grpc.add_BackendRouteServicer_to_server(
BackendServicer(self), server)

Expand Down Expand Up @@ -219,7 +224,11 @@ async def _register_channel(self, channel) -> None:
await asyncio.sleep(HEART_BEAT_DURATION)

async def _connect_and_notify(self, endpoint: str, ch_name: str) -> None:
grpc_ch = grpc.aio.insecure_channel(endpoint)
grpc_ch = grpc.aio.insecure_channel(
endpoint,
options=[('grpc.max_send_message_length', GRPC_MAX_MESSAGE_LENGTH),
('grpc.max_receive_message_length',
GRPC_MAX_MESSAGE_LENGTH)])
stub = msg_pb2_grpc.BackendRouteStub(grpc_ch)

await self.notify(ch_name, msg_pb2.NotifyType.JOIN, stub, grpc_ch)
Expand Down Expand Up @@ -360,7 +369,7 @@ async def _broadcast_task(self, channel):
ex_name = type(ex).__name__
logger.debug(f"An exception of type {ex_name} occurred")

self._cleanup_end(end_id)
await self._cleanup_end(end_id)
txq.task_done()

async def _unicast_task(self, channel, end_id):
Expand Down Expand Up @@ -400,7 +409,7 @@ def heart_beat():
ex_name = type(ex).__name__
logger.debug(f"An exception of type {ex_name} occurred")

self._cleanup_end(end_id)
await self._cleanup_end(end_id)
txq.task_done()
# This break ends a tx_task for end_id
break
Expand Down Expand Up @@ -467,7 +476,7 @@ async def _rx_task(self, end_id: str, reader) -> None:

# grpc channel is unavailable
# so, clean up an entry for end_id from _endpoints dict
self._cleanup_end(end_id)
await self._cleanup_end(end_id)

logger.debug(f"cleaned up {end_id} info from _endpoints")

Expand Down

0 comments on commit 8ef4ab7

Please sign in to comment.