diff --git a/.gitignore b/.gitignore index 66fd13c..21b2018 100644 --- a/.gitignore +++ b/.gitignore @@ -11,5 +11,5 @@ # Output of the go coverage tool, specifically when used with LiteIDE *.out -# Dependency directories (remove the comment below to include it) -# vendor/ +# Dependency directories +vendor/ diff --git a/README.md b/README.md index 19e8665..5b9925a 100644 --- a/README.md +++ b/README.md @@ -271,19 +271,21 @@ func (s *OddEvenRoutingStrategy) SelectRoute( ctx context.Context, req fiber.Request, routes map[string]fiber.Component, -) (fiber.Component, []fiber.Component, error) { +) (fiber.Component, []fiber.Component, fiber.Labels, error) { sessionIdStr := "" if sessionHeader, ok := req.Header()["X-Session-ID"]; ok { sessionIdStr = sessionHeader[0] } + // Metadata that can be propagated upstream for logging / debugging + labels := fiber.NewLabelsMap() if sessionID, err := strconv.Atoi(sessionIdStr); err != nil { return nil, nil, err } else { if sessionID % 2 != 0 { - return routes["route-a"], []fiber.Component{}, nil + return routes["route-a"], []fiber.Component{}, labels.WithLabel("Match-Type", "even"), nil } else { - return routes["route-b"], []fiber.Component{}, nil + return routes["route-b"], []fiber.Component{}, labels.WithLabel("Match-Type", "odd"), nil } } } diff --git a/eager_router.go b/eager_router.go index 977b5e2..656d8e8 100644 --- a/eager_router.go +++ b/eager_router.go @@ -63,7 +63,7 @@ func (fanIn *eagerRouterFanIn) Aggregate( ) Response { // use routing strategy to fetch primary route and fallbacks // publish the ordered routes into a channel - routesOrderCh, errCh := fanIn.strategy.getRoutesOrder(ctx, req, fanIn.router.GetRoutes()) + routesOrderCh := fanIn.strategy.getRoutesOrder(ctx, req, fanIn.router.GetRoutes()) out := make(chan Response, 1) go func() { @@ -77,6 +77,9 @@ func (fanIn *eagerRouterFanIn) Aggregate( // would be initialized from a routesOrderCh channel routes []Component + // response labels + labels Labels = NewLabelsMap() + // index of current primary route currentRouteIdx int @@ -93,18 +96,17 @@ func (fanIn *eagerRouterFanIn) Aggregate( } else { responseCh = nil } - case orderedRoutes, ok := <-routesOrderCh: + case routesOrderResponse, ok := <-routesOrderCh: if ok { - routes = orderedRoutes + labels = routesOrderResponse.Labels + if routesOrderResponse.Err != nil { + masterResponse = NewErrorResponse(errors.NewFiberError(req.Protocol(), routesOrderResponse.Err)) + } else { + routes = routesOrderResponse.Components + } } else { routesOrderCh = nil } - case err, ok := <-errCh: - if ok { - masterResponse = NewErrorResponse(errors.NewFiberError(req.Protocol(), err)) - } else { - errCh = nil - } case <-ctx.Done(): if routes == nil { // timeout exceeded, but no routes received. Sending error response @@ -139,7 +141,7 @@ func (fanIn *eagerRouterFanIn) Aggregate( } } } - out <- masterResponse + out <- masterResponse.WithLabels(labels) }() return <-out diff --git a/extras/random_routing_strategy.go b/extras/random_routing_strategy.go index 18c70b1..290e81e 100644 --- a/extras/random_routing_strategy.go +++ b/extras/random_routing_strategy.go @@ -3,6 +3,7 @@ package extras import ( "context" "math/rand" + "strconv" "github.com/gojek/fiber" ) @@ -19,8 +20,11 @@ func (s *RandomRoutingStrategy) SelectRoute( _ context.Context, _ fiber.Request, routes map[string]fiber.Component, -) (route fiber.Component, fallbacks []fiber.Component, err error) { +) (route fiber.Component, fallbacks []fiber.Component, labels fiber.Labels, err error) { idx := rand.Intn(len(routes)) + // Add idx to attribute map for logging / debugging upstream + labels = fiber.NewLabelsMap().WithLabel("idx", strconv.Itoa(idx)) + for _, child := range routes { if idx == 0 { route = child @@ -29,5 +33,5 @@ func (s *RandomRoutingStrategy) SelectRoute( } idx-- } - return route, fallbacks, nil + return route, fallbacks, labels, nil } diff --git a/go.mod b/go.mod index 7afa9ce..a05b843 100644 --- a/go.mod +++ b/go.mod @@ -1,21 +1,32 @@ module github.com/gojek/fiber -go 1.14 +go 1.18 require ( github.com/ghodss/yaml v1.0.0 github.com/google/go-cmp v0.5.8 - github.com/kr/text v0.2.0 // indirect - github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect github.com/opentracing/opentracing-go v1.1.0 - github.com/pkg/errors v0.9.1 // indirect github.com/stretchr/testify v1.8.0 go.uber.org/zap v1.17.0 + google.golang.org/grpc v1.48.0 + google.golang.org/protobuf v1.28.0 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/golang/protobuf v1.5.2 // indirect + github.com/kr/text v0.2.0 // indirect + github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/stretchr/objx v0.4.0 // indirect + go.uber.org/atomic v1.7.0 // indirect + go.uber.org/multierr v1.6.0 // indirect golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect + golang.org/x/text v0.3.7 // indirect google.golang.org/genproto v0.0.0-20220519153652-3a47de7e79bd // indirect - google.golang.org/grpc v1.48.0 - google.golang.org/protobuf v1.28.0 gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect gopkg.in/yaml.v2 v2.4.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 60e9fd1..b12310e 100644 --- a/go.sum +++ b/go.sum @@ -113,12 +113,9 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f h1:v4INt8xihDGvnrfjMDVXGxw9wrfxYyCjk0KbXjhR55s= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= diff --git a/grpc/response.go b/grpc/response.go index c79cd71..267d5d9 100644 --- a/grpc/response.go +++ b/grpc/response.go @@ -27,10 +27,35 @@ func (r *Response) StatusCode() int { return int(r.Status.Code()) } +// Label returns all the values associated with the given key, in the response metadata. +// If the key does not exist, an empty slice will be returned. +func (r *Response) Label(key string) []string { + return r.Metadata.Get(key) +} + +// WithLabel appends the given value(s) to the key, in the response metadata. +// If the key does not already exist, a new key will be created. +// The modified response is returned. +func (r *Response) WithLabel(key string, values ...string) fiber.Response { + r.Metadata.Append(key, values...) + return r +} + +// WithLabels does the same thing as WithLabel but over a collection of key-values. +func (r *Response) WithLabels(labels fiber.Labels) fiber.Response { + for _, key := range labels.Keys() { + values := labels.Label(key) + r.Metadata.Append(key, values...) + } + return r +} + func (r *Response) BackendName() string { - return strings.Join(r.Metadata.Get("backend"), ",") + return strings.Join(r.Label("backend"), ",") } +// WithBackendName sets the given backend name in the response metadata. +// The modified response is returned. func (r *Response) WithBackendName(backendName string) fiber.Response { r.Metadata.Set("backend", backendName) return r diff --git a/grpc/response_test.go b/grpc/response_test.go index d562505..3088039 100644 --- a/grpc/response_test.go +++ b/grpc/response_test.go @@ -1,9 +1,11 @@ -package grpc +package grpc_test import ( "log" "testing" + "github.com/gojek/fiber" + "github.com/gojek/fiber/grpc" testproto "github.com/gojek/fiber/internal/testdata/gen/testdata/proto" "github.com/stretchr/testify/assert" "google.golang.org/grpc/codes" @@ -15,16 +17,16 @@ import ( func TestResponse_Backend(t *testing.T) { tests := []struct { name string - res Response - want Response + res grpc.Response + want grpc.Response backendName string }{ { name: "ok", - res: Response{ + res: grpc.Response{ Metadata: map[string][]string{}, }, - want: Response{ + want: grpc.Response{ Metadata: metadata.New(map[string]string{"backend": "testing"}), }, backendName: "testing", @@ -42,13 +44,13 @@ func TestResponse_Backend(t *testing.T) { func TestResponse_Status(t *testing.T) { tests := []struct { name string - res Response + res grpc.Response expectedCode int expectedSuccess bool }{ { name: "ok", - res: Response{ + res: grpc.Response{ Status: *status.New(codes.OK, ""), }, expectedCode: 0, @@ -56,7 +58,7 @@ func TestResponse_Status(t *testing.T) { }, { name: "ok", - res: Response{ + res: grpc.Response{ Status: *status.New(codes.InvalidArgument, ""), }, expectedCode: 3, @@ -83,12 +85,12 @@ func TestResponse_Payload(t *testing.T) { responseByte, _ := proto.Marshal(response) tests := []struct { name string - req Response + req grpc.Response expected []byte }{ { name: "", - req: Response{ + req: grpc.Response{ Message: responseByte, }, expected: responseByte, @@ -100,3 +102,98 @@ func TestResponse_Payload(t *testing.T) { }) } } + +func TestResponse_Label(t *testing.T) { + tests := map[string]struct { + response fiber.Response + key string + expected []string + }{ + "empty labels": { + response: &grpc.Response{ + Metadata: map[string][]string{}, + }, + key: "dummy-key", + }, + "non-empty labels": { + response: &grpc.Response{ + Metadata: map[string][]string{"key": []string{"v1", "v2"}}, + }, + key: "key", + expected: []string{"v1", "v2"}, + }, + } + + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + values := tt.response.Label(tt.key) + assert.Equal(t, tt.expected, values) + }) + } +} + +func TestResponse_WithLabel(t *testing.T) { + tests := map[string]struct { + response fiber.Response + key string + values []string + expected []string + }{ + "new labels": { + response: &grpc.Response{ + Metadata: map[string][]string{"key": []string{"v1", "v2"}}, + }, + key: "k1", + values: []string{"v1", "v2"}, + expected: []string{"v1", "v2"}, + }, + "append labels": { + response: &grpc.Response{ + Metadata: map[string][]string{"k1": []string{"v1", "v2"}}, + }, + key: "k1", + values: []string{"v3"}, + expected: []string{"v1", "v2", "v3"}, + }, + } + + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + newLabels := tt.response.WithLabel(tt.key, tt.values...) + assert.Equal(t, tt.expected, newLabels.Label(tt.key)) + }) + } +} + +func TestHTTPResponse_WithLabels(t *testing.T) { + tests := map[string]struct { + response fiber.Response + labels fiber.Labels + key string + expected []string + }{ + "new labels": { + response: &grpc.Response{ + Metadata: map[string][]string{"key": []string{"v1", "v2"}}, + }, + labels: fiber.LabelsMap{"k1": []string{"v1", "v2"}}, + key: "k1", + expected: []string{"v1", "v2"}, + }, + "append labels": { + response: &grpc.Response{ + Metadata: map[string][]string{"k1": []string{"v1", "v2"}}, + }, + labels: fiber.LabelsMap{"k1": []string{"v3"}}, + key: "k1", + expected: []string{"v1", "v2", "v3"}, + }, + } + + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + newLabels := tt.response.WithLabels(tt.labels) + assert.Equal(t, tt.expected, newLabels.Label(tt.key)) + }) + } +} diff --git a/http/response.go b/http/response.go index 43c43bb..d6bfefa 100644 --- a/http/response.go +++ b/http/response.go @@ -4,6 +4,7 @@ import ( "fmt" "io/ioutil" "net/http" + "strings" "github.com/gojek/fiber" "github.com/gojek/fiber/errors" @@ -22,17 +23,43 @@ func (r *Response) IsSuccess() bool { return isSuccessStatus(r.StatusCode()) } -func (r *Response) WithBackendName(backEnd string) fiber.Response { - r.Header().Set(headerBackendName, backEnd) +// Label returns all the values associated with the given key, in the response header. +// If the key does not exist, an empty slice will be returned. +func (r *Response) Label(key string) []string { + return r.Header().Values(key) +} + +// WithLabel appends the given value(s) to the key, in the response header. +// If the key does not already exist, a new key will be created. +// The modified response is returned. +func (r *Response) WithLabel(key string, values ...string) fiber.Response { + for _, value := range values { + r.Header().Add(key, value) + } + return r +} + +// WithLabels does the same thing as WithLabel but over a collection of key-values. +func (r *Response) WithLabels(labels fiber.Labels) fiber.Response { + for _, key := range labels.Keys() { + values := labels.Label(key) + for _, value := range values { + r.Header().Add(key, value) + } + } return r } // BackendName returns the backend used to make the request func (r *Response) BackendName() string { - if r.Header() == nil { - r.response.Header = make(http.Header) - } - return r.Header().Get(headerBackendName) + return strings.Join(r.Label(headerBackendName), ",") +} + +// WithBackendName sets the given backend name in the response header. +// The modified response is returned. +func (r *Response) WithBackendName(backEnd string) fiber.Response { + r.Header().Set(headerBackendName, backEnd) + return r } // StatusCode returns the response status code diff --git a/http/response_test.go b/http/response_test.go index be9c21c..5e23b28 100644 --- a/http/response_test.go +++ b/http/response_test.go @@ -5,8 +5,10 @@ import ( "net/http" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/gojek/fiber" fiberHTTP "github.com/gojek/fiber/http" "github.com/gojek/fiber/internal/testutils" ) @@ -91,3 +93,107 @@ func TestNewHTTPResponse(t *testing.T) { }) } } + +func TestHTTPResponseLabel(t *testing.T) { + tests := map[string]struct { + response fiber.Response + key string + expected []string + }{ + "empty labels": { + response: fiberHTTP.NewHTTPResponse(&http.Response{ + Body: makeBody([]byte("{}")), + StatusCode: http.StatusOK, + }), + key: "dummy-key", + }, + "case insensitive key": { + response: fiberHTTP.NewHTTPResponse(&http.Response{ + Header: http.Header{"Key": []string{"v1", "v2"}}, + Body: makeBody([]byte("{}")), + StatusCode: http.StatusOK, + }), + key: "Key", + expected: []string{"v1", "v2"}, + }, + } + + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + values := tt.response.Label(tt.key) + assert.Equal(t, tt.expected, values) + }) + } +} + +func TestHTTPResponseWithLabel(t *testing.T) { + tests := map[string]struct { + response fiber.Response + key string + values []string + expected []string + }{ + "new labels": { + response: fiberHTTP.NewHTTPResponse(&http.Response{ + Body: makeBody([]byte("{}")), + StatusCode: http.StatusOK, + }), + key: "k1", + values: []string{"v1", "v2"}, + expected: []string{"v1", "v2"}, + }, + "append labels": { + response: fiberHTTP.NewHTTPResponse(&http.Response{ + Header: http.Header{"K1": []string{"v1", "v2"}}, + Body: makeBody([]byte("{}")), + StatusCode: http.StatusOK, + }), + key: "k1", + values: []string{"v3"}, + expected: []string{"v1", "v2", "v3"}, + }, + } + + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + newLabels := tt.response.WithLabel(tt.key, tt.values...) + assert.Equal(t, tt.expected, newLabels.Label(tt.key)) + }) + } +} + +func TestHTTPResponseWithLabels(t *testing.T) { + tests := map[string]struct { + response fiber.Response + labels fiber.Labels + key string + expected []string + }{ + "new labels": { + response: fiberHTTP.NewHTTPResponse(&http.Response{ + Body: makeBody([]byte("{}")), + StatusCode: http.StatusOK, + }), + labels: fiber.LabelsMap{"k1": []string{"v1", "v2"}}, + key: "K1", + expected: []string{"v1", "v2"}, + }, + "append labels": { + response: fiberHTTP.NewHTTPResponse(&http.Response{ + Header: http.Header{"K1": []string{"v1", "v2"}}, + Body: makeBody([]byte("{}")), + StatusCode: http.StatusOK, + }), + labels: fiber.LabelsMap{"k1": []string{"v3"}}, + key: "k1", + expected: []string{"v1", "v2", "v3"}, + }, + } + + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + newLabels := tt.response.WithLabels(tt.labels) + assert.Equal(t, tt.expected, newLabels.Label(tt.key)) + }) + } +} diff --git a/integration_test/integration_test.go b/integration_test/integration_test.go index 9b659b7..5070355 100644 --- a/integration_test/integration_test.go +++ b/integration_test/integration_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" ) @@ -78,16 +79,17 @@ var ( ) func TestMain(m *testing.M) { - // Set up three http and grpc server with fix response for test - runTestHttpServer(httpAddr1, httpResponse1, 0) - runTestHttpServer(httpAddr2, httpResponse2, 0) - runTestHttpServer(httpAddr3, httpResponse3, 10) + // Set up three grpc and http server with fix response for test. + // Third routes will be set to timeout intentionally. - // Third routes will be set to timeout intentionally runTestGrpcServer(grpcPort1, grpcResponse1, 0) runTestGrpcServer(grpcPort2, grpcResponse2, 0) runTestGrpcServer(grpcPort3, grpcResponse3, 10) + runTestHttpServer(httpAddr1, httpResponse1, 0) + runTestHttpServer(httpAddr2, httpResponse2, 0) + runTestHttpServer(httpAddr3, httpResponse3, 10) + os.Exit(m.Run()) } @@ -158,9 +160,10 @@ func TestE2EFromConfig(t *testing.T) { routesOrder: []string{route1, route2, route3}, request: grpcRequest, expectedMessageProto: grpcResponse1, - expectedResponse: &grpc.Response{ - Status: *status.New(codes.OK, "Success"), - }, + expectedResponse: (&grpc.Response{ + Status: *status.New(codes.OK, "Success"), + Metadata: metadata.New(map[string]string{}), + }).WithLabel("order", []string{route1, route2, route3}...), }, { name: "http route 1", @@ -172,7 +175,7 @@ func TestE2EFromConfig(t *testing.T) { StatusCode: http.StatusOK, Body: makeBody(httpResponse1), }, - ), + ).WithLabel("order", []string{route1, route2, route3}...), }, { name: "grpc route 2", @@ -180,9 +183,10 @@ func TestE2EFromConfig(t *testing.T) { routesOrder: []string{route2, route1, route3}, request: grpcRequest, expectedMessageProto: grpcResponse2, - expectedResponse: &grpc.Response{ - Status: *status.New(codes.OK, "Success"), - }, + expectedResponse: (&grpc.Response{ + Status: *status.New(codes.OK, "Success"), + Metadata: metadata.New(map[string]string{}), + }).WithLabel("order", []string{route2, route1, route3}...), }, { name: "http route 2", @@ -194,7 +198,7 @@ func TestE2EFromConfig(t *testing.T) { StatusCode: http.StatusOK, Body: makeBody(httpResponse2), }, - ), + ).WithLabel("order", []string{route2, route1, route3}...), }, { name: "grpc route3 timeout, route 1 fallback returned", @@ -202,9 +206,10 @@ func TestE2EFromConfig(t *testing.T) { routesOrder: []string{route3, route1, route2}, request: grpcRequest, expectedMessageProto: grpcResponse1, - expectedResponse: &grpc.Response{ - Status: *status.New(codes.OK, "Success"), - }, + expectedResponse: (&grpc.Response{ + Status: *status.New(codes.OK, "Success"), + Metadata: metadata.New(map[string]string{}), + }).WithLabel("order", []string{route3, route1, route2}...), }, { name: "http route3 timeout, route 1 fallback returned", @@ -216,7 +221,7 @@ func TestE2EFromConfig(t *testing.T) { StatusCode: http.StatusOK, Body: makeBody(httpResponse1), }, - ), + ).WithLabel("order", []string{route3, route1, route2}...), }, { name: "grpc route3 timeout, route 2 fallback returned", @@ -224,9 +229,10 @@ func TestE2EFromConfig(t *testing.T) { routesOrder: []string{route3, route2, route1}, request: grpcRequest, expectedMessageProto: grpcResponse2, - expectedResponse: &grpc.Response{ - Status: *status.New(codes.OK, "Success"), - }, + expectedResponse: (&grpc.Response{ + Status: *status.New(codes.OK, "Success"), + Metadata: metadata.New(map[string]string{}), + }).WithLabel("order", []string{route3, route2, route1}...), }, { name: "http route3 timeout, route 2 fallback returned", @@ -238,7 +244,7 @@ func TestE2EFromConfig(t *testing.T) { StatusCode: http.StatusOK, Body: makeBody(httpResponse2), }, - ), + ).WithLabel("order", []string{route3, route2, route1}...), }, { name: "grpc route3 timeout", @@ -248,14 +254,18 @@ func TestE2EFromConfig(t *testing.T) { expectedResponse: &grpc.Response{ Status: *status.New(codes.Unavailable, ""), }, - expectedFiberErr: fiber.NewErrorResponse(fiberError.ErrServiceUnavailable(protocol.GRPC)), + expectedFiberErr: fiber. + NewErrorResponse(fiberError.ErrServiceUnavailable(protocol.GRPC)). + WithLabel("order", []string{route3}...), }, { - name: "http route3 timeout", - configPath: "./fiberhttp.yaml", - routesOrder: []string{route3}, - request: httpRequest, - expectedFiberErr: fiber.NewErrorResponse(fiberError.ErrServiceUnavailable(protocol.HTTP)), + name: "http route3 timeout", + configPath: "./fiberhttp.yaml", + routesOrder: []string{route3}, + request: httpRequest, + expectedFiberErr: fiber. + NewErrorResponse(fiberError.ErrServiceUnavailable(protocol.HTTP)). + WithLabel("order", []string{route3}...), }, } @@ -281,9 +291,12 @@ func TestE2EFromConfig(t *testing.T) { resp, ok := <-router.Dispatch(context.Background(), tt.request).Iter() require.True(t, ok) + var finalResp fiber.Response if tt.expectedFiberErr != nil { + finalResp = tt.expectedFiberErr assert.EqualValues(t, tt.expectedFiberErr, resp) } else { + finalResp = tt.expectedResponse require.Equal(t, resp.StatusCode(), tt.expectedResponse.StatusCode()) if tt.request.Protocol() == protocol.GRPC { responseProto := &testproto.PredictValuesResponse{} @@ -295,6 +308,8 @@ func TestE2EFromConfig(t *testing.T) { assert.Equal(t, tt.expectedResponse.Payload(), resp.Payload()) } } + // Verify that the labels were correctly set, for both success and failure response + assert.Equal(t, finalResp.Label("order"), resp.Label("order")) }) } } diff --git a/interceptor.go b/interceptor.go index a300eea..fc9b30f 100644 --- a/interceptor.go +++ b/interceptor.go @@ -12,6 +12,8 @@ var ( CtxComponentIDKey CtxKey = "CTX_COMPONENT_ID" // CtxComponentKindKey is used to denote the component's kind in the request context CtxComponentKindKey CtxKey = "CTX_COMPONENT_KIND" + // CtxComponentLabelsKey is used to denote the component's labels in the request context + CtxComponentLabelsKey CtxKey = "CTX_COMPONENT_LABELS" ) // Interceptor is the interface for a structural interceptor diff --git a/internal/testutils/routing_strategy.go b/internal/testutils/routing_strategy.go index 0640e73..76d6533 100644 --- a/internal/testutils/routing_strategy.go +++ b/internal/testutils/routing_strategy.go @@ -4,8 +4,8 @@ import ( "context" "time" - "github.com/stretchr/testify/mock" "github.com/gojek/fiber" + "github.com/stretchr/testify/mock" ) type MockRoutingStrategy struct { @@ -17,14 +17,14 @@ func (s *MockRoutingStrategy) SelectRoute( _ context.Context, req fiber.Request, routes map[string]fiber.Component, -) (route fiber.Component, fallbacks []fiber.Component, err error) { +) (route fiber.Component, fallbacks []fiber.Component, labels fiber.Labels, err error) { args := s.Called(req, routes) if args.Get(0) == nil { - return (fiber.Component)(nil), args.Get(1).([]fiber.Component), args.Error(2) + return (fiber.Component)(nil), args.Get(1).([]fiber.Component), args.Get(2).(fiber.Labels), args.Error(3) } - return args.Get(0).(fiber.Component), args.Get(1).([]fiber.Component), args.Error(2) + return args.Get(0).(fiber.Component), args.Get(1).([]fiber.Component), args.Get(2).(fiber.Labels), args.Error(3) } func NewMockRoutingStrategy( @@ -39,9 +39,10 @@ func NewMockRoutingStrategy( time.Sleep(latency) }). Return( - func() (fiber.Component, []fiber.Component, error) { + func() (fiber.Component, []fiber.Component, fiber.Labels, error) { + labels := fiber.NewLabelsMap().WithLabel("order", order...) if len(order) == 0 { - return (fiber.Component)(nil), []fiber.Component{}, err + return (fiber.Component)(nil), []fiber.Component{}, labels, err } // Else fallbacks := make([]fiber.Component, 0) @@ -49,7 +50,7 @@ func NewMockRoutingStrategy( fallbacks = append(fallbacks, routes[order[i]]) } - return routes[order[0]], fallbacks, err + return routes[order[0]], fallbacks, labels, err }(), ) return strategy diff --git a/labels.go b/labels.go new file mode 100644 index 0000000..3102e35 --- /dev/null +++ b/labels.go @@ -0,0 +1,39 @@ +package fiber + +type Labels interface { + Keys() []string + Label(key string) []string + WithLabel(key string, values ...string) Labels +} + +// LabelsMap implements the Labels interface via a simple map +type LabelsMap map[string][]string + +func (a LabelsMap) Keys() []string { + keys := make([]string, len(a)) + + i := 0 + for k := range a { + keys[i] = k + i++ + } + return keys +} + +func (a LabelsMap) Label(key string) []string { + if values, ok := a[key]; ok { + return values + } else { + return []string{} + } +} + +func (a LabelsMap) WithLabel(key string, values ...string) Labels { + a[key] = values + return a +} + +func NewLabelsMap() Labels { + var newMap LabelsMap = map[string][]string{} + return newMap +} diff --git a/labels_test.go b/labels_test.go new file mode 100644 index 0000000..ead1f21 --- /dev/null +++ b/labels_test.go @@ -0,0 +1,91 @@ +package fiber_test + +import ( + "sort" + "testing" + + "github.com/gojek/fiber" + "github.com/stretchr/testify/assert" +) + +func TestLabelsMapKeys(t *testing.T) { + tests := map[string]struct { + data fiber.LabelsMap + expected []string + }{ + "empty map": { + data: fiber.LabelsMap{}, + expected: []string{}, + }, + "non-empty map": { + data: fiber.LabelsMap{"k1": []string{"v1", "v2"}, "k2": []string{"v3"}}, + expected: []string{"k1", "k2"}, + }, + } + + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + keys := tt.data.Keys() + // Sort and compare + sort.Slice(keys, func(p, q int) bool { + return keys[p] < keys[q] + }) + assert.Equal(t, tt.expected, keys) + }) + } +} + +func TestLabelsMapWithLabel(t *testing.T) { + tests := map[string]struct { + data fiber.LabelsMap + key string + values []string + expected fiber.LabelsMap + }{ + "set key": { + data: fiber.LabelsMap{}, + key: "k1", + values: []string{"v1", "v2"}, + expected: fiber.LabelsMap{"k1": []string{"v1", "v2"}}, + }, + "overwrite key": { + data: fiber.LabelsMap{"k1": []string{"v1", "v2"}, "k2": []string{"v3"}}, + key: "k2", + values: []string{"new-val"}, + expected: fiber.LabelsMap{"k1": []string{"v1", "v2"}, "k2": []string{"new-val"}}, + }, + } + + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + newMap := tt.data.WithLabel(tt.key, tt.values...) + assert.Equal(t, tt.expected, newMap) + }) + } +} + +func TestLabelsMapLabel(t *testing.T) { + tests := map[string]struct { + data fiber.LabelsMap + key string + expected []string + }{ + "empty map": { + data: fiber.LabelsMap{}, + key: "k", + expected: []string{}, + }, + "non-empty map": { + data: fiber.LabelsMap{"k1": []string{"v1", "v2"}, "k2": []string{"v3"}}, + key: "k1", + expected: []string{"v1", "v2"}, + }, + } + + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + values := tt.data.Label(tt.key) + assert.Equal(t, tt.expected, values) + }) + } +} diff --git a/lazy_router.go b/lazy_router.go index d2d23f1..50eb5be 100644 --- a/lazy_router.go +++ b/lazy_router.go @@ -50,25 +50,24 @@ func (r *LazyRouter) Dispatch(ctx context.Context, req Request) ResponseQueue { defer close(out) var routes []Component - routesOrderCh, errCh := r.strategy.getRoutesOrder(ctx, req, r.routes) - for routesOrderCh != nil || errCh != nil { - select { - case orderedRoutes, ok := <-routesOrderCh: - if ok { - routes = orderedRoutes - } else { - routesOrderCh = nil - } - case err, ok := <-errCh: - if ok { - out <- NewErrorResponse(errors.NewFiberError(req.Protocol(), err)) + var labels Labels = NewLabelsMap() + + routesOrderCh := r.strategy.getRoutesOrder(ctx, req, r.routes) + + select { + case routesOrderResponse, ok := <-routesOrderCh: + if ok { + labels = routesOrderResponse.Labels + if routesOrderResponse.Err != nil { + out <- NewErrorResponse(errors.NewFiberError(req.Protocol(), routesOrderResponse.Err)).WithLabels(labels) return + } else { + routes = routesOrderResponse.Components } - errCh = nil - case <-ctx.Done(): - out <- NewErrorResponse(errors.ErrRouterStrategyTimeoutExceeded(req.Protocol())) - return } + case <-ctx.Done(): + out <- NewErrorResponse(errors.ErrRouterStrategyTimeoutExceeded(req.Protocol())).WithLabels(labels) + return } if len(routes) > 0 { @@ -76,7 +75,7 @@ func (r *LazyRouter) Dispatch(ctx context.Context, req Request) ResponseQueue { for _, route := range routes { copyReq, _ := req.Clone() responses := make([]Response, 0) - responseCh := route.Dispatch(ctx, copyReq).Iter() + responseCh := route.Dispatch(context.WithValue(ctx, CtxComponentLabelsKey, labels), copyReq).Iter() ok := true for ok { select { @@ -89,19 +88,18 @@ func (r *LazyRouter) Dispatch(ctx context.Context, req Request) ResponseQueue { // all responseQueue from selected route are ok, sending them back to output // and breaking a cycle over other routes for _, resp := range responses { - out <- resp + out <- resp.WithLabels(labels) } return } case <-ctx.Done(): - out <- NewErrorResponse(errors.ErrRequestTimeout(req.Protocol())) + out <- NewErrorResponse(errors.ErrRequestTimeout(req.Protocol())).WithLabels(labels) return } } } - } else { - out <- NewErrorResponse(errors.ErrRouterStrategyReturnedEmptyRoutes(req.Protocol())) } + out <- NewErrorResponse(errors.ErrRouterStrategyReturnedEmptyRoutes(req.Protocol())).WithLabels(labels) }() return queue diff --git a/lazy_router_test.go b/lazy_router_test.go index cf1fc70..93ce6c3 100644 --- a/lazy_router_test.go +++ b/lazy_router_test.go @@ -63,6 +63,24 @@ func TestLazyRouter_Dispatch(t *testing.T) { }, timeout: 100 * time.Millisecond, }, + { + name: "error: no route succeeded", + routes: map[string]fiber.Component{ + "route-a": testutils.NewMockComponent( + "route-a", + testUtilsHttp.DelayedResponse{Response: testUtilsHttp.MockResp(500, "A-NOK", nil, fiberErrors.ErrServiceUnavailable(protocol.HTTP))}), + "route-b": testutils.NewMockComponent( + "route-b", + testUtilsHttp.DelayedResponse{Response: testUtilsHttp.MockResp(500, "B-NOK", nil, nil)}), + }, + strategy: []string{ + "route-a", "route-b", + }, + expected: []fiber.Response{ + testUtilsHttp.MockResp(501, "", nil, fiberErrors.ErrRouterStrategyReturnedEmptyRoutes(protocol.HTTP)), + }, + timeout: 100 * time.Millisecond, + }, { name: "error: routing strategy succeeded, but route timeout exceeded", routes: map[string]fiber.Component{ diff --git a/response.go b/response.go index a59ca78..1ee5bef 100644 --- a/response.go +++ b/response.go @@ -11,10 +11,14 @@ type Response interface { StatusCode() int BackendName() string WithBackendName(string) Response + Label(key string) []string + WithLabel(key string, values ...string) Response + WithLabels(Labels) Response } type ErrorResponse struct { *CachedPayload + labels Labels code int backend string } @@ -36,6 +40,22 @@ func (resp *ErrorResponse) StatusCode() int { return resp.code } +func (resp *ErrorResponse) Label(key string) []string { + return resp.labels.Label(key) +} + +func (resp *ErrorResponse) WithLabel(key string, values ...string) Response { + resp.labels = resp.labels.WithLabel(key, values...) + return resp +} + +func (resp *ErrorResponse) WithLabels(labels Labels) Response { + for _, key := range labels.Keys() { + resp.labels = resp.labels.WithLabel(key, labels.Label(key)...) + } + return resp +} + func NewErrorResponse(err error) Response { var fiberErr *errors.FiberError if castedError, ok := err.(*errors.FiberError); ok { @@ -47,5 +67,6 @@ func NewErrorResponse(err error) Response { return &ErrorResponse{ CachedPayload: NewCachedPayload(payload), code: fiberErr.Code, + labels: NewLabelsMap(), } } diff --git a/routing_strategy.go b/routing_strategy.go index cd50378..1df7457 100644 --- a/routing_strategy.go +++ b/routing_strategy.go @@ -11,7 +11,7 @@ type RoutingStrategy interface { SelectRoute(ctx context.Context, req Request, routes map[string]Component, - ) (route Component, fallbacks []Component, err error) + ) (route Component, fallbacks []Component, labels Labels, err error) } type baseRoutingStrategy struct { @@ -19,31 +19,34 @@ type baseRoutingStrategy struct { BaseFiberType } +type routesOrderResponse struct { + Components []Component + Labels Labels + Err error +} + func (s *baseRoutingStrategy) getRoutesOrder( ctx context.Context, req Request, routes map[string]Component, -) (<-chan []Component, <-chan error) { - out := make(chan []Component) - errCh := make(chan error, 1) +) <-chan routesOrderResponse { + out := make(chan routesOrderResponse) go func() { - route, fallbacks, err := s.SelectRoute(ctx, req, routes) - - if err != nil { - errCh <- err - } else { - // Append routes - routes := fallbacks - if route != nil { - routes = append([]Component{route}, routes...) - } - out <- routes + route, fallbacks, labels, err := s.SelectRoute(ctx, req, routes) + + // Combine preferred route with the fallbacks + routes := fallbacks + if route != nil { + routes = append([]Component{route}, routes...) + } + + out <- routesOrderResponse{ + Components: routes, + Err: err, + Labels: labels, } - // Close both channels - close(out) - close(errCh) }() - return out, errCh + return out }