Skip to content

Commit

Permalink
feat: Optimise replicator commit logic (#82)
Browse files Browse the repository at this point in the history
* fix: Compute commitable offset before pruning.

* fix: Offset commits are too frequent.

* fix: Commit offset in the background.
  • Loading branch information
fmarek-kindred authored Sep 25, 2023
1 parent 29cee52 commit d6eb1fe
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 13 deletions.
2 changes: 2 additions & 0 deletions packages/talos_certifier/src/ports/message.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use async_trait::async_trait;
use std::collections::HashMap;
use tokio::task::JoinHandle;

use crate::errors::SystemServiceError;

Expand All @@ -14,6 +15,7 @@ pub trait MessageReciever: SharedPortTraits {
async fn consume_message(&mut self) -> Result<Option<Self::Message>, MessageReceiverError>;
async fn subscribe(&self) -> Result<(), SystemServiceError>;
async fn commit(&self) -> Result<(), SystemServiceError>;
fn commit_async(&self) -> Option<JoinHandle<Result<(), SystemServiceError>>>;
async fn update_savepoint(&mut self, offset: i64) -> Result<(), SystemServiceError>;
async fn unsubscribe(&self);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{

use async_trait::async_trait;
use log::{error, info, warn};
use tokio::sync::mpsc;
use tokio::{sync::mpsc, time::Interval};

use crate::{
core::{ServiceResult, System, SystemService},
Expand All @@ -19,6 +19,7 @@ pub struct MessageReceiverService {
pub message_channel_tx: mpsc::Sender<ChannelMessage>,
pub commit_offset: Arc<AtomicI64>,
pub system: System,
pub commit_interval: Interval,
}

impl MessageReceiverService {
Expand All @@ -33,6 +34,7 @@ impl MessageReceiverService {
message_channel_tx,
system,
commit_offset,
commit_interval: tokio::time::interval(Duration::from_millis(10_000)),
}
}

Expand All @@ -45,7 +47,6 @@ impl MessageReceiverService {
#[async_trait]
impl SystemService for MessageReceiverService {
async fn run(&mut self) -> ServiceResult {
let mut interval = tokio::time::interval(Duration::from_millis(10_000));
tokio::select! {
// ** Consume Messages from Kafka
res = self.receiver.consume_message() => {
Expand Down Expand Up @@ -81,10 +82,10 @@ impl SystemService for MessageReceiverService {
}
}
//** commit message
_ = interval.tick() => {
_ = self.commit_interval.tick() => {
let offset = self.commit_offset.load(std::sync::atomic::Ordering::Relaxed);
self.receiver.update_savepoint(offset).await?;
self.receiver.commit().await?;
self.receiver.commit_async();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::sync::{atomic::AtomicI64, Arc};

use async_trait::async_trait;
use tokio::sync::{broadcast, mpsc};
use tokio::{
sync::{broadcast, mpsc},
task::JoinHandle,
};

use crate::{
core::{System, SystemService},
Expand Down Expand Up @@ -41,6 +44,9 @@ impl MessageReciever for MockReciever {
async fn commit(&self) -> Result<(), SystemServiceError> {
Ok(())
}
fn commit_async(&self) -> Option<JoinHandle<Result<(), SystemServiceError>>> {
None
}
async fn update_savepoint(&mut self, _version: i64) -> Result<(), SystemServiceError> {
Ok(())
}
Expand Down
30 changes: 27 additions & 3 deletions packages/talos_certifier_adapters/src/kafka/consumer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::time::Duration;
use std::{sync::Arc, time::Duration};

use async_trait::async_trait;
use log::debug;
Expand All @@ -19,6 +19,7 @@ use talos_certifier::{
};
use talos_rdkafka_utils::kafka_config::KafkaConfig;
use time::OffsetDateTime;
use tokio::task::JoinHandle;

use crate::{kafka::utils::get_message_headers, KafkaAdapterError};

Expand All @@ -27,7 +28,7 @@ use super::utils;
// Kafka Consumer Client
// #[derive(Debug, Clone)]
pub struct KafkaConsumer {
pub consumer: StreamConsumer<DefaultConsumerContext>,
pub consumer: Arc<StreamConsumer<DefaultConsumerContext>>,
pub topic: String,
pub tpl: TopicPartitionList,
}
Expand All @@ -38,7 +39,7 @@ impl KafkaConsumer {

let topic = config.topic.clone();
Self {
consumer,
consumer: Arc::new(consumer),
topic,
tpl: TopicPartitionList::new(),
}
Expand Down Expand Up @@ -182,6 +183,29 @@ impl MessageReciever for KafkaConsumer {
}
Ok(())
}

fn commit_async(&self) -> Option<JoinHandle<Result<(), SystemServiceError>>> {
if self.tpl.count() > 0 {
let consumer_copy = Arc::clone(&self.consumer);
let tpl = self.tpl.clone();
let handle = tokio::task::spawn(async move {
consumer_copy.commit(&tpl, rdkafka::consumer::CommitMode::Async).map_err(|err| {
MessageReceiverError {
kind: MessageReceiverErrorKind::CommitError,
version: None,
reason: err.to_string(),
data: None,
}
.into()
})
});

Some(handle)
} else {
None
}
}

async fn update_savepoint(&mut self, offset: i64) -> Result<(), SystemServiceError> {
// let partition = self.tpl.;
let tpl = self.tpl.elements_for_topic(&self.topic);
Expand Down
16 changes: 12 additions & 4 deletions packages/talos_cohort_replicator/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ where
pub receiver: M,
pub suffix: S,
pub last_installing: u64,
pub next_commit_offset: Option<u64>,
_phantom: PhantomData<T>,
}

Expand All @@ -95,6 +96,7 @@ where
receiver,
suffix,
last_installing: 0,
next_commit_offset: None,
_phantom: PhantomData,
}
}
Expand Down Expand Up @@ -138,14 +140,20 @@ where
get_statemap_from_suffix_items(items.into_iter())
}

pub(crate) async fn commit_till_last_installed(&mut self) {
pub(crate) async fn prepare_offset_for_commit(&mut self) {
if self.last_installing > 0 {
if let Some(last_installed) = self.suffix.get_last_installed(Some(self.last_installing)) {
let version = last_installed.decision_ver.unwrap();
self.receiver.update_savepoint(version as i64).await.unwrap();

self.receiver.commit().await.unwrap();
self.next_commit_offset = Some(version);
}
}
}

pub(crate) async fn commit(&mut self) {
if let Some(version) = self.next_commit_offset {
self.receiver.update_savepoint(version as i64).await.unwrap();
self.receiver.commit_async();
self.next_commit_offset = None;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ where
\n ");
}

replicator.commit_till_last_installed().await;
replicator.commit().await;
}
// Receive feedback from installer.
res = replicator_rx.recv() => {
Expand All @@ -110,6 +110,7 @@ where

// Prune suffix and update suffix head.
if replicator.suffix.get_suffix_meta().prune_index >= replicator.suffix.get_suffix_meta().prune_start_threshold {
replicator.prepare_offset_for_commit().await;
replicator.suffix.prune_till_version(version).unwrap();
}
total_items_installed += 1;
Expand Down

0 comments on commit d6eb1fe

Please sign in to comment.