Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enh: First version of the notifications service #3217

Merged
merged 4 commits into from
Feb 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/unreleased/notifications-service.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: Implement notifications service

Implemented the minimal version of the notifications service to be able to notify a user when they received a share.

https://github.com/owncloud/ocis/pull/3217
14 changes: 14 additions & 0 deletions notifications/cmd/notifications/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package main

import (
"os"

"github.com/owncloud/ocis/notifications/pkg/command"
"github.com/owncloud/ocis/notifications/pkg/config"
)

func main() {
if err := command.Execute(config.DefaultConfig()); err != nil {
os.Exit(1)
}
}
105 changes: 105 additions & 0 deletions notifications/pkg/channels/channels.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Package channels provides different communication channels to notify users.
package channels

import (
"context"
"net/smtp"

gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
groups "github.com/cs3org/go-cs3apis/cs3/identity/group/v1beta1"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
"github.com/cs3org/reva/pkg/rgrpc/todo/pool"
"github.com/owncloud/ocis/notifications/pkg/config"
"github.com/owncloud/ocis/ocis-pkg/log"
"github.com/pkg/errors"
)

// Channel defines the methods of a communication channel.
type Channel interface {
// SendMessage sends a message to users.
SendMessage(userIDs []string, msg string) error
// SendMessageToGroup sends a message to a group.
SendMessageToGroup(groupdID *groups.GroupId, msg string) error
}

// NewMailChannel instantiates a new mail communication channel.
func NewMailChannel(cfg config.Config, logger log.Logger) (Channel, error) {
gc, err := pool.GetGatewayServiceClient(cfg.Notifications.RevaGateway)
if err != nil {
logger.Error().Err(err).Msg("could not get gateway client")
return nil, err
}
return Mail{
gatewayClient: gc,
conf: cfg,
logger: logger,
}, nil
}

// Mail is the communcation channel for email.
type Mail struct {
gatewayClient gateway.GatewayAPIClient
conf config.Config
logger log.Logger
}

// SendMessage sends a message to all given users.
func (m Mail) SendMessage(userIDs []string, msg string) error {
to, err := m.getReceiverAddresses(userIDs)
if err != nil {
return err
}
body := []byte(msg)

smtpConf := m.conf.Notifications.SMTP
auth := smtp.PlainAuth("", smtpConf.Sender, smtpConf.Password, smtpConf.Host)
if err := smtp.SendMail(smtpConf.Host+":"+smtpConf.Port, auth, smtpConf.Sender, to, body); err != nil {
return errors.Wrap(err, "could not send mail")
}
return nil
}

// SendMessageToGroup sends a message to all members of the given group.
func (m Mail) SendMessageToGroup(groupID *groups.GroupId, msg string) error {
// TODO We need an authenticated context here...
res, err := m.gatewayClient.GetGroup(context.Background(), &groups.GetGroupRequest{GroupId: groupID})
if err != nil {
return err
}
if res.Status.Code != rpc.Code_CODE_OK {
return errors.New("could not get group")
}

members := make([]string, 0, len(res.Group.Members))
for _, id := range res.Group.Members {
members = append(members, id.OpaqueId)
}

return m.SendMessage(members, msg)
}

func (m Mail) getReceiverAddresses(receivers []string) ([]string, error) {
addresses := make([]string, 0, len(receivers))
for _, id := range receivers {
// Authenticate is too costly but at the moment our only option to get the user.
// We don't have an authenticated context so calling `GetUser` doesn't work.
res, err := m.gatewayClient.Authenticate(context.Background(), &gateway.AuthenticateRequest{
Type: "machine",
ClientId: "userid:" + id,
ClientSecret: m.conf.Notifications.MachineAuthSecret,
})
if err != nil {
return nil, err
}
if res.Status.Code != rpc.Code_CODE_OK {
m.logger.Error().
Interface("status", res.Status).
Str("receiver_id", id).
Msg("could not get user")
continue
}
addresses = append(addresses, res.User.Mail)
}

return addresses, nil
}
18 changes: 18 additions & 0 deletions notifications/pkg/command/health.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package command

