Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Use new blockstore that stores Multihashes and not CIDs. #5510

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion blocks/blockstoreutil/remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func RmBlocks(blocks bs.GCBlockstore, pins pin.Pinner, cids []cid.Cid, opts RmBl
stillOkay := FilterPinned(pins, out, cids)

for _, c := range stillOkay {
err := blocks.DeleteBlock(c)
err := blocks.Delete(c.Hash())
if err != nil && opts.Force && (err == bs.ErrNotFound || err == ds.ErrNotFound) {
// ignore non-existent blocks
} else if err != nil {
Expand Down
3 changes: 0 additions & 3 deletions core/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
filestore "github.com/ipfs/go-ipfs/filestore"
pin "github.com/ipfs/go-ipfs/pin"
repo "github.com/ipfs/go-ipfs/repo"
cidv0v1 "github.com/ipfs/go-ipfs/thirdparty/cidv0v1"
"github.com/ipfs/go-ipfs/thirdparty/verifbs"
uio "gx/ipfs/QmPL8bYtbACcSFFiSr4s2du7Na382NxRADR8hC7D9FkEA2/go-unixfs/io"
resolver "gx/ipfs/QmX7uSbkNz76yNwBhuwYwRbhihLnJqM73VTCjS3UMJud9A/go-path/resolver"
Expand Down Expand Up @@ -215,8 +214,6 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {

bs = bstore.NewIdStore(bs)

bs = cidv0v1.NewBlockstore(bs)

n.BaseBlocks = bs
n.GCLocker = bstore.NewGCLocker()
n.Blockstore = bstore.NewGCBlockstore(bs, n.GCLocker)
Expand Down
8 changes: 4 additions & 4 deletions core/commands/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ var findProvidersDhtCmd = &cmds.Command{
outChan := make(chan interface{})
res.SetOutput((<-chan interface{})(outChan))

pchan := n.Routing.FindProvidersAsync(ctx, c, numProviders)
pchan := n.Routing.FindProvidersAsync(ctx, c.Hash(), numProviders)
go func() {
defer close(outChan)
for e := range events {
Expand Down Expand Up @@ -291,7 +291,7 @@ var provideRefDhtCmd = &cmds.Command{
return
}

has, err := n.Blockstore.Has(c)
has, err := n.Blockstore.Has(c.Hash())
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
Expand Down Expand Up @@ -370,7 +370,7 @@ var provideRefDhtCmd = &cmds.Command{

func provideKeys(ctx context.Context, r routing.IpfsRouting, cids []cid.Cid) error {
for _, c := range cids {
err := r.Provide(ctx, c, true)
err := r.Provide(ctx, c.Hash(), true)
if err != nil {
return err
}
Expand All @@ -393,7 +393,7 @@ func provideKeysRec(ctx context.Context, r routing.IpfsRouting, dserv ipld.DAGSe
continue
}

err = r.Provide(ctx, k, true)
err = r.Provide(ctx, k.Hash(), true)
if err != nil {
return err
}
Expand Down
7 changes: 4 additions & 3 deletions core/commands/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,8 @@ var dupsFileStore = &oldCmds.Command{

go func() {
defer close(out)
for cid := range ch {
have, err := fs.MainBlockstore().Has(cid)
for h := range ch {
have, err := fs.MainBlockstore().Has(h)
if err != nil {
select {
case out <- &RefWrapper{Err: err.Error()}:
Expand All @@ -213,8 +213,9 @@ var dupsFileStore = &oldCmds.Command{
return
}
if have {
c := cid.NewCidV1(cid.Raw, h)
select {
case out <- &RefWrapper{Ref: cid.String()}:
case out <- &RefWrapper{Ref: c.String()}:
case <-req.Context().Done():
return
}
Expand Down
2 changes: 1 addition & 1 deletion core/commands/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func connectToPubSubPeers(ctx context.Context, n *core.IpfsNode, cid cid.Cid) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

provs := n.Routing.FindProvidersAsync(ctx, cid, 10)
provs := n.Routing.FindProvidersAsync(ctx, cid.Hash(), 10)
wg := &sync.WaitGroup{}
for p := range provs {
wg.Add(1)
Expand Down
14 changes: 7 additions & 7 deletions core/commands/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (
corerepo "github.com/ipfs/go-ipfs/core/corerepo"
fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo"

cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid"
cmds "gx/ipfs/QmPXR4tNdLbp8HsZiPMjpsgqphX9Vhw2J6Jh5MKH2ovW3D/go-ipfs-cmds"
mh "gx/ipfs/QmPnFwZ2JXKnXgMw8CdBPxn7FWh6LLdjUjxV1fKHuJnkr8/go-multihash"
cmdkit "gx/ipfs/QmSP88ryZkHSRn1fnngAaV2Vcn63WUJzAavnRM9CVdU1Ky/go-ipfs-cmdkit"
config "gx/ipfs/QmYVqYJTVjetcf1guieEgWpK1PZtHPytP624vKzTF1P3r2/go-ipfs-config"
bstore "gx/ipfs/QmegPGspn3RpTMQ23Fd3GVVMopo1zsEMurudbFMZ5UXBLH/go-ipfs-blockstore"
Expand Down Expand Up @@ -47,7 +47,7 @@ var RepoCmd = &cmds.Command{

// GcResult is the result returned by "repo gc" command.
type GcResult struct {
Key cid.Cid
Key string
Error string `json:",omitempty"`
}

Expand Down Expand Up @@ -81,15 +81,15 @@ order to reclaim hard disk space.
re.Emit(&GcResult{Error: res.Error.Error()})
errs = true
} else {
re.Emit(&GcResult{Key: res.KeyRemoved})
re.Emit(&GcResult{Key: res.KeyRemoved.String()})
}
}
if errs {
return errors.New("encountered errors during gc run")
}
} else {
err := corerepo.CollectResult(req.Context, gcOutChan, func(k cid.Cid) {
re.Emit(&GcResult{Key: k})
err := corerepo.CollectResult(req.Context, gcOutChan, func(k mh.Multihash) {
re.Emit(&GcResult{Key: k.String()})
})
if err != nil {
return err
Expand Down Expand Up @@ -289,11 +289,11 @@ var repoVerifyCmd = &oldcmds.Command{
var fails int
var i int
for k := range keys {
_, err := bs.Get(k)
_, err := bs.Get(bstore.CidFromMultihash(k))
if err != nil {
select {
case out <- &VerifyProgress{
Msg: fmt.Sprintf("block %s was corrupt (%s)", k, err),
Msg: fmt.Sprintf("block %s was corrupt (%s)", k.String(), err),
}:
case <-req.Context().Done():
return
Expand Down
12 changes: 7 additions & 5 deletions core/coreapi/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface"
caopts "github.com/ipfs/go-ipfs/core/coreapi/interface/options"

mh "gx/ipfs/QmPnFwZ2JXKnXgMw8CdBPxn7FWh6LLdjUjxV1fKHuJnkr8/go-multihash"
cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid"
cidutil "gx/ipfs/QmQJSeE3CX4zos9qeaG8EhecEK9zvrTEfTG84J8C5NVRwt/go-cidutil"
peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer"
Expand Down Expand Up @@ -46,7 +47,7 @@ func (api *DhtAPI) FindProviders(ctx context.Context, p coreiface.Path, opts ...
return nil, fmt.Errorf("number of providers must be greater than 0")
}

pchan := api.node.Routing.FindProvidersAsync(ctx, rp.Cid(), numProviders)
pchan := api.node.Routing.FindProvidersAsync(ctx, rp.Cid().Hash(), numProviders)
return pchan, nil
}

Expand All @@ -66,8 +67,9 @@ func (api *DhtAPI) Provide(ctx context.Context, path coreiface.Path, opts ...cao
}

c := rp.Cid()
h := c.Hash()

has, err := api.node.Blockstore.Has(c)
has, err := api.node.Blockstore.Has(h)
if err != nil {
return err
}
Expand All @@ -79,7 +81,7 @@ func (api *DhtAPI) Provide(ctx context.Context, path coreiface.Path, opts ...cao
if settings.Recursive {
err = provideKeysRec(ctx, api.node.Routing, api.node.Blockstore, []cid.Cid{c})
} else {
err = provideKeys(ctx, api.node.Routing, []cid.Cid{c})
err = provideKeys(ctx, api.node.Routing, []mh.Multihash{h})
}
if err != nil {
return err
Expand All @@ -88,7 +90,7 @@ func (api *DhtAPI) Provide(ctx context.Context, path coreiface.Path, opts ...cao
return nil
}

func provideKeys(ctx context.Context, r routing.IpfsRouting, cids []cid.Cid) error {
func provideKeys(ctx context.Context, r routing.IpfsRouting, cids []mh.Multihash) error {
for _, c := range cids {
err := r.Provide(ctx, c, true)
if err != nil {
Expand All @@ -115,7 +117,7 @@ func provideKeysRec(ctx context.Context, r routing.IpfsRouting, bs blockstore.Bl
for {
select {
case k := <-provided.New:
err := r.Provide(ctx, k, true)
err := r.Provide(ctx, k.Hash(), true)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion core/coreapi/pin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func TestPinRecursive(t *testing.T) {
t.Errorf("unexpected verify result count: %d", n)
}

err = nd.Blockstore.DeleteBlock(p0.Cid())
err = nd.Blockstore.Delete(p0.Cid().Hash())
if err != nil {
t.Fatal(err)
}
Expand Down
5 changes: 3 additions & 2 deletions core/corerepo/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

humanize "gx/ipfs/QmPSBJL4momYnE7DcUyk2DVhD6rH488ZmHBGLbxNdhU44K/go-humanize"
cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid"
mh "gx/ipfs/QmPnFwZ2JXKnXgMw8CdBPxn7FWh6LLdjUjxV1fKHuJnkr8/go-multihash"
logging "gx/ipfs/QmRREK2CAZ5Re2Bd9zZFG6FeYDppUWt5cMgsoUEp3ktgSr/go-log"
mfs "gx/ipfs/QmRkrpnhZqDxTxwGCsDbuZMr7uCFZHH6SGfrcjgEQwxF3t/go-mfs"
)
Expand Down Expand Up @@ -94,7 +95,7 @@ func GarbageCollect(n *core.IpfsNode, ctx context.Context) error {
// CollectResult collects the output of a garbage collection run and calls the
// given callback for each object removed. It also collects all errors into a
// MultiError which is returned after the gc is completed.
func CollectResult(ctx context.Context, gcOut <-chan gc.Result, cb func(cid.Cid)) error {
func CollectResult(ctx context.Context, gcOut <-chan gc.Result, cb func(mh.Multihash)) error {
var errors []error
loop:
for {
Expand All @@ -105,7 +106,7 @@ loop:
}
if res.Error != nil {
errors = append(errors, res.Error)
} else if res.KeyRemoved.Defined() && cb != nil {
} else if res.KeyRemoved != nil && cb != nil {
cb(res.KeyRemoved)
}
case <-ctx.Done():
Expand Down
16 changes: 10 additions & 6 deletions exchange/reprovide/providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,40 @@ import (

pin "github.com/ipfs/go-ipfs/pin"

cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid"
cidutil "gx/ipfs/QmQJSeE3CX4zos9qeaG8EhecEK9zvrTEfTG84J8C5NVRwt/go-cidutil"
merkledag "gx/ipfs/QmXv5mwmQ74r4aiHcNeQ4GAmfB3aWJuqaE4WyDfDfvkgLM/go-merkledag"
ipld "gx/ipfs/QmdDXJs4axxefSPgK6Y1QhpJWKuDPnGJiqgq4uncb4rFHL/go-ipld-format"
blocks "gx/ipfs/QmegPGspn3RpTMQ23Fd3GVVMopo1zsEMurudbFMZ5UXBLH/go-ipfs-blockstore"
mh "gx/ipfs/QmPnFwZ2JXKnXgMw8CdBPxn7FWh6LLdjUjxV1fKHuJnkr8/go-multihash"
)

// NewBlockstoreProvider returns key provider using bstore.AllKeysChan
func NewBlockstoreProvider(bstore blocks.Blockstore) KeyChanFunc {
return func(ctx context.Context) (<-chan cid.Cid, error) {
return bstore.AllKeysChan(ctx)
return func(ctx context.Context) (<-chan mh.Multihash, error) {
ch, err := bstore.AllKeysChan(ctx)
if err != nil {
return nil, err
}
return ch, nil
}
}

// NewPinnedProvider returns provider supplying pinned keys
func NewPinnedProvider(pinning pin.Pinner, dag ipld.DAGService, onlyRoots bool) KeyChanFunc {
return func(ctx context.Context) (<-chan cid.Cid, error) {
return func(ctx context.Context) (<-chan mh.Multihash, error) {
set, err := pinSet(ctx, pinning, dag, onlyRoots)
if err != nil {
return nil, err
}

outCh := make(chan cid.Cid)
outCh := make(chan mh.Multihash)
go func() {
defer close(outCh)
for c := range set.New {
select {
case <-ctx.Done():
return
case outCh <- c:
case outCh <- c.Hash():
}
}

Expand Down
14 changes: 7 additions & 7 deletions exchange/reprovide/reprovide.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@ import (
"time"

backoff "gx/ipfs/QmPJUtEJsm5YLUWhF6imvyCH8KZXRJa9Wup7FDMwTy5Ufz/backoff"
cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid"
mh "gx/ipfs/QmPnFwZ2JXKnXgMw8CdBPxn7FWh6LLdjUjxV1fKHuJnkr8/go-multihash"
logging "gx/ipfs/QmRREK2CAZ5Re2Bd9zZFG6FeYDppUWt5cMgsoUEp3ktgSr/go-log"
"gx/ipfs/QmVkMRSkXrpjqrroEXWuYBvDBnXCdMMY6gsKicBGVGUqKT/go-verifcid"
//"gx/ipfs/QmVkMRSkXrpjqrroEXWuYBvDBnXCdMMY6gsKicBGVGUqKT/go-verifcid"
routing "gx/ipfs/QmdKS5YtmuSWKuLLgbHG176mS3VX3AKiyVmaaiAfvgcuch/go-libp2p-routing"
)

var log = logging.Logger("reprovider")

//KeyChanFunc is function streaming CIDs to pass to content routing
type KeyChanFunc func(context.Context) (<-chan cid.Cid, error)
type KeyChanFunc func(context.Context) (<-chan mh.Multihash, error)
type doneFunc func(error)

type Reprovider struct {
Expand Down Expand Up @@ -85,10 +85,10 @@ func (rp *Reprovider) Reprovide() error {
}
for c := range keychan {
// hash security
if err := verifcid.ValidateCid(c); err != nil {
log.Errorf("insecure hash in reprovider, %s (%s)", c, err)
continue
}
//if err := verifcid.ValidateCid(c); err != nil {
// log.Errorf("insecure hash in reprovider, %s (%s)", c, err)
// continue
//}
op := func() error {
err := rp.rsys.Provide(rp.ctx, c, true)
if err != nil {
Expand Down
19 changes: 10 additions & 9 deletions filestore/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

posinfo "gx/ipfs/QmPG32VXR5jmpo9q8R9FNdR4Ae97Ky9CiZE6SctJLUB79H/go-ipfs-posinfo"
cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid"
mh "gx/ipfs/QmPnFwZ2JXKnXgMw8CdBPxn7FWh6LLdjUjxV1fKHuJnkr8/go-multihash"
logging "gx/ipfs/QmRREK2CAZ5Re2Bd9zZFG6FeYDppUWt5cMgsoUEp3ktgSr/go-log"
blocks "gx/ipfs/QmRcHuYzAyswytBuMF78rj3LTChYszomRFXNg4685ZN1WM/go-block-format"
dsq "gx/ipfs/QmSpg1CvpXQQow5ernt1gNBXaXV6yxyNqi7XoeerWfzB5w/go-datastore/query"
Expand Down Expand Up @@ -49,7 +50,7 @@ func NewFilestore(bs blockstore.Blockstore, fm *FileManager) *Filestore {

// AllKeysChan returns a channel from which to read the keys stored in
// the blockstore. If the given context is cancelled the channel will be closed.
func (f *Filestore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
func (f *Filestore) AllKeysChan(ctx context.Context) (<-chan mh.Multihash, error) {
ctx, cancel := context.WithCancel(ctx)

a, err := f.bs.AllKeysChan(ctx)
Expand All @@ -58,7 +59,7 @@ func (f *Filestore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return nil, err
}

out := make(chan cid.Cid, dsq.KeysOnlyBufSize)
out := make(chan mh.Multihash, dsq.KeysOnlyBufSize)
go func() {
defer cancel()
defer close(out)
Expand Down Expand Up @@ -115,13 +116,13 @@ func (f *Filestore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
// blockstore. As expected, in the case of FileManager blocks, only the
// reference is deleted, not its contents. It may return
// ErrNotFound when the block is not stored.
func (f *Filestore) DeleteBlock(c cid.Cid) error {
err1 := f.bs.DeleteBlock(c)
func (f *Filestore) Delete(c mh.Multihash) error {
err1 := f.bs.Delete(c)
if err1 != nil && err1 != blockstore.ErrNotFound {
return err1
}

err2 := f.fm.DeleteBlock(c)
err2 := f.fm.Delete(c)
// if we successfully removed something from the blockstore, but the
// filestore didnt have it, return success

Expand Down Expand Up @@ -154,7 +155,7 @@ func (f *Filestore) Get(c cid.Cid) (blocks.Block, error) {

// GetSize returns the size of the requested block. It may return ErrNotFound
// when the block is not stored.
func (f *Filestore) GetSize(c cid.Cid) (int, error) {
func (f *Filestore) GetSize(c mh.Multihash) (int, error) {
size, err := f.bs.GetSize(c)
switch err {
case nil:
Expand All @@ -168,7 +169,7 @@ func (f *Filestore) GetSize(c cid.Cid) (int, error) {

// Has returns true if the block with the given Cid is
// stored in the Filestore.
func (f *Filestore) Has(c cid.Cid) (bool, error) {
func (f *Filestore) Has(c mh.Multihash) (bool, error) {
has, err := f.bs.Has(c)
if err != nil {
return false, err
Expand All @@ -186,7 +187,7 @@ func (f *Filestore) Has(c cid.Cid) (bool, error) {
// delegated to the FileManager, while the rest of blocks
// are handled by the regular blockstore.
func (f *Filestore) Put(b blocks.Block) error {
has, err := f.Has(b.Cid())
has, err := f.Has(b.Cid().Hash())
if err != nil {
return err
}
Expand All @@ -210,7 +211,7 @@ func (f *Filestore) PutMany(bs []blocks.Block) error {
var fstores []*posinfo.FilestoreNode

for _, b := range bs {
has, err := f.Has(b.Cid())
has, err := f.Has(b.Cid().Hash())
if err != nil {
return err
}
Expand Down
Loading