Skip to content

Commit

Permalink
Merge pull request #10 from obsidiansystems/ipfs-binary-cache-with-up…
Browse files Browse the repository at this point in the history
…load

Update ipfs binary cache with upload support
  • Loading branch information
Ericson2314 authored Jun 11, 2020
2 parents 4b24d04 + e899575 commit efd8790
Show file tree
Hide file tree
Showing 9 changed files with 151 additions and 111 deletions.
18 changes: 14 additions & 4 deletions src/libstore/filetransfer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -282,12 +282,22 @@ struct curlFileTransfer : public FileTransfer

if (request.head)
curl_easy_setopt(req, CURLOPT_NOBODY, 1);
else if (request.post)
curl_easy_setopt(req, CURLOPT_POST, 1);

if (request.data) {
curl_easy_setopt(req, CURLOPT_UPLOAD, 1L);
curl_easy_setopt(req, CURLOPT_READFUNCTION, readCallbackWrapper);
curl_easy_setopt(req, CURLOPT_READDATA, this);
curl_easy_setopt(req, CURLOPT_INFILESIZE_LARGE, (curl_off_t) request.data->length());
if (request.post) {
// based off of https://curl.haxx.se/libcurl/c/postit2.html
curl_mime *form = curl_mime_init(req);
curl_mimepart *field = curl_mime_addpart(form);
curl_mime_data(field, request.data->data(), request.data->length());
curl_easy_setopt(req, CURLOPT_MIMEPOST, form);
} else {
curl_easy_setopt(req, CURLOPT_UPLOAD, 1L);
curl_easy_setopt(req, CURLOPT_READFUNCTION, readCallbackWrapper);
curl_easy_setopt(req, CURLOPT_READDATA, this);
curl_easy_setopt(req, CURLOPT_INFILESIZE_LARGE, (curl_off_t) request.data->length());
}
}