import (
"github.com/owncloud/ocis/notifications/pkg/config"
"github.com/urfave/cli/v2"
)

// Health is the entrypoint for the health command.
func Health(cfg *config.Config) *cli.Command {
return &cli.Command{
Name: "health",
Usage: "Check health status",
Action: func(c *cli.Context) error {
// Not implemented
return nil
},
}
}
64 changes: 64 additions & 0 deletions notifications/pkg/command/root.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package command

import (
"context"
"os"

"github.com/owncloud/ocis/notifications/pkg/config"
"github.com/owncloud/ocis/ocis-pkg/clihelper"
ociscfg "github.com/owncloud/ocis/ocis-pkg/config"
"github.com/thejerf/suture/v4"
"github.com/urfave/cli/v2"
)

// GetCommands provides all commands for this service
func GetCommands(cfg *config.Config) cli.Commands {
return []*cli.Command{
// start this service
Server(cfg),

// interaction with this service

// infos about this service
Health(cfg),
Version(cfg),
}
}

// Execute is the entry point for the notifications command.
func Execute(cfg *config.Config) error {
app := clihelper.DefaultApp(&cli.App{
Name: "notifications",
Usage: "starts notifications service",
Commands: GetCommands(cfg),
})

cli.HelpFlag = &cli.BoolFlag{
Name: "help,h",
Usage: "Show the help",
}

return app.Run(os.Args)
}

// SutureService allows for the notifications command to be embedded and supervised by a suture supervisor tree.
type SutureService struct {
cfg *config.Config
}

// NewSutureService creates a new notifications.SutureService
func NewSutureService(cfg *ociscfg.Config) suture.Service {
cfg.Settings.Commons = cfg.Commons
return SutureService{
cfg: cfg.Notifications,
}
}

func (s SutureService) Serve(ctx context.Context) error {
s.cfg.Context = ctx
if err := Execute(s.cfg); err != nil {
return err
}

return nil
}
50 changes: 50 additions & 0 deletions notifications/pkg/command/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package command

import (
"fmt"

"github.com/asim/go-micro/plugins/events/nats/v4"
"github.com/cs3org/reva/pkg/events"
"github.com/cs3org/reva/pkg/events/server"
"github.com/owncloud/ocis/notifications/pkg/channels"
"github.com/owncloud/ocis/notifications/pkg/config"
"github.com/owncloud/ocis/notifications/pkg/config/parser"
"github.com/owncloud/ocis/notifications/pkg/logging"
"github.com/owncloud/ocis/notifications/pkg/service"
"github.com/urfave/cli/v2"
)

// Server is the entrypoint for the server command.
func Server(cfg *config.Config) *cli.Command {
return &cli.Command{
Name: "server",
Usage: fmt.Sprintf("start %s extension without runtime (unsupervised mode)", cfg.Service.Name),
Category: "server",
Before: func(c *cli.Context) error {
return parser.ParseConfig(cfg)
},
Action: func(c *cli.Context) error {
logger := logging.Configure(cfg.Service.Name, cfg.Log)

evs := []events.Unmarshaller{
events.ShareCreated{},
}

evtsCfg := cfg.Notifications.Events
client, err := server.NewNatsStream(nats.Address(evtsCfg.Endpoint), nats.ClusterID(evtsCfg.Cluster))
if err != nil {
return err
}
evts, err := events.Consume(client, evtsCfg.ConsumerGroup, evs...)
if err != nil {
return err
}
channel, err := channels.NewMailChannel(*cfg, logger)
if err != nil {
return err
}
svc := service.NewEventsNotifier(evts, channel, logger)
return svc.Run()
},
}
}
19 changes: 19 additions & 0 deletions notifications/pkg/command/version.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package command

import (
"github.com/owncloud/ocis/notifications/pkg/config"
"github.com/urfave/cli/v2"
)

