Skip to content

Commit

Permalink
refactor: move fetchs under CID
Browse files Browse the repository at this point in the history
  • Loading branch information
kayagokalp committed May 5, 2023
1 parent cc12abc commit fd35715
Showing 1 changed file with 50 additions and 32 deletions.
82 changes: 50 additions & 32 deletions forc-pkg/src/source/ipfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
};
use anyhow::Result;
use futures::TryStreamExt;
use ipfs_api::{IpfsApi, TryFromUri};
use ipfs_api::IpfsApi;
use ipfs_api_backend_hyper as ipfs_api;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::{
Expand All @@ -13,6 +13,7 @@ use std::{
str::FromStr,
};
use tar::Archive;
use tracing::{info, warn};

#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub struct Cid(cid::Cid);
Expand All @@ -32,6 +33,8 @@ impl Pinned {
pub const PREFIX: &'static str = "ipfs";
}

const PUBLIC_GATEWAY: &str = "https://ipfs.io";

impl FromStr for Cid {
type Err = <cid::Cid as FromStr>::Err;

Expand Down Expand Up @@ -62,14 +65,28 @@ impl source::Fetch for Pinned {
{
let _guard = lock.write()?;
if !repo_path.exists() {
info!(
" {} {} {}",
ansi_term::Color::Green.bold().paint("Fetching"),
ansi_term::Style::new().bold().paint(ctx.name),
self
);
let cid = &self.0;
let ipfs_client = ipfs_client();
let dest = cache_dir();
let handle = tokio::runtime::Handle::current();
let _ = handle.enter();
futures::executor::block_on(async {
//fetch_package_with_cid(&ipfs_client, cid, &dest).await
fetch_package_with_cid_from_public_gateway(cid, &dest).await
if let Err(e) = cid.fetch_with_client(&ipfs_client, &dest).await {
warn!(
" {}",
ansi_term::Color::Yellow.bold().paint(format!("Couldn't fetch from local ipfs node, reason:\n{e:?}.\n Falling back to {PUBLIC_GATEWAY}")),
);

cid.fetch_with_public_gateway(&dest).await
} else {
Ok(())
}
})?;
}
}
Expand Down Expand Up @@ -107,6 +124,36 @@ impl fmt::Display for Pinned {
}
}

impl Cid {
/// Using local node, fetches a package with CID.
async fn fetch_with_client(&self, ipfs_client: &IpfsClient, dst: &Path) -> Result<()> {
let cid_path = format!("/ipfs/{}", self.0);
let bytes = ipfs_client
.get(&cid_path)
.map_ok(|chunk| chunk.to_vec())
.try_concat()
.await?;
let mut archive = Archive::new(bytes.as_slice());
archive.unpack(dst)?;
Ok(())
}

async fn fetch_with_public_gateway(&self, dst: &Path) -> Result<()> {
let client = reqwest::Client::new();
let fetch_url = format!(
"{}/ipfs/{}?download=true&format=tar&filename={}.tar",
PUBLIC_GATEWAY, self.0, self.0
);
let req = client.get(fetch_url);
let res = req.send().await?;
let bytes: Vec<_> = res.text().await?.bytes().collect();

let mut archive = Archive::new(bytes.as_slice());
archive.unpack(dst)?;
Ok(())
}
}

#[derive(Debug)]
pub enum PinnedParseError {
Prefix,
Expand Down Expand Up @@ -164,32 +211,3 @@ fn pkg_cache_dir(cid: &Cid) -> PathBuf {
fn ipfs_client() -> IpfsClient {
IpfsClient::default()
}

async fn fetch_package_with_cid(ipfs_client: &IpfsClient, cid: &Cid, dst: &Path) -> Result<()> {
let cid_path = format!("/ipfs/{}", cid.0);
let bytes = ipfs_client
.get(&cid_path)
.map_ok(|chunk| chunk.to_vec())
.try_concat()
.await?;
let mut archive = Archive::new(bytes.as_slice());
archive.unpack(dst)?;
Ok(())
}

async fn fetch_package_with_cid_from_public_gateway(cid: &Cid, dst: &Path) -> Result<()> {
const PUBLIC_GATEWAY: &str = "https://ipfs.io";

let client = reqwest::Client::new();
let fetch_url = format!(
"{}/ipfs/{}?download=true&format=tar&filename={}.tar",
PUBLIC_GATEWAY, cid.0, cid.0
);
let req = client.get(fetch_url);
let res = req.send().await?;
let bytes: Vec<_> = res.text().await?.bytes().collect();

let mut archive = Archive::new(bytes.as_slice());
archive.unpack(dst)?;
Ok(())
}

0 comments on commit fd35715

Please sign in to comment.