if (request.verifyTLS) {
Expand Down
1 change: 1 addition & 0 deletions src/libstore/filetransfer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ struct FileTransferRequest
std::string expectedETag;
bool verifyTLS = true;
bool head = false;
bool post = false;
size_t tries = fileTransferSettings.tries;
unsigned int baseRetryTimeMs = 250;
ActivityId parentAct;
Expand Down
197 changes: 122 additions & 75 deletions src/libstore/ipfs-binary-cache-store.cc
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#include <cstring>
#include <nlohmann/json.hpp>

#include "binary-cache-store.hh"
#include "filetransfer.hh"
#include "nar-info-disk-cache.hh"
#include "ipfs.hh"

namespace nix {

Expand All @@ -15,48 +15,66 @@ class IPFSBinaryCacheStore : public BinaryCacheStore
private:

std::string cacheUri;
std::string daemonUri;

/* Host where a IPFS API can be reached (usually localhost) */
std::string ipfsAPIHost;
/* Port where a IPFS API can be reached (usually 5001) */
uint16_t ipfsAPIPort;
/* Whether to use a IPFS Gateway instead of the API */
bool useIpfsGateway;
/* Where to find a IPFS Gateway */
std::string ipfsGatewayURL;

std::string constructIPFSRequest(const std::string & path) {
std::string uri;
std::string ipfsPath = cacheUri + "/" + path;
if (useIpfsGateway == false) {
uri = ipfs::buildAPIURL(ipfsAPIHost, ipfsAPIPort) +
"/cat" +
ipfs::buildQuery({{"arg", ipfsPath}});
} else {
uri = ipfsGatewayURL + ipfsPath;
}
return uri;
std::string getIpfsPath() {
auto state(_state.lock());
return state->ipfsPath;
}
std::optional<string> optIpnsPath;

struct State
{
std::string ipfsPath;
};
Sync<State> _state;

public:

IPFSBinaryCacheStore(
const Params & params, const Path & _cacheUri)
: BinaryCacheStore(params)
, cacheUri(_cacheUri)
, ipfsAPIHost(get(params, "host").value_or("127.0.0.1"))
, ipfsAPIPort(std::stoi(get(params, "port").value_or("5001")))
, useIpfsGateway(get(params, "use_gateway").value_or("0") == "1")
, ipfsGatewayURL(get(params, "gateway").value_or("https://ipfs.io"))
{
auto state(_state.lock());

if (cacheUri.back() == '/')
cacheUri.pop_back();
/*
* A cache is still useful since the IPFS API or
* gateway may have a higher latency when not running on
* localhost
*/
diskCache = getNarInfoDiskCache();

if (hasPrefix(cacheUri, "ipfs://"))
state->ipfsPath = "/ipfs/" + std::string(cacheUri, 7);
else if (hasPrefix(cacheUri, "ipns://"))
optIpnsPath = "/ipns/" + std::string(cacheUri, 7);
else
throw Error("unknown IPNS URI '%s'", cacheUri);

std::string ipfsAPIHost(get(params, "host").value_or("127.0.0.1"));
std::string ipfsAPIPort(get(params, "port").value_or("5001"));
daemonUri = "http://" + ipfsAPIHost + ":" + ipfsAPIPort;

// Check the IPFS daemon is running
FileTransferRequest request(daemonUri + "/api/v0/version");
request.post = true;
request.tries = 1;
auto res = getFileTransfer()->download(request);
auto versionInfo = nlohmann::json::parse(*res.data);
if (versionInfo.find("Version") == versionInfo.end())
throw Error("daemon for IPFS is not running properly");

// Resolve the IPNS name to an IPFS object
if (optIpnsPath) {
auto ipnsPath = *optIpnsPath;
debug("Resolving IPFS object of '%s', this could take a while.", ipnsPath);
auto uri = daemonUri + "/api/v0/name/resolve?offline=true&arg=" + getFileTransfer()->urlEncode(ipnsPath);
FileTransferRequest request(uri);
request.post = true;
request.tries = 1;
auto res = getFileTransfer()->download(request);
auto json = nlohmann::json::parse(*res.data);
if (json.find("Path") == json.end())
throw Error("daemon for IPFS is not running properly");
state->ipfsPath = json["Path"];
}
}

std::string getUri() override
Expand All @@ -66,61 +84,96 @@ class IPFSBinaryCacheStore : public BinaryCacheStore

void init() override
{
if (auto cacheInfo = diskCache->cacheExists(getUri())) {
wantMassQuery.setDefault(cacheInfo->wantMassQuery ? "true" : "false");
priority.setDefault(fmt("%d", cacheInfo->priority));
} else {
try {
BinaryCacheStore::init();
} catch (UploadToIPFS &) {
throw Error(format("‘%s’ does not appear to be a binary cache") % cacheUri);
}
diskCache->createCache(cacheUri, storeDir, wantMassQuery, priority);
}
std::string cacheInfoFile = "nix-cache-info";
if (!fileExists(cacheInfoFile))
upsertFile(cacheInfoFile, "StoreDir: " + storeDir + "\n", "text/x-nix-cache-info");
BinaryCacheStore::init();
}

protected:

bool fileExists(const std::string & path) override
{
/*
* TODO: Try a local mount first, best to share code with
* LocalBinaryCacheStore
*/
auto uri = daemonUri + "/api/v0/object/stat?arg=" + getFileTransfer()->urlEncode(getIpfsPath() + "/" + path);

/* TODO: perform ipfs ls instead instead of trying to fetch it */
auto uri = constructIPFSRequest(path);
FileTransferRequest request(uri);
request.post = true;
request.tries = 1;
try {
FileTransferRequest request(uri);
//request.showProgress = FileTransferRequest::no;
request.tries = 5;
if (useIpfsGateway)
request.head = true;
getFileTransfer()->download(request);
return true;
auto res = getFileTransfer()->download(request);
auto json = nlohmann::json::parse(*res.data);

return json.find("Hash") != json.end();
} catch (FileTransferError & e) {
if (e.error == FileTransfer::NotFound)
return false;
throw;
// probably should verify this is a not found error but
// ipfs gives us a 500
return false;
}
}

// IPNS publish can be slow, we try to do it rarely.
void sync() override
{
if (!optIpnsPath)
return;
auto ipnsPath = *optIpnsPath;

auto state(_state.lock());

debug("Publishing '%s' to '%s', this could take a while.", state->ipfsPath, ipnsPath);

auto uri = daemonUri + "/api/v0/name/publish?offline=true&arg=" + getFileTransfer()->urlEncode(state->ipfsPath);
uri += "&key=" + std::string(ipnsPath, 6);

auto req = FileTransferRequest(uri);
req.post = true;
req.tries = 1;
getFileTransfer()->download(req);
}

void addLink(std::string name, std::string ipfsObject)
{
auto state(_state.lock());

auto uri = daemonUri + "/api/v0/object/patch/add-link?create=true";
uri += "&arg=" + getFileTransfer()->urlEncode(state->ipfsPath);
uri += "&arg=" + getFileTransfer()->urlEncode(name);
uri += "&arg=" + getFileTransfer()->urlEncode(ipfsObject);

auto req = FileTransferRequest(uri);
req.post = true;
req.tries = 1;
auto res = getFileTransfer()->download(req);
auto json = nlohmann::json::parse(*res.data);

state->ipfsPath = "/ipfs/" + (std::string) json["Hash"];
}

void upsertFile(const std::string & path, const std::string & data, const std::string & mimeType) override
{
throw UploadToIPFS("uploading to an IPFS binary cache is not supported");
// TODO: use callbacks

auto req = FileTransferRequest(daemonUri + "/api/v0/add");
req.data = std::make_shared<string>(data);
req.post = true;
req.tries = 1;
try {
auto res = getFileTransfer()->upload(req);
auto json = nlohmann::json::parse(*res.data);
addLink(path, "/ipfs/" + (std::string) json["Hash"]);
} catch (FileTransferError & e) {
throw UploadToIPFS("while uploading to IPFS binary cache at '%s': %s", cacheUri, e.msg());
}
}

void getFile(const std::string & path,
Callback<std::shared_ptr<std::string>> callback) noexcept override
{
/*
* TODO: Try local mount first, best to share code with
* LocalBinaryCacheStore
*/
auto uri = constructIPFSRequest(path);
auto uri = daemonUri + "/api/v0/cat?arg=" + getFileTransfer()->urlEncode(getIpfsPath() + "/" + path);

FileTransferRequest request(uri);
//request.showProgress = FileTransferRequest::no;
request.tries = 8;
request.post = true;
request.tries = 1;

auto callbackPtr = std::make_shared<decltype(callback)>(std::move(callback));

Expand All @@ -129,9 +182,7 @@ class IPFSBinaryCacheStore : public BinaryCacheStore
try {
(*callbackPtr)(result.get().data);
} catch (FileTransferError & e) {
if (e.error == FileTransfer::NotFound || e.error == FileTransfer::Forbidden)
return (*callbackPtr)(std::shared_ptr<std::string>());
callbackPtr->rethrow();
return (*callbackPtr)(std::shared_ptr<std::string>());
} catch (...) {
callbackPtr->rethrow();
}
Expand All @@ -145,12 +196,8 @@ static RegisterStoreImplementation regStore([](
const std::string & uri, const Store::Params & params)
-> std::shared_ptr<Store>
{
/*
* TODO: maybe use ipfs:/ fs:/ipfs/
* https://github.com/ipfs/go-ipfs/issues/1678#issuecomment-157478515
*/
if (uri.substr(0, strlen("/ipfs/")) != "/ipfs/" &&
uri.substr(0, strlen("/ipns/")) != "/ipns/")
if (uri.substr(0, strlen("ipfs://")) != "ipfs://" &&
uri.substr(0, strlen("ipns://")) != "ipns://")
return 0;
auto store = std::make_shared<IPFSBinaryCacheStore>(params, uri);
store->init();
Expand Down
32 changes: 0 additions & 32 deletions src/libstore/ipfs.hh

This file was deleted.

2 changes: 2 additions & 0 deletions src/libstore/store-api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,8 @@ void copyPaths(ref<Store> srcStore, ref<Store> dstStore, const StorePathSet & st
nrDone++;
showProgress();
});

dstStore->sync();
}


Expand Down
4 changes: 4 additions & 0 deletions src/libstore/store-api.hh
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,10 @@ public:
virtual void createUser(const std::string & userName, uid_t userId)
{ }

/* Sync writes to commits written data, usually a no-op. */
virtual void sync()
{ };

protected:

Stats stats;
Expand Down
5 changes: 5 additions & 0 deletions src/nix-store/nix-store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ static void opAdd(Strings opFlags, Strings opArgs)

for (auto & i : opArgs)
cout << fmt("%s\n", store->printStorePath(store->addToStore(std::string(baseNameOf(i)), i)));

store->sync();
}


Expand All @@ -188,6 +190,8 @@ static void opAddFixed(Strings opFlags, Strings opArgs)

for (auto & i : opArgs)
cout << fmt("%s\n", store->printStorePath(store->addToStore(std::string(baseNameOf(i)), i, recursive, hashAlgo)));

store->sync();
}


Expand Down Expand Up @@ -952,6 +956,7 @@ static void opServe(Strings opFlags, Strings opArgs)
SizedSource sizedSource(in, info.narSize);

store->addToStore(info, sizedSource, NoRepair, NoCheckSigs);
store->sync();

// consume all the data that has been sent before continuing.
sizedSource.drainAll();
Expand Down
1 change: 1 addition & 0 deletions src/nix/add-to-store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ struct CmdAddToStore : MixDryRun, StoreCommand
if (!dryRun) {
auto source = StringSource { *sink.s };
store->addToStore(info, source);
store->sync();
}

logger->stdout("%s", store->printStorePath(info.path));
Expand Down
2 changes: 2 additions & 0 deletions src/nix/make-content-addressable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ struct CmdMakeContentAddressable : StorePathsCommand, MixJSON

remappings.insert_or_assign(std::move(path), std::move(info.path));
}

store->sync();
}
};

Expand Down

0 comments on commit efd8790

Please sign in to comment.