Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow upload/retrieve of assets of arbitrary size from asset canister #1482

Merged
merged 52 commits into from
Mar 12, 2021
Merged
Show file tree
Hide file tree
Changes from 42 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
e0150aa
wip
ericswanson-dfinity Feb 16, 2021
42abc48
checkpoint stable memory ffs
ericswanson-dfinity Feb 17, 2021
38fedba
batch expiry map
ericswanson-dfinity Feb 17, 2021
0362051
create blobs compiles
ericswanson-dfinity Feb 17, 2021
4d270f0
create blobs
ericswanson-dfinity Feb 18, 2021
2d669d0
first pass writeBlob
ericswanson-dfinity Feb 18, 2021
8e6f956
motoko name conventions
ericswanson-dfinity Feb 18, 2021
c30fe2d
ugh
ericswanson-dfinity Feb 18, 2021
77de230
Word8 -> Nat8
ericswanson-dfinity Feb 18, 2021
5c6f39e
stable hash map stuff; start on first test
ericswanson-dfinity Feb 19, 2021
c4a7d87
SHM.get
ericswanson-dfinity Feb 19, 2021
8817568
wip
ericswanson-dfinity Feb 19, 2021
75d7561
wip
ericswanson-dfinity Feb 19, 2021
cf20411
first pass chunked retrieval
ericswanson-dfinity Feb 20, 2021
007260f
check write access; move set_asset_contents to non-async function
ericswanson-dfinity Feb 22, 2021
da9bc17
StableHashMap
ericswanson-dfinity Feb 23, 2021
0a1be8f
split up for commit_batch;
ericswanson-dfinity Feb 23, 2021
3efc721
adjust spacing for Motoko style guide, delete dead code, implement de…
ericswanson-dfinity Feb 23, 2021
5d4d117
wip
ericswanson-dfinity Feb 24, 2021
f8cd7fc
StableHashMap: fix divide-by-zero in remove
ericswanson-dfinity Feb 25, 2021
a32e0e5
CANISTER: fix variant names, commit_batch parameter
ericswanson-dfinity Feb 25, 2021
a964256
CANISTER: debug prints
ericswanson-dfinity Feb 25, 2021
c509998
DFX work: create batch and chunks
ericswanson-dfinity Feb 24, 2021
1fea9bf
DFX: commit batch
ericswanson-dfinity Feb 25, 2021
c02b697
DFX
ericswanson-dfinity Feb 25, 2021
c7c763e
DFX smaller async fns
ericswanson-dfinity Feb 25, 2021
2adca2b
DFX clippy
ericswanson-dfinity Feb 25, 2021
ec8c50a
DFX
ericswanson-dfinity Feb 27, 2021
e802ca0
DFX: slow, imperative upload that works
ericswanson-dfinity Mar 1, 2021
8291900
CANISTER: seperate Types.mo
ericswanson-dfinity Mar 2, 2021
43d3e5b
Remove StableHashMap:
ericswanson-dfinity Mar 2, 2021
7de6afa
expire batches
ericswanson-dfinity Mar 3, 2021
28c3948
e2e test - arbitrarily large files
ericswanson-dfinity Mar 4, 2021
2f6ea3f
dfx: comment out imports
ericswanson-dfinity Mar 4, 2021
ecba19f
canister: Batch class
ericswanson-dfinity Mar 4, 2021
4b04f3f
Move assetstorage.mo to assetstore/Main.mo
ericswanson-dfinity Mar 5, 2021
408330c
Asset.mo
ericswanson-dfinity Mar 5, 2021
c24f2a1
wip
ericswanson-dfinity Mar 8, 2021
83630e6
cleanup
ericswanson-dfinity Mar 8, 2021
3eaaf12
Update src/dfx/src/lib/installers/assets.rs
ericswanson-dfinity Mar 9, 2021
fb14c09
Use candid::Nat rather than u128
ericswanson-dfinity Mar 9, 2021
a432d2e
cargo fmt
ericswanson-dfinity Mar 9, 2021
377abbe
Add http_request method
ericswanson-dfinity Mar 9, 2021
592abae
Merge remote-tracking branch 'origin/master' into ericswanson/112-ass…
ericswanson-dfinity Mar 10, 2021
ea26fc0
Add keys() method for upgrade path; add to changelog
ericswanson-dfinity Mar 11, 2021
ab502fb
removed an extra word
ericswanson-dfinity Mar 11, 2021
6bbcb8d
Merge remote-tracking branch 'origin/master' into ericswanson/112-ass…
ericswanson-dfinity Mar 11, 2021
486eb3f
Merge remote-tracking branch 'origin/master' into ericswanson/112-ass…
ericswanson-dfinity Mar 11, 2021
d895d0b
ok maybe 25 MB was always going to take too long on CI
ericswanson-dfinity Mar 11, 2021
bd5a75b
disable large asset test on ic-ref due to wasm interpreter efficiency
ericswanson-dfinity Mar 11, 2021
d4f4ed1
serde_bytes for GetResponse
ericswanson-dfinity Mar 11, 2021
52c7d8e
Merge remote-tracking branch 'origin/master' into ericswanson/112-ass…
ericswanson-dfinity Mar 11, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions distributed-canisters.nix
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,19 @@ pkgs.runCommandNoCCLocal "distributed-canisters" {
} ''
mkdir -p $out