// Version prints the service versions of all running instances.
func Version(cfg *config.Config) *cli.Command {
return &cli.Command{
Name: "version",
Usage: "print the version of this binary and the running extension instances",
Category: "info",
Action: func(c *cli.Context) error {
// not implemented
return nil
},
}
}
44 changes: 44 additions & 0 deletions notifications/pkg/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package config

import (
"context"

"github.com/owncloud/ocis/ocis-pkg/shared"
)

// Config combines all available configuration parts.
type Config struct {
*shared.Commons

Service Service

Log *Log `ocisConfig:"log"`
Debug Debug `ocisConfig:"debug"`

Notifications Notifications `ocisConfig:"notifications"`

Context context.Context
}

// Notifications definces the config options for the notifications service.
type Notifications struct {
SMTP SMTP `ocisConfig:"SMTP"`
Events Events `ocisConfig:"events"`
RevaGateway string `ocisConfig:"reva_gateway" env:"REVA_GATEWAY;NOTIFICATIONS_REVA_GATEWAY"`
MachineAuthSecret string `ocisConfig:"machine_auth_api_key" env:"OCIS_MACHINE_AUTH_API_KEY;NOTIFICATIONS_MACHINE_AUTH_API_KEY"`
}

// SMTP combines the smtp configuration options.
type SMTP struct {
Host string `ocisConfig:"smtp_host" env:"NOTIFICATIONS_SMTP_HOST"`
Port string `ocisConfig:"smtp_port" env:"NOTIFICATIONS_SMTP_PORT"`
Sender string `ocisConfig:"smtp_sender" env:"NOTIFICATIONS_SMTP_SENDER"`
Password string `ocisConfig:"smtp_password" env:"NOTIFICATIONS_SMTP_PASSWORD"`
}

// Events combines the configuration options for the event bus.
type Events struct {
Endpoint string `ocisConfig:"events_endpoint" env:"NOTIFICATIONS_EVENTS_ENDPOINT"`
Cluster string `ocisConfig:"events_cluster" env:"NOTIFICATIONS_EVENTS_CLUSTER"`
ConsumerGroup string `ocisConfig:"events_group" env:"NOTIFICATIONS_EVENTS_GROUP"`
}
9 changes: 9 additions & 0 deletions notifications/pkg/config/debug.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package config

// Debug defines the available debug configuration.
type Debug struct {
Addr string `ocisConfig:"addr" env:"NOTIFICATIONS_DEBUG_ADDR"`
Token string `ocisConfig:"token" env:"NOTIFICATIONS_DEBUG_TOKEN"`
Pprof bool `ocisConfig:"pprof" env:"NOTIFICATIONS_DEBUG_PPROF"`
Zpages bool `ocisConfig:"zpages" env:"NOTIFICATIONS_DEBUG_ZPAGES"`
}
27 changes: 27 additions & 0 deletions notifications/pkg/config/defaultconfig.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package config

// NOTE: Most of this configuration is not needed to keep it as simple as possible
// TODO: Clean up unneeded configuration

func DefaultConfig() *Config {
return &Config{
Service: Service{
Name: "notifications",
},
Notifications: Notifications{
SMTP: SMTP{
Host: "127.0.0.1",
Port: "1025",
Sender: "god@example.com",
Password: "godisdead",
},
Events: Events{
Endpoint: "127.0.0.1:4222",
Cluster: "test-cluster",
ConsumerGroup: "notifications",
},
RevaGateway: "127.0.0.1:9142",
MachineAuthSecret: "change-me-please",
},
}
}
9 changes: 9 additions & 0 deletions notifications/pkg/config/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package config

// Log defines the available log configuration.
type Log struct {
Level string `mapstructure:"level" env:"OCIS_LOG_LEVEL;NOTIFICATIONS_LOG_LEVEL"`
Pretty bool `mapstructure:"pretty" env:"OCIS_LOG_PRETTY;NOTIFICATIONS_LOG_PRETTY"`
Color bool `mapstructure:"color" env:"OCIS_LOG_COLOR;NOTIFICATIONS_LOG_COLOR"`
File string `mapstructure:"file" env:"OCIS_LOG_FILE;NOTIFICATIONS_LOG_FILE"`
}
Loading