From ff2ba63c3140250b120ecb0cb7155fbcf3fad0b3 Mon Sep 17 00:00:00 2001 From: Jeff Ortel Date: Fri, 14 Jun 2024 17:07:19 -0500 Subject: [PATCH] :sparkles: Add /tasks patch; return 202 for task actions and updates. (#660) Adds patch endpoints for: - /tasks - /taskgroups The Update() methods on both Task and Taskgroup support both PUT and PATCH depending on the http method. They also support delegation from Submit() which mainly sets the state=Ready. Updated to return 202 (accepted) on success. - put /tasks/:id - patch /tasks/:id - put /tasks/:id/cancel BaseHandler.modBody() removed. Add patch support to binding client. Adds API test for task patch. closes: #661 closes: #662 --------- Signed-off-by: Jeff Ortel --- api/base.go | 28 ---------------- api/task.go | 69 ++++++++++++++++++--------------------- api/taskgroup.go | 59 +++++++++++++-------------------- binding/client.go | 57 +++++++++++++++++++++++++++++++- binding/task.go | 7 ++++ task/manager.go | 16 +++------ test/api/task/api_test.go | 28 ++++++++++++++++ 7 files changed, 150 insertions(+), 114 deletions(-) diff --git a/api/base.go b/api/base.go index e85f38f7..b1459493 100644 --- a/api/base.go +++ b/api/base.go @@ -1,7 +1,6 @@ package api import ( - "bytes" "database/sql" "encoding/json" "errors" @@ -99,33 +98,6 @@ func (h *BaseHandler) pk(ctx *gin.Context) (id uint) { return } -// modBody updates the body using the `mod` function. -// 1. read the body. -// 2. mod() -// 3. write body. -func (h *BaseHandler) modBody( - ctx *gin.Context, - r interface{}, - mod func(bool) error) (err error) { - // - withBody := false - if ctx.Request.ContentLength > 0 { - withBody = true - err = h.Bind(ctx, r) - if err != nil { - return - } - } - err = mod(withBody) - if err != nil { - return - } - b, _ := json.Marshal(r) - bfr := bytes.NewBuffer(b) - ctx.Request.Body = io.NopCloser(bfr) - return -} - // CurrentUser gets username from Keycloak auth token. func (h *BaseHandler) CurrentUser(ctx *gin.Context) (user string) { rtx := WithContext(ctx) diff --git a/api/task.go b/api/task.go index d13a0192..95e82761 100644 --- a/api/task.go +++ b/api/task.go @@ -38,7 +38,7 @@ const ( ) const ( - LocatorParam = "locator" + Submit = "submit" ) // TaskHandler handles task routes. @@ -55,10 +55,11 @@ func (h TaskHandler) AddRoutes(e *gin.Engine) { routeGroup.POST(TasksRoot, h.Create) routeGroup.GET(TaskRoot, h.Get) routeGroup.PUT(TaskRoot, h.Update) + routeGroup.PATCH(TaskRoot, Transaction, h.Update) routeGroup.DELETE(TaskRoot, h.Delete) routeGroup.GET(TasksReportQueueRoot, h.Queued) // Actions - routeGroup.PUT(TaskSubmitRoot, h.Submit, h.Update) + routeGroup.PUT(TaskSubmitRoot, Transaction, h.Submit) routeGroup.PUT(TaskCancelRoot, h.Cancel) // Bucket routeGroup = e.Group("/") @@ -335,73 +336,66 @@ func (h TaskHandler) Delete(ctx *gin.Context) { // @description Update a task. // @tags tasks // @accept json -// @success 204 +// @success 202 // @router /tasks/{id} [put] // @param id path int true "Task ID" // @param task body Task true "Task data" func (h TaskHandler) Update(ctx *gin.Context) { id := h.pk(ctx) + m := &model.Task{} + err := h.DB(ctx).First(m, id).Error + if err != nil { + _ = ctx.Error(err) + return + } r := &Task{} - err := h.Bind(ctx, r) + if ctx.Request.Method == http.MethodPatch && + ctx.Request.ContentLength > 0 { + r.With(m) + } + err = h.Bind(ctx, r) if err != nil { + _ = ctx.Error(err) return } - r.ID = id + if _, found := ctx.Get(Submit); found { + r.State = tasking.Ready + } + m = r.Model() + m.ID = id + m.UpdateUser = h.CurrentUser(ctx) rtx := WithContext(ctx) task := &tasking.Task{} - task.With(r.Model()) - task.UpdateUser = h.BaseHandler.CurrentUser(ctx) + task.With(m) err = rtx.TaskManager.Update(h.DB(ctx), task) if err != nil { _ = ctx.Error(err) return } - h.Status(ctx, http.StatusNoContent) + h.Status(ctx, http.StatusAccepted) } // Submit godoc // @summary Submit a task. -// @description Submit a task. +// @description Patch and submit a task. // @tags tasks // @accept json -// @success 204 +// @success 202 // @router /tasks/{id}/submit [put] // @param id path int true "Task ID" // @param task body Task false "Task data (optional)" func (h TaskHandler) Submit(ctx *gin.Context) { - id := h.pk(ctx) - r := &Task{} - err := h.findRefs(ctx, r) - if err != nil { - _ = ctx.Error(err) - return - } - mod := func(withBody bool) (err error) { - if !withBody { - m := r.Model() - err = h.DB(ctx).First(m, id).Error - if err != nil { - return - } - r.With(m) - } - r.State = tasking.Ready - return - } - err = h.modBody(ctx, r, mod) - if err != nil { - _ = ctx.Error(err) - return - } - ctx.Next() + ctx.Set(Submit, true) + ctx.Request.Method = http.MethodPatch + h.Update(ctx) } // Cancel godoc // @summary Cancel a task. // @description Cancel a task. // @tags tasks -// @success 204 +// @success 202 // @router /tasks/{id}/cancel [put] // @param id path int true "Task ID" func (h TaskHandler) Cancel(ctx *gin.Context) { @@ -413,7 +407,7 @@ func (h TaskHandler) Cancel(ctx *gin.Context) { return } - h.Status(ctx, http.StatusNoContent) + h.Status(ctx, http.StatusAccepted) } // BucketGet godoc @@ -735,7 +729,6 @@ type Task struct { TTL TTL `json:"ttl,omitempty" yaml:",omitempty"` Data any `json:"data,omitempty" yaml:",omitempty"` Application *Ref `json:"application,omitempty" yaml:",omitempty"` - Actions []string `json:"actions,omitempty" yaml:",omitempty"` Bucket *Ref `json:"bucket,omitempty" yaml:",omitempty"` Pod string `json:"pod,omitempty" yaml:",omitempty"` Retries int `json:"retries,omitempty" yaml:",omitempty"` diff --git a/api/taskgroup.go b/api/taskgroup.go index bd458a90..2432b8b8 100644 --- a/api/taskgroup.go +++ b/api/taskgroup.go @@ -35,8 +35,9 @@ func (h TaskGroupHandler) AddRoutes(e *gin.Engine) { routeGroup.GET(TaskGroupsRoot+"/", h.List) routeGroup.POST(TaskGroupsRoot, h.Create) routeGroup.PUT(TaskGroupRoot, h.Update) + routeGroup.PATCH(TaskGroupRoot, Transaction, h.Update) routeGroup.GET(TaskGroupRoot, h.Get) - routeGroup.PUT(TaskGroupSubmitRoot, h.Submit, h.Update) + routeGroup.PUT(TaskGroupSubmitRoot, Transaction, h.Submit) routeGroup.DELETE(TaskGroupRoot, h.Delete) // Bucket routeGroup = e.Group("/") @@ -175,18 +176,25 @@ func (h TaskGroupHandler) Create(ctx *gin.Context) { // @param task body TaskGroup true "Task data" func (h TaskGroupHandler) Update(ctx *gin.Context) { id := h.pk(ctx) - updated := &TaskGroup{} - err := h.Bind(ctx, updated) + m := &model.TaskGroup{} + err := h.DB(ctx).First(m, id).Error if err != nil { + _ = ctx.Error(err) return } - current := &model.TaskGroup{} - err = h.DB(ctx).First(current, id).Error + r := &TaskGroup{} + if ctx.Request.Method == http.MethodPatch && + ctx.Request.ContentLength > 0 { + r.With(m) + } + err = h.Bind(ctx, r) if err != nil { - _ = ctx.Error(err) return } - err = h.findRefs(ctx, updated) + if _, found := ctx.Get(Submit); found { + r.State = tasking.Ready + } + err = h.findRefs(ctx, r) if err != nil { _ = ctx.Error(err) return @@ -196,10 +204,10 @@ func (h TaskGroupHandler) Update(ctx *gin.Context) { clause.Associations, "BucketID", "Bucket") - m := updated.Model() + m = r.Model() m.ID = id - m.UpdateUser = h.BaseHandler.CurrentUser(ctx) - switch updated.State { + m.UpdateUser = h.CurrentUser(ctx) + switch m.State { case "", tasking.Created: err = db.Save(m).Error if err != nil { @@ -230,6 +238,7 @@ func (h TaskGroupHandler) Update(ctx *gin.Context) { for i := range m.Tasks { task := &tasking.Task{} task.With(&m.Tasks[i]) + task.CreateUser = h.CurrentUser(ctx) err = rtx.TaskManager.Create(h.DB(ctx), task) if err != nil { _ = ctx.Error(err) @@ -284,7 +293,7 @@ func (h TaskGroupHandler) Delete(ctx *gin.Context) { // Submit godoc // @summary Submit a task group. -// @description Submit a task group. +// @description Patch and submit a task group. // @tags taskgroups // @accept json // @success 204 @@ -292,31 +301,9 @@ func (h TaskGroupHandler) Delete(ctx *gin.Context) { // @param id path int true "TaskGroup ID" // @param taskgroup body TaskGroup false "TaskGroup data (optional)" func (h TaskGroupHandler) Submit(ctx *gin.Context) { - id := h.pk(ctx) - r := &TaskGroup{} - err := h.findRefs(ctx, r) - if err != nil { - _ = ctx.Error(err) - return - } - mod := func(withBody bool) (err error) { - if !withBody { - m := r.Model() - err = h.DB(ctx).First(m, id).Error - if err != nil { - return - } - r.With(m) - } - r.State = tasking.Ready - return - } - err = h.modBody(ctx, r, mod) - if err != nil { - _ = ctx.Error(err) - return - } - ctx.Next() + ctx.Set(Submit, true) + ctx.Request.Method = http.MethodPatch + h.Update(ctx) } // BucketGet godoc diff --git a/binding/client.go b/binding/client.go index 93ce4a43..354c1253 100644 --- a/binding/client.go +++ b/binding/client.go @@ -172,6 +172,8 @@ func (r *Client) Post(path string, object interface{}) (err error) { } status := response.StatusCode switch status { + case http.StatusAccepted: + case http.StatusNoContent: case http.StatusOK, http.StatusCreated: var body []byte @@ -185,7 +187,6 @@ func (r *Client) Post(path string, object interface{}) (err error) { err = liberr.Wrap(err) return } - case http.StatusNoContent: default: err = r.restError(response) } @@ -223,6 +224,60 @@ func (r *Client) Put(path string, object interface{}, params ...Param) (err erro } status := response.StatusCode switch status { + case http.StatusAccepted: + case http.StatusNoContent: + case http.StatusOK, + http.StatusCreated: + var body []byte + body, err = io.ReadAll(response.Body) + if err != nil { + err = liberr.Wrap(err) + return + } + err = json.Unmarshal(body, object) + if err != nil { + err = liberr.Wrap(err) + return + } + default: + err = r.restError(response) + } + + return +} + +// Patch a resource. +func (r *Client) Patch(path string, object interface{}, params ...Param) (err error) { + request := func() (request *http.Request, err error) { + bfr, err := json.Marshal(object) + if err != nil { + err = liberr.Wrap(err) + return + } + reader := bytes.NewReader(bfr) + request = &http.Request{ + Header: http.Header{}, + Method: http.MethodPatch, + Body: io.NopCloser(reader), + URL: r.join(path), + } + request.Header.Set(api.Accept, binding.MIMEJSON) + if len(params) > 0 { + q := request.URL.Query() + for _, p := range params { + q.Add(p.Key, p.Value) + } + request.URL.RawQuery = q.Encode() + } + return + } + response, err := r.send(request) + if err != nil { + return + } + status := response.StatusCode + switch status { + case http.StatusAccepted: case http.StatusNoContent: case http.StatusOK, http.StatusCreated: diff --git a/binding/task.go b/binding/task.go index 8cd08fb0..623bd2bc 100644 --- a/binding/task.go +++ b/binding/task.go @@ -37,6 +37,13 @@ func (h *Task) Update(r *api.Task) (err error) { return } +// Patch a Task. +func (h *Task) Patch(id uint, r any) (err error) { + path := Path(api.TaskRoot).Inject(Params{api.ID: id}) + err = h.client.Patch(path, r) + return +} + // Delete a Task. func (h *Task) Delete(id uint) (err error) { err = h.client.Delete(Path(api.TaskRoot).Inject(Params{api.ID: id})) diff --git a/task/manager.go b/task/manager.go index 45965743..a2c0c9bd 100644 --- a/task/manager.go +++ b/task/manager.go @@ -150,6 +150,7 @@ func (m *Manager) Create(db *gorm.DB, requested *Task) (err error) { task.TTL = requested.TTL task.Data = requested.Data task.ApplicationID = requested.ApplicationID + task.BucketID = requested.BucketID default: err = &BadRequest{ Reason: "state must be (Created|Ready)", @@ -193,18 +194,13 @@ func (m *Manager) Update(db *gorm.DB, requested *Task) (err error) { Postponed: task.UpdateUser = requested.UpdateUser task.Name = requested.Name + task.Locator = requested.Locator task.Data = requested.Data task.Priority = requested.Priority task.Policy = requested.Policy task.TTL = requested.TTL - case Running, - Succeeded, - Failed, - Canceled: - err = &BadRequest{ - Reason: "state must not be (Running|Succeeded|Failed|Canceled)", - } - return + default: + // discarded. } err = db.Save(task).Error if err != nil { @@ -247,9 +243,7 @@ func (m *Manager) Cancel(db *gorm.DB, id uint) (err error) { case Succeeded, Failed, Canceled: - err = &BadRequest{ - Reason: "state must not be (Succeeded|Failed|Canceled)", - } + // discarded. return default: } diff --git a/test/api/task/api_test.go b/test/api/task/api_test.go index 526d550a..906568e7 100644 --- a/test/api/task/api_test.go +++ b/test/api/task/api_test.go @@ -39,6 +39,34 @@ func TestTaskCRUD(t *testing.T) { t.Errorf("Different response error. Got %s, expected %s", got.Name, r.Name) } + // patch. + type TaskPatch struct { + Name string `json:"name"` + Policy struct { + PreemptEnabled bool `json:"preemptEnabled"` + } + } + p := &TaskPatch{} + p.Name = "patched " + r.Name + p.Policy.PreemptEnabled = true + err = Task.Patch(r.ID, p) + if err != nil { + t.Errorf(err.Error()) + } + got, err = Task.Get(r.ID) + if err != nil { + t.Errorf(err.Error()) + } + if got.Name != p.Name { + t.Errorf("Different response error. Got %s, expected %s", got.Name, p.Name) + } + if got.Policy.PreemptEnabled != p.Policy.PreemptEnabled { + t.Errorf( + "Different response error. Got %v, expected %v", + got.Policy.PreemptEnabled, + p.Policy.PreemptEnabled) + } + // Delete. err = Task.Delete(r.ID) if err != nil {