diff --git a/pkg/endpoint/endpoint.go b/pkg/endpoint/endpoint.go index 926337d3..0dfa7b1f 100644 --- a/pkg/endpoint/endpoint.go +++ b/pkg/endpoint/endpoint.go @@ -19,7 +19,6 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" - "github.com/soheilhy/cmux" "go.etcd.io/etcd/server/v3/embed" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -88,45 +87,15 @@ func Listen(ctx context.Context, config Config) (ETCDConfig, error) { } b.Register(grpcServer) - // set up HTTP server with basic mux - httpServer := httpServer() - // Create raw listener and wrap in cmux for protocol switching listener, err := createListener(config) if err != nil { return ETCDConfig{}, errors.Wrap(err, "creating listener") } - m := cmux.New(listener) - - if config.ServerTLSConfig.CertFile != "" && config.ServerTLSConfig.KeyFile != "" { - // If using TLS, wrap handler in GRPC/HTTP switching handler and serve TLS - httpServer.Handler = grpcHandlerFunc(grpcServer, httpServer.Handler) - anyl := m.Match(cmux.Any()) - go func() { - if err := httpServer.ServeTLS(anyl, config.ServerTLSConfig.CertFile, config.ServerTLSConfig.KeyFile); err != nil { - logrus.Errorf("Kine TLS server shutdown: %v", err) - } - }() - } else { - // If using plaintext, use cmux matching for GRPC/HTTP switching - grpcl := m.Match(cmux.HTTP2()) - go func() { - if err := grpcServer.Serve(grpcl); err != nil { - logrus.Errorf("Kine GRPC server shutdown: %v", err) - } - }() - httpl := m.Match(cmux.HTTP1()) - go func() { - if err := httpServer.Serve(httpl); err != nil { - logrus.Errorf("Kine HTTP server shutdown: %v", err) - } - }() - } go func() { - if err := m.Serve(); err != nil { - logrus.Errorf("Kine listener shutdown: %v", err) - grpcServer.Stop() + if err := grpcServer.Serve(listener); err != nil { + logrus.Errorf("Kine GPRC server exited: %v", err) } }() diff --git a/pkg/endpoint/http.go b/pkg/endpoint/http.go deleted file mode 100644 index f66058da..00000000 --- a/pkg/endpoint/http.go +++ /dev/null @@ -1,66 +0,0 @@ -package endpoint - -import ( - "log" - "net/http" - "strings" - - "github.com/sirupsen/logrus" - "google.golang.org/grpc" -) - -var ( - etcdVersion = []byte(`{"etcdserver":"3.5.0","etcdcluster":"3.5.0"}`) - versionPath = "/version" -) - -// httpServer returns a HTTP server with the basic mux handler. -func httpServer() *http.Server { - // Set up root HTTP mux with basic response handlers - mux := http.NewServeMux() - handleBasic(mux) - - return &http.Server{ - Handler: mux, - ErrorLog: log.New(logrus.StandardLogger().Writer(), "kinehttp ", log.LstdFlags), - } -} - -// handleBasic binds basic HTTP response handlers to a mux. -func handleBasic(mux *http.ServeMux) { - mux.HandleFunc(versionPath, serveVersion) -} - -// serveVersion responds with a canned JSON version response. -func serveVersion(w http.ResponseWriter, r *http.Request) { - if !allowMethod(w, r, http.MethodGet) { - return - } - w.Header().Set("Content-Type", "application/json") - w.Write(etcdVersion) -} - -// allowMethod returns true if a method is allowed, or false (after sending a -// MethodNotAllowed error to the client) if it is not. -func allowMethod(w http.ResponseWriter, r *http.Request, m string) bool { - if m == r.Method { - return true - } - w.Header().Set("Allow", m) - http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) - return false -} - -// grpcHandlerFunc takes a GRPC server and HTTP handler, and returns a handler -// function that will route GRPC requests to the GRPC server, and everything -// else to the HTTP handler. This is based on sample code provided in the GRPC -// ServeHTTP documentation for sharing a port between GRPC and HTTP handlers. -func grpcHandlerFunc(grpcServer *grpc.Server, httpHandler http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.ProtoMajor == 2 && strings.HasPrefix(r.Header.Get("Content-Type"), "application/grpc") { - grpcServer.ServeHTTP(w, r) - } else { - httpHandler.ServeHTTP(w, r) - } - }) -} diff --git a/pkg/server/watch.go b/pkg/server/watch.go index cc38c83c..51b8894f 100644 --- a/pkg/server/watch.go +++ b/pkg/server/watch.go @@ -8,6 +8,7 @@ import ( "github.com/sirupsen/logrus" "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/mvccpb" + clientv3 "go.etcd.io/etcd/client/v3" ) var watchID int64 @@ -138,7 +139,7 @@ func (w *watcher) Cancel(watchID int64, err error) { CancelReason: "watch closed", WatchId: watchID, }) - if serr != nil && err != nil { + if serr != nil && err != nil && !clientv3.IsConnCanceled(serr) { logrus.Errorf("WATCH Failed to send cancel response for watchID %d: %v", watchID, serr) } }