Skip to content

Commit

Permalink
Add migrations from temp IPFS node after migration
Browse files Browse the repository at this point in the history
- When migrations are downloaded via IPFS, add the migration archives to IPFS by connecting to the temporary migration node and getching the migrations.
- When migrations are downloaded via HTTP, add the migration archive files directly to IPFS.
  • Loading branch information
gammazero committed Apr 14, 2021
1 parent d07d0c9 commit 3be7d38
Show file tree
Hide file tree
Showing 9 changed files with 245 additions and 98 deletions.
54 changes: 34 additions & 20 deletions cmd/ipfs/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
_ "expvar"
"fmt"
"io/ioutil"
"net"
"net/http"
_ "net/http/pprof"
Expand Down Expand Up @@ -274,8 +275,8 @@ func daemonFunc(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment
}
}

var migrationDL migrations.Downloads
var keepMigrations, pinMigrations bool
var fetcher migrations.Fetcher

// acquire the repo lock _before_ constructing a node. we need to make
// sure we are permitted to access the resources (datastore, etc.)
Expand All @@ -298,10 +299,11 @@ func daemonFunc(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment
}

switch keep, _ := req.Options[migrateKeepKwd].(string); keep {
case "", "keep":
keepMigrations = true
case "pin":
pinMigrations = true
fallthrough
case "", "keep":
keepMigrations = true
case "discard":
default:
return errors.New("unrecognized value for %s, must be 'keep', 'pin', or 'discard'")
Expand All @@ -312,30 +314,43 @@ func daemonFunc(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment

// Get migration fetcher(s) according to download policy
policy, _ := req.Options[migrateDownloadKwd].(string)
fetcher, err := getMigrationFetcher(policy, peers)
fetcher, err = getMigrationFetcher(policy, peers)
if err != nil {
return err
}

if keepMigrations || pinMigrations {
migrationDL, err = migrations.RunMigrationDL(cctx.Context(), fetcher, fsrepo.RepoVersion, "", false)
defer migrationDL.Cleanup()
} else {
err = migrations.RunMigration(cctx.Context(), fetcher, fsrepo.RepoVersion, "", false)
if keepMigrations {
// Create temp directory to store downloaded migration archives
migrations.DownloadDirectory, err = ioutil.TempDir("", "migrations")
if err != nil {
return err
}
defer func() {
if migrations.DownloadDirectory != "" {
os.RemoveAll(migrations.DownloadDirectory)
}
}()
}
closeErr := fetcher.Close()

err = migrations.RunMigration(cctx.Context(), fetcher, fsrepo.RepoVersion, "", false)
if err != nil {
fmt.Println("The migrations of fs-repo failed:")
fmt.Printf(" %s\n", err)
fmt.Println("If you think this is a bug, please file an issue and include this whole log output.")
fmt.Println(" https://github.com/ipfs/fs-repo-migrations")
fetcher.Close()
return err
}

// If there was an error closing the IpfsFetcher, then write error, but
// do not fail because of it.
if closeErr != nil {
log.Errorf("error closing IPFS fetcher: %s", closeErr)
if keepMigrations {
defer fetcher.Close()
} else {
// If there is an error closing the IpfsFetcher, then print error, but
// do not fail because of it.
err = fetcher.Close()
if err != nil {
log.Errorf("error closing IPFS fetcher: %s", err)
}
}

repo, err = fsrepo.Open(cctx.ConfigRoot)
Expand Down Expand Up @@ -460,12 +475,11 @@ func daemonFunc(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment
}

// Add any files downloaded by migration.
if len(migrationDL.Files) != 0 {
err = addMigrationBins(cctx.Context(), node, migrationDL.Files, pinMigrations)
if err != nil {
return err
}
migrationDL.Cleanup()
if keepMigrations {
addMigrations(cctx.Context(), node, fetcher, pinMigrations)
fetcher.Close()
os.RemoveAll(migrations.DownloadDirectory)
migrations.DownloadDirectory = ""
}

// construct http gateway
Expand Down
105 changes: 100 additions & 5 deletions cmd/ipfs/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"io"
"net/url"
"os"
"path/filepath"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/ipfs/go-ipfs/repo/fsrepo/migrations"
"github.com/ipfs/go-ipfs/repo/fsrepo/migrations/ipfsfetcher"
"github.com/ipfs/interface-go-ipfs-core/options"
ipath "github.com/ipfs/interface-go-ipfs-core/path"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multiaddr"
)
Expand Down Expand Up @@ -89,23 +91,65 @@ func getMigrationFetcher(downloadPolicy string, peers string) (migrations.Fetche
return migrations.NewMultiFetcher(fetchers...), nil
}

// addMigrationBins adds the files at binPaths to IPFS, pinning them also if
// pin is true.
func addMigrationBins(ctx context.Context, node *core.IpfsNode, binPaths []string, pin bool) error {
func addMigrations(ctx context.Context, node *core.IpfsNode, fetcher migrations.Fetcher, pin bool) error {
if mf, ok := fetcher.(*migrations.MultiFetcher); ok {
fetcher = mf.LastUsed()
}

switch f := fetcher.(type) {
case *ipfsfetcher.IpfsFetcher:
// Add migrations by connecting to temp node and getting from IPFS
err := addMigrationPaths(ctx, node, f.AddrInfo(), f.FetchedPaths(), pin)
if err != nil {
return err
}
case *migrations.HttpFetcher:
// Add the downloaded migration files directly
if migrations.DownloadDirectory != "" {
var paths []string
err := filepath.Walk(migrations.DownloadDirectory, func(filePath string, info os.FileInfo, err error) error {
if info.IsDir() {
return nil
}
paths = append(paths, filePath)
return nil
})
if err != nil {
return err
}
err = addMigrationFiles(ctx, node, paths, pin)
if err != nil {
return err
}
}
default:
return errors.New("Cannot get migrations from unknown fetcher type")
}

return nil
}

// addMigrationFiles adds the files at paths to IPFS, optionally pinning them
func addMigrationFiles(ctx context.Context, node *core.IpfsNode, paths []string, pin bool) error {
ifaceCore, err := coreapi.NewCoreAPI(node)
if err != nil {
return err
}
ufs := ifaceCore.Unixfs()

// Add migration files
for _, filePath := range binPaths {
for _, filePath := range paths {
f, err := os.Open(filePath)
if err != nil {
return err
}

ipfsPath, err := ufs.Add(ctx, files.NewReaderFile(f), options.Unixfs.Pin(pin))
fi, err := f.Stat()
if err != nil {
return err
}

ipfsPath, err := ufs.Add(ctx, files.NewReaderStatFile(f, fi), options.Unixfs.Pin(pin))
if err != nil {
return err
}
Expand All @@ -115,6 +159,57 @@ func addMigrationBins(ctx context.Context, node *core.IpfsNode, binPaths []strin
return nil
}

// addMigrationPaths adds the files at paths to IPFS, optionally pinning
// them. This is done after connecting to the peer.
func addMigrationPaths(ctx context.Context, node *core.IpfsNode, peerInfo peer.AddrInfo, paths []ipath.Path, pin bool) error {
if len(paths) == 0 {
return errors.New("nothing downloaded by ipfs fetcher")
}

ipfs, err := coreapi.NewCoreAPI(node)
if err != nil {
return err
}

// Connect to temp node
if err := ipfs.Swarm().Connect(ctx, peerInfo); err != nil {
return fmt.Errorf("cound not connec to migration peer %q: %s", peerInfo.ID, err)
}
fmt.Printf("conneced to migration peer %q\n", peerInfo)

if pin {
pinApi := ipfs.Pin()
for _, ipfsPath := range paths {
err := pinApi.Add(ctx, ipfsPath)
if err != nil {
return err
}
fmt.Printf("Added and pinned migration file: %q\n", ipfsPath)
}
return nil
}

ufs := ipfs.Unixfs()

// Add migration files
for _, ipfsPath := range paths {
nd, err := ufs.Get(ctx, ipfsPath)
if err != nil {
return err
}

fnd, ok := nd.(files.File)
if !ok {
return fmt.Errorf("not a file node: %q", ipfsPath)
}
io.Copy(io.Discard, fnd)
nd.Close()
fmt.Printf("Added migration file: %q\n", ipfsPath)
}

return nil
}

func parsePeers(migrationPeers string) ([]peer.AddrInfo, error) {
var peers []string
for _, p := range strings.Split(migrationPeers, ",") {
Expand Down
2 changes: 1 addition & 1 deletion cmd/ipfs/migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const peersStr = "/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWGC6TvWhfajpgX6wvJHMYvKpDMX
func TestGetMigrationFetcher(t *testing.T) {
var f migrations.Fetcher
var err error
f, err = getMigrationFetcher("ftp://bad.gateway.io", "")
_, err = getMigrationFetcher("ftp://bad.gateway.io", "")
if err == nil {
t.Fatal("Expected bad URL scheme error")
}
Expand Down
31 changes: 26 additions & 5 deletions repo/fsrepo/migrations/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ import (
"strings"
)

// DownloadDirectory can be set as the location for FetchBinary to save the
// downloaded archive file in. If not set, then FetchBinary saves the archive
// in a temporary directory that is removed after the contents of the archive
// is extracted.
var DownloadDirectory string

// FetchBinary downloads an archive from the distribution site and unpacks it.
//
// The base name of the binary inside the archive may differ from the base
Expand Down Expand Up @@ -68,12 +74,27 @@ func FetchBinary(ctx context.Context, fetcher Fetcher, dist, ver, binName, out s
}
}

// Create temp directory to store download
tmpDir, err := ioutil.TempDir("", arcName)
if err != nil {
return "", err
tmpDir := DownloadDirectory
if tmpDir != "" {
fi, err = os.Stat(tmpDir)
if err != nil {
return "", err
}
if !fi.IsDir() {
return "", &os.PathError{
Op: "FetchBinary",
Path: tmpDir,
Err: os.ErrExist,
}
}
} else {
// Create temp directory to store download
tmpDir, err = ioutil.TempDir("", arcName)
if err != nil {
return "", err
}
defer os.RemoveAll(tmpDir)
}
defer os.RemoveAll(tmpDir)

atype := "tar.gz"
if runtime.GOOS == "windows" {
Expand Down
4 changes: 4 additions & 0 deletions repo/fsrepo/migrations/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ func (f *MultiFetcher) Close() error {
return errs
}

func (f *MultiFetcher) LastUsed() Fetcher {
return f.fetchers[0]
}

func (f *MultiFetcher) Len() int {
return len(f.fetchers)
}
Expand Down
Loading

0 comments on commit 3be7d38

Please sign in to comment.