Skip to content

Commit

Permalink
global rebalance: warn cluster map changed
Browse files Browse the repository at this point in the history
* up versions: aistore 3.19, CLI 1.6, aisloader 1.7
* add err-destination-missing type

Signed-off-by: Alex Aizman <alex.aizman@gmail.com>
  • Loading branch information
alex-aizman committed Aug 29, 2023
1 parent 63eb828 commit 7dcc067
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 12 deletions.
6 changes: 3 additions & 3 deletions cmn/ver_const.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import "github.com/NVIDIA/aistore/cmn/jsp"
// `jsp` formats its *signature* and other implementation details.

const (
VersionAIStore = "3.18"
VersionCLI = "1.5"
VersionLoader = "1.6"
VersionAIStore = "3.19"
VersionCLI = "1.6"
VersionLoader = "1.7"
VersionAuthN = "1.0"
)

Expand Down
28 changes: 21 additions & 7 deletions reb/globrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,14 @@ func (reb *Reb) initRenew(rargs *rebArgs, notif *xact.NotifXact, logHdr string,
reb.setXact(xreb)
reb.rebID.Store(rargs.id)

// check Smap _prior_ to opening streams
smap := reb.t.Sowner().Get()
if smap.Version != rargs.smap.Version {
debug.Assert(smap.Version > rargs.smap.Version)
nlog.Errorf("Warning %s: %s post-init version change %s => %s", reb.t, xreb, rargs.smap, smap)
// TODO: handle an unlikely corner case keeping in mind that not every change warants a different rebalance
}

// 3. init streams and data structures
if haveStreams {
reb.beginStreams(rargs.config)
Expand Down Expand Up @@ -731,24 +739,28 @@ func (rj *rebJogger) walkBck(bck *meta.Bck) bool {
if rj.xreb.IsAborted() {
nlog.Infof("aborting traversal")
} else {
nlog.Errorf("%s: failed to traverse, err: %v", rj.m.t, err)
nlog.Errorf("%s: %s failed to traverse: %v", rj.m.t, rj.xreb.Name(), err)
}
return true
}

// send completion
func (rj *rebJogger) objSentCallback(hdr transport.ObjHdr, _ io.ReadCloser, arg any, err error) {
rj.m.inQueue.Dec()
if err != nil {
if cmn.FastV(4, cos.SmoduleReb) || !cos.IsRetriableConnErr(err) {
if err == nil {
rj.xreb.OutObjsAdd(1, hdr.ObjAttrs.Size) // NOTE: double-counts retransmissions
return
}
// log err
if cmn.FastV(4, cos.SmoduleReb) || !cos.IsRetriableConnErr(err) {
if bundle.IsErrDestinationMissing(err) {
nlog.Errorf("%s: %v, %s", rj.xreb.Name(), err, rj.smap)
} else {
lom, ok := arg.(*cluster.LOM)
debug.Assert(ok)
nlog.Errorf("%s: failed to send %s: %v", rj.m.t.Snode(), lom, err)
nlog.Errorf("%s: %s failed to send %s: %v", rj.m.t, rj.xreb.Name(), lom, err)
}
return
}
xreb := rj.xreb
xreb.OutObjsAdd(1, hdr.ObjAttrs.Size) // NOTE: double-counts retransmissions
}

func (rj *rebJogger) visitObj(fqn string, de fs.DirEntry) (err error) {
Expand Down Expand Up @@ -800,12 +812,14 @@ func (rj *rebJogger) _lwalk(lom *cluster.LOM, fqn string) error {
if err != nil {
return err
}

// transmit (unlock via transport completion => roc.Close)
rj.m.addLomAck(lom)
if err := rj.doSend(lom, tsi, roc); err != nil {
rj.m.delLomAck(lom, 0, false /*free LOM*/)
return err
}

return nil
}

Expand Down
23 changes: 21 additions & 2 deletions transport/bundle/stream_bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ type (
Multiplier int // so-many TCP connections per Rx endpoint, with round-robin
ManualResync bool // auto-resync by default
}

ErrDestinationMissing struct {
streamStr string
tname string
smapStr string
}
)

// interface guard
Expand Down Expand Up @@ -182,7 +188,7 @@ func (sb *Streams) Send(obj *transport.Obj, roc cos.ReadOpenCloser, nodes ...*me
if _, ok := streams[di.ID()]; ok {
continue
}
err = cos.NewErrNotFound("destination mismatch: stream (%s) => %s", sb, di)
err = &ErrDestinationMissing{sb.String(), di.StringEx(), sb.smap.String()}
_doCmpl(obj, roc, err) // ditto
return
}
Expand Down Expand Up @@ -359,7 +365,7 @@ func (sb *Streams) Resync() {
}
// not connecting to the peer that's in maintenance and already rebalanced-out
if si.InMaintPostReb() {
nlog.Infof("%s => %s[-/%#b] - skipping", sb, si.StringEx(), si.Flags)
nlog.Infof("%s => %s[-/%#b] per %s - skipping", sb, si.StringEx(), si.Flags, smap)
continue
}

Expand Down Expand Up @@ -414,3 +420,16 @@ func mdiff(oldMaps, newMaps []meta.NodeMap) (added, removed meta.NodeMap) {
}
return
}

///////////////////////////
// ErrDestinationMissing //
///////////////////////////

func (e *ErrDestinationMissing) Error() string {
return fmt.Sprintf("destination missing: stream (%s) => %s, %s", e.streamStr, e.tname, e.smapStr)
}

func IsErrDestinationMissing(e error) bool {
_, ok := e.(*ErrDestinationMissing)
return ok
}

0 comments on commit 7dcc067

Please sign in to comment.