Skip to content

Commit

Permalink
feat: enable cohort to pass on_commit messages for messenger (#94)
Browse files Browse the repository at this point in the history
  • Loading branch information
gk-kindred authored Nov 14, 2023
1 parent 4e540d9 commit 127d304
Show file tree
Hide file tree
Showing 16 changed files with 209 additions and 15 deletions.
29 changes: 23 additions & 6 deletions cohort_banking_initiator_js/src/banking-app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export class BankingApp {
private pond: Pond,
private database: Pool,
private queue: BroadcastChannel,
private onFinishListener: (appRef: BankingApp) => any) {}
private onFinishListener: (appRef: BankingApp) => any) { }

async init() {
this.initiator = await Initiator.init(sdkConfig)
Expand Down Expand Up @@ -133,6 +133,21 @@ export class BankingApp {
writeset: [tx.from, tx.to],
readvers: state.items.map(i => i.version),
statemaps: [{ "TRANSFER": tx }],
onCommit: {
publish: {
kafka: [
{
topic: "test.transfer.feedback.js",
value: {
"from_account": tx.from,
"to_account": tx.to,
"amount": tx.amount
}
},
]
}
}

},
snapshot: state.snapshotVersion,
timeoutMs: 0,
Expand All @@ -143,14 +158,16 @@ export class BankingApp {
let cnn: PoolClient
try {
cnn = await this.database.connect()
const result = await cnn.query({ name: "get-state", text:
`SELECT
const result = await cnn.query({
name: "get-state", text:
`SELECT
ba."number" as "id", ba."version" as "version", cs."version" AS snapshot_version
FROM
bank_accounts ba, cohort_snapshot cs
WHERE
ba."number" = $1 OR ba."number" = $2`,
values: [tx.from, tx.to] }
values: [tx.from, tx.to]
}
)

if (result.rowCount != 2) {
Expand All @@ -163,7 +180,7 @@ export class BankingApp {
} catch (e) {
// This print here is important, without it the original reason is lost when using NAPI 2.10.
logger.error("BankingApp.loadState(): %s", e)
throw new Error(`Unable to load state for tx: ${ JSON.stringify(tx) }. Reason: ${e.message}`, { cause: e })
throw new Error(`Unable to load state for tx: ${JSON.stringify(tx)}. Reason: ${e.message}`, { cause: e })
} finally {
cnn?.release()
}
Expand Down Expand Up @@ -219,7 +236,7 @@ export class BankingApp {
} catch (e) {
// This print here is important, without it the original reason is lost when using NAPI 2.10.
logger.error("BankingApp.installOutOfOrder(): %s", e)
throw new Error(`Unable to complete out of order installation of tx: ${ JSON.stringify(tx) }`, { cause: e })
throw new Error(`Unable to complete out of order installation of tx: ${JSON.stringify(tx)}`, { cause: e })
} finally {
cnn?.release()
}
Expand Down
32 changes: 29 additions & 3 deletions cohort_sdk_client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,31 @@ export interface JsCertificationCandidate {
writeset: Array<string>
readvers: Array<number>
statemaps?: Array<Record<string, any>>
onCommit?: JsCandidateOnCommitActions
}

export interface JsKafkaAction {
cluster?: string
/** Topic to publish the payload */
topic: string
/** Key encoding to be used. Defaults to `text/plain`. */
keyEncoding?: string
/** Key for the message to publish. */
key?: string
/** Optional if the message should be published to a specific partition. */
partition?: number
/** Optional headers while publishing. */
headers?: Record<string, string>
/** Key encoding to be used. Defaults to `application/json`. */
valueEncoding?: string
/** Payload to publish. */
value: any
}
export interface JsCandidateOnCommitPublishActions {
kafka: Array<JsKafkaAction>
}
export interface JsCandidateOnCommitActions {
publish?: JsCandidateOnCommitPublishActions
}
```

Expand All @@ -113,10 +138,11 @@ export interface JsCertificationCandidate {
Before SDK can issue a certification request to Talos Certifier it needs some details from you. You will have to query your local database to fetch the following:
1. Identifiers and version numbers of all objects involved in your transaction. These are known as `readset`, `writeset` and `readvers`.
2. The copy of your transaction as one serializable object. It makes sense to describe your transaction as JSON object and serialise it to string. This is known as `statemap`.
3. Any additional message to be published for candidate requests with committed decision outcome, can be added to `onCommit` field. Currently the SDK supports only publishing to **Kafka**.

Above mentioned reads, writes and statemap fields together are known as certification candidate details. You may ask whether statemap is optional? Indeed, as you are passing the arrow function to `fnOooInstaller` callback you have the context of your request. From the perspective of Initiator app, the answer is "yes, it is optional". However, the statemap will also be received by your Replicator app. Replicator may be implemented as a separate process. Replicator will know what needs to be updated in the database by reading statemap.
Above mentioned reads, writes and statemap fields together are known as certification candidate details. You may ask whether statemap is optional? Indeed, as you are passing the arrow function to `fnOooInstaller` callback you have the context of your request. From the perspective of Initiator app, the answer is "yes, it is optional". However, the statemap will also be received by your Replicator app. Replicator may be implemented as a separate process. Replicator will know what needs to be updated in the database by reading statemap.

Read about `statemap` in the end of this document. See section "About Statemap".
Read about `statemap` in the end of this document. See section [About Statemap](#about-statemap).

### About "JsCertificationRequestPayload"

Expand All @@ -134,7 +160,7 @@ Most likely you will want to retry your request. The SDK implements retry with i

## About "Out of Order Install Callback"

If your business transaction requires a certification from Talos, it is expected that you will not do any changes to objects taking part in your transaction (you will not update database records) until the decision is received from Talos. Only after certification decision is received you will proceed with business transaction. Typically, this is going to be some database update, for example, you will update balances of relevant bank accounts, hence "transfer" money between them. This step is done inside "Out of Order Install Callback". SDK will invoke this callback only when Talos approved your transaction, in other words, when Talos checks that there are no conflicting requests to update your objects.
If your business transaction requires a certification from Talos, it is expected that you will not do any changes to objects taking part in your transaction (you will not update database records) until the decision is received from Talos. Only after certification decision is received you will proceed with business transaction. Typically, this is going to be some database update, for example, you will update balances of relevant bank accounts, hence "transfer" money between them. This step is done inside "Out of Order Install Callback". SDK will invoke this callback only when Talos approved your transaction, in other words, when Talos checks that there are no conflicting requests to update your objects.

<em>What is the benefit of having out of order callback if its responsibility overlaps with "Statemap Installer Callback" found in Replicator?
You may wish not to implement this callback and rely on Replicator to do all your DB changes. Just keep in mind that Replicator will do it "later". How much later will depends on the overall load on the replicator and other dependent transactions which are still in-flight. If you did not implement out of order callback then it is possible to finish the call to `let response = await initiator.certify(...)`, have "go ahead" decision from Talos in the response variable, but your DB will not see this change. If, at that point, you returned response to user via HTTP and user went to query DB via another endpoint, it could be that user will not see the change yet (Replicator may still be processing the backlog of other transactions). On the other hand, with out of order callback in place, once the call to `let response = await initiator.certify(...)` finished, your DB is already updated and you may rely on that change in your following logic.
Expand Down
1 change: 1 addition & 0 deletions examples/agent_client/examples/agent_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ impl Generator<CertificationRequest> for RequestGenerator {
snapshot: 5,
writeset: Vec::from(["3".to_string()]),
statemap: None,
on_commit: None,
};

CertificationRequest {
Expand Down
29 changes: 27 additions & 2 deletions packages/cohort_banking/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,17 @@ use banking_common::{
};
use cohort_sdk::{
cohort::Cohort,
model::{ClientErrorKind, Config},
model::{
callback::{CandidateOnCommitActions, CandidateOnCommitPublishActions, KafkaAction},
ClientErrorKind, Config,
},
};

use opentelemetry_api::{
global,
metrics::{Counter, Unit},
};
use serde_json::json;
use talos_agent::messaging::api::Decision;

use crate::{
Expand Down Expand Up @@ -64,19 +68,39 @@ impl BankingApp {
#[async_trait]
impl Handler<TransferRequest> for BankingApp {
async fn handle(&self, request: TransferRequest) -> Result<(), String> {
log::debug!("processig new banking transfer request: {:?}", request);
log::debug!("processing new banking transfer request: {:?}", request);

let statemap = vec![HashMap::from([(
BusinessActionType::TRANSFER.to_string(),
TransferRequest::new(request.from.clone(), request.to.clone(), request.amount).json(),
)])];

let on_commit_action = CandidateOnCommitActions {
publish: Some(CandidateOnCommitPublishActions {
kafka: vec![KafkaAction {
headers: None,
key: None,
partition: None,
topic: "test.transfer.feedback".to_string(),
value: json!({
"from_account": request.from,
"to_account": request.to,
"amount": request.amount
}),
cluster: Default::default(),
key_encoding: Default::default(),
value_encoding: Default::default(),
}],
}),
};

let certification_request = CertificationRequest {
timeout_ms: 0,
candidate: CandidateData {
readset: vec![request.from.clone(), request.to.clone()],
writeset: vec![request.from, request.to],
statemap: Some(statemap),
on_commit: Some(on_commit_action),
},
};

Expand All @@ -85,6 +109,7 @@ impl Handler<TransferRequest> for BankingApp {
database: Arc::clone(&self.database),
single_query_strategy,
};

let request_payload_callback = || state_provider.get_certification_candidate(certification_request.clone());

let oo_inst = OutOfOrderInstallerImpl {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ impl CertificationCandidateProviderImpl {
writeset: request.candidate.writeset,
statemaps: request.candidate.statemap,
readvers: state.items.into_iter().map(|x| x.version).collect(),
on_commit: request.candidate.on_commit,
};

Ok(CertificationCandidateCallbackResponse::Proceed(CertificationRequestPayload {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ impl QueueProcessor {
threads: u64,
item_handler: Arc<H>,
) -> Vec<JoinHandle<()>> {
let item_handler = Arc::new(item_handler);
let mut tasks = Vec::<JoinHandle<()>>::new();

for thread_number in 1..=threads {
Expand Down
4 changes: 3 additions & 1 deletion packages/cohort_banking/src/model/requests.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use cohort_sdk::model::callback::CandidateOnCommitActions;
use serde_json::Value;
use std::collections::HashMap;

Expand All @@ -6,7 +7,8 @@ pub struct CandidateData {
pub readset: Vec<String>,
pub writeset: Vec<String>,
pub statemap: Option<Vec<HashMap<String, Value>>>,
// The "snapshot" is intentionally messing here. We will compute it ourselves before feeding this data to Talos
// The "snapshot" is intentionally missing here. We will compute it ourselves before feeding this data to Talos
pub on_commit: Option<CandidateOnCommitActions>,
}

#[derive(Clone)]
Expand Down
7 changes: 7 additions & 0 deletions packages/cohort_sdk/src/cohort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use opentelemetry_api::{
global,
metrics::{Counter, Histogram, Unit},
};
use serde_json::Value;
use talos_agent::{
agent::{
core::{AgentServices, TalosAgentImpl},
Expand Down Expand Up @@ -448,6 +449,11 @@ impl Cohort {
let (snapshot, readvers) = Self::select_snapshot_and_readvers(request.snapshot, request.candidate.readvers);

let xid = uuid::Uuid::new_v4().to_string();
let on_commit: Option<Box<Value>> = match request.candidate.on_commit {
Some(value) => serde_json::to_value(value).ok().map(|x| x.into()),
None => None,
};

let agent_request = CertificationRequest {
message_key: xid.clone(),
candidate: CandidateData {
Expand All @@ -457,6 +463,7 @@ impl Cohort {
writeset: request.candidate.writeset,
readvers,
snapshot,
on_commit,
},
timeout: if request.timeout_ms > 0 {
Some(Duration::from_millis(request.timeout_ms))
Expand Down
43 changes: 43 additions & 0 deletions packages/cohort_sdk/src/model/callback.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::HashMap;

use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use serde_json::Value;

#[derive(Debug, PartialEq)]
Expand All @@ -16,12 +17,54 @@ pub struct CertificationRequestPayload {
pub timeout_ms: u64,
}

fn default_text_plain_encoding() -> String {
"text/plain".to_string()
}

fn default_application_json_encoding() -> String {
"application/json".to_string()
}

#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct KafkaAction {
#[serde(default)]
pub cluster: String,
/// Topic to publish the payload
pub topic: String,
/// Key encoding to be used. Defaults to `text/plain`.
#[serde(default = "default_text_plain_encoding")]
pub key_encoding: String,
/// Key for the message to publish.
pub key: Option<String>,
/// Optional if the message should be published to a specific partition.
pub partition: Option<i32>,
/// Optional headers while publishing.
pub headers: Option<HashMap<String, String>>,
/// Key encoding to be used. Defaults to `application/json`.
#[serde(default = "default_application_json_encoding")]
pub value_encoding: String,
/// Payload to publish.
pub value: serde_json::Value,
}

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
pub struct CandidateOnCommitPublishActions {
pub kafka: Vec<KafkaAction>,
}

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
pub struct CandidateOnCommitActions {
pub publish: Option<CandidateOnCommitPublishActions>,
}

#[derive(Debug, Clone, PartialEq)]
pub struct CertificationCandidate {
pub readset: Vec<String>,
pub writeset: Vec<String>,
pub readvers: Vec<u64>,
pub statemaps: Option<Vec<HashMap<String, Value>>>,
pub on_commit: Option<CandidateOnCommitActions>,
}

#[derive(Debug, Clone)]
Expand Down
Loading

0 comments on commit 127d304

Please sign in to comment.