Skip to content

Commit

Permalink
feat(test): add notification worker test
Browse files Browse the repository at this point in the history
  • Loading branch information
mabdh committed Nov 30, 2022
1 parent 3f14785 commit 87bfd6d
Show file tree
Hide file tree
Showing 7 changed files with 269 additions and 89 deletions.
184 changes: 100 additions & 84 deletions cli/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,52 +78,11 @@ func workerStartNotificationHandlerCommand() *cobra.Command {
return err
}

logger := initLogger(cfg.Log)

dbClient, err := db.New(cfg.DB)
if err != nil {
return err
}

pgClient, err := pgc.NewClient(logger, dbClient)
if err != nil {
return err
}

encryptor, err := secret.New(cfg.Service.EncryptionKey)
if err != nil {
return fmt.Errorf("cannot initialize encryptor: %w", err)
}

_, notifierRegistry, err := InitAPIDeps(logger, cfg, pgClient, encryptor, nil)
if err != nil {
return err
}

cancelWorkerChan := make(chan struct{})

var queue notification.Queuer
switch cfg.Notification.Queue.Kind {
case queues.KindPostgres:
queue, err = postgresq.New(logger, cfg.DB)
if err != nil {
return err
}
default:
return fmt.Errorf(heredoc.Docf(`
unsupported kind of queue for worker: %s
supported queue kind are:
- postgres
`, cfg.Notification.Queue.Kind.String()))
if err := StartNotificationHandlerWorker(ctx, cfg, cancelWorkerChan); err != nil {
return err
}
workerTicker := worker.NewTicker(logger, worker.WithTickerDuration(cfg.Notification.MessageHandler.PollDuration), worker.WithID("message-worker"))
notificationHandler := notification.NewHandler(cfg.Notification.MessageHandler, logger, queue, notifierRegistry,
notification.HandlerWithIdentifier(workerTicker.GetID()))
go func() {
workerTicker.Run(ctx, cancelWorkerChan, func(ctx context.Context, runningAt time.Time) error {
return notificationHandler.Process(ctx, runningAt)
})
}()

<-ctx.Done()
close(cancelWorkerChan)
Expand Down Expand Up @@ -154,61 +113,118 @@ func workerStartNotificationDLQHandlerCommand() *cobra.Command {
return err
}

logger := initLogger(cfg.Log)
cancelWorkerChan := make(chan struct{})

dbClient, err := db.New(cfg.DB)
if err != nil {
if err := StartNotificationDLQHandlerWorker(ctx, cfg, cancelWorkerChan); err != nil {
return err
}

pgClient, err := pgc.NewClient(logger, dbClient)
if err != nil {
return err
}
<-ctx.Done()
close(cancelWorkerChan)

encryptor, err := secret.New(cfg.Service.EncryptionKey)
if err != nil {
return fmt.Errorf("cannot initialize encryptor: %w", err)
}
return nil
},
}

_, notifierRegistry, err := InitAPIDeps(logger, cfg, pgClient, encryptor, nil)
if err != nil {
return err
}
c.Flags().StringVarP(&configFile, "config", "c", "config.yaml", "Config file path")
return c
}

cancelWorkerChan := make(chan struct{})
func StartNotificationHandlerWorker(ctx context.Context, cfg config.Config, cancelWorkerChan chan struct{}) error {
logger := initLogger(cfg.Log)

dbClient, err := db.New(cfg.DB)
if err != nil {
return err
}

var queue notification.Queuer
switch cfg.Notification.Queue.Kind {
case queues.KindPostgres:
queue, err = postgresq.New(logger, cfg.DB, postgresq.WithStrategy(postgresq.StrategyDLQ))
if err != nil {
return err
}
default:
return fmt.Errorf(heredoc.Docf(`
pgClient, err := pgc.NewClient(logger, dbClient)
if err != nil {
return err
}

encryptor, err := secret.New(cfg.Service.EncryptionKey)
if err != nil {
return fmt.Errorf("cannot initialize encryptor: %w", err)
}

_, notifierRegistry, err := InitAPIDeps(logger, cfg, pgClient, encryptor, nil)
if err != nil {
return err
}

var queue notification.Queuer
switch cfg.Notification.Queue.Kind {
case queues.KindPostgres:
queue, err = postgresq.New(logger, cfg.DB)
if err != nil {
return err
}
default:
return fmt.Errorf(heredoc.Docf(`
unsupported kind of queue for worker: %s
supported queue kind are:
- postgres
`, string(cfg.Notification.Queue.Kind)))
}
`, cfg.Notification.Queue.Kind.String()))
}
workerTicker := worker.NewTicker(logger, worker.WithTickerDuration(cfg.Notification.MessageHandler.PollDuration), worker.WithID("message-worker"))
notificationHandler := notification.NewHandler(cfg.Notification.MessageHandler, logger, queue, notifierRegistry,
notification.HandlerWithIdentifier(workerTicker.GetID()))
go func() {
workerTicker.Run(ctx, cancelWorkerChan, func(ctx context.Context, runningAt time.Time) error {
return notificationHandler.Process(ctx, runningAt)
})
}()

return nil
}

