Skip to content

Commit

Permalink
Add gRPC error interceptors to API client (#30578)
Browse files Browse the repository at this point in the history
* Move gRPC error intercetpors to api/utils/grpc/interceptors.

* Use error interceptors in api client and mock server.

* Apply suggestions from CR.

* Unwrap FromGRPC errors in middleware.

* Use gRPC auth service in tests instead of external example service.

* It's gRPC!!!

* Fix unit test.

* Add error interceptor to proxy client.

* Fix merge conflict.
  • Loading branch information
Joerger authored Aug 24, 2023
1 parent d110282 commit cf6473f
Show file tree
Hide file tree
Showing 55 changed files with 666 additions and 623 deletions.
8 changes: 4 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -1218,7 +1218,7 @@ buf/installed:
exit 1; \
fi

# grpc generates GRPC stubs from service definitions.
# grpc generates gRPC stubs from service definitions.
# This target runs in the buildbox container.
.PHONY: grpc
grpc:
Expand All @@ -1228,13 +1228,13 @@ else
$(MAKE) grpc/host
endif

# grpc/host generates GRPC stubs.
# grpc/host generates gRPC stubs.
# Unlike grpc, this target runs locally.
.PHONY: grpc/host
grpc/host: protos/all
@build.assets/genproto.sh

# protos-up-to-date checks if the generated GRPC stubs are up to date.
# protos-up-to-date checks if the generated gRPC stubs are up to date.
# This target runs in the buildbox container.
.PHONY: protos-up-to-date
protos-up-to-date:
Expand All @@ -1244,7 +1244,7 @@ else
$(MAKE) protos-up-to-date/host
endif

# protos-up-to-date/host checks if the generated GRPC stubs are up to date.
# protos-up-to-date/host checks if the generated gRPC stubs are up to date.
# Unlike protos-up-to-date, this target runs locally.
.PHONY: protos-up-to-date/host
protos-up-to-date/host: must-start-clean/host grpc/host
Expand Down
2 changes: 1 addition & 1 deletion api/breaker/breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func NonNilErrorIsSuccess(_ interface{}, err error) bool {
}

// IsResponseSuccessful determines whether the error provided should be ignored by the circuit breaker. This checks
// for http status codes < 500 and a few unsuccessful grpc status codes.
// for http status codes < 500 and a few unsuccessful gRPC status codes.
func IsResponseSuccessful(v interface{}, err error) bool {
switch t := v.(type) {
case nil:
Expand Down
40 changes: 20 additions & 20 deletions api/client/accesslist/accesslist.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package accesslist
import (
"context"

"github.com/gravitational/trace/trail"
"github.com/gravitational/trace"

accesslistv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/accesslist/v1"
"github.com/gravitational/teleport/api/types/accesslist"
Expand All @@ -41,15 +41,15 @@ func NewClient(grpcClient accesslistv1.AccessListServiceClient) *Client {
func (c *Client) GetAccessLists(ctx context.Context) ([]*accesslist.AccessList, error) {
resp, err := c.grpcClient.GetAccessLists(ctx, &accesslistv1.GetAccessListsRequest{})
if err != nil {
return nil, trail.FromGRPC(err)
return nil, trace.Wrap(err)
}

accessLists := make([]*accesslist.AccessList, len(resp.AccessLists))
for i, accessList := range resp.AccessLists {
var err error
accessLists[i], err = conv.FromProto(accessList)
if err != nil {
return nil, trail.FromGRPC(err)
return nil, trace.Wrap(err)
}
}

Expand All @@ -63,15 +63,15 @@ func (c *Client) ListAccessLists(ctx context.Context, pageSize int, nextToken st
NextToken: nextToken,
})
if err != nil {
return nil, "", trail.FromGRPC(err)
return nil, "", trace.Wrap(err)
}

accessLists := make([]*accesslist.AccessList, len(resp.AccessLists))
for i, accessList := range resp.AccessLists {
var err error
accessLists[i], err = conv.FromProto(accessList)
if err != nil {
return nil, "", trail.FromGRPC(err)
return nil, "", trace.Wrap(err)
}
}

Expand All @@ -84,11 +84,11 @@ func (c *Client) GetAccessList(ctx context.Context, name string) (*accesslist.Ac
Name: name,
})
if err != nil {
return nil, trail.FromGRPC(err)
return nil, trace.Wrap(err)
}

accessList, err := conv.FromProto(resp)
return accessList, trail.FromGRPC(err)
return accessList, trace.Wrap(err)
}

// UpsertAccessList creates or updates an access list resource.
Expand All @@ -97,24 +97,24 @@ func (c *Client) UpsertAccessList(ctx context.Context, accessList *accesslist.Ac
AccessList: conv.ToProto(accessList),
})
if err != nil {
return nil, trail.FromGRPC(err)
return nil, trace.Wrap(err)
}
responseAccessList, err := conv.FromProto(resp)
return responseAccessList, trail.FromGRPC(err)
return responseAccessList, trace.Wrap(err)
}

// DeleteAccessList removes the specified access list resource.
func (c *Client) DeleteAccessList(ctx context.Context, name string) error {
_, err := c.grpcClient.DeleteAccessList(ctx, &accesslistv1.DeleteAccessListRequest{
Name: name,
})
return trail.FromGRPC(err)
return trace.Wrap(err)
}

// DeleteAllAccessLists removes all access lists.
func (c *Client) DeleteAllAccessLists(ctx context.Context) error {
_, err := c.grpcClient.DeleteAllAccessLists(ctx, &accesslistv1.DeleteAllAccessListsRequest{})
return trail.FromGRPC(err)
return trace.Wrap(err)
}

// ListAccessListMembers returns a paginated list of all access list members for an access list.
Expand All @@ -125,15 +125,15 @@ func (c *Client) ListAccessListMembers(ctx context.Context, accessList string, p
AccessList: accessList,
})
if err != nil {
return nil, "", trail.FromGRPC(err)
return nil, "", trace.Wrap(err)
}

