From 9b4cc38f5b4a39ab8f88e4df99e695b37f593683 Mon Sep 17 00:00:00 2001 From: Jeff Ortel Date: Thu, 27 Jun 2024 09:25:05 -0500 Subject: [PATCH] :ghost: Tasking find refs (#673) Signed-off-by: Jeff Ortel --- api/application.go | 50 ++++----- api/base.go | 41 ------- api/identity.go | 20 ++-- api/task.go | 93 +--------------- api/taskgroup.go | 22 ++-- cmd/main.go | 4 - importer/manager.go | 37 ++----- k8s/api/tackle/v1alpha2/task.go | 12 ++ migration/json/fields.go | 86 ++++++++++++++ migration/json/pkg.go | 6 + migration/v14/model/core.go | 63 +++++------ model/pkg.go | 7 +- settings/hub.go | 76 +++---------- task/error.go | 14 --- task/manager.go | 191 +++++++++++++++++++------------- test/api/task/api_test.go | 14 ++- trigger/application.go | 42 +++++++ trigger/identity.go | 28 +++++ trigger/pkg.go | 53 +++++++++ 19 files changed, 458 insertions(+), 401 deletions(-) create mode 100644 migration/json/fields.go create mode 100644 migration/json/pkg.go create mode 100644 trigger/application.go create mode 100644 trigger/identity.go create mode 100644 trigger/pkg.go diff --git a/api/application.go b/api/application.go index b2f1fd57..4384a9c8 100644 --- a/api/application.go +++ b/api/application.go @@ -2,7 +2,6 @@ package api import ( "encoding/json" - "fmt" "net/http" "sort" "strings" @@ -11,7 +10,7 @@ import ( "github.com/konveyor/tackle2-hub/assessment" "github.com/konveyor/tackle2-hub/metrics" "github.com/konveyor/tackle2-hub/model" - tasking "github.com/konveyor/tackle2-hub/task" + "github.com/konveyor/tackle2-hub/trigger" "gorm.io/gorm/clause" ) @@ -250,11 +249,20 @@ func (h ApplicationHandler) Create(ctx *gin.Context) { return } - err = h.discover(ctx, m) + rtx := WithContext(ctx) + tr := trigger.Application{ + Trigger: trigger.Trigger{ + TaskManager: rtx.TaskManager, + Client: rtx.Client, + DB: h.DB(ctx), + }, + } + err = tr.Created(m) if err != nil { _ = ctx.Error(err) return } + h.Respond(ctx, http.StatusCreated, r) } @@ -380,11 +388,20 @@ func (h ApplicationHandler) Update(ctx *gin.Context) { } } - err = h.discover(ctx, m) + rtx := WithContext(ctx) + tr := trigger.Application{ + Trigger: trigger.Trigger{ + TaskManager: rtx.TaskManager, + Client: rtx.Client, + DB: h.DB(ctx), + }, + } + err = tr.Updated(m) if err != nil { _ = ctx.Error(err) return } + h.Status(ctx, http.StatusNoContent) } @@ -1074,31 +1091,6 @@ func (h ApplicationHandler) AssessmentCreate(ctx *gin.Context) { h.Respond(ctx, http.StatusCreated, r) } -// discover an application's language and frameworks by launching discovery tasks. -func (h ApplicationHandler) discover(ctx *gin.Context, application *model.Application) (err error) { - rtx := WithContext(ctx) - db := h.DB(ctx) - for _, kind := range Settings.Hub.Discovery.Tasks { - t := Task{} - t.Kind = kind - t.Name = fmt.Sprintf("%s-%s", application.Name, kind) - ref := Ref{ID: application.ID} - t.Application = &ref - t.State = tasking.Ready - taskHandler := TaskHandler{} - err = taskHandler.FindRefs(rtx.Client, &t) - if err != nil { - return - } - task := tasking.Task{Task: t.Model()} - err = rtx.TaskManager.Create(db, &task) - if err != nil { - return - } - } - return -} - // Application REST resource. type Application struct { Resource `yaml:",inline"` diff --git a/api/base.go b/api/base.go index 186c894d..bc19ce9a 100644 --- a/api/base.go +++ b/api/base.go @@ -250,47 +250,6 @@ func (h *BaseHandler) Attachment(ctx *gin.Context, name string) { attachment) } -// Merge maps B into A. -// The B map is the authority. -func (h *BaseHandler) Merge(a, b map[string]any) (out map[string]any) { - if a == nil { - a = map[string]any{} - } - if b == nil { - b = map[string]any{} - } - out = map[string]any{} - for k, v := range a { - out[k] = v - if bv, found := b[k]; found { - out[k] = bv - if av, cast := v.(map[string]any); cast { - if bv, cast := bv.(map[string]any); cast { - out[k] = h.Merge(av, bv) - } else { - out[k] = bv - } - } - } - } - for k, v := range b { - if _, found := a[k]; !found { - out[k] = v - } - } - - return -} - -// AsMap returns the object as a map. -func (h *BaseHandler) AsMap(object any) (mp map[string]any, isMap bool) { - b, _ := json.Marshal(object) - mp = make(map[string]any) - err := json.Unmarshal(b, &mp) - isMap = err == nil - return -} - // REST resource. type Resource struct { ID uint `json:"id,omitempty" yaml:"id,omitempty"` diff --git a/api/identity.go b/api/identity.go index ee624acc..40d5bd41 100644 --- a/api/identity.go +++ b/api/identity.go @@ -6,6 +6,7 @@ import ( "github.com/gin-gonic/gin" "github.com/konveyor/tackle2-hub/model" + "github.com/konveyor/tackle2-hub/trigger" "gorm.io/gorm/clause" ) @@ -207,13 +208,18 @@ func (h IdentityHandler) Update(ctx *gin.Context) { return } - appHandler := ApplicationHandler{} - for i := range m.Applications { - err = appHandler.discover(ctx, &m.Applications[i]) - if err != nil { - _ = ctx.Error(err) - return - } + rtx := WithContext(ctx) + tr := trigger.Identity{ + Trigger: trigger.Trigger{ + TaskManager: rtx.TaskManager, + Client: rtx.Client, + DB: h.DB(ctx), + }, + } + err = tr.Updated(m) + if err != nil { + _ = ctx.Error(err) + return } h.Status(ctx, http.StatusNoContent) diff --git a/api/task.go b/api/task.go index 4c0d53b2..04507705 100644 --- a/api/task.go +++ b/api/task.go @@ -1,7 +1,6 @@ package api import ( - "context" "fmt" "io/ioutil" "net/http" @@ -12,15 +11,12 @@ import ( "github.com/gin-gonic/gin" qf "github.com/konveyor/tackle2-hub/api/filter" - crd "github.com/konveyor/tackle2-hub/k8s/api/tackle/v1alpha2" "github.com/konveyor/tackle2-hub/model" "github.com/konveyor/tackle2-hub/tar" tasking "github.com/konveyor/tackle2-hub/task" "gorm.io/gorm" "gorm.io/gorm/clause" - k8serr "k8s.io/apimachinery/pkg/api/errors" "k8s.io/utils/strings/slices" - k8sclient "sigs.k8s.io/controller-runtime/pkg/client" ) // Routes @@ -293,11 +289,6 @@ func (h TaskHandler) Create(ctx *gin.Context) { return } rtx := WithContext(ctx) - err = h.FindRefs(rtx.Client, r) - if err != nil { - _ = ctx.Error(err) - return - } task := &tasking.Task{} task.With(r.Model()) task.CreateUser = h.BaseHandler.CurrentUser(ctx) @@ -336,7 +327,7 @@ func (h TaskHandler) Delete(ctx *gin.Context) { // @description Update a task. // @tags tasks // @accept json -// @success 202 +// @success 200 // @router /tasks/{id} [put] // @param id path int true "Task ID" // @param task body Task true "Task data" @@ -373,7 +364,9 @@ func (h TaskHandler) Update(ctx *gin.Context) { return } - h.Status(ctx, http.StatusAccepted) + r.With(m) + + h.Respond(ctx, http.StatusOK, r) } // Submit godoc @@ -381,7 +374,7 @@ func (h TaskHandler) Update(ctx *gin.Context) { // @description Patch and submit a task. // @tags tasks // @accept json -// @success 202 +// @success 200 // @router /tasks/{id}/submit [put] // @param id path int true "Task ID" // @param task body Task false "Task data (optional)" @@ -617,82 +610,6 @@ func (h TaskHandler) GetAttached(ctx *gin.Context) { } } -// FindRefs find referenced resources. -// - addon -// - extensions -// - kind -// - priority -// The priority is defaulted to the kind as needed. -func (h *TaskHandler) FindRefs(client k8sclient.Client, r *Task) (err error) { - if r.Addon != "" { - addon := &crd.Addon{} - name := r.Addon - err = client.Get( - context.TODO(), - k8sclient.ObjectKey{ - Name: name, - Namespace: Settings.Hub.Namespace, - }, - addon) - if err != nil { - if k8serr.IsNotFound(err) { - err = &BadRequestError{ - Reason: "Addon: " + name + " not found", - } - } - return - } - } - for _, name := range r.Extensions { - ext := &crd.Extension{} - err = client.Get( - context.TODO(), - k8sclient.ObjectKey{ - Name: name, - Namespace: Settings.Hub.Namespace, - }, - ext) - if err != nil { - if k8serr.IsNotFound(err) { - err = &BadRequestError{ - Reason: "Extension: " + name + " not found", - } - } - return - } - } - if r.Kind != "" { - kind := &crd.Task{} - name := r.Kind - err = client.Get( - context.TODO(), - k8sclient.ObjectKey{ - Name: name, - Namespace: Settings.Hub.Namespace, - }, - kind) - if err != nil { - if k8serr.IsNotFound(err) { - err = &BadRequestError{ - Reason: "Task: " + name + " not found", - } - } - return - } - if r.Priority == 0 { - r.Priority = kind.Spec.Priority - } - mA, castA := h.AsMap(kind.Spec.Data) - mB, castB := r.Data.(map[string]any) - if castA && castB { - r.Data = h.Merge(mA, mB) - } else { - r.Data = mA - } - } - return -} - // TTL time-to-live. type TTL model.TTL diff --git a/api/taskgroup.go b/api/taskgroup.go index ff6f9ea2..69bfc013 100644 --- a/api/taskgroup.go +++ b/api/taskgroup.go @@ -450,12 +450,11 @@ func (h *TaskGroupHandler) findRefs(ctx *gin.Context, r *TaskGroup) (err error) if r.Priority == 0 { r.Priority = kind.Spec.Priority } - mA, castA := h.AsMap(kind.Spec.Data) - mB, castB := r.Data.(map[string]any) - if castA && castB { - r.Data = h.Merge(mA, mB) - } else { - r.Data = mA + data := model.Data{Any: r.Data} + other := model.Data{Any: kind.Data()} + merged := data.Merge(other) + if !merged { + r.Data = other.Any } } return @@ -472,14 +471,9 @@ func (h *TaskGroupHandler) Propagate(m *model.TaskGroup) (err error) { task.Policy = m.Policy task.State = m.State task.SetBucket(m.BucketID) - if m.Data.Any != nil { - mA, castA := m.Data.Any.(map[string]any) - mB, castB := task.Data.Any.(map[string]any) - if castA && castB { - task.Data.Any = h.Merge(mA, mB) - } else { - task.Data.Any = m.Data - } + merged := task.Data.Merge(m.Data) + if !merged { + task.Data = m.Data } } diff --git a/cmd/main.go b/cmd/main.go index 0c3d3559..7ceff82c 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -124,10 +124,6 @@ func main() { return } }() - err = Settings.FindDiscoveryTasks() - if err != nil { - return - } } // // k8s client. diff --git a/importer/manager.go b/importer/manager.go index e8856751..b11f45e4 100644 --- a/importer/manager.go +++ b/importer/manager.go @@ -14,6 +14,7 @@ import ( "github.com/konveyor/tackle2-hub/model" "github.com/konveyor/tackle2-hub/settings" tasking "github.com/konveyor/tackle2-hub/task" + "github.com/konveyor/tackle2-hub/trigger" "gorm.io/gorm" k8sclient "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -344,38 +345,24 @@ func (m *Manager) createApplication(imp *model.Import) (ok bool) { return } // best effort - err := m.discover(app) + tr := trigger.Application{ + Trigger: trigger.Trigger{ + TaskManager: m.TaskManager, + Client: m.Client, + DB: m.DB, + }, + } + err := tr.Created(app) if err != nil { - imp.ErrorMessage = fmt.Sprintf("Failed to launch discovery tasks for Application '%s'", app.Name) - return + imp.ErrorMessage = fmt.Sprintf( + "Failed to launch discovery tasks for Application '%s'.", + app.Name) } ok = true return } -func (m *Manager) discover(application *model.Application) (err error) { - for _, kind := range Settings.Hub.Discovery.Tasks { - t := api.Task{} - t.Kind = kind - t.Name = fmt.Sprintf("%s-%s", application.Name, kind) - ref := api.Ref{ID: application.ID} - t.Application = &ref - t.State = tasking.Ready - taskHandler := api.TaskHandler{} - err = taskHandler.FindRefs(m.Client, &t) - if err != nil { - return - } - task := tasking.Task{Task: t.Model()} - err = m.TaskManager.Create(m.DB, &task) - if err != nil { - return - } - } - return -} - func (m *Manager) createStakeholder(name string, email string) (stakeholder model.Stakeholder, err error) { stakeholder.Name = name stakeholder.Email = email diff --git a/k8s/api/tackle/v1alpha2/task.go b/k8s/api/tackle/v1alpha2/task.go index 89261c9a..2e5180dc 100644 --- a/k8s/api/tackle/v1alpha2/task.go +++ b/k8s/api/tackle/v1alpha2/task.go @@ -17,6 +17,8 @@ limitations under the License. package v1alpha2 import ( + "encoding/json" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ) @@ -65,6 +67,16 @@ func (r *Task) HasDep(name string) (found bool) { return } +// Data returns the task Data as map[string]any. +func (r *Task) Data() (mp map[string]any) { + b := r.Spec.Data.Raw + if b == nil { + return + } + _ = json.Unmarshal(b, &mp) + return +} + // TaskList is a list of Task. // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object type TaskList struct { diff --git a/migration/json/fields.go b/migration/json/fields.go new file mode 100644 index 00000000..926aa40b --- /dev/null +++ b/migration/json/fields.go @@ -0,0 +1,86 @@ +package json + +import "gopkg.in/yaml.v2" + +// Ref represents a FK. +type Ref struct { + ID uint `json:"id" binding:"required"` + Name string `json:"name,omitempty" yaml:",omitempty"` +} + +// Map alias. +type Map = map[string]any + +// Any alias. +type Any any + +// Data json any field. +type Data struct { + Any +} + +// Merge merges the other into self. +// Both must be a map. +func (d *Data) Merge(other Data) (merged bool) { + b, isMap := d.AsMap() + if !isMap { + return + } + a, isMap := other.AsMap() + if !isMap { + return + } + d.Any = d.merge(a, b) + merged = true + return +} + +// Merge maps B into A. +// The B map takes precedence. +func (d *Data) merge(a, b map[any]any) (out map[any]any) { + if a == nil { + a = make(map[any]any) + } + if b == nil { + b = make(map[any]any) + } + out = make(map[any]any) + for k, v := range a { + out[k] = v + if bv, found := b[k]; found { + out[k] = bv + if av, cast := v.(map[any]any); cast { + if bv, cast := bv.(map[any]any); cast { + out[k] = d.merge(av, bv) + } else { + out[k] = bv + } + } + } + } + for k, v := range b { + if _, found := a[k]; !found { + out[k] = v + } + } + + return +} + +// AsMap returns self as a map. +func (d *Data) AsMap() (mp map[any]any, isMap bool) { + if d.Any == nil { + return + } + b, err := yaml.Marshal(d.Any) + if err != nil { + return + } + mp = make(map[any]any) + err = yaml.Unmarshal(b, &mp) + if err != nil { + return + } + isMap = true + return +} diff --git a/migration/json/pkg.go b/migration/json/pkg.go new file mode 100644 index 00000000..74ef4ae6 --- /dev/null +++ b/migration/json/pkg.go @@ -0,0 +1,6 @@ +package json + +import "encoding/json" + +var Unmarshal = json.Unmarshal +var Marshal = json.Marshal diff --git a/migration/v14/model/core.go b/migration/v14/model/core.go index 3af77c81..0a7541dd 100644 --- a/migration/v14/model/core.go +++ b/migration/v14/model/core.go @@ -1,7 +1,6 @@ package model import ( - "encoding/json" "os" "path" "time" @@ -9,6 +8,7 @@ import ( "github.com/google/uuid" liberr "github.com/jortel/go-utils/error" "github.com/konveyor/tackle2-hub/encryption" + "github.com/konveyor/tackle2-hub/migration/json" "gorm.io/gorm" ) @@ -126,7 +126,7 @@ type Task struct { Priority int Policy TaskPolicy `gorm:"type:json;serializer:json"` TTL TTL `gorm:"type:json;serializer:json"` - Data Data `gorm:"type:json;serializer:json"` + Data json.Data `gorm:"type:json;serializer:json"` Started *time.Time Terminated *time.Time Errors []TaskError `gorm:"type:json;serializer:json"` @@ -146,40 +146,6 @@ func (m *Task) BeforeCreate(db *gorm.DB) (err error) { return } -// TaskEvent task event. -type TaskEvent struct { - Kind string `json:"kind"` - Count int `json:"count"` - Reason string `json:"reason,omitempty" yaml:",omitempty"` - Last time.Time `json:"last"` -} - -// Map alias. -type Map = map[string]any - -// Any alias. -type Any any - -// Data json any field. -type Data struct { - Any -} - -// TTL time-to-live. -type TTL struct { - Created int `json:"created,omitempty" yaml:",omitempty"` - Pending int `json:"pending,omitempty" yaml:",omitempty"` - Running int `json:"running,omitempty" yaml:",omitempty"` - Succeeded int `json:"succeeded,omitempty" yaml:",omitempty"` - Failed int `json:"failed,omitempty" yaml:",omitempty"` -} - -// Ref represents a FK. -type Ref struct { - ID uint `json:"id" binding:"required"` - Name string `json:"name,omitempty" yaml:",omitempty"` -} - // TaskError used in Task.Errors. type TaskError struct { Severity string `json:"severity"` @@ -208,7 +174,7 @@ type TaskReport struct { Activity []string `gorm:"type:json;serializer:json"` Errors []TaskError `gorm:"type:json;serializer:json"` Attached []Attachment `gorm:"type:json;serializer:json" ref:"[]file"` - Result Data `gorm:"type:json;serializer:json"` + Result json.Data `gorm:"type:json;serializer:json"` TaskID uint `gorm:"<-:create;uniqueIndex"` Task *Task } @@ -223,7 +189,7 @@ type TaskGroup struct { State string Priority int Policy TaskPolicy `gorm:"type:json;serializer:json"` - Data Data `gorm:"type:json;serializer:json"` + Data json.Data `gorm:"type:json;serializer:json"` List []Task `gorm:"type:json;serializer:json"` Tasks []Task `gorm:"constraint:OnDelete:CASCADE"` } @@ -318,3 +284,24 @@ func (r *Identity) Decrypt() (err error) { } return } + +// +// JSON Fields. +// + +// TaskEvent task event. +type TaskEvent struct { + Kind string `json:"kind"` + Count int `json:"count"` + Reason string `json:"reason,omitempty" yaml:",omitempty"` + Last time.Time `json:"last"` +} + +// TTL time-to-live. +type TTL struct { + Created int `json:"created,omitempty" yaml:",omitempty"` + Pending int `json:"pending,omitempty" yaml:",omitempty"` + Running int `json:"running,omitempty" yaml:",omitempty"` + Succeeded int `json:"succeeded,omitempty" yaml:",omitempty"` + Failed int `json:"failed,omitempty" yaml:",omitempty"` +} diff --git a/model/pkg.go b/model/pkg.go index 6171a917..c9f799c8 100644 --- a/model/pkg.go +++ b/model/pkg.go @@ -1,6 +1,7 @@ package model import ( + "github.com/konveyor/tackle2-hub/migration/json" "github.com/konveyor/tackle2-hub/migration/v14/model" ) @@ -47,9 +48,9 @@ type Ticket = model.Ticket type Tracker = model.Tracker type TTL = model.TTL -type Ref = model.Ref -type Map = model.Map -type Data = model.Data +type Ref = json.Ref +type Map = json.Map +type Data = json.Data type TaskError = model.TaskError type TaskEvent = model.TaskEvent diff --git a/settings/hub.go b/settings/hub.go index e903c476..192acfec 100644 --- a/settings/hub.go +++ b/settings/hub.go @@ -1,22 +1,9 @@ package settings import ( - "context" "os" "strconv" "time" - - liberr "github.com/jortel/go-utils/error" - crd "github.com/konveyor/tackle2-hub/k8s/api/tackle/v1alpha2" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/selection" - "k8s.io/client-go/kubernetes/scheme" - k8sclient "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/client/config" -) - -const ( - DiscoveryLabel = "konveyor.io/discovery" ) const ( @@ -48,6 +35,7 @@ const ( EnvAnalysisReportPath = "ANALYSIS_REPORT_PATH" EnvAnalysisArchiverEnabled = "ANALYSIS_ARCHIVER_ENABLED" EnvDiscoveryEnabled = "DISCOVERY_ENABLED" + EnvDiscoveryLabel = "DISCOVERY_LABEL" ) type Hub struct { @@ -114,9 +102,10 @@ type Hub struct { ReportPath string ArchiverEnabled bool } + // Discovery settings. Discovery struct { Enabled bool - Tasks []string + Label string } } @@ -276,55 +265,20 @@ func (r *Hub) Load() (err error) { } else { r.Analysis.ArchiverEnabled = true } - - if !r.Disconnected { - s, found = os.LookupEnv(EnvDiscoveryEnabled) - if found { - b, _ := strconv.ParseBool(s) - r.Discovery.Enabled = b - } else { - r.Discovery.Enabled = true - } - } - - return -} - -// FindDiscoveryTasks by their label. -func (r *Hub) FindDiscoveryTasks() (err error) { - if !r.Discovery.Enabled { - return - } - cfg, _ := config.GetConfig() - client, err := k8sclient.New( - cfg, - k8sclient.Options{ - Scheme: scheme.Scheme, - }) - if err != nil { - err = liberr.Wrap(err) - return - } - selector := labels.NewSelector() - req, _ := labels.NewRequirement(DiscoveryLabel, selection.Exists, []string{}) - selector = selector.Add(*req) - options := &k8sclient.ListOptions{ - Namespace: Settings.Namespace, - LabelSelector: selector, - } - list := crd.TaskList{} - err = client.List( - context.TODO(), - &list, - options) - if err != nil { - err = liberr.Wrap(err) - return + s, found = os.LookupEnv(EnvDiscoveryEnabled) + if found { + b, _ := strconv.ParseBool(s) + r.Discovery.Enabled = !r.Disconnected && b + } else { + r.Discovery.Enabled = !r.Disconnected } - for i := range list.Items { - t := &list.Items[i] - r.Discovery.Tasks = append(r.Discovery.Tasks, t.Name) + s, found = os.LookupEnv(EnvDiscoveryLabel) + if found { + r.Discovery.Label = s + } else { + r.Discovery.Label = "konveyor.io/discovery" } + return } diff --git a/task/error.go b/task/error.go index 42b7320f..b6786c86 100644 --- a/task/error.go +++ b/task/error.go @@ -24,20 +24,6 @@ func (e *BadRequest) Is(err error) (matched bool) { return } -// ActionTimeout report an action timeout. -type ActionTimeout struct { -} - -func (e *ActionTimeout) Error() string { - return "Requested (asynchronous) action timed out." -} - -func (e *ActionTimeout) Is(err error) (matched bool) { - var inst *ActionTimeout - matched = errors.As(err, &inst) - return -} - // SoftErr returns true when the error isA SoftError. func SoftErr(err error) (matched, retry bool) { if err == nil { diff --git a/task/manager.go b/task/manager.go index 855ef9bd..4f6398fc 100644 --- a/task/manager.go +++ b/task/manager.go @@ -130,6 +130,10 @@ func (m *Manager) Run(ctx context.Context) { // Create a task. func (m *Manager) Create(db *gorm.DB, requested *Task) (err error) { + err = m.findRefs(requested) + if err != nil { + return + } task := &Task{&model.Task{}} switch requested.State { case "": @@ -167,77 +171,80 @@ func (m *Manager) Create(db *gorm.DB, requested *Task) (err error) { // Update update task. func (m *Manager) Update(db *gorm.DB, requested *Task) (err error) { - err = m.action(func() (err error) { - task := &Task{} - err = db.First(task, requested.ID).Error - if err != nil { - return - } - switch task.State { - case Created, - Ready: - task.UpdateUser = requested.UpdateUser - task.Name = requested.Name - task.Kind = requested.Kind - task.Addon = requested.Addon - task.Extensions = requested.Extensions - task.State = requested.State - task.Locator = requested.Locator - task.Priority = requested.Priority - task.Policy = requested.Policy - task.TTL = requested.TTL - task.Data = requested.Data - task.ApplicationID = requested.ApplicationID - case Pending, - QuotaBlocked, - Postponed: - task.UpdateUser = requested.UpdateUser - task.Name = requested.Name - task.Locator = requested.Locator - task.Data = requested.Data - task.Priority = requested.Priority - task.Policy = requested.Policy - task.TTL = requested.TTL - default: - // discarded. - } - err = db.Save(task).Error - if err != nil { - err = liberr.Wrap(err) - return - } + task := &Task{} + err = db.First(task, requested.ID).Error + if err != nil { + return + } + switch task.State { + case Created, + Ready: + task.UpdateUser = requested.UpdateUser + task.Name = requested.Name + task.Kind = requested.Kind + task.Addon = requested.Addon + task.Extensions = requested.Extensions + task.State = requested.State + task.Locator = requested.Locator + task.Priority = requested.Priority + task.Policy = requested.Policy + task.TTL = requested.TTL + task.Data = requested.Data + task.ApplicationID = requested.ApplicationID + case Pending, + QuotaBlocked, + Postponed: + task.UpdateUser = requested.UpdateUser + task.Name = requested.Name + task.Locator = requested.Locator + task.Data = requested.Data + task.Priority = requested.Priority + task.Policy = requested.Policy + task.TTL = requested.TTL + default: + // discarded. + return + } + err = m.findRefs(task) + if err != nil { return - }) + } + err = db.Save(task).Error + if err != nil { + err = liberr.Wrap(err) + return + } return } // Delete a task. func (m *Manager) Delete(db *gorm.DB, id uint) (err error) { - err = m.action(func() (err error) { - task := &Task{} - err = db.First(task, id).Error - if err != nil { - return - } - err = task.Delete(m.Client) - if err != nil { - return - } - err = db.Delete(task).Error + task := &Task{} + err = db.First(task, id).Error + if err != nil { return - }) + } + m.action( + func() (err error) { + err = task.Delete(m.Client) + if err != nil { + return + } + err = db.Delete(task).Error + return + }) return } // Cancel a task. func (m *Manager) Cancel(db *gorm.DB, id uint) (err error) { - err = m.action( + task := &Task{} + err = db.First(task, id).Error + if err != nil { + return + } + m.action( func() (err error) { - task := &Task{} - err = db.First(task, id).Error - if err != nil { - return - } switch task.State { case Succeeded, Failed, @@ -273,29 +280,22 @@ func (m *Manager) pause() { time.Sleep(d) } -// action executes an asynchronous action. -func (m *Manager) action(action func() error) (err error) { - d := time.Minute - ch := make(chan error) +// action enqueues an asynchronous action. +func (m *Manager) action(action func() error) { m.queue <- func() { + var err error defer func() { p := recover() if p != nil { - if err, cast := p.(error); cast { - ch <- err + if pErr, cast := p.(error); cast { + err = pErr } } - close(ch) + if err != nil { + Log.Error(err, "Action failed.") + } }() - select { - case ch <- action(): - default: - } - } - select { - case err = <-ch: - case <-time.After(d): - err = &ActionTimeout{} + err = action() } return } @@ -389,6 +389,49 @@ func (m *Manager) disconnected(list []*Task) (kept []*Task, err error) { return } +// FindRefs find referenced resources. +// - addon +// - extensions +// - kind +// - priority +// The priority is defaulted to the kind as needed. +func (m *Manager) findRefs(task *Task) (err error) { + if Settings.Disconnected { + return + } + if task.Addon != "" { + _, found := m.cluster.addons[task.Addon] + if !found { + err = &AddonNotFound{Name: task.Addon} + return + } + } + for _, name := range task.Extensions { + _, found := m.cluster.extensions[name] + if !found { + err = &ExtensionNotFound{Name: name} + return + } + } + if task.Kind == "" { + return + } + kind, found := m.cluster.tasks[task.Kind] + if !found { + err = &KindNotFound{Name: task.Kind} + return + } + if task.Priority == 0 { + task.Priority = kind.Spec.Priority + } + other := model.Data{Any: kind.Data()} + merged := task.Data.Merge(other) + if !merged { + task.Data = other + } + return +} + // selectAddon selects addon as needed. // The returned list has failed tasks removed. func (m *Manager) selectAddons(list []*Task) (kept []*Task, err error) { diff --git a/test/api/task/api_test.go b/test/api/task/api_test.go index 906568e7..8fdab840 100644 --- a/test/api/task/api_test.go +++ b/test/api/task/api_test.go @@ -2,6 +2,7 @@ package task import ( "testing" + "time" "github.com/konveyor/tackle2-hub/test/assert" ) @@ -73,9 +74,16 @@ func TestTaskCRUD(t *testing.T) { t.Errorf(err.Error()) } - _, err = Task.Get(r.ID) - if err == nil { - t.Errorf("Resource exits, but should be deleted: %v", r) + for i := 5; i >= 0; i-- { + time.Sleep(time.Second) + _, err = Task.Get(r.ID) + if err != nil { + break + } + if i == 0 { + t.Errorf("Resource exits, but should be deleted: %v", r) + break + } } }) } diff --git a/trigger/application.go b/trigger/application.go new file mode 100644 index 00000000..44823f63 --- /dev/null +++ b/trigger/application.go @@ -0,0 +1,42 @@ +package trigger + +import ( + "fmt" + + "github.com/konveyor/tackle2-hub/model" + tasking "github.com/konveyor/tackle2-hub/task" +) + +// Application trigger. +type Application struct { + Trigger +} + +// Created trigger. +func (r *Application) Created(m *model.Application) (err error) { + err = r.Updated(m) + return +} + +// Updated trigger. +func (r *Application) Updated(m *model.Application) (err error) { + if !Settings.Discovery.Enabled { + return + } + kinds, err := r.FindTasks(Settings.Discovery.Label) + if err != nil { + return + } + for _, kind := range kinds { + t := &tasking.Task{Task: &model.Task{}} + t.Kind = kind.Name + t.Name = fmt.Sprintf("%s-%s", m.Name, t.Name) + t.ApplicationID = &m.ID + t.State = tasking.Ready + err = r.TaskManager.Create(r.DB, t) + if err != nil { + return + } + } + return +} diff --git a/trigger/identity.go b/trigger/identity.go new file mode 100644 index 00000000..e8416c6a --- /dev/null +++ b/trigger/identity.go @@ -0,0 +1,28 @@ +package trigger + +import ( + "github.com/konveyor/tackle2-hub/model" +) + +// Identity trigger. +type Identity struct { + Trigger +} + +// Updated model created trigger. +func (r *Identity) Updated(m *model.Identity) (err error) { + tr := Application{ + Trigger: Trigger{ + TaskManager: r.TaskManager, + Client: r.Client, + DB: r.DB, + }, + } + for i := range m.Applications { + err = tr.Updated(&m.Applications[i]) + if err != nil { + return + } + } + return +} diff --git a/trigger/pkg.go b/trigger/pkg.go new file mode 100644 index 00000000..20bd344e --- /dev/null +++ b/trigger/pkg.go @@ -0,0 +1,53 @@ +package trigger + +import ( + "context" + + liberr "github.com/jortel/go-utils/error" + crd "github.com/konveyor/tackle2-hub/k8s/api/tackle/v1alpha2" + "github.com/konveyor/tackle2-hub/settings" + tasking "github.com/konveyor/tackle2-hub/task" + "gorm.io/gorm" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/selection" + k8sclient "sigs.k8s.io/controller-runtime/pkg/client" +) + +var ( + Settings = &settings.Settings +) + +// Trigger supports actions triggered by model changes. +type Trigger struct { + TaskManager *tasking.Manager + Client k8sclient.Client + DB *gorm.DB +} + +// FindTasks returns tasks with the specified label. +func (r *Trigger) FindTasks(label string) (matched []*crd.Task, err error) { + selector := labels.NewSelector() + req, _ := labels.NewRequirement( + label, + selection.Exists, + []string{}) + selector = selector.Add(*req) + options := &k8sclient.ListOptions{ + Namespace: Settings.Namespace, + LabelSelector: selector, + } + list := crd.TaskList{} + err = r.Client.List( + context.TODO(), + &list, + options) + if err != nil { + err = liberr.Wrap(err) + return + } + for i := range list.Items { + t := &list.Items[i] + matched = append(matched, t) + } + return +}