workerTicker := worker.NewTicker(logger, worker.WithTickerDuration(cfg.Notification.DLQHandler.PollDuration), worker.WithID("dlq-worker"))
notificationHandler := notification.NewHandler(cfg.Notification.DLQHandler, logger, queue, notifierRegistry,
notification.HandlerWithIdentifier("dlq-"+workerTicker.GetID()))
go func() {
workerTicker.Run(ctx, cancelWorkerChan, func(ctx context.Context, runningAt time.Time) error {
return notificationHandler.Process(ctx, runningAt)
})
}()
func StartNotificationDLQHandlerWorker(ctx context.Context, cfg config.Config, cancelWorkerChan chan struct{}) error {
logger := initLogger(cfg.Log)

<-ctx.Done()
close(cancelWorkerChan)
dbClient, err := db.New(cfg.DB)
if err != nil {
return err
}

return nil
},
pgClient, err := pgc.NewClient(logger, dbClient)
if err != nil {
return err
}

c.Flags().StringVarP(&configFile, "config", "c", "config.yaml", "Config file path")
return c
encryptor, err := secret.New(cfg.Service.EncryptionKey)
if err != nil {
return fmt.Errorf("cannot initialize encryptor: %w", err)
}

_, notifierRegistry, err := InitAPIDeps(logger, cfg, pgClient, encryptor, nil)
if err != nil {
return err
}

var queue notification.Queuer
switch cfg.Notification.Queue.Kind {
case queues.KindPostgres:
queue, err = postgresq.New(logger, cfg.DB, postgresq.WithStrategy(postgresq.StrategyDLQ))
if err != nil {
return err
}
default:
return fmt.Errorf(heredoc.Docf(`
unsupported kind of queue for worker: %s
supported queue kind are:
- postgres
`, string(cfg.Notification.Queue.Kind)))
}

workerTicker := worker.NewTicker(logger, worker.WithTickerDuration(cfg.Notification.DLQHandler.PollDuration), worker.WithID("dlq-worker"))
notificationHandler := notification.NewHandler(cfg.Notification.DLQHandler, logger, queue, notifierRegistry,
notification.HandlerWithIdentifier("dlq-"+workerTicker.GetID()))
go func() {
workerTicker.Run(ctx, cancelWorkerChan, func(ctx context.Context, runningAt time.Time) error {
return notificationHandler.Process(ctx, runningAt)
})
}()

return nil
}
2 changes: 1 addition & 1 deletion pkg/pgc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (c *Client) QueryxContext(ctx context.Context, op string, tableName string,
if err != nil {
span.SetStatus(trace.Status{
Code: trace.StatusCodeUnknown,
Message: sqlxRow.Err().Error(),
Message: err.Error(),
})
}
return sqlxRow, err
Expand Down
2 changes: 1 addition & 1 deletion test/e2e_test/cortex_alerting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (s *CortexAlertingTestSuite) SetupTest() {
s.appConfig.Notification.MessageHandler.Enabled = true
s.appConfig.Notification.DLQHandler.Enabled = true

StartSiren(*s.appConfig)
StartSirenServer(*s.appConfig)

ctx := context.Background()
s.client, s.cancelClient, err = CreateClient(ctx, fmt.Sprintf("localhost:%d", apiPort))
Expand Down
2 changes: 1 addition & 1 deletion test/e2e_test/cortex_namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (s *CortexNamespaceTestSuite) SetupTest() {

// TODO host.docker.internal only works for docker-desktop to call a service in host (siren)
s.appConfig.Providers.Cortex.WebhookBaseAPI = "http://host.docker.internal:8080/v1beta1/alerts/cortex"
StartSiren(*s.appConfig)
StartSirenServer(*s.appConfig)

ctx := context.Background()
s.client, s.cancelClient, err = CreateClient(ctx, fmt.Sprintf("localhost:%d", apiPort))
Expand Down
2 changes: 1 addition & 1 deletion test/e2e_test/cortex_rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (s *CortexRuleTestSuite) SetupTest() {
// TODO host.docker.internal only works for docker-desktop to call a service in host (siren)
s.appConfig.Providers.Cortex.WebhookBaseAPI = fmt.Sprintf("http://host.docker.internal:%d/v1beta1/alerts/cortex", apiPort)
s.appConfig.Providers.Cortex.GroupWaitDuration = "1s"
StartSiren(*s.appConfig)
StartSirenServer(*s.appConfig)

ctx := context.Background()
s.client, s.cancelClient, err = CreateClient(ctx, fmt.Sprintf("localhost:%d", apiPort))
Expand Down
25 changes: 24 additions & 1 deletion test/e2e_test/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func getFreePort() (int, error) {
return l.Addr().(*net.TCPAddr).Port, nil
}

func StartSiren(cfg config.Config) {
func StartSirenServer(cfg config.Config) {
logger := log.NewZap()
logger.Info("starting up siren...")
go func() {
Expand All @@ -239,3 +239,26 @@ func StartSiren(cfg config.Config) {
logger.Info("siren is up")

}

func StartSirenMessageWorker(cfg config.Config, closeChannel chan struct{}) error {
logger := log.NewZap()
logger.Info("starting up siren notification message worker...")

if err := cli.StartNotificationHandlerWorker(context.Background(), cfg, closeChannel); err != nil {
return err
}
logger.Info("siren notification message is running")
return nil
}

func StartSirenDLQWorker(cfg config.Config, closeChannel chan struct{}) error {
logger := log.NewZap()
logger.Info("starting up siren notification dlq worker...")

if err := cli.StartNotificationDLQHandlerWorker(context.Background(), cfg, closeChannel); err != nil {
return err
}

logger.Info("siren notification dlq is running")
return nil
}
Loading

0 comments on commit 87bfd6d

Please sign in to comment.