members = make([]*accesslist.AccessListMember, len(resp.Members))
for i, accessList := range resp.Members {
var err error
members[i], err = conv.FromMemberProto(accessList)
if err != nil {
return nil, "", trail.FromGRPC(err)
return nil, "", trace.Wrap(err)
}
}

Expand All @@ -147,11 +147,11 @@ func (c *Client) GetAccessListMember(ctx context.Context, accessList string, mem
MemberName: memberName,
})
if err != nil {
return nil, trail.FromGRPC(err)
return nil, trace.Wrap(err)
}

member, err := conv.FromMemberProto(resp)
return member, trail.FromGRPC(err)
return member, trace.Wrap(err)
}

// UpsertAccessListMember creates or updates an access list member resource.
Expand All @@ -160,10 +160,10 @@ func (c *Client) UpsertAccessListMember(ctx context.Context, member *accesslist.
Member: conv.ToMemberProto(member),
})
if err != nil {
return nil, trail.FromGRPC(err)
return nil, trace.Wrap(err)
}
responseMember, err := conv.FromMemberProto(resp)
return responseMember, trail.FromGRPC(err)
return responseMember, trace.Wrap(err)
}

// DeleteAccessListMember hard deletes the specified access list member resource.
Expand All @@ -172,19 +172,19 @@ func (c *Client) DeleteAccessListMember(ctx context.Context, accessList string,
AccessList: accessList,
MemberName: memberName,
})
return trail.FromGRPC(err)
return trace.Wrap(err)
}

// DeleteAllAccessListMembers hard deletes all access list members for an access list.
func (c *Client) DeleteAllAccessListMembersForAccessList(ctx context.Context, accessList string) error {
_, err := c.grpcClient.DeleteAllAccessListMembersForAccessList(ctx, &accesslistv1.DeleteAllAccessListMembersForAccessListRequest{
AccessList: accessList,
})
return trail.FromGRPC(err)
return trace.Wrap(err)
}

// DeleteAllAccessListMembers hard deletes all access list members.
func (c *Client) DeleteAllAccessListMembers(ctx context.Context) error {
_, err := c.grpcClient.DeleteAllAccessListMembers(ctx, &accesslistv1.DeleteAllAccessListMembersRequest{})
return trail.FromGRPC(err)
return trace.Wrap(err)
}
13 changes: 6 additions & 7 deletions api/client/auditstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"sync"

"github.com/gravitational/trace"
"github.com/gravitational/trace/trail"
"google.golang.org/grpc"
ggzip "google.golang.org/grpc/encoding/gzip"

Expand All @@ -35,7 +34,7 @@ func (c *Client) createOrResumeAuditStream(ctx context.Context, request proto.Au
stream, err := c.grpc.CreateAuditStream(closeCtx, grpc.UseCompressor(ggzip.Name))
if err != nil {
cancel()
return nil, trail.FromGRPC(err)
return nil, trace.Wrap(err)
}
s := &auditStreamer{
stream: stream,
Expand All @@ -46,7 +45,7 @@ func (c *Client) createOrResumeAuditStream(ctx context.Context, request proto.Au
go s.recv()
err = s.stream.Send(&request)
if err != nil {
return nil, trace.NewAggregate(s.Close(ctx), trail.FromGRPC(err))
return nil, trace.NewAggregate(s.Close(ctx), trace.Wrap(err))
}
return s, nil
}
Expand Down Expand Up @@ -85,7 +84,7 @@ type auditStreamer struct {
// the stream completed and closes the stream instance.
func (s *auditStreamer) Close(ctx context.Context) error {
defer s.closeWithError(nil)
return trail.FromGRPC(s.stream.Send(&proto.AuditStreamRequest{
return trace.Wrap(s.stream.Send(&proto.AuditStreamRequest{
Request: &proto.AuditStreamRequest_FlushAndCloseStream{
FlushAndCloseStream: &proto.FlushAndCloseStream{},
},
Expand All @@ -94,7 +93,7 @@ func (s *auditStreamer) Close(ctx context.Context) error {

// Complete completes stream.
func (s *auditStreamer) Complete(ctx context.Context) error {
return trail.FromGRPC(s.stream.Send(&proto.AuditStreamRequest{
return trace.Wrap(s.stream.Send(&proto.AuditStreamRequest{
Request: &proto.AuditStreamRequest_CompleteStream{
CompleteStream: &proto.CompleteStream{},
},
Expand All @@ -113,7 +112,7 @@ func (s *auditStreamer) RecordEvent(ctx context.Context, event events.PreparedSe
if err != nil {
return trace.Wrap(err)
}
err = trail.FromGRPC(s.stream.Send(&proto.AuditStreamRequest{
err = trace.Wrap(s.stream.Send(&proto.AuditStreamRequest{
Request: &proto.AuditStreamRequest_Event{Event: oneof},
}))
if err != nil {
Expand Down Expand Up @@ -141,7 +140,7 @@ func (s *auditStreamer) recv() {
for {
status, err := s.stream.Recv()
if err != nil {
s.closeWithError(trail.FromGRPC(err))
s.closeWithError(trace.Wrap(err))
return
}
select {
Expand Down
Loading

0 comments on commit cf6473f

Please sign in to comment.