Skip to content

Commit

Permalink
sov-cli can submit transactions to sov-sequencer and can trigger …
Browse files Browse the repository at this point in the history
…publishing batch (#446)
  • Loading branch information
citizen-stig authored Jul 10, 2023
1 parent 698269d commit e542d7e
Show file tree
Hide file tree
Showing 20 changed files with 541 additions and 67 deletions.
2 changes: 2 additions & 0 deletions adapters/celestia/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ edition = { workspace = true }

[dependencies]
borsh = { workspace = true, features = ["bytes"] }
bech32 = { workspace = true }
prost = "0.11"
prost-types = "0.11"
tendermint = "0.32"
Expand All @@ -33,6 +34,7 @@ nmt-rs = { git = "https://github.com/Sovereign-Labs/nmt-rs.git", rev = "dd375884
[dev-dependencies]
postcard = { version = "1", features = ["use-std"] }
proptest = { version = "1.2" }
wiremock = "0.5"

[build-dependencies]
prost-build = { version = "0.11" }
Expand Down
301 changes: 281 additions & 20 deletions adapters/celestia/src/da_service.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,32 @@
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::str::FromStr;
use std::time::Duration;

use jsonrpsee::core::client::ClientT;
use jsonrpsee::core::params::ArrayParams;
use jsonrpsee::http_client::{HeaderMap, HttpClient};
use nmt_rs::NamespaceId;
use sov_rollup_interface::da::CountedBufReader;
use sov_rollup_interface::services::da::DaService;
use tracing::{debug, info, span, Level};

// 0x736f762d74657374 = b"sov-test"
// For testing, use this NamespaceId (b"sov-test"):
// pub const ROLLUP_NAMESPACE: NamespaceId = NamespaceId([115, 111, 118, 45, 116, 101, 115, 116]);
use crate::share_commit::recreate_commitment;
use crate::shares::{Blob, NamespaceGroup, Share};
use crate::types::{ExtendedDataSquare, FilteredCelestiaBlock, Row, RpcNamespacedSharesResponse};
use crate::utils::BoxError;
use crate::verifier::address::CelestiaAddress;
use crate::verifier::proofs::{CompletenessProof, CorrectnessProof};
use crate::verifier::{CelestiaSpec, RollupParams, PFB_NAMESPACE};
use crate::{
parse_pfb_namespace,
share_commit::recreate_commitment,
shares::{Blob, NamespaceGroup, Share},
types::{ExtendedDataSquare, FilteredCelestiaBlock, Row, RpcNamespacedSharesResponse},
utils::BoxError,
verifier::{
address::CelestiaAddress,
proofs::{CompletenessProof, CorrectnessProof},
CelestiaSpec, RollupParams, PFB_NAMESPACE,
},
BlobWithSender, CelestiaHeader, CelestiaHeaderResponse, DataAvailabilityHeader,
parse_pfb_namespace, BlobWithSender, CelestiaHeader, CelestiaHeaderResponse,
DataAvailabilityHeader,
};

// Approximate value, just to make it work.
const GAS_PER_BYTE: usize = 120;

#[derive(Debug, Clone)]
pub struct CelestiaService {
client: HttpClient,
Expand Down Expand Up @@ -106,6 +107,10 @@ fn default_max_response_size() -> u32 {
1024 * 1024 * 100 // 100 MB
}

fn default_request_timeout_ms() -> u64 {
30_000
}

impl DaService for CelestiaService {
type RuntimeConfig = DaServiceConfig;

Expand All @@ -126,9 +131,12 @@ impl DaService for CelestiaService {
.parse()
.unwrap(),
);

jsonrpsee::http_client::HttpClientBuilder::default()
.set_headers(headers)
.max_request_body_size(config.max_celestia_response_body_size) // 100 MB
// TODO: Should be replaced with config option https://github.com/Sovereign-Labs/sovereign-sdk/issues/478
.request_timeout(Duration::from_millis(default_request_timeout_ms()))
.max_request_body_size(config.max_celestia_response_body_size)
.build(&config.celestia_rpc_address)
}
.expect("Client initialization is valid");
Expand Down Expand Up @@ -177,7 +185,7 @@ impl DaService for CelestiaService {
)
.await?;

debug!("Decoding pfb protofbufs...");
debug!("Decoding pfb protobufs...");
// Parse out the pfds and store them for later retrieval
let pfds = parse_pfb_namespace(tx_data)?;
let mut pfd_map = HashMap::new();
Expand Down Expand Up @@ -226,7 +234,7 @@ impl DaService for CelestiaService {

let blob_tx = BlobWithSender {
blob: CountedBufReader::new(blob.into_iter()),
sender: CelestiaAddress(sender.as_bytes().to_vec()),
sender: CelestiaAddress::from_str(&sender).expect("Incorrect sender address"),
hash: commitment,
};

Expand Down Expand Up @@ -254,17 +262,57 @@ impl DaService for CelestiaService {
// https://node-rpc-docs.celestia.org/
let client = self.client.clone();
info!("Sending {} bytes of raw data to Celestia.", blob.len());
// Take ownership of the blob so that the future is 'static.
let fee: u64 = 2000;
let namespace = self.rollup_namespace.0.to_vec();
let blob = blob.to_vec();
// We factor extra share to be occupied for namespace, which is pessimistic
let gas_limit = get_gas_limit_for_bytes(blob.len());

Box::pin(async move {
let _response = client
.request::<serde_json::Value, _>("state.SubmitTx", vec![blob])
let mut params = ArrayParams::new();
params.insert(namespace)?;
params.insert(blob)?;
params.insert(fee.to_string())?;
params.insert(gas_limit)?;
// Note, we only deserialize what we can use, other fields might be left over
let response = client
.request::<CelestiaBasicResponse, _>("state.SubmitPayForBlob", params)
.await?;
if !response.is_success() {
anyhow::bail!("Error returned from Celestia node: {:?}", response);
}
debug!("Response after submitting blob: {:?}", response);
info!(
"Blob has been submitted to Celestia. tx-hash={}",
response.tx_hash,
);
Ok::<(), BoxError>(())
})
}
}

fn get_gas_limit_for_bytes(n: usize) -> usize {
(n + 512) * GAS_PER_BYTE + 1060
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct CelestiaBasicResponse {
raw_log: String,
#[serde(rename = "code")]
error_code: Option<u64>,
#[serde(rename = "txhash")]
tx_hash: String,
gas_wanted: u64,
gas_used: u64,
}

impl CelestiaBasicResponse {
/// We assume that absence of `code` indicates that request was successfull
pub fn is_success(&self) -> bool {
self.error_code.is_none()
}
}

async fn get_rows_containing_namespace(
nid: NamespaceId,
dah: &DataAvailabilityHeader,
Expand All @@ -285,8 +333,18 @@ async fn get_rows_containing_namespace(

#[cfg(test)]
mod tests {
use std::time::Duration;

use nmt_rs::NamespaceId;
use serde_json::json;
use sov_rollup_interface::services::da::DaService;
use wiremock::matchers::{bearer_token, body_json, method, path};
use wiremock::{Mock, MockServer, Request, ResponseTemplate};

use crate::da_service::{default_request_timeout_ms, CelestiaService, DaServiceConfig};
use crate::parse_pfb_namespace;
use crate::shares::{NamespaceGroup, Share};
use crate::verifier::RollupParams;

const SERIALIZED_PFB_SHARES: &str = r#"["AAAAAAAAAAQBAAABRQAAABHDAgq3AgqKAQqHAQogL2NlbGVzdGlhLmJsb2IudjEuTXNnUGF5Rm9yQmxvYnMSYwovY2VsZXN0aWExemZ2cnJmYXE5dWQ2Zzl0NGt6bXNscGYyNHlzYXhxZm56ZWU1dzkSCHNvdi10ZXN0GgEoIiCB8FoaUuOPrX2wFBbl4MnWY3qE72tns7sSY8xyHnQtr0IBABJmClAKRgofL2Nvc21vcy5jcnlwdG8uc2VjcDI1NmsxLlB1YktleRIjCiEDmXaTf6RVIgUVdG0XZ6bqecEn8jWeAi+LjzTis5QZdd4SBAoCCAEYARISCgwKBHV0aWESBDIwMDAQgPEEGkAhq2CzD1DqxsVXIriANXYyLAmJlnnt8YTNXiwHgMQQGUbl65QUe37UhnbNVrOzDVYK/nQV9TgI+5NetB2JbIz6EgEBGgRJTkRYAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA="]"#;
const SERIALIZED_ROLLUP_DATA_SHARES: &str = r#"["c292LXRlc3QBAAAAKHsia2V5IjogInRlc3RrZXkiLCAidmFsdWUiOiAidGVzdHZhbHVlIn0AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA="]"#;
Expand Down Expand Up @@ -323,4 +381,207 @@ mod tests {

assert!(blobs.next().is_none());
}

// Last return value is namespace
async fn setup_service() -> (MockServer, DaServiceConfig, CelestiaService, [u8; 8]) {
// Start a background HTTP server on a random local port
let mock_server = MockServer::start().await;

let config = DaServiceConfig {
celestia_rpc_auth_token: "RPC_TOKEN".to_string(),
celestia_rpc_address: mock_server.uri(),
max_celestia_response_body_size: 120_000,
};
let namespace = [9u8; 8];
let da_service = CelestiaService::new(
config.clone(),
RollupParams {
namespace: NamespaceId(namespace),
},
);

(mock_server, config, da_service, namespace)
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct BasicJsonRpcResponse {
jsonrpc: String,
id: u64,
method: String,
params: serde_json::Value,
}

#[tokio::test]
async fn test_submit_blob_correct() -> anyhow::Result<()> {
let (mock_server, config, da_service, namespace) = setup_service().await;

let blob: Vec<u8> = vec![1, 2, 3, 4, 5, 11, 12, 13, 14, 15];

// TODO: Fee is hardcoded for now
let expected_body = json!({
"id": 0,
"jsonrpc": "2.0",
"method": "state.SubmitPayForBlob",
"params": [
namespace,
blob,
"2000",
63700
]
});

Mock::given(method("POST"))
.and(path("/"))
.and(bearer_token(config.celestia_rpc_auth_token))
.and(body_json(&expected_body))
.respond_with(|req: &Request| {
let request: BasicJsonRpcResponse = serde_json::from_slice(&req.body).unwrap();
let response_json = json!({
"jsonrpc": "2.0",
"id": request.id,
"result": {
"data": "122A0A282F365",
"events": ["some event"],
"gas_used": 70522,
"gas_wanted": 133540,
"height": 26,
"logs": [
"some log"
],
"raw_log": "some raw logs",
"txhash": "C9FEFD6D35FCC73F9E7D5C74E1D33F0B7666936876F2AD75E5D0FB2944BFADF2"
}
});

ResponseTemplate::new(200)
.append_header("Content-Type", "application/json")
.set_body_json(response_json)
})
.up_to_n_times(1)
.mount(&mock_server)
.await;

da_service.send_transaction(&blob).await?;

Ok(())
}

#[tokio::test]
async fn test_submit_blob_application_level_error() -> anyhow::Result<()> {
// Our calculation of gas is off and gas limit exceeded, for example
let (mock_server, _config, da_service, _namespace) = setup_service().await;

let blob: Vec<u8> = vec![1, 2, 3, 4, 5, 11, 12, 13, 14, 15];

// Do not check API token or expected body here.
// Only interested in behaviour on response
Mock::given(method("POST"))
.and(path("/"))
.respond_with(|req: &Request| {
let request: BasicJsonRpcResponse = serde_json::from_slice(&req.body).unwrap();
let response_json = json!({
"jsonrpc": "2.0",
"id": request.id,
"result": {
"code": 11,
"codespace": "sdk",
"gas_used": 10_000,
"gas_wanted": 12_000,
"raw_log": "out of gas in location: ReadFlat; gasWanted: 10, gasUsed: 1000: out of gas",
"txhash": "C9FEFD6D35FCC73F9E7D5C74E1D33F0B7666936876F2AD75E5D0FB2944BFADF2"
}
});
ResponseTemplate::new(200)
.append_header("Content-Type", "application/json")
.set_body_json(response_json)
})
.up_to_n_times(1)
.mount(&mock_server)
.await;

let result = da_service.send_transaction(&blob).await;

assert!(result.is_err());
let error_string = result.err().unwrap().to_string();
assert!(error_string.contains("Error returned from Celestia node:"));
assert!(error_string.contains(
"out of gas in location: ReadFlat; gasWanted: 10, gasUsed: 1000: out of gas"
));
Ok(())
}

#[tokio::test]
async fn test_submit_blob_internal_server_error() -> anyhow::Result<()> {
let (mock_server, _config, da_service, _namespace) = setup_service().await;

let error_response = ResponseTemplate::new(500).set_body_bytes("Internal Error".as_bytes());

let blob: Vec<u8> = vec![1, 2, 3, 4, 5, 11, 12, 13, 14, 15];

// Do not check API token or expected body here.
// Only interested in behaviour on response
Mock::given(method("POST"))
.and(path("/"))
.respond_with(error_response)
.up_to_n_times(1)
.mount(&mock_server)
.await;

let result = da_service.send_transaction(&blob).await;

assert!(result.is_err());
let error_string = result.err().unwrap().to_string();
assert!(error_string.contains(
"Networking or low-level protocol error: Server returned an error status code: 500"
));
Ok(())
}

#[tokio::test]
// This test is slow now, but it can be fixed when
// https://github.com/Sovereign-Labs/sovereign-sdk/issues/478 is implemented
// Slower request timeout can be set
async fn test_submit_blob_response_timeout() -> anyhow::Result<()> {
let (mock_server, _config, da_service, _namespace) = setup_service().await;

let response_json = json!({
"jsonrpc": "2.0",
"id": 0,
"result": {
"data": "122A0A282F365",
"events": ["some event"],
"gas_used": 70522,
"gas_wanted": 133540,
"height": 26,
"logs": [
"some log"
],
"raw_log": "some raw logs",
"txhash": "C9FEFD6D35FCC73F9E7D5C74E1D33F0B7666936876F2AD75E5D0FB2944BFADF2"
}
});

let error_response = ResponseTemplate::new(200)
.append_header("Content-Type", "application/json")
.set_delay(Duration::from_millis(default_request_timeout_ms() + 100))
.set_body_json(response_json);

let blob: Vec<u8> = vec![1, 2, 3, 4, 5, 11, 12, 13, 14, 15];

// Do not check API token or expected body here.
// Only interested in behaviour on response
Mock::given(method("POST"))
.and(path("/"))
.respond_with(error_response)
.up_to_n_times(1)
.mount(&mock_server)
.await;

let result = da_service.send_transaction(&blob).await;

assert!(result.is_err());
let error_string = result.err().unwrap().to_string();
assert!(error_string.contains("Request timeout"));
Ok(())
}
}
Loading

0 comments on commit e542d7e

Please sign in to comment.