-
-
Notifications
You must be signed in to change notification settings - Fork 3k
/
migrations.go
302 lines (265 loc) · 7.88 KB
/
migrations.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
package migrations
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"net/url"
"os"
"os/exec"
"path"
"runtime"
"strings"
"sync"
config "github.com/ipfs/kubo/config"
)
const (
// Migrations subdirectory in distribution. Empty for root (no subdir).
distMigsRoot = ""
distFSRM = "fs-repo-migrations"
)
// RunMigration finds, downloads, and runs the individual migrations needed to
// migrate the repo from its current version to the target version.
func RunMigration(ctx context.Context, fetcher Fetcher, targetVer int, ipfsDir string, allowDowngrade bool) error {
ipfsDir, err := CheckIpfsDir(ipfsDir)
if err != nil {
return err
}
fromVer, err := RepoVersion(ipfsDir)
if err != nil {
return fmt.Errorf("could not get repo version: %w", err)
}
if fromVer == targetVer {
// repo already at target version number
return nil
}
if fromVer > targetVer && !allowDowngrade {
return fmt.Errorf("downgrade not allowed from %d to %d", fromVer, targetVer)
}
logger := log.New(os.Stdout, "", 0)
logger.Print("Looking for suitable migration binaries.")
migrations, binPaths, err := findMigrations(ctx, fromVer, targetVer)
if err != nil {
return err
}
// Download migrations that were not found
if len(binPaths) < len(migrations) {
missing := make([]string, 0, len(migrations)-len(binPaths))
for _, mig := range migrations {
if _, ok := binPaths[mig]; !ok {
missing = append(missing, mig)
}
}
logger.Println("Need", len(missing), "migrations, downloading.")
tmpDir, err := os.MkdirTemp("", "migrations")
if err != nil {
return err
}
defer os.RemoveAll(tmpDir)
fetched, err := fetchMigrations(ctx, fetcher, missing, tmpDir, logger)
if err != nil {
logger.Print("Failed to download migrations.")
return err
}
for i := range missing {
binPaths[missing[i]] = fetched[i]
}
}
var revert bool
if fromVer > targetVer {
revert = true
}
for _, migration := range migrations {
logger.Println("Running migration", migration, "...")
err = runMigration(ctx, binPaths[migration], ipfsDir, revert, logger)
if err != nil {
return fmt.Errorf("migration %s failed: %w", migration, err)
}
}
logger.Printf("Success: fs-repo migrated to version %d.\n", targetVer)
return nil
}
func NeedMigration(target int) (bool, error) {
vnum, err := RepoVersion("")
if err != nil {
return false, fmt.Errorf("could not get repo version: %w", err)
}
return vnum != target, nil
}
func ExeName(name string) string {
if runtime.GOOS == "windows" {
return name + ".exe"
}
return name
}
// ReadMigrationConfig reads the Migration section of the IPFS config, avoiding
// reading anything other than the Migration section. That way, we're free to
// make arbitrary changes to all _other_ sections in migrations.
func ReadMigrationConfig(repoRoot string, userConfigFile string) (*config.Migration, error) {
var cfg struct {
Migration config.Migration
}
cfgPath, err := config.Filename(repoRoot, userConfigFile)
if err != nil {
return nil, err
}
cfgFile, err := os.Open(cfgPath)
if err != nil {
return nil, err
}
defer cfgFile.Close()
err = json.NewDecoder(cfgFile).Decode(&cfg)
if err != nil {
return nil, err
}
switch cfg.Migration.Keep {
case "":
cfg.Migration.Keep = config.DefaultMigrationKeep
case "discard", "cache", "keep":
default:
return nil, errors.New("unknown config value, Migrations.Keep must be 'cache', 'pin', or 'discard'")
}
if len(cfg.Migration.DownloadSources) == 0 {
cfg.Migration.DownloadSources = config.DefaultMigrationDownloadSources
}
return &cfg.Migration, nil
}
// GetMigrationFetcher creates one or more fetchers according to
// downloadSources,.
func GetMigrationFetcher(downloadSources []string, distPath string, newIpfsFetcher func(string) Fetcher) (Fetcher, error) {
const httpUserAgent = "kubo/migration"
const numTriesPerHTTP = 3
var fetchers []Fetcher
for _, src := range downloadSources {
src := strings.TrimSpace(src)
switch src {
case "HTTPS", "https", "HTTP", "http":
fetchers = append(fetchers, &RetryFetcher{NewHttpFetcher(distPath, "", httpUserAgent, 0), numTriesPerHTTP})
case "IPFS", "ipfs":
if newIpfsFetcher != nil {
fetchers = append(fetchers, newIpfsFetcher(distPath))
}
case "":
// Ignore empty string
default:
u, err := url.Parse(src)
if err != nil {
return nil, fmt.Errorf("bad gateway address: %w", err)
}
switch u.Scheme {
case "":
u.Scheme = "https"
case "https", "http":
default:
return nil, errors.New("bad gateway address: url scheme must be http or https")
}
fetchers = append(fetchers, &RetryFetcher{NewHttpFetcher(distPath, u.String(), httpUserAgent, 0), numTriesPerHTTP})
}
}
switch len(fetchers) {
case 0:
return nil, errors.New("no sources specified")
case 1:
return fetchers[0], nil
}
// Wrap fetchers in a MultiFetcher to try them in order
return NewMultiFetcher(fetchers...), nil
}
func migrationName(from, to int) string {
return fmt.Sprintf("fs-repo-%d-to-%d", from, to)
}
// findMigrations returns a list of migrations, ordered from first to last
// migration to apply, and a map of locations of migration binaries of any
// migrations that were found.
func findMigrations(ctx context.Context, from, to int) ([]string, map[string]string, error) {
step := 1
count := to - from
if from > to {
step = -1
count = from - to
}
migrations := make([]string, 0, count)
binPaths := make(map[string]string, count)
for cur := from; cur != to; cur += step {
if ctx.Err() != nil {
return nil, nil, ctx.Err()
}
var migName string
if step == -1 {
migName = migrationName(cur+step, cur)
} else {
migName = migrationName(cur, cur+step)
}
migrations = append(migrations, migName)
bin, err := exec.LookPath(migName)
if err != nil {
continue
}
binPaths[migName] = bin
}
return migrations, binPaths, nil
}
func runMigration(ctx context.Context, binPath, ipfsDir string, revert bool, logger *log.Logger) error {
pathArg := fmt.Sprintf("-path=%s", ipfsDir)
var cmd *exec.Cmd
if revert {
logger.Println(" => Running:", binPath, pathArg, "-verbose=true -revert")
cmd = exec.CommandContext(ctx, binPath, pathArg, "-verbose=true", "-revert")
} else {
logger.Println(" => Running:", binPath, pathArg, "-verbose=true")
cmd = exec.CommandContext(ctx, binPath, pathArg, "-verbose=true")
}
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
return cmd.Run()
}
// fetchMigrations downloads the requested migrations, and returns a slice with
// the paths of each binary, in the same order specified by needed.
func fetchMigrations(ctx context.Context, fetcher Fetcher, needed []string, destDir string, logger *log.Logger) ([]string, error) {
osv, err := osWithVariant()
if err != nil {
return nil, err
}
if osv == "linux-musl" {
return nil, fmt.Errorf("linux-musl not supported, you must build the binary from source for your platform")
}
var wg sync.WaitGroup
wg.Add(len(needed))
bins := make([]string, len(needed))
// Download and unpack all requested migrations concurrently.
for i, name := range needed {
logger.Printf("Downloading migration: %s...", name)
go func(i int, name string) {
defer wg.Done()
dist := path.Join(distMigsRoot, name)
ver, err := LatestDistVersion(ctx, fetcher, dist, false)
if err != nil {
logger.Printf("could not get latest version of migration %s: %s", name, err)
return
}
loc, err := FetchBinary(ctx, fetcher, dist, ver, name, destDir)
if err != nil {
logger.Printf("could not download %s: %s", name, err)
return
}
logger.Printf("Downloaded and unpacked migration: %s (%s)", loc, ver)
bins[i] = loc
}(i, name)
}
wg.Wait()
var fails []string
for i := range bins {
if bins[i] == "" {
fails = append(fails, needed[i])
}
}
if len(fails) != 0 {
err = fmt.Errorf("failed to download migrations: %s", strings.Join(fails, " "))
if ctx.Err() != nil {
err = fmt.Errorf("%s, %w", ctx.Err(), err)
}
return nil, err
}
return bins, nil
}