Skip to content

Commit

Permalink
Merge pull request etcd-io#635 from Elbehery/add_move_subBucket
Browse files Browse the repository at this point in the history
add MoveBucket to support moving a sub-bucket from one bucket to anot…
  • Loading branch information
ahrtr committed Jan 3, 2024
2 parents 1d7fd9a + 886eccb commit 19be273
Show file tree
Hide file tree
Showing 5 changed files with 420 additions and 6 deletions.
80 changes: 75 additions & 5 deletions bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,19 +285,21 @@ func (b *Bucket) DeleteBucket(key []byte) (err error) {
return errors.ErrTxNotWritable
}

newKey := cloneBytes(key)

// Move cursor to correct position.
c := b.Cursor()
k, _, flags := c.seek(key)
k, _, flags := c.seek(newKey)

// Return an error if bucket doesn't exist or is not a bucket.
if !bytes.Equal(key, k) {
if !bytes.Equal(newKey, k) {
return errors.ErrBucketNotFound
} else if (flags & common.BucketLeafFlag) == 0 {
return errors.ErrIncompatibleValue
}

// Recursively delete all child buckets.
child := b.Bucket(key)
child := b.Bucket(newKey)
err = child.ForEachBucket(func(k []byte) error {
if err := child.DeleteBucket(k); err != nil {
return fmt.Errorf("delete bucket: %s", err)
Expand All @@ -309,15 +311,83 @@ func (b *Bucket) DeleteBucket(key []byte) (err error) {
}

// Remove cached copy.
delete(b.buckets, string(key))
delete(b.buckets, string(newKey))

// Release all bucket pages to freelist.
child.nodes = nil
child.rootNode = nil
child.free()

// Delete the node if we have a matching key.
c.node().del(key)
c.node().del(newKey)

return nil
}

// MoveBucket moves a sub-bucket from the source bucket to the destination bucket.
// Returns an error if
// 1. the sub-bucket cannot be found in the source bucket;
// 2. or the key already exists in the destination bucket;
// 3. or the key represents a non-bucket value;
// 4. the source and destination buckets are the same.
func (b *Bucket) MoveBucket(key []byte, dstBucket *Bucket) (err error) {
lg := b.tx.db.Logger()
lg.Debugf("Moving bucket %q", string(key))
defer func() {
if err != nil {
lg.Errorf("Moving bucket %q failed: %v", string(key), err)
} else {
lg.Debugf("Moving bucket %q successfully", string(key))
}
}()

if b.tx.db == nil || dstBucket.tx.db == nil {
return errors.ErrTxClosed
} else if !dstBucket.Writable() {
return errors.ErrTxNotWritable
}

newKey := cloneBytes(key)

// Move cursor to correct position.
c := b.Cursor()
k, v, flags := c.seek(newKey)

// Return an error if bucket doesn't exist or is not a bucket.
if !bytes.Equal(newKey, k) {
return errors.ErrBucketNotFound
} else if (flags & common.BucketLeafFlag) == 0 {
lg.Errorf("An incompatible key %s exists in the source bucket", string(newKey))
return errors.ErrIncompatibleValue
}

// Do nothing (return true directly) if the source bucket and the
// destination bucket are actually the same bucket.
if b == dstBucket || (b.RootPage() == dstBucket.RootPage() && b.RootPage() != 0) {
lg.Errorf("The source bucket (%s) and the target bucket (%s) are the same bucket", b.String(), dstBucket.String())
return errors.ErrSameBuckets
}

// check whether the key already exists in the destination bucket
curDst := dstBucket.Cursor()
k, _, flags = curDst.seek(newKey)

// Return an error if there is an existing key in the destination bucket.
if bytes.Equal(newKey, k) {
if (flags & common.BucketLeafFlag) != 0 {
return errors.ErrBucketExists
}
lg.Errorf("An incompatible key %s exists in the target bucket", string(newKey))
return errors.ErrIncompatibleValue
}

// remove the sub-bucket from the source bucket
delete(b.buckets, string(newKey))
c.node().del(newKey)

// add te sub-bucket to the destination bucket
newValue := cloneBytes(v)
curDst.node().put(newKey, newKey, newValue, 0, common.BucketLeafFlag)

return nil
}
Expand Down
6 changes: 5 additions & 1 deletion errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,12 @@ var (
// ErrValueTooLarge is returned when inserting a value that is larger than MaxValueSize.
ErrValueTooLarge = errors.New("value too large")

// ErrIncompatibleValue is returned when trying create or delete a bucket
// ErrIncompatibleValue is returned when trying to create or delete a bucket
// on an existing non-bucket key or when trying to create or delete a
// non-bucket key on an existing bucket key.
ErrIncompatibleValue = errors.New("incompatible value")

// ErrSameBuckets is returned when trying to move a sub-bucket between
// source and target buckets, while source and target buckets are the same.
ErrSameBuckets = errors.New("the source and target are the same bucket")
)
Loading

0 comments on commit 19be273

Please sign in to comment.