Skip to content

Commit

Permalink
Merge pull request Aiven-Open#877 from Aiven-Open/jjaakola-aiven-opti…
Browse files Browse the repository at this point in the history
…mize-after-startup-schema-reader-performance

fix: consume only one record at a time after startup
  • Loading branch information
eliax1996 committed May 14, 2024
2 parents 669493c + 85ef357 commit ef3a486
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 6 deletions.
18 changes: 13 additions & 5 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,16 @@
KAFKA_CLIENT_CREATION_TIMEOUT_SECONDS: Final = 2.0
SCHEMA_TOPIC_CREATION_TIMEOUT_SECONDS: Final = 5.0

# For good startup performance the consumption of multiple
# records for each consume round is essential.
# Consumer default is 1 message for each consume call and after
# startup the default is a good value. If consumer would expect
# more messages it would return control back after timeout and
# making schema storing latency to be `processing time + timeout`.
MAX_MESSAGES_TO_CONSUME_ON_STARTUP: Final = 1000
MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP: Final = 1
MESSAGE_CONSUME_TIMEOUT_SECONDS: Final = 0.2

# Metric names
METRIC_SCHEMA_TOPIC_RECORDS_PROCESSED_COUNT: Final = "karapace_schema_reader_records_processed"
METRIC_SCHEMA_TOPIC_RECORDS_PER_KEYMODE_GAUGE: Final = "karapace_schema_reader_records_per_keymode"
Expand Down Expand Up @@ -120,11 +130,8 @@ def __init__(
) -> None:
Thread.__init__(self, name="schema-reader")
self.master_coordinator = master_coordinator
self.timeout_s = 0.2
# Consumer default is 1 message for each consume call
# For good startup performance the consumption of multiple
# records for each consume round is essential.
self.max_messages_to_process = 1000
self.timeout_s = MESSAGE_CONSUME_TIMEOUT_SECONDS
self.max_messages_to_process = MAX_MESSAGES_TO_CONSUME_ON_STARTUP
self.config = config

self.database = database
Expand Down Expand Up @@ -301,6 +308,7 @@ def _is_ready(self) -> bool:
self.startup_previous_processed_offset = self.offset
ready = self.offset >= self._highest_offset
if ready:
self.max_messages_to_process = MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP
LOG.info("Ready in %s seconds", time.monotonic() - self.start_time)
return ready

Expand Down
32 changes: 31 additions & 1 deletion tests/unit/test_schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@
from karapace.config import DEFAULTS
from karapace.in_memory_database import InMemoryDatabase
from karapace.offset_watcher import OffsetWatcher
from karapace.schema_reader import KafkaSchemaReader, OFFSET_EMPTY, OFFSET_UNINITIALIZED
from karapace.schema_reader import (
KafkaSchemaReader,
MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP,
MAX_MESSAGES_TO_CONSUME_ON_STARTUP,
OFFSET_EMPTY,
OFFSET_UNINITIALIZED,
)
from tests.base_testcase import BaseTestCase
from unittest.mock import Mock

Expand Down Expand Up @@ -154,3 +160,27 @@ def test_readiness_check(testcase: ReadinessTestCase) -> None:

schema_reader.handle_messages()
assert schema_reader.ready is testcase.expected


def test_num_max_messages_to_consume_moved_to_one_after_ready() -> None:
key_formatter_mock = Mock()
consumer_mock = Mock()
consumer_mock.consume.return_value = []
# Return tuple (beginning, end), end offset is the next upcoming record offset
consumer_mock.get_watermark_offsets.return_value = (0, 1)

offset_watcher = OffsetWatcher()
schema_reader = KafkaSchemaReader(
config=DEFAULTS,
offset_watcher=offset_watcher,
key_formatter=key_formatter_mock,
master_coordinator=None,
database=InMemoryDatabase(),
)
schema_reader.consumer = consumer_mock
schema_reader.offset = 0
assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_ON_STARTUP

schema_reader.handle_messages()
assert schema_reader.ready is True
assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP

0 comments on commit ef3a486

Please sign in to comment.