Skip to content

Commit

Permalink
Fix deadlock.
Browse files Browse the repository at this point in the history
Signed-off-by: Jeff Ortel <jortel@redhat.com>
  • Loading branch information
jortel committed Jun 25, 2024
1 parent ed6be10 commit 175e781
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 133 deletions.
6 changes: 3 additions & 3 deletions api/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func (h TaskHandler) Delete(ctx *gin.Context) {
// @description Update a task.
// @tags tasks
// @accept json
// @success 202
// @success 200
// @router /tasks/{id} [put]
// @param id path int true "Task ID"
// @param task body Task true "Task data"
Expand Down Expand Up @@ -364,15 +364,15 @@ func (h TaskHandler) Update(ctx *gin.Context) {
return
}

h.Status(ctx, http.StatusAccepted)
h.Status(ctx, http.StatusOK)
}

// Submit godoc
// @summary Submit a task.
// @description Patch and submit a task.
// @tags tasks
// @accept json
// @success 202
// @success 200
// @router /tasks/{id}/submit [put]
// @param id path int true "Task ID"
// @param task body Task false "Task data (optional)"
Expand Down
14 changes: 0 additions & 14 deletions task/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,6 @@ func (e *BadRequest) Is(err error) (matched bool) {
return
}

// ActionTimeout report an action timeout.
type ActionTimeout struct {
}

func (e *ActionTimeout) Error() string {
return "Requested (asynchronous) action timed out."
}

func (e *ActionTimeout) Is(err error) (matched bool) {
var inst *ActionTimeout
matched = errors.As(err, &inst)
return
}

// SoftErr returns true when the error isA SoftError.
func SoftErr(err error) (matched, retry bool) {
if err == nil {
Expand Down
220 changes: 104 additions & 116 deletions task/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,126 +130,121 @@ func (m *Manager) Run(ctx context.Context) {

// Create a task.
func (m *Manager) Create(db *gorm.DB, requested *Task) (err error) {
err = m.action(func() (err error) {
err = m.findRefs(requested)
if err != nil {
return
}
task := &Task{&model.Task{}}
switch requested.State {
case "":
requested.State = Created
fallthrough
case Created,
Ready:
task.CreateUser = requested.CreateUser
task.Name = requested.Name
task.Kind = requested.Kind
task.Addon = requested.Addon
task.Extensions = requested.Extensions
task.State = requested.State
task.Locator = requested.Locator
task.Priority = requested.Priority
task.Policy = requested.Policy
task.TTL = requested.TTL
task.Data = requested.Data
task.ApplicationID = requested.ApplicationID
task.BucketID = requested.BucketID
default:
err = &BadRequest{
Reason: "state must be (Created|Ready)",
}
return
}
err = db.Create(task).Error
if err != nil {
err = liberr.Wrap(err)
return
err = m.findRefs(requested)
if err != nil {
return
}
task := &Task{&model.Task{}}
switch requested.State {
case "":
requested.State = Created
fallthrough
case Created,
Ready:
task.CreateUser = requested.CreateUser
task.Name = requested.Name
task.Kind = requested.Kind
task.Addon = requested.Addon
task.Extensions = requested.Extensions
task.State = requested.State
task.Locator = requested.Locator
task.Priority = requested.Priority
task.Policy = requested.Policy
task.TTL = requested.TTL
task.Data = requested.Data
task.ApplicationID = requested.ApplicationID
task.BucketID = requested.BucketID
default:
err = &BadRequest{
Reason: "state must be (Created|Ready)",
}
requested.Task = task.Task
return
})
}
err = db.Create(task).Error
if err != nil {
err = liberr.Wrap(err)
return
}
requested.Task = task.Task
return
}

// Update update task.
func (m *Manager) Update(db *gorm.DB, requested *Task) (err error) {
err = m.action(func() (err error) {
task := &Task{}
err = db.First(task, requested.ID).Error
if err != nil {
return
}
switch task.State {
case Created,
Ready:
task.UpdateUser = requested.UpdateUser
task.Name = requested.Name
task.Kind = requested.Kind
task.Addon = requested.Addon
task.Extensions = requested.Extensions
task.State = requested.State
task.Locator = requested.Locator
task.Priority = requested.Priority
task.Policy = requested.Policy
task.TTL = requested.TTL
task.Data = requested.Data
task.ApplicationID = requested.ApplicationID
case Pending,
QuotaBlocked,
Postponed:
task.UpdateUser = requested.UpdateUser
task.Name = requested.Name
task.Locator = requested.Locator
task.Data = requested.Data
task.Priority = requested.Priority
task.Policy = requested.Policy
task.TTL = requested.TTL
default:
// discarded.
return
}
err = m.findRefs(task)
if err != nil {
return
}
err = db.Save(task).Error
if err != nil {
err = liberr.Wrap(err)
return
}
task := &Task{}
err = db.First(task, requested.ID).Error
if err != nil {
return
}
switch task.State {
case Created,
Ready:
task.UpdateUser = requested.UpdateUser
task.Name = requested.Name
task.Kind = requested.Kind
task.Addon = requested.Addon
task.Extensions = requested.Extensions
task.State = requested.State
task.Locator = requested.Locator
task.Priority = requested.Priority
task.Policy = requested.Policy
task.TTL = requested.TTL
task.Data = requested.Data
task.ApplicationID = requested.ApplicationID
case Pending,
QuotaBlocked,
Postponed:
task.UpdateUser = requested.UpdateUser
task.Name = requested.Name
task.Locator = requested.Locator
task.Data = requested.Data
task.Priority = requested.Priority
task.Policy = requested.Policy
task.TTL = requested.TTL
default:
// discarded.
return
})
}
err = m.findRefs(task)
if err != nil {
return
}
err = db.Save(task).Error
if err != nil {
err = liberr.Wrap(err)
return
}
return
}

// Delete a task.
func (m *Manager) Delete(db *gorm.DB, id uint) (err error) {
err = m.action(func() (err error) {
task := &Task{}
err = db.First(task, id).Error
if err != nil {
return
}
err = task.Delete(m.Client)
if err != nil {
return
}
err = db.Delete(task).Error
task := &Task{}
err = db.First(task, id).Error
if err != nil {
return
})
}
m.action(
func() (err error) {
err = task.Delete(m.Client)
if err != nil {
return
}
err = db.Delete(task).Error
return
})
return
}

// Cancel a task.
func (m *Manager) Cancel(db *gorm.DB, id uint) (err error) {
err = m.action(
task := &Task{}
err = db.First(task, id).Error
if err != nil {
return
}
m.action(
func() (err error) {
task := &Task{}
err = db.First(task, id).Error
if err != nil {
return
}
switch task.State {
case Succeeded,
Failed,
Expand Down Expand Up @@ -285,29 +280,22 @@ func (m *Manager) pause() {
time.Sleep(d)
}

// action executes an asynchronous action.
func (m *Manager) action(action func() error) (err error) {
d := time.Minute
ch := make(chan error)
// action enqueues an asynchronous action.
func (m *Manager) action(action func() error) {
m.queue <- func() {
var err error
defer func() {
p := recover()
if p != nil {
if err, cast := p.(error); cast {
ch <- err
if pErr, cast := p.(error); cast {
err = pErr
}
}
close(ch)
if err != nil {
Log.Error(err, "Action failed.")
}
}()
select {
case ch <- action():
default:
}
}
select {
case err = <-ch:
case <-time.After(d):
err = &ActionTimeout{}
err = action()
}
return
}
Expand Down
1 change: 1 addition & 0 deletions trigger/pkg.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ var (
Settings = &settings.Settings
)

// Trigger supports actions triggered by model changes.
type Trigger struct {
}

Expand Down

0 comments on commit 175e781

Please sign in to comment.