Skip to content

Commit

Permalink
feat(network): use snappy to compress data in ckb protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
jjyr authored and doitian committed Nov 19, 2018
1 parent 83824d5 commit 52441df
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 12 deletions.
15 changes: 13 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ log = "0.4.5"
bytes = "0.4.9"
tokio = "0.1.8"
futures = { version = "0.1.19", features = ["use_std"] }
snap = "0.2"
libp2p = { git = "https://github.com/libp2p/rust-libp2p", rev="cfdfca1a06fb2deb9ebcc15a63d715ebddb23bd0", default-features = false, features = ["libp2p-secio", "libp2p-secio-secp256k1"] }
50 changes: 40 additions & 10 deletions network/src/ckb_protocol.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
#![cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))]

use super::{Error, ProtocolId};
use bytes::BufMut;
use bytes::{Buf, IntoBuf};
use bytes::{Bytes, BytesMut};
use futures::sync::mpsc;
use futures::{future, stream, Future, Sink, Stream};
use libp2p::core::{ConnectionUpgrade, Endpoint, Multiaddr};
use snap;
use std::io;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::string::ToString;
use std::vec::IntoIter as VecIntoIter;
Expand Down Expand Up @@ -170,23 +174,49 @@ impl<T> CKBProtocol<T> {

Some(stream.into_future().map_err(|(err, _)| err).and_then(
move |(message, stream)| match message {
Some(Message::Recv(data)) => {
if data.is_empty() {
Some(Message::Recv(compressed_data)) => {
if compressed_data.is_empty() {
debug!("receive a empty message, ignoring");
let f = future::ok((None, (sink, stream, false)));
return future::Either::A(f);
}

let out = Some(data.freeze());
let f = future::ok((out, (sink, stream, false)));
future::Either::A(f)
// decompress data
let mut decompresser = snap::Reader::new(compressed_data.freeze().into_buf().reader());
let mut data = vec![].writer();
match io::copy(&mut decompresser, &mut data) {
Ok(_) => {
let out = Some(data.into_inner().into());
let f = future::ok((out, (sink, stream, false)));
future::Either::A(f)
},
Err(e) => {
future::Either::A(future::err(e))
}
}
}

Some(Message::SendData(data)) => {
let fut = sink
.send(data)
.map(move |sink| (None, (sink, stream, false)));
future::Either::B(fut)
let mut compressed_data = vec![].writer();
let mut compresser = snap::Writer::new(compressed_data);
let mut data_buf = data.into_buf();
match io::copy(&mut data_buf.reader(), &mut compresser) {
Ok(_) => {
match compresser.into_inner() {
Ok(compressed_data) => {
let compressed_data : Bytes = compressed_data.into_inner().into();
let fut = sink
.send(compressed_data)
.map(move |sink| (None, (sink, stream, false)));
future::Either::B(fut)
},
Err(e) => {
future::Either::A(future::err(IoError::new(IoErrorKind::Other, format!("compressed data error {}", e.to_string()))))
}
}}
Err(e) => {
future::Either::A(future::err(e))
}
}
}

Some(Message::Finished) | None => {
Expand Down
1 change: 1 addition & 0 deletions network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ extern crate bytes;
extern crate futures;
extern crate libp2p;
extern crate rand;
extern crate snap;
extern crate tokio;
extern crate unsigned_varint;
#[macro_use]
Expand Down

0 comments on commit 52441df

Please sign in to comment.