Skip to content

Commit

Permalink
Merge pull request #6898 from mitake/auth-maintain
Browse files Browse the repository at this point in the history
RFC, WIP: etcdserver: let maintenance services require root role
  • Loading branch information
xiang90 committed Jan 14, 2017
2 parents 118fd18 + 783eaf9 commit 26d9926
Show file tree
Hide file tree
Showing 9 changed files with 198 additions and 67 deletions.
54 changes: 53 additions & 1 deletion auth/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"sort"
"strconv"
"strings"
"sync"

Expand All @@ -29,6 +30,7 @@ import (
"github.com/coreos/pkg/capnslog"
"golang.org/x/crypto/bcrypt"
"golang.org/x/net/context"
"google.golang.org/grpc/metadata"
)

var (
Expand Down Expand Up @@ -57,6 +59,7 @@ var (
ErrPermissionNotGranted = errors.New("auth: permission is not granted to the role")
ErrAuthNotEnabled = errors.New("auth: authentication is not enabled")
ErrAuthOldRevision = errors.New("auth: revision in header is old")
ErrInvalidAuthToken = errors.New("auth: invalid auth token")

// BcryptCost is the algorithm cost / strength for hashing auth passwords
BcryptCost = bcrypt.DefaultCost
Expand Down Expand Up @@ -153,6 +156,9 @@ type AuthStore interface {

// Close does cleanup of AuthStore
Close() error

// AuthInfoFromCtx gets AuthInfo from gRPC's context
AuthInfoFromCtx(ctx context.Context) (*AuthInfo, error)
}

type authStore struct {
Expand All @@ -167,6 +173,8 @@ type authStore struct {
simpleTokenKeeper *simpleTokenTTLKeeper

revision uint64

indexWaiter func(uint64) <-chan struct{}
}

func (as *authStore) AuthEnable() error {
Expand Down Expand Up @@ -871,7 +879,7 @@ func (as *authStore) isAuthEnabled() bool {
return as.enabled
}

func NewAuthStore(be backend.Backend) *authStore {
func NewAuthStore(be backend.Backend, indexWaiter func(uint64) <-chan struct{}) *authStore {
tx := be.BatchTx()
tx.Lock()

Expand All @@ -883,6 +891,7 @@ func NewAuthStore(be backend.Backend) *authStore {
be: be,
simpleTokens: make(map[string]string),
revision: 0,
indexWaiter: indexWaiter,
}

as.commitRevision(tx)
Expand Down Expand Up @@ -921,3 +930,46 @@ func getRevision(tx backend.BatchTx) uint64 {
func (as *authStore) Revision() uint64 {
return as.revision
}

func (as *authStore) isValidSimpleToken(token string, ctx context.Context) bool {
splitted := strings.Split(token, ".")
if len(splitted) != 2 {
return false
}
index, err := strconv.Atoi(splitted[1])
if err != nil {
return false
}

select {
case <-as.indexWaiter(uint64(index)):
return true
case <-ctx.Done():
}

return false
}

func (as *authStore) AuthInfoFromCtx(ctx context.Context) (*AuthInfo, error) {
md, ok := metadata.FromContext(ctx)
if !ok {
return nil, nil
}

ts, tok := md["token"]
if !tok {
return nil, nil
}

token := ts[0]
if !as.isValidSimpleToken(token, ctx) {
return nil, ErrInvalidAuthToken
}

authInfo, uok := as.AuthInfoFromToken(token)
if !uok {
plog.Warningf("invalid auth token: %s", token)
return nil, ErrInvalidAuthToken
}
return authInfo, nil
}
20 changes: 14 additions & 6 deletions auth/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,22 @@ import (

func init() { BcryptCost = bcrypt.MinCost }

func dummyIndexWaiter(index uint64) <-chan struct{} {
ch := make(chan struct{})
go func() {
ch <- struct{}{}
}()
return ch
}

func TestUserAdd(t *testing.T) {
b, tPath := backend.NewDefaultTmpBackend()
defer func() {
b.Close()
os.Remove(tPath)
}()

as := NewAuthStore(b)
as := NewAuthStore(b, dummyIndexWaiter)
ua := &pb.AuthUserAddRequest{Name: "foo"}
_, err := as.UserAdd(ua) // add a non-existing user
if err != nil {
Expand Down Expand Up @@ -80,7 +88,7 @@ func TestCheckPassword(t *testing.T) {
os.Remove(tPath)
}()

as := NewAuthStore(b)
as := NewAuthStore(b, dummyIndexWaiter)
defer as.Close()
err := enableAuthAndCreateRoot(as)
if err != nil {
Expand Down Expand Up @@ -125,7 +133,7 @@ func TestUserDelete(t *testing.T) {
os.Remove(tPath)
}()

as := NewAuthStore(b)
as := NewAuthStore(b, dummyIndexWaiter)
defer as.Close()
err := enableAuthAndCreateRoot(as)
if err != nil {
Expand Down Expand Up @@ -162,7 +170,7 @@ func TestUserChangePassword(t *testing.T) {
os.Remove(tPath)
}()

as := NewAuthStore(b)
as := NewAuthStore(b, dummyIndexWaiter)
defer as.Close()
err := enableAuthAndCreateRoot(as)
if err != nil {
Expand Down Expand Up @@ -208,7 +216,7 @@ func TestRoleAdd(t *testing.T) {
os.Remove(tPath)
}()

as := NewAuthStore(b)
as := NewAuthStore(b, dummyIndexWaiter)
defer as.Close()
err := enableAuthAndCreateRoot(as)
if err != nil {
Expand All @@ -229,7 +237,7 @@ func TestUserGrant(t *testing.T) {
os.Remove(tPath)
}()

as := NewAuthStore(b)
as := NewAuthStore(b, dummyIndexWaiter)
defer as.Close()
err := enableAuthAndCreateRoot(as)
if err != nil {
Expand Down
32 changes: 30 additions & 2 deletions e2e/ctl_v3_defrag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,20 @@ package e2e

import "testing"

func TestCtlV3Defrag(t *testing.T) { testCtl(t, defragTest) }
func TestCtlV3Defrag(t *testing.T) { testCtl(t, defragTest) }
func TestCtlV3DefragWithAuth(t *testing.T) { testCtl(t, defragTestWithAuth) }

func defragTest(cx ctlCtx) {
func maintenanceInitKeys(cx ctlCtx) {
var kvs = []kv{{"key", "val1"}, {"key", "val2"}, {"key", "val3"}}
for i := range kvs {
if err := ctlV3Put(cx, kvs[i].key, kvs[i].val, ""); err != nil {
cx.t.Fatal(err)
}
}
}

func defragTest(cx ctlCtx) {
maintenanceInitKeys(cx)

if err := ctlV3Compact(cx, 4, cx.compactPhysical); err != nil {
cx.t.Fatal(err)
Expand All @@ -35,6 +40,29 @@ func defragTest(cx ctlCtx) {
}
}

func defragTestWithAuth(cx ctlCtx) {
maintenanceInitKeys(cx)

if err := authEnable(cx); err != nil {
cx.t.Fatal(err)
}

cx.user, cx.pass = "root", "root"
authSetupTestUser(cx)

// ordinal user cannot defrag
cx.user, cx.pass = "test-user", "pass"
if err := ctlV3Defrag(cx); err == nil {
cx.t.Fatal("ordinal user should not be able to issue a defrag request")
}

// root can defrag
cx.user, cx.pass = "root", "root"
if err := ctlV3Defrag(cx); err != nil {
cx.t.Fatal(err)
}
}

func ctlV3Defrag(cx ctlCtx) error {
cmdArgs := append(cx.PrefixArgs(), "defrag")
lines := make([]string, cx.epc.cfg.clusterSize)
Expand Down
46 changes: 40 additions & 6 deletions e2e/ctl_v3_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,7 @@ import (
func TestCtlV3Snapshot(t *testing.T) { testCtl(t, snapshotTest) }

func snapshotTest(cx ctlCtx) {
var kvs = []kv{{"key", "val1"}, {"key", "val2"}, {"key", "val3"}}
for i := range kvs {
if err := ctlV3Put(cx, kvs[i].key, kvs[i].val, ""); err != nil {
cx.t.Fatal(err)
}
}
maintenanceInitKeys(cx)

fpath := "test.snapshot"
defer os.RemoveAll(fpath)
Expand Down Expand Up @@ -242,3 +237,42 @@ func TestIssue6361(t *testing.T) {
t.Fatal(err)
}
}

func TestCtlV3SnapshotWithAuth(t *testing.T) { testCtl(t, snapshotTestWithAuth) }

func snapshotTestWithAuth(cx ctlCtx) {
maintenanceInitKeys(cx)

if err := authEnable(cx); err != nil {
cx.t.Fatal(err)
}

cx.user, cx.pass = "root", "root"
authSetupTestUser(cx)

fpath := "test.snapshot"
defer os.RemoveAll(fpath)

// ordinal user cannot save a snapshot
cx.user, cx.pass = "test-user", "pass"
if err := ctlV3SnapshotSave(cx, fpath); err == nil {
cx.t.Fatal("ordinal user should not be able to save a snapshot")
}

// root can save a snapshot
cx.user, cx.pass = "root", "root"
if err := ctlV3SnapshotSave(cx, fpath); err != nil {
cx.t.Fatalf("snapshotTest ctlV3SnapshotSave error (%v)", err)
}

st, err := getSnapshotStatus(cx, fpath)
if err != nil {
cx.t.Fatalf("snapshotTest getSnapshotStatus error (%v)", err)
}
if st.Revision != 4 {
cx.t.Fatalf("expected 4, got %d", st.Revision)
}
if st.TotalKey < 3 {
cx.t.Fatalf("expected at least 3, got %d", st.TotalKey)
}
}
54 changes: 53 additions & 1 deletion etcdserver/api/v3rpc/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"crypto/sha256"
"io"

"github.com/coreos/etcd/auth"
"github.com/coreos/etcd/etcdserver"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/mvcc"
Expand Down Expand Up @@ -45,6 +46,10 @@ type RaftStatusGetter interface {
Leader() types.ID
}

type AuthGetter interface {
AuthStore() auth.AuthStore
}

type maintenanceServer struct {
rg RaftStatusGetter
kg KVGetter
Expand All @@ -54,7 +59,8 @@ type maintenanceServer struct {
}

func NewMaintenanceServer(s *etcdserver.EtcdServer) pb.MaintenanceServer {
return &maintenanceServer{rg: s, kg: s, bg: s, a: s, hdr: newHeader(s)}
srv := &maintenanceServer{rg: s, kg: s, bg: s, a: s, hdr: newHeader(s)}
return &authMaintenanceServer{srv, s}
}

func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) {
Expand Down Expand Up @@ -139,3 +145,49 @@ func (ms *maintenanceServer) Status(ctx context.Context, ar *pb.StatusRequest) (
ms.hdr.fill(resp.Header)
return resp, nil
}

type authMaintenanceServer struct {
*maintenanceServer
ag AuthGetter
}

func (ams *authMaintenanceServer) isAuthenticated(ctx context.Context) error {
authInfo, err := ams.ag.AuthStore().AuthInfoFromCtx(ctx)
if err != nil {
return err
}

return ams.ag.AuthStore().IsAdminPermitted(authInfo)
}

func (ams *authMaintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) {
if err := ams.isAuthenticated(ctx); err != nil {
return nil, err
}

return ams.maintenanceServer.Defragment(ctx, sr)
}

func (ams *authMaintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance_SnapshotServer) error {
if err := ams.isAuthenticated(srv.Context()); err != nil {
return err
}

return ams.maintenanceServer.Snapshot(sr, srv)
}

func (ams *authMaintenanceServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) {
if err := ams.isAuthenticated(ctx); err != nil {
return nil, err
}

return ams.maintenanceServer.Hash(ctx, r)
}

func (ams *authMaintenanceServer) Status(ctx context.Context, ar *pb.StatusRequest) (*pb.StatusResponse, error) {
if err := ams.isAuthenticated(ctx); err != nil {
return nil, err
}

return ams.maintenanceServer.Status(ctx, ar)
}
2 changes: 1 addition & 1 deletion etcdserver/api/v3rpc/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func togRPCError(err error) error {
return rpctypes.ErrGRPCPermissionNotGranted
case auth.ErrAuthNotEnabled:
return rpctypes.ErrGRPCAuthNotEnabled
case etcdserver.ErrInvalidAuthToken:
case auth.ErrInvalidAuthToken:
return rpctypes.ErrGRPCInvalidAuthToken
default:
return grpc.Errorf(codes.Unknown, err.Error())
Expand Down
1 change: 0 additions & 1 deletion etcdserver/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ var (
ErrNoLeader = errors.New("etcdserver: no leader")
ErrRequestTooLarge = errors.New("etcdserver: request is too large")
ErrNoSpace = errors.New("etcdserver: no space")
ErrInvalidAuthToken = errors.New("etcdserver: invalid auth token")
ErrTooManyRequests = errors.New("etcdserver: too many requests")
ErrUnhealthy = errors.New("etcdserver: unhealthy cluster")
ErrKeyNotFound = errors.New("etcdserver: key not found")
Expand Down
Loading

0 comments on commit 26d9926

Please sign in to comment.