Skip to content

Commit

Permalink
etcdserver: define error string and link grpc error code.
Browse files Browse the repository at this point in the history
  • Loading branch information
YoyinZyc committed Nov 25, 2019
1 parent d7b0c14 commit 2caeb11
Show file tree
Hide file tree
Showing 14 changed files with 148 additions and 96 deletions.
2 changes: 1 addition & 1 deletion etcdserver/api/membership/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
c.version = clusterVersionFromStore(c.lg, c.v2store)
}

c.downgradeInfo = downgradeFromBackend(c.lg, c.be)
c.downgradeInfo = downgradeInfoFromBackend(c.lg, c.be)
var d *DowngradeInfo
if c.downgradeInfo == nil {
d = &DowngradeInfo{Enabled: false}
Expand Down
6 changes: 3 additions & 3 deletions etcdserver/api/membership/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ func mustSaveDowngradeToBackend(lg *zap.Logger, be backend.Backend, downgrade *D
tx.UnsafePut(clusterBucketName, dkey, dvalue)
}

func downgradeFromBackend(lg *zap.Logger, be backend.Backend) *DowngradeInfo {
func downgradeInfoFromBackend(lg *zap.Logger, be backend.Backend) *DowngradeInfo {
dkey := backendDowngradeKey()
if be != nil {
tx := be.BatchTx()
tx := be.ReadTx()
tx.Lock()
defer tx.Unlock()
_, vs := tx.UnsafeRange(clusterBucketName, dkey, nil, 0)
Expand All @@ -114,7 +114,7 @@ func downgradeFromBackend(lg *zap.Logger, be backend.Backend) *DowngradeInfo {
func clusterVersionFromBackend(be backend.Backend) *semver.Version {
ckey := backendClusterVersionKey()
if be != nil {
tx := be.BatchTx()
tx := be.ReadTx()
tx.Lock()
defer tx.Unlock()

Expand Down
2 changes: 1 addition & 1 deletion etcdserver/api/v3rpc/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (ms *maintenanceServer) MoveLeader(ctx context.Context, tr *pb.MoveLeaderRe
func (ms *maintenanceServer) Downgrade(ctx context.Context, r *pb.DowngradeRequest) (*pb.DowngradeResponse, error) {
resp, err := ms.d.Downgrade(ctx, r)
if err != nil {
return nil, err
return nil, togRPCError(err)
}

resp.Header = &pb.ResponseHeader{}
Expand Down
21 changes: 21 additions & 0 deletions etcdserver/api/v3rpc/rpctypes/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ var (
ErrGPRCNotSupportedForLearner = status.New(codes.Unavailable, "etcdserver: rpc not supported for learner").Err()
ErrGRPCBadLeaderTransferee = status.New(codes.FailedPrecondition, "etcdserver: bad leader transferee").Err()

ErrGRPCClusterVersionUnavailable = status.New(codes.Unavailable, "etcdserver: cluster version is unavailable").Err()
ErrGRPCWrongVersionFormat = status.New(codes.InvalidArgument, "etcdserver: wrong version format").Err()
ErrGRPCInvalidDowngradeTargetVersion = status.New(codes.InvalidArgument, "etcdserver: invalid target version").Err()
ErrGRPCIsDowngrading = status.New(codes.FailedPrecondition, "etcdserver: cluster has an ongoing downgrade job").Err()
ErrGRPCIsNotDowngrading = status.New(codes.FailedPrecondition, "etcdserver: cluster is not downgrading").Err()
ErrGRPCUnknownDowngradeAction = status.New(codes.InvalidArgument, "etcdserver: unknown downgrade action").Err()

errStringToError = map[string]error{
ErrorDesc(ErrGRPCEmptyKey): ErrGRPCEmptyKey,
ErrorDesc(ErrGRPCKeyNotFound): ErrGRPCKeyNotFound,
Expand Down Expand Up @@ -132,6 +139,13 @@ var (
ErrorDesc(ErrGRPCCorrupt): ErrGRPCCorrupt,
ErrorDesc(ErrGPRCNotSupportedForLearner): ErrGPRCNotSupportedForLearner,
ErrorDesc(ErrGRPCBadLeaderTransferee): ErrGRPCBadLeaderTransferee,

ErrorDesc(ErrGRPCClusterVersionUnavailable): ErrGRPCClusterVersionUnavailable,
ErrorDesc(ErrGRPCWrongVersionFormat): ErrGRPCWrongVersionFormat,
ErrorDesc(ErrGRPCInvalidDowngradeTargetVersion): ErrGRPCInvalidDowngradeTargetVersion,
ErrorDesc(ErrGRPCIsDowngrading): ErrGRPCIsDowngrading,
ErrorDesc(ErrGRPCIsNotDowngrading): ErrGRPCIsNotDowngrading,
ErrorDesc(ErrGRPCUnknownDowngradeAction): ErrGRPCUnknownDowngradeAction,
}
)

Expand Down Expand Up @@ -190,6 +204,13 @@ var (
ErrUnhealthy = Error(ErrGRPCUnhealthy)
ErrCorrupt = Error(ErrGRPCCorrupt)
ErrBadLeaderTransferee = Error(ErrGRPCBadLeaderTransferee)

ErrClusterVersionUnavailable = Error(ErrGRPCClusterVersionUnavailable)
ErrWrongVersionFormat = Error(ErrGRPCWrongVersionFormat)
ErrInvalidDowngradeTargetVersion = Error(ErrGRPCInvalidDowngradeTargetVersion)
ErrIsDowngrading = Error(ErrGRPCIsDowngrading)
ErrIsNotDowngrading = Error(ErrGRPCIsNotDowngrading)
ErrUnknownDowngradeAction = Error(ErrGRPCUnknownDowngradeAction)
)

// EtcdError defines gRPC server errors.
Expand Down
28 changes: 17 additions & 11 deletions etcdserver/api/v3rpc/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,23 @@ var toGRPCErrorMap = map[error]error{
etcdserver.ErrNoSpace: rpctypes.ErrGRPCNoSpace,
etcdserver.ErrTooManyRequests: rpctypes.ErrTooManyRequests,

etcdserver.ErrNoLeader: rpctypes.ErrGRPCNoLeader,
etcdserver.ErrNotLeader: rpctypes.ErrGRPCNotLeader,
etcdserver.ErrLeaderChanged: rpctypes.ErrGRPCLeaderChanged,
etcdserver.ErrStopped: rpctypes.ErrGRPCStopped,
etcdserver.ErrTimeout: rpctypes.ErrGRPCTimeout,
etcdserver.ErrTimeoutDueToLeaderFail: rpctypes.ErrGRPCTimeoutDueToLeaderFail,
etcdserver.ErrTimeoutDueToConnectionLost: rpctypes.ErrGRPCTimeoutDueToConnectionLost,
etcdserver.ErrUnhealthy: rpctypes.ErrGRPCUnhealthy,
etcdserver.ErrKeyNotFound: rpctypes.ErrGRPCKeyNotFound,
etcdserver.ErrCorrupt: rpctypes.ErrGRPCCorrupt,
etcdserver.ErrBadLeaderTransferee: rpctypes.ErrGRPCBadLeaderTransferee,
etcdserver.ErrNoLeader: rpctypes.ErrGRPCNoLeader,
etcdserver.ErrNotLeader: rpctypes.ErrGRPCNotLeader,
etcdserver.ErrLeaderChanged: rpctypes.ErrGRPCLeaderChanged,
etcdserver.ErrStopped: rpctypes.ErrGRPCStopped,
etcdserver.ErrTimeout: rpctypes.ErrGRPCTimeout,
etcdserver.ErrTimeoutDueToLeaderFail: rpctypes.ErrGRPCTimeoutDueToLeaderFail,
etcdserver.ErrTimeoutDueToConnectionLost: rpctypes.ErrGRPCTimeoutDueToConnectionLost,
etcdserver.ErrUnhealthy: rpctypes.ErrGRPCUnhealthy,
etcdserver.ErrKeyNotFound: rpctypes.ErrGRPCKeyNotFound,
etcdserver.ErrCorrupt: rpctypes.ErrGRPCCorrupt,
etcdserver.ErrBadLeaderTransferee: rpctypes.ErrGRPCBadLeaderTransferee,
etcdserver.ErrClusterVersionUnavailable: rpctypes.ErrGRPCClusterVersionUnavailable,
etcdserver.ErrWrongVersionFormat: rpctypes.ErrGRPCWrongVersionFormat,
etcdserver.ErrInvalidDowngradeTargetVersion: rpctypes.ErrGRPCInvalidDowngradeTargetVersion,
etcdserver.ErrIsDowngrading: rpctypes.ErrGRPCIsDowngrading,
etcdserver.ErrIsNotDowngrading: rpctypes.ErrGRPCIsNotDowngrading,
etcdserver.ErrUnknownDowngradeAction: rpctypes.ErrGRPCUnknownDowngradeAction,

lease.ErrLeaseNotFound: rpctypes.ErrGRPCLeaseNotFound,
lease.ErrLeaseExists: rpctypes.ErrGRPCLeaseExist,
Expand Down
5 changes: 5 additions & 0 deletions etcdserver/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,12 +691,17 @@ func (a *applierV3backend) Alarm(ar *pb.AlarmRequest) (*pb.AlarmResponse, error)

func (a *applierV3backend) Downgrade(dr *pb.DowngradeRequest) (*pb.DowngradeResponse, error) {
var d membership.DowngradeInfo
lg := a.s.getLogger()
switch dr.Action {
case pb.DowngradeRequest_ENABLE:
v := dr.Version
d = membership.DowngradeInfo{Enabled: true, TargetVersion: semver.Must(semver.NewVersion(v))}
case pb.DowngradeRequest_CANCEL:
d = membership.DowngradeInfo{Enabled: false}
default:
if lg != nil {
lg.Panic("unknown DowngradeRequest action type", zap.String("type", dr.Action.String()))
}
}
a.s.cluster.SetDowngradeInfo(&d)
resp := &pb.DowngradeResponse{Version: a.s.ClusterVersion().String()}
Expand Down
10 changes: 5 additions & 5 deletions etcdserver/cluster_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,10 @@ func getVersions(lg *zap.Logger, cl *membership.RaftCluster, local types.ID, rt
return vers
}

// decideAllowedVersionRange decides the available version range of the cluster that local server can join in;
// allowedVersionRange decides the available version range of the cluster that local server can join in;
// if the downgrade enabled status is true, the version window is [localVersion, oneMinorHigher]
// if the downgrade is not enabled, the version window is [MinClusterVersion, localVersion]
func decideAllowedVersionRange(downgradeEnabled bool) (minV *semver.Version, maxV *semver.Version) {
func allowedVersionRange(downgradeEnabled bool) (minV *semver.Version, maxV *semver.Version) {
minV = semver.Must(semver.NewVersion(version.MinClusterVersion))
maxV = semver.Must(semver.NewVersion(version.Version))
maxV = &semver.Version{Major: maxV.Major, Minor: maxV.Minor}
Expand All @@ -192,8 +192,8 @@ func decideAllowedVersionRange(downgradeEnabled bool) (minV *semver.Version, max
return minV, maxV
}

// decideDowngradeEnabled will decide the downgrade enabled status of the cluster.
func decideDowngradeEnabled(lg *zap.Logger, cl *membership.RaftCluster, local types.ID, rt http.RoundTripper) bool {
// getDowngradeEnabledFromRemotePeers will get the downgrade enabled status of the cluster.
func getDowngradeEnabledFromRemotePeers(lg *zap.Logger, cl *membership.RaftCluster, local types.ID, rt http.RoundTripper) bool {
members := cl.Members()

for _, m := range members {
Expand Down Expand Up @@ -351,7 +351,7 @@ func isDowngradeFinished(lg *zap.Logger, targetVersion *semver.Version, vers map
// We set this rule since when the local member joins, another member might be offline.
func isCompatibleWithCluster(lg *zap.Logger, cl *membership.RaftCluster, local types.ID, rt http.RoundTripper) bool {
vers := getVersions(lg, cl, local, rt)
minV, maxV := decideAllowedVersionRange(decideDowngradeEnabled(lg, cl, local, rt))
minV, maxV := allowedVersionRange(getDowngradeEnabledFromRemotePeers(lg, cl, local, rt))
return isCompatibleWithVers(lg, vers, local, minV, maxV)
}

Expand Down
2 changes: 1 addition & 1 deletion etcdserver/cluster_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func TestDecideAllowedVersionRange(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
minV, maxV := decideAllowedVersionRange(tt.downgradeEnabled)
minV, maxV := allowedVersionRange(tt.downgradeEnabled)
if !minV.Equal(*tt.expectedMinV) {
t.Errorf("Expected minV is %v; Got %v", tt.expectedMinV.String(), minV.String())
}
Expand Down
44 changes: 25 additions & 19 deletions etcdserver/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,31 @@ import (
)

var (
ErrUnknownMethod = errors.New("etcdserver: unknown method")
ErrStopped = errors.New("etcdserver: server stopped")
ErrCanceled = errors.New("etcdserver: request cancelled")
ErrTimeout = errors.New("etcdserver: request timed out")
ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure")
ErrTimeoutDueToConnectionLost = errors.New("etcdserver: request timed out, possibly due to connection lost")
ErrTimeoutLeaderTransfer = errors.New("etcdserver: request timed out, leader transfer took too long")
ErrLeaderChanged = errors.New("etcdserver: leader changed")
ErrNotEnoughStartedMembers = errors.New("etcdserver: re-configuration failed due to not enough started members")
ErrLearnerNotReady = errors.New("etcdserver: can only promote a learner member which is in sync with leader")
ErrNoLeader = errors.New("etcdserver: no leader")
ErrNotLeader = errors.New("etcdserver: not leader")
ErrRequestTooLarge = errors.New("etcdserver: request is too large")
ErrNoSpace = errors.New("etcdserver: no space")
ErrTooManyRequests = errors.New("etcdserver: too many requests")
ErrUnhealthy = errors.New("etcdserver: unhealthy cluster")
ErrKeyNotFound = errors.New("etcdserver: key not found")
ErrCorrupt = errors.New("etcdserver: corrupt cluster")
ErrBadLeaderTransferee = errors.New("etcdserver: bad leader transferee")
ErrUnknownMethod = errors.New("etcdserver: unknown method")
ErrStopped = errors.New("etcdserver: server stopped")
ErrCanceled = errors.New("etcdserver: request cancelled")
ErrTimeout = errors.New("etcdserver: request timed out")
ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure")
ErrTimeoutDueToConnectionLost = errors.New("etcdserver: request timed out, possibly due to connection lost")
ErrTimeoutLeaderTransfer = errors.New("etcdserver: request timed out, leader transfer took too long")
ErrLeaderChanged = errors.New("etcdserver: leader changed")
ErrNotEnoughStartedMembers = errors.New("etcdserver: re-configuration failed due to not enough started members")
ErrLearnerNotReady = errors.New("etcdserver: can only promote a learner member which is in sync with leader")
ErrNoLeader = errors.New("etcdserver: no leader")
ErrNotLeader = errors.New("etcdserver: not leader")
ErrRequestTooLarge = errors.New("etcdserver: request is too large")
ErrNoSpace = errors.New("etcdserver: no space")
ErrTooManyRequests = errors.New("etcdserver: too many requests")
ErrUnhealthy = errors.New("etcdserver: unhealthy cluster")
ErrKeyNotFound = errors.New("etcdserver: key not found")
ErrCorrupt = errors.New("etcdserver: corrupt cluster")
ErrBadLeaderTransferee = errors.New("etcdserver: bad leader transferee")
ErrClusterVersionUnavailable = errors.New("etcdserver: cluster version is unavailable")
ErrWrongVersionFormat = errors.New("etcdserver: wrong version format")
ErrInvalidDowngradeTargetVersion = errors.New("etcdserver: invalid target version")
ErrIsDowngrading = errors.New("etcdserver: cluster has an ongoing downgrade job")
ErrIsNotDowngrading = errors.New("etcdserver: cluster is not downgrading")
ErrUnknownDowngradeAction = errors.New("etcdserver: unknown downgrade action")
)

type DiscoveryError struct {
Expand Down
12 changes: 3 additions & 9 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,6 @@ const (

// Todo: need to be decided
monitorDowngradeInterval = time.Second

downgradeHTTPTimeout = 5 * time.Second
)

var (
Expand Down Expand Up @@ -881,8 +879,6 @@ type ServerPeerHTTP interface {
}

type ServerDowngradeHTTP interface {
// DowngradeInfo is the downgrade information of the cluster
DowngradeInfo() *membership.DowngradeInfo
DowngradeEnabledHandler() http.Handler
}

Expand Down Expand Up @@ -916,12 +912,13 @@ func (h *downgradeEnabledHandler) ServeHTTP(w http.ResponseWriter, r *http.Reque
return
}

ctx, cancel := context.WithTimeout(context.Background(), downgradeHTTPTimeout)
ctx, cancel := context.WithTimeout(context.Background(), h.server.Cfg.ReqTimeout())
defer cancel()

// serve with linearized downgrade info
if err := h.server.linearizableReadNotify(ctx); err != nil {
http.Error(w, "failed linearized read", http.StatusBadRequest)
http.Error(w, fmt.Sprintf("failed linearized read: %v", err),
http.StatusInternalServerError)
return
}
enabled := h.server.DowngradeInfo().Enabled
Expand Down Expand Up @@ -2561,9 +2558,6 @@ func (s *EtcdServer) monitorVersions() {
continue
}

// Original etcd v3.1.26 only update cluster version if the decided version is
// greater than the current cluster version, in this patched etcd, we relax the rule
// and allow +1 or -1 minor version cluster version change
if v != nil && membership.IsVersionChangable(s.cluster.Version(), v) {
s.goAttach(func() { s.updateClusterVersion(v.String()) })
}
Expand Down
56 changes: 28 additions & 28 deletions etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"time"

"github.com/coreos/go-semver/semver"
Expand Down Expand Up @@ -814,19 +812,18 @@ func (s *EtcdServer) Downgrade(ctx context.Context, r *pb.DowngradeRequest) (*pb
return s.downgradeEnable(ctx, r)
case pb.DowngradeRequest_CANCEL:
return s.downgradeCancel(ctx)
default:
return nil, ErrUnknownDowngradeAction
}
return nil, nil
}

func (s *EtcdServer) downgradeValidate(ctx context.Context, v string) (*pb.DowngradeResponse, error) {
resp := &pb.DowngradeResponse{}
var err error

targetVersion, err := semver.NewVersion(v)
targetVersion, err := changeToTargetVersion(v)
if err != nil {
return nil, fmt.Errorf("wrong version format: %v", err)
return nil, err
}
targetVersion = &semver.Version{Major: targetVersion.Major, Minor: targetVersion.Minor}

// do linearized read to avoid using stale downgrade information
err = s.linearizableReadNotify(ctx)
Expand All @@ -835,30 +832,22 @@ func (s *EtcdServer) downgradeValidate(ctx context.Context, v string) (*pb.Downg
}

cv := s.ClusterVersion()
resp.Version = cv.String()
if cv.LessThan(*targetVersion) {
err = errors.New("target version too high")
return nil, err
if cv == nil {
return nil, ErrClusterVersionUnavailable
}
resp.Version = cv.String()

if cv.Equal(*targetVersion) {
err = errors.New("target version is current cluster version")
return nil, err
}
if !membership.IsVersionChangable(cv, targetVersion) {
err = fmt.Errorf(
"target version too small. "+
"the cluster can only be downgraded to %s",
semver.Version{Major: cv.Major, Minor: cv.Minor - 1}.String())
allowedTargetVersion := semver.Version{Major: cv.Major, Minor: cv.Minor - 1}
if !targetVersion.Equal(allowedTargetVersion) {
err = ErrInvalidDowngradeTargetVersion
return nil, err
}

downgradeInfo := s.cluster.DowngradeInfo()

if downgradeInfo.Enabled {
// Todo: return the downgrade status along with the error msg
err = errors.New("the cluster has an ongoing downgrade job")
return resp, err
return resp, ErrIsDowngrading
}
return resp, nil
}
Expand All @@ -869,13 +858,10 @@ func (s *EtcdServer) downgradeEnable(ctx context.Context, r *pb.DowngradeRequest
if resp, err := s.downgradeValidate(ctx, v); err != nil {
return resp, err
}

targetVersion, err := semver.NewVersion(v)
targetVersion, err := changeToTargetVersion(v)
if err != nil {
return nil, fmt.Errorf("wrong version format: %v", err)
return nil, err
}
// cluster version only keeps major.minor, remove patch version
targetVersion = &semver.Version{Major: targetVersion.Major, Minor: targetVersion.Minor}
r.Version = targetVersion.String()

resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Downgrade: r})
Expand All @@ -893,7 +879,7 @@ func (s *EtcdServer) downgradeCancel(ctx context.Context) (*pb.DowngradeResponse

downgradeInfo := s.cluster.DowngradeInfo()
if !downgradeInfo.Enabled {
return nil, errors.New("the cluster is not downgrading")
return nil, ErrIsNotDowngrading
}

resp, err := s.raftRequest(ctx,
Expand All @@ -903,3 +889,17 @@ func (s *EtcdServer) downgradeCancel(ctx context.Context) (*pb.DowngradeResponse
}
return resp.(*pb.DowngradeResponse), nil
}

func changeToTargetVersion(v string) (*semver.Version, error) {
targetVersion, err := semver.NewVersion(v)
if err != nil {
// allow input version format Major.Minor
targetVersion, err = semver.NewVersion(v + ".0")
if err != nil {
return nil, ErrWrongVersionFormat
}
}
// cluster version only keeps major.minor, remove patch version
targetVersion = &semver.Version{Major: targetVersion.Major, Minor: targetVersion.Minor}
return targetVersion, nil
}
Loading

0 comments on commit 2caeb11

Please sign in to comment.