Skip to content

Commit

Permalink
feat: add streaming updates core functionality and project streaming (#…
Browse files Browse the repository at this point in the history
…8669)

---------

Co-authored-by: Ryan <rb@hpe.com> & Eric L. <eric.liu@hpe.com>
  • Loading branch information
corban-beaird and eecsliu authored Feb 9, 2024
1 parent ed61121 commit 6206bde
Show file tree
Hide file tree
Showing 24 changed files with 2,822 additions and 4 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ require (
github.com/hashicorp/go-cleanhttp v0.5.2
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/jinzhu/copier v0.3.5
github.com/lib/pq v1.10.2
github.com/shirou/gopsutil v3.21.11+incompatible
github.com/uptrace/bun v1.1.14
github.com/uptrace/bun/dialect/pgdialect v1.1.14
Expand Down
6 changes: 6 additions & 0 deletions master/internal/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/determined-ai/determined/master/internal/prom"
"github.com/determined-ai/determined/master/internal/proxy"
"github.com/determined-ai/determined/master/internal/rm"
"github.com/determined-ai/determined/master/internal/stream"
"github.com/determined-ai/determined/master/internal/task"
"github.com/determined-ai/determined/master/internal/task/tasklogger"
"github.com/determined-ai/determined/master/internal/task/taskmodel"
Expand Down Expand Up @@ -1245,6 +1246,11 @@ func (m *Master) Run(ctx context.Context, gRPCLogInitDone chan struct{}) error {

webhooks.Init()
defer webhooks.Deinit()
ssup := stream.NewSupervisor(m.db.URL)
go func() {
_ = ssup.Run(ctx)
}()
m.echo.GET("/stream", api.WebSocketRoute(ssup.Websocket))

return m.startServers(ctx, cert, gRPCLogInitDone)
}
2 changes: 1 addition & 1 deletion master/internal/db/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func ensureMigrationUpgrade(tx *pg.Tx) error {
func (db *PgDB) Migrate(migrationURL string, actions []string) error {
// go-pg/migrations uses go-pg/pg connection API, which is not compatible
// with pgx, so we use a one-off go-pg/pg connection.
pgOpts, err := makeGoPgOpts(db.url)
pgOpts, err := makeGoPgOpts(db.URL)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions master/internal/db/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func PaginateBunUnsafe(
type PgDB struct {
sql *sqlx.DB
queries *StaticQueryMap
url string
URL string
}

// ConnectPostgres connects to a Postgres database.
Expand All @@ -181,7 +181,7 @@ func ConnectPostgres(url string) (*PgDB, error) {
for {
sql, err := sqlx.Connect("pgx", url)
if err == nil {
db := &PgDB{sql: sql, queries: &StaticQueryMap{}, url: url}
db := &PgDB{sql: sql, queries: &StaticQueryMap{}, URL: url}
initTheOneBun(db)
return db, nil
}
Expand Down
2 changes: 1 addition & 1 deletion master/internal/experiment/authz_basic_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
// ExperimentAuthZBasic is basic OSS controls.
type ExperimentAuthZBasic struct{}

// CanGetExperiment always returns true and a nill error.
// CanGetExperiment always returns true and a nil error.
func (a *ExperimentAuthZBasic) CanGetExperiment(
ctx context.Context, curUser model.User, e *model.Experiment,
) error {
Expand Down
29 changes: 29 additions & 0 deletions master/internal/stream/authz_basic_impl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package stream

import (
"context"

"github.com/lib/pq"

"github.com/determined-ai/determined/master/pkg/model"
)

// StreamAuthZBasic is classic OSS Determined authentication for streaming clients.
type StreamAuthZBasic struct{}

// GetProjectStreamableScopes always returns an AccessScopeSet with global permissions and a nil error.
func (a *StreamAuthZBasic) GetProjectStreamableScopes(
_ context.Context,
_ model.User,
) (model.AccessScopeSet, error) {
return model.AccessScopeSet{model.GlobalAccessScopeID: true}, nil
}

// GetPermissionChangeListener always returns a nil pointer and a nil error.
func (a *StreamAuthZBasic) GetPermissionChangeListener() (*pq.Listener, error) {
return nil, nil
}

func init() {
AuthZProvider.Register("basic", &StreamAuthZBasic{})
}
23 changes: 23 additions & 0 deletions master/internal/stream/authz_iface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package stream

import (
"context"

"github.com/lib/pq"

"github.com/determined-ai/determined/master/internal/authz"
"github.com/determined-ai/determined/master/pkg/model"
)

// StreamAuthZ is the interface for streaming authorization.
type StreamAuthZ interface {
// GetProjectStreamableScopes returns an AccessScopeSet where the user has permission to view projects.
GetProjectStreamableScopes(ctx context.Context, curUser model.User) (model.AccessScopeSet, error)

// GetPermissionChangeListener returns a pointer listener
// listening for permission change notifications if applicable.
GetPermissionChangeListener() (*pq.Listener, error)
}

// AuthZProvider provides StreamAuthZ implementations.
var AuthZProvider authz.AuthZProviderType[StreamAuthZ]
43 changes: 43 additions & 0 deletions master/internal/stream/messages.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package stream

import (
"encoding/json"

"github.com/gorilla/websocket"
log "github.com/sirupsen/logrus"

"github.com/determined-ai/determined/master/pkg/stream"
)

// StartupMsg is the first message a streaming client sends.
//
// It declares initially known keys and also configures the initial subscriptions for the stream.
type StartupMsg struct {
SyncID string `json:"sync_id"`
Known KnownKeySet `json:"known"`
Subscribe SubscriptionSpecSet `json:"subscribe"`
}

// KnownKeySet allows a client to describe which primary keys it knows of as existing,
// so the server can respond with which client-known keys have been deleted or disappeared,
// and also which server-known keys are not yet known to the client (appearances).
//
// Each field of a KnownKeySet is a comma-separated list of int64s and ranges like "a,b-c,d".
type KnownKeySet struct {
Projects string `json:"projects"`
}

// prepareWebsocketMessage converts the MarshallableMsg into a websocket.PreparedMessage.
func prepareWebsocketMessage(obj stream.MarshallableMsg) interface{} {
jbytes, err := json.Marshal(obj)
if err != nil {
log.Errorf("error marshaling message for streaming: %s", err.Error())
return nil
}
msg, err := websocket.NewPreparedMessage(websocket.TextMessage, jbytes)
if err != nil {
log.Errorf("error preparing message for streaming: %s", err.Error())
return nil
}
return msg
}
Loading

0 comments on commit 6206bde

Please sign in to comment.