Skip to content

Commit

Permalink
[#16] .Update avoid duplicate logging of errors
Browse files Browse the repository at this point in the history
  • Loading branch information
ThomasObenaus committed Feb 18, 2020
1 parent e39db87 commit a478534
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 9 deletions.
55 changes: 51 additions & 4 deletions cosmos.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"sync"
"time"

"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/spf13/cast"
"github.com/supplyon/gremcos/interfaces"
)

Expand Down Expand Up @@ -95,8 +97,12 @@ func New(host string, options ...Option) (*Cosmos, error) {
cosmos.wg.Add(1)
go func() {
defer cosmos.wg.Done()
for err := range cosmos.errorChannel {
cosmos.logger.Error().Err(err).Msg("Error from connection pool received")
for range cosmos.errorChannel {
// consume the errors from the channel
// at the moment it is not needed to post them to the log since they are
// anyway handed over to the caller
// For debugging the following line can be uncommented
// cosmos.logger.Error().Err(err).Msg("Error from connection pool received")
}
cosmos.logger.Debug().Msg("Error channel consumer closed")
}()
Expand All @@ -109,8 +115,16 @@ func (c *Cosmos) dial() (interfaces.QueryExecutor, error) {
return Dial(c.dialer, c.errorChannel, SetAuth(c.username, c.password), PingInterval(time.Second*30))
}

func (c *Cosmos) Execute(query string) (resp []interfaces.Response, err error) {
return c.pool.Execute(query)
func (c *Cosmos) Execute(query string) ([]interfaces.Response, error) {

resp, err := c.pool.Execute(query)

// try to investigate the responses and to find out if we can find more specific error information
if respErr := extractFirstError(resp); respErr != nil {
err = respErr
}

return resp, err
}

func (c *Cosmos) ExecuteAsync(query string, responseChannel chan interfaces.AsyncResponse) (err error) {
Expand Down Expand Up @@ -140,6 +154,39 @@ func (c *Cosmos) IsHealthy() error {
return c.pool.Ping()
}

// extractFirstError runs through the given responses and returns the first error it finds.
// All information (e.g.)
func extractFirstError(responses []interfaces.Response) error {

for _, response := range responses {
statusCode := response.Status.Code

// everything ok --> skip this response
if statusCode == interfaces.StatusSuccess || statusCode == interfaces.StatusNoContent || statusCode == interfaces.StatusPartialContent {
continue
}

// since all success codes are already skipped
// here we have an error

// Do specific a interpretation on the 500 errors if possible.
// Usually from CosmosDB we can use additional headers to extract more detail
if statusCode == interfaces.StatusServerError {
responseInfo, err := parseAttributeMap(response.Status.Attributes)
if err != nil {
// if we can't parse/ interpret the attribute map then we return the full/ unparsed error information
return fmt.Errorf("Failed parsing attributes of response: '%s'. Unparsed error: %d - %s", err.Error(), response.Status.Code, response.Status.Message)
}
return fmt.Errorf("%d (%d) - %s", responseInfo.statusCode, responseInfo.subStatusCode, responseInfo.statusDescription)
}

// for the remaining error status codes do the usual error detection mechanism based on the main status code
return extractError(response)
}

return nil
}

func parseAttributeMap(attributes map[string]interface{}) (responseInformation, error) {

responseInfo := responseInformation{}
Expand Down
6 changes: 3 additions & 3 deletions examples/cosmos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func main() {
gremcos.WithAuth(username, password),
gremcos.WithLogger(logger),
gremcos.NumMaxActiveConnections(10),
gremcos.ConnectionIdleTimeout(time.Second*1),
gremcos.ConnectionIdleTimeout(time.Second*30),
)

if err != nil {
Expand All @@ -63,8 +63,8 @@ func processLoop(cosmos *gremcos.Cosmos, logger zerolog.Logger, exitChannel chan
signal.Notify(signalChannel, syscall.SIGINT, syscall.SIGTERM)

// create tickers for doing health check and queries
queryTicker := time.NewTicker(time.Second * 5)
healthCheckTicker := time.NewTicker(time.Second * 1)
queryTicker := time.NewTicker(time.Second * 2)
healthCheckTicker := time.NewTicker(time.Second * 30)

// ensure to clean up as soon as the processLoop has been left
defer func() {
Expand Down
4 changes: 2 additions & 2 deletions examples/local/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ func processLoop(cosmos *gremcos.Cosmos, logger zerolog.Logger, exitChannel chan
signal.Notify(signalChannel, syscall.SIGINT, syscall.SIGTERM)

// create tickers for doing health check and queries
queryTicker := time.NewTicker(time.Second * 2)
healthCheckTicker := time.NewTicker(time.Second * 1)
queryTicker := time.NewTicker(time.Millisecond * 2000)
healthCheckTicker := time.NewTicker(time.Second * 20)

// ensure to clean up as soon as the processLoop has been left
defer func() {
Expand Down

0 comments on commit a478534

Please sign in to comment.