for canister_mo in ${distributed}/*.mo; do
canister_name=$(basename -s .mo $canister_mo)
for canister_dir in $(find ${distributed} -mindepth 1 -maxdepth 1 -type d); do
canister_name=$(basename $canister_dir)

build_dir=$out/$canister_name
mkdir -p $build_dir

$moc/bin/moc \
$canister_mo \
$canister_dir/Main.mo \
-o $build_dir/$canister_name.did \
--idl \
--package base $base
$moc/bin/moc \
$canister_mo \
$canister_dir/Main.mo \
-o $build_dir/$canister_name.wasm \
-c --release \
--package base $base
Expand Down
23 changes: 23 additions & 0 deletions e2e/tests-dfx/assetscanister.bash
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,26 @@ teardown() {

HOME=. assert_command_fail dfx canister call --update e2e_project_assets store '("index.js", vec { 1; 2; 3; })'
}

@test 'can store arbitrarily large files' {
install_asset assetscanister

dfx_start
dfx canister create --all
dfx build
dfx canister install e2e_project_assets

dd if=/dev/urandom of=src/e2e_project_assets/assets/large-asset.bin bs=1000000 count=25

dfx deploy

assert_command dfx canister call --query e2e_project_assets get '(record{key="large-asset.bin";accept_encodings=vec{"identity"}})'
assert_match 'total_length = 25_000_000'
assert_match 'content_type = "application/octet-stream"'
assert_match 'content_encoding = "identity"'

assert_command dfx canister call --query e2e_project_assets get_chunk '(record{key="large-asset.bin";content_encoding="identity";index=4})'

assert_command dfx canister call --query e2e_project_assets get_chunk '(record{key="large-asset.bin";content_encoding="identity";index=13})'
assert_command_fail dfx canister call --query e2e_project_assets get_chunk '(record{key="large-asset.bin";content_encoding="identity";index=14})'
}
326 changes: 300 additions & 26 deletions src/dfx/src/lib/installers/assets.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,276 @@
use crate::lib::canister_info::assets::AssetsCanisterInfo;
use crate::lib::canister_info::CanisterInfo;
use crate::lib::error::DfxResult;
use crate::lib::error::{DfxError, DfxResult};
use crate::lib::waiter::waiter_with_timeout;
use candid::Encode;
use candid::{CandidType, Decode, Encode, Nat};

use delay::{Delay, Waiter};
use ic_agent::Agent;
use std::path::Path;
use ic_types::Principal;
use serde::Deserialize;
use std::path::PathBuf;
use std::time::Duration;
use walkdir::WalkDir;

const CREATE_BATCH: &str = "create_batch";
const CREATE_CHUNK: &str = "create_chunk";
const COMMIT_BATCH: &str = "commit_batch";
const MAX_CHUNK_SIZE: usize = 1_900_000;

#[derive(CandidType, Debug)]
struct CreateBatchRequest {}

#[derive(CandidType, Debug, Deserialize)]
struct CreateBatchResponse {
batch_id: Nat,
}

#[derive(CandidType, Debug, Deserialize)]
struct CreateChunkRequest<'a> {
batch_id: Nat,
#[serde(with = "serde_bytes")]
content: &'a [u8],
ericswanson-dfinity marked this conversation as resolved.
Show resolved Hide resolved
}

#[derive(CandidType, Debug, Deserialize)]
struct CreateChunkResponse {
chunk_id: Nat,
}

#[derive(CandidType, Debug)]
struct GetRequest {
key: String,
accept_encodings: Vec<String>,
}

#[derive(CandidType, Debug)]
struct GetResponse {
contents: Vec<u8>,
ericswanson-dfinity marked this conversation as resolved.
Show resolved Hide resolved
content_type: String,
content_encoding: String,
}

#[derive(CandidType, Debug)]
struct CreateAssetArguments {
key: String,
content_type: String,
}
#[derive(CandidType, Debug)]
struct SetAssetContentArguments {
key: String,
content_encoding: String,
chunk_ids: Vec<Nat>,
}
#[derive(CandidType, Debug)]
struct UnsetAssetContentArguments {
key: String,
content_encoding: String,
}
#[derive(CandidType, Debug)]
struct DeleteAssetArguments {
key: String,
}
#[derive(CandidType, Debug)]
struct ClearArguments {}

#[derive(CandidType, Debug)]
enum BatchOperationKind {
CreateAsset(CreateAssetArguments),

SetAssetContent(SetAssetContentArguments),

_UnsetAssetContent(UnsetAssetContentArguments),

DeleteAsset(DeleteAssetArguments),

_Clear(ClearArguments),
}

#[derive(CandidType, Debug)]
struct CommitBatchArguments<'a> {
batch_id: &'a Nat,
operations: Vec<BatchOperationKind>,
}

#[derive(Clone, Debug)]
struct AssetLocation {
source: PathBuf,
relative: PathBuf,
}

struct ChunkedAsset {
asset_location: AssetLocation,
chunk_ids: Vec<Nat>,
}

async fn create_chunk(
agent: &Agent,
canister_id: &Principal,
timeout: Duration,
batch_id: &Nat,
content: &[u8],
) -> DfxResult<Nat> {
let batch_id = batch_id.clone();
let args = CreateChunkRequest { batch_id, content };
let args = candid::Encode!(&args)?;

let mut waiter = Delay::builder()
.timeout(std::time::Duration::from_secs(30))
.throttle(std::time::Duration::from_secs(1))
.build();
waiter.start();

loop {
match agent
.update(&canister_id, CREATE_CHUNK)
.with_arg(&args)
.expire_after(timeout)
.call_and_wait(waiter_with_timeout(timeout))
.await
.map_err(DfxError::from)
.and_then(|response| {
candid::Decode!(&response, CreateChunkResponse)
.map_err(DfxError::from)
.map(|x| x.chunk_id)
}) {
Ok(chunk_id) => {
break Ok(chunk_id);
}
Err(agent_err) => match waiter.wait() {
Ok(()) => {}
Err(_) => break Err(agent_err),
},
}
}
}
ericswanson-dfinity marked this conversation as resolved.
Show resolved Hide resolved

async fn make_chunked_asset(
agent: &Agent,
canister_id: &Principal,
timeout: Duration,
batch_id: &Nat,
asset_location: AssetLocation,
) -> DfxResult<ChunkedAsset> {
let content = &std::fs::read(&asset_location.source)?;

// ?? doesn't work: rust lifetimes + task::spawn = tears
// how to deal with lifetimes for agent and canister_id here
// this function won't exit until after the task is joined...
// let chunks_future_tasks: Vec<_> = content
// .chunks(MAX_CHUNK_SIZE)
// .map(|content| task::spawn(create_chunk(agent, canister_id, timeout, batch_id, content)))
// .collect();
// println!("await chunk creation");
// let but_lifetimes = try_join_all(chunks_future_tasks)
// .await?
// .into_iter()
// .collect::<DfxResult<Vec<u128>>>()
// .map(|chunk_ids| ChunkedAsset {
// asset_location,
// chunk_ids,
// });
// ?? doesn't work

// works (sometimes), does more work concurrently, but often doesn't work against bootstrap.
// (connection stuck in odd idle state: all agent requests return "channel closed" error.)
// let chunks_futures: Vec<_> = content
// .chunks(MAX_CHUNK_SIZE)
// .map(|content| create_chunk(agent, canister_id, timeout, batch_id, content))
// .collect();
// println!("await chunk creation");
//
// try_join_all(chunks_futures)
// .await
// .map(|chunk_ids| ChunkedAsset {
// asset_location,
// chunk_ids,
// })
// works (sometimes)

let mut chunk_ids: Vec<Nat> = vec![];
let chunks = content.chunks(MAX_CHUNK_SIZE);
let (num_chunks, _) = chunks.size_hint();
for (i, data_chunk) in chunks.enumerate() {
println!(
" {} {}/{} ({} bytes)",
&asset_location.relative.to_string_lossy(),
i + 1,
num_chunks,
data_chunk.len()
);
chunk_ids.push(create_chunk(agent, canister_id, timeout, batch_id, data_chunk).await?);
}
Ok(ChunkedAsset {
asset_location,
chunk_ids,
})
}

async fn make_chunked_assets(
agent: &Agent,
canister_id: &Principal,
timeout: Duration,
batch_id: &Nat,
locs: Vec<AssetLocation>,
) -> DfxResult<Vec<ChunkedAsset>> {
// this neat futures version works faster in parallel when it works,
// but does not work often when connecting through the bootstrap.
// let futs: Vec<_> = locs
// .into_iter()
// .map(|loc| make_chunked_asset(agent, canister_id, timeout, batch_id, loc))
// .collect();
// try_join_all(futs).await
let mut chunked_assets = vec![];
for loc in locs {
chunked_assets.push(make_chunked_asset(agent, canister_id, timeout, batch_id, loc).await?);
}
Ok(chunked_assets)
}

async fn commit_batch(
agent: &Agent,
canister_id: &Principal,
timeout: Duration,
batch_id: &Nat,
chunked_assets: Vec<ChunkedAsset>,
) -> DfxResult {
let operations: Vec<_> = chunked_assets
.into_iter()
.map(|chunked_asset| {
let key = chunked_asset
.asset_location
.relative
.to_string_lossy()
.to_string();
vec![
BatchOperationKind::DeleteAsset(DeleteAssetArguments { key: key.clone() }),
BatchOperationKind::CreateAsset(CreateAssetArguments {
key: key.clone(),
content_type: "application/octet-stream".to_string(),
}),
BatchOperationKind::SetAssetContent(SetAssetContentArguments {
key,
content_encoding: "identity".to_string(),
chunk_ids: chunked_asset.chunk_ids,
}),
]
})
.flatten()
.collect();
let arg = CommitBatchArguments {
batch_id,
operations,
};
let arg = candid::Encode!(&arg)?;
agent
.update(&canister_id, COMMIT_BATCH)
.with_arg(arg)
.expire_after(timeout)
.call_and_wait(waiter_with_timeout(timeout))
.await?;
Ok(())
}

pub async fn post_install_store_assets(
info: &CanisterInfo,
agent: &Agent,
Expand All @@ -17,28 +279,40 @@ pub async fn post_install_store_assets(
let assets_canister_info = info.as_info::<AssetsCanisterInfo>()?;
let output_assets_path = assets_canister_info.get_output_assets_path();

let walker = WalkDir::new(output_assets_path).into_iter();
for entry in walker {
let entry = entry?;
if entry.file_type().is_file() {
let source = entry.path();
let relative: &Path = source
.strip_prefix(output_assets_path)
.expect("cannot strip prefix");
let content = &std::fs::read(&source)?;
let path = relative.to_string_lossy().to_string();
let blob = candid::Encode!(&path, &content)?;

let canister_id = info.get_canister_id().expect("Could not find canister ID.");
let method_name = String::from("store");

agent
.update(&canister_id, &method_name)
.with_arg(&blob)
.expire_after(timeout)
.call_and_wait(waiter_with_timeout(timeout))
.await?;
}
}
let asset_locations: Vec<AssetLocation> = WalkDir::new(output_assets_path)
.into_iter()
.filter_map(|r| {
r.ok().filter(|entry| entry.file_type().is_file()).map(|e| {
let source = e.path().to_path_buf();
let relative = source
.strip_prefix(output_assets_path)
.expect("cannot strip prefix")
.to_path_buf();
AssetLocation { source, relative }
})
})
.collect();

let canister_id = info.get_canister_id().expect("Could not find canister ID.");

let batch_id = create_batch(agent, &canister_id, timeout).await?;

let chunked_assets =
make_chunked_assets(agent, &canister_id, timeout, &batch_id, asset_locations).await?;

commit_batch(agent, &canister_id, timeout, &batch_id, chunked_assets).await?;

Ok(())
}

async fn create_batch(agent: &Agent, canister_id: &Principal, timeout: Duration) -> DfxResult<Nat> {
let create_batch_args = CreateBatchRequest {};
let response = agent
.update(&canister_id, CREATE_BATCH)
.with_arg(candid::Encode!(&create_batch_args)?)
.expire_after(timeout)
.call_and_wait(waiter_with_timeout(timeout))
.await?;
let create_batch_response = candid::Decode!(&response, CreateBatchResponse)?;
Ok(create_batch_response.batch_id)
}
Loading