-
Notifications
You must be signed in to change notification settings - Fork 171
/
ls.go
280 lines (255 loc) · 8.5 KB
/
ls.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
// Package api provides native Go-based API/SDK over HTTP(S).
/*
* Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved.
*/
package api
import (
"context"
"errors"
"net/http"
"net/url"
"strconv"
"time"
"github.com/NVIDIA/aistore/api/apc"
"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/debug"
"github.com/NVIDIA/aistore/cmn/mono"
)
const (
maxListPageRetries = 3
msgpBufSize = 16 * cos.KiB
)
type (
LsoCounter struct {
startTime int64 // time operation started
callAfter int64 // callback after
callback LsoCB
count int
done bool
}
LsoCB func(*LsoCounter)
// additional and optional list-objects args (compare with: GetArgs, PutArgs)
ListArgs struct {
Callback LsoCB
CallAfter time.Duration
Header http.Header // to optimize listing very large buckets, e.g.: Header.Set(apc.HdrInventory, "true")
Limit int64
}
)
// ListBuckets returns buckets for provided query, where
// - `fltPresence` is one of { apc.FltExists, apc.FltPresent, ... } - see api/apc/query.go
// - ListBuckets utilizes `cmn.QueryBcks` - control structure that's practically identical to `cmn.Bck`,
// except for the fact that some or all its fields can be empty (to facilitate the corresponding query).
// See also: QueryBuckets, ListObjects
func ListBuckets(bp BaseParams, qbck cmn.QueryBcks, fltPresence int) (cmn.Bcks, error) {
q := make(url.Values, 4)
q.Set(apc.QparamFltPresence, strconv.Itoa(fltPresence))
qbck.AddToQuery(q)
bp.Method = http.MethodGet
reqParams := AllocRp()
{
reqParams.BaseParams = bp
reqParams.Path = apc.URLPathBuckets.S
// NOTE: bucket name
// - qbck.IsBucket() to differentiate between list-objects and list-buckets (operations)
// - list-buckets own correctness (see QueryBuckets below)
reqParams.Body = cos.MustMarshal(apc.ActMsg{Action: apc.ActList, Name: qbck.Name})
reqParams.Header = http.Header{cos.HdrContentType: []string{cos.ContentJSON}}
reqParams.Query = q
}
bcks := cmn.Bcks{}
_, err := reqParams.DoReqAny(&bcks)
FreeRp(reqParams)
if err != nil {
return nil, err
}
return bcks, nil
}
// QueryBuckets is a little convenience helper. It returns true if the selection contains
// at least one bucket that satisfies the (qbck) criteria.
// - `fltPresence` - as per QparamFltPresence enum (see api/apc/query.go)
func QueryBuckets(bp BaseParams, qbck cmn.QueryBcks, fltPresence int) (bool, error) {
bcks, err := ListBuckets(bp, qbck, fltPresence)
return len(bcks) > 0, err
}
// ListObjects returns a list of objects in a bucket - a slice of structures in the
// `cmn.LsoRes` that look like `cmn.LsoEnt`.
//
// The `numObjects` argument is the maximum number of objects to be returned
// (where 0 (zero) means returning all objects in the bucket).
//
// This API supports numerous options and flags. In particular, `apc.LsoMsg`
// structure supports "opening" objects formatted as one of the supported
// archival types and include contents of archived directories in generated
// result sets.
//
// In addition, `lsmsg` (`apc.LsoMsg`) provides options (flags) to optimize
// the request's latency, to list anonymous public-access Cloud buckets, and more.
// Further details at `api/apc/lsmsg.go` source.
//
// AIS supports listing buckets that have millions of objects.
// For large and very large buckets, it is strongly recommended to use the
// `ListObjectsPage` API - effectively, an iterator returning _next_
// listed page along with associated _continuation token_.
//
// See also:
// - docs/cli/* for CLI usage examples
// - `apc.LsoMsg`
// - `api.ListObjectsPage`
func ListObjects(bp BaseParams, bck cmn.Bck, lsmsg *apc.LsoMsg, args ListArgs) (*cmn.LsoRes, error) {
reqParams := lsoReq(bp, bck, &args)
if lsmsg == nil {
lsmsg = &apc.LsoMsg{}
} else {
lsmsg.UUID, lsmsg.ContinuationToken = "", "" // new
}
lst, err := lso(reqParams, lsmsg, args)
freeMbuf(reqParams.buf)
FreeRp(reqParams)
return lst, err
}
func lsoReq(bp BaseParams, bck cmn.Bck, args *ListArgs) *ReqParams {
hdr := args.Header
if hdr == nil {
hdr = make(http.Header, 2)
}
// NOTE:
// unlike S3 API (that aistore also provides), native Go-based API always utilizes
// message pack serialization (of the list-objects results), with performance
// improvement that proved to be _significant_, esp. in large-scale benchmarks
hdr.Set(cos.HdrAccept, cos.ContentMsgPack)
hdr.Set(cos.HdrContentType, cos.ContentJSON)
bp.Method = http.MethodGet
reqParams := AllocRp()
{
reqParams.BaseParams = bp
reqParams.Path = apc.URLPathBuckets.Join(bck.Name)
reqParams.Header = hdr
reqParams.Query = bck.NewQuery()
reqParams.buf = allocMbuf() // msgpack
}
return reqParams
}
// `toRead` holds the remaining number of objects to list (that is, unless we are listing
// the entire bucket). Each iteration lists a page of objects and reduces `toRead`
// accordingly. When the latter gets below page size, we perform the final
// iteration for the reduced page.
func lso(reqParams *ReqParams, lsmsg *apc.LsoMsg, args ListArgs) (lst *cmn.LsoRes, _ error) {
var (
ctx *LsoCounter
toRead = args.Limit
listAll = args.Limit == 0
)
if args.Callback != nil {
ctx = &LsoCounter{startTime: mono.NanoTime(), callback: args.Callback, count: -1}
ctx.callAfter = ctx.startTime + args.CallAfter.Nanoseconds()
}
for pageNum := 1; listAll || toRead > 0; pageNum++ {
if !listAll {
lsmsg.PageSize = toRead
}
actMsg := apc.ActMsg{Action: apc.ActList, Value: lsmsg}
reqParams.Body = cos.MustMarshal(actMsg)
page, err := lsoPage(reqParams)
if err != nil {
return nil, err
}
if pageNum == 1 {
lst = page
lsmsg.UUID = page.UUID
debug.Assert(cos.IsValidUUID(lst.UUID), lst.UUID)
} else {
lst.Entries = append(lst.Entries, page.Entries...)
lst.ContinuationToken = page.ContinuationToken
lst.Flags |= page.Flags
debug.Assert(lst.UUID == page.UUID, lst.UUID, page.UUID)
}
if ctx != nil && ctx.mustCall() {
ctx.count = len(lst.Entries)
if page.ContinuationToken == "" {
ctx.finish()
}
ctx.callback(ctx)
}
if page.ContinuationToken == "" { // listed all pages
break
}
toRead = max(toRead-int64(len(page.Entries)), 0)
lsmsg.ContinuationToken = page.ContinuationToken
}
return lst, nil
}
// w/ limited retry and increasing timeout
func lsoPage(reqParams *ReqParams) (_ *cmn.LsoRes, err error) {
for range maxListPageRetries {
page := &cmn.LsoRes{}
if _, err = reqParams.DoReqAny(page); err == nil {
return page, nil
}
if !errors.Is(err, context.DeadlineExceeded) {
break
}
client := *reqParams.BaseParams.Client
client.Timeout += client.Timeout >> 1
reqParams.BaseParams.Client = &client
}
return nil, err
}
// ListObjectsPage returns the first page of bucket objects.
// On success the function updates `lsmsg.ContinuationToken` which client then can reuse
// to fetch the next page.
// See also:
// - docs/cli/* for CLI usage examples
// - `apc.LsoMsg`
// - `api.ListObjects`
func ListObjectsPage(bp BaseParams, bck cmn.Bck, lsmsg *apc.LsoMsg, args ListArgs) (*cmn.LsoRes, error) {
reqParams := lsoReq(bp, bck, &args)
if lsmsg == nil {
lsmsg = &apc.LsoMsg{}
}
actMsg := apc.ActMsg{Action: apc.ActList, Value: lsmsg}
reqParams.Body = cos.MustMarshal(actMsg)
// no need to preallocate bucket entries slice (msgpack does it)
page := &cmn.LsoRes{}
_, err := reqParams.DoReqAny(page)
freeMbuf(reqParams.buf)
FreeRp(reqParams)
if err != nil {
return nil, err
}
lsmsg.UUID = page.UUID
lsmsg.ContinuationToken = page.ContinuationToken
return page, nil
}
// TODO: obsolete this function after introducing mechanism to detect remote bucket changes.
func ListObjectsInvalidateCache(bp BaseParams, bck cmn.Bck) error {
var (
path = apc.URLPathBuckets.Join(bck.Name)
q = url.Values{}
)
bp.Method = http.MethodPost
reqParams := AllocRp()
{
reqParams.Query = bck.AddToQuery(q)
reqParams.BaseParams = bp
reqParams.Path = path
reqParams.Body = cos.MustMarshal(apc.ActMsg{Action: apc.ActInvalListCache})
reqParams.Header = http.Header{cos.HdrContentType: []string{cos.ContentJSON}}
}
err := reqParams.DoRequest()
FreeRp(reqParams)
return err
}
////////////////
// LsoCounter //
////////////////
func (ctx *LsoCounter) IsFinished() bool { return ctx.done }
func (ctx *LsoCounter) Elapsed() time.Duration { return mono.Since(ctx.startTime) }
func (ctx *LsoCounter) Count() int { return ctx.count }
// private
func (ctx *LsoCounter) mustCall() bool {
return ctx.callAfter == ctx.startTime /*immediate*/ ||
mono.NanoTime() >= ctx.callAfter
}
func (ctx *LsoCounter) finish() { ctx.done = true }