Skip to content

Commit

Permalink
Implement auto-cancellation of concurrent jobs if the event is push (g…
Browse files Browse the repository at this point in the history
…o-gitea#25716)

- cancel running jobs if the event is push
- Add a new function `CancelRunningJobs` to cancel all running jobs of a
run
- Update `FindRunOptions` struct to include `Ref` field and update its
condition in `toConds` function
- Implement auto cancellation of running jobs in the same workflow in
`notify` function

related task: go-gitea#22751

---------

Signed-off-by: Bo-Yi Wu <appleboy.tw@gmail.com>
Signed-off-by: appleboy <appleboy.tw@gmail.com>
Co-authored-by: Jason Song <i@wolfogre.com>
Co-authored-by: delvh <dev.lh@web.de>
  • Loading branch information
3 people committed Jul 25, 2023
1 parent 5db640a commit 44781f9
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 19 deletions.
69 changes: 68 additions & 1 deletion models/actions/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type ActionRun struct {
Index int64 `xorm:"index unique(repo_index)"` // a unique number for each run of a repository
TriggerUserID int64 `xorm:"index"`
TriggerUser *user_model.User `xorm:"-"`
Ref string
Ref string `xorm:"index"` // the commit/tag/… that caused the run
CommitSHA string
IsForkPullRequest bool // If this is triggered by a PR from a forked repository or an untrusted user, we need to check if it is approved and limit permissions when running the workflow.
NeedApproval bool // may need approval if it's a fork pull request
Expand Down Expand Up @@ -164,6 +164,73 @@ func updateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) err
return err
}

// CancelRunningJobs cancels all running and waiting jobs associated with a specific workflow.
func CancelRunningJobs(ctx context.Context, repoID int64, ref, workflowID string) error {
// Find all runs in the specified repository, reference, and workflow with statuses 'Running' or 'Waiting'.
runs, total, err := FindRuns(ctx, FindRunOptions{
RepoID: repoID,
Ref: ref,
WorkflowID: workflowID,
Status: []Status{StatusRunning, StatusWaiting},
})
if err != nil {
return err
}

// If there are no runs found, there's no need to proceed with cancellation, so return nil.
if total == 0 {
return nil
}

// Iterate over each found run and cancel its associated jobs.
for _, run := range runs {
// Find all jobs associated with the current run.
jobs, _, err := FindRunJobs(ctx, FindRunJobOptions{
RunID: run.ID,
})
if err != nil {
return err
}

// Iterate over each job and attempt to cancel it.
for _, job := range jobs {
// Skip jobs that are already in a terminal state (completed, cancelled, etc.).
status := job.Status
if status.IsDone() {
continue
}

// If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
if job.TaskID == 0 {
job.Status = StatusCancelled
job.Stopped = timeutil.TimeStampNow()

// Update the job's status and stopped time in the database.
n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
if err != nil {
return err
}

// If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
if n == 0 {
return fmt.Errorf("job has changed, try again")
}

// Continue with the next job.
continue
}

// If the job has an associated task, try to stop the task, effectively cancelling the job.
if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil {
return err
}
}
}

// Return nil to indicate successful cancellation of all running and waiting jobs.
return nil
}

// InsertRun inserts a run
func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWorkflow) error {
ctx, commiter, err := db.TxContext(ctx)
Expand Down
24 changes: 14 additions & 10 deletions models/actions/run_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,13 @@ func (runs RunList) LoadRepos() error {

type FindRunOptions struct {
db.ListOptions
RepoID int64
OwnerID int64
WorkflowFileName string
TriggerUserID int64
Approved bool // not util.OptionalBool, it works only when it's true
Status Status
RepoID int64
OwnerID int64
WorkflowID string
Ref string // the commit/tag/… that caused this workflow
TriggerUserID int64
Approved bool // not util.OptionalBool, it works only when it's true
Status []Status
}

func (opts FindRunOptions) toConds() builder.Cond {
Expand All @@ -82,17 +83,20 @@ func (opts FindRunOptions) toConds() builder.Cond {
if opts.OwnerID > 0 {
cond = cond.And(builder.Eq{"owner_id": opts.OwnerID})
}
if opts.WorkflowFileName != "" {
cond = cond.And(builder.Eq{"workflow_id": opts.WorkflowFileName})
if opts.WorkflowID != "" {
cond = cond.And(builder.Eq{"workflow_id": opts.WorkflowID})
}
if opts.TriggerUserID > 0 {
cond = cond.And(builder.Eq{"trigger_user_id": opts.TriggerUserID})
}
if opts.Approved {
cond = cond.And(builder.Gt{"approved_by": 0})
}
if opts.Status > StatusUnknown {
cond = cond.And(builder.Eq{"status": opts.Status})
if len(opts.Status) > 0 {
cond = cond.And(builder.In("status", opts.Status))
}
if opts.Ref != "" {
cond = cond.And(builder.Eq{"ref": opts.Ref})
}
return cond
}
Expand Down
2 changes: 2 additions & 0 deletions models/migrations/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,8 @@ var migrations = []Migration{
NewMigration("Reduce commit status", v1_21.ReduceCommitStatus),
// v267 -> v268
NewMigration("Add action_tasks_version table", v1_21.CreateActionTasksVersionTable),
// v268 -> v269
NewMigration("Update Action Ref", v1_21.UpdateActionsRefIndex),
}

// GetCurrentDBVersion returns the current db version
Expand Down
16 changes: 16 additions & 0 deletions models/migrations/v1_21/v268.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT

package v1_21 //nolint

import (
"xorm.io/xorm"
)

// UpdateActionsRefIndex updates the index of actions ref field
func UpdateActionsRefIndex(x *xorm.Engine) error {
type ActionRun struct {
Ref string `xorm:"index"` // the commit/tag/… causing the run
}
return x.Sync(new(ActionRun))
}
12 changes: 8 additions & 4 deletions routers/web/repo/actions/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,14 @@ func List(ctx *context.Context) {
Page: page,
PageSize: convert.ToCorrectPageSize(ctx.FormInt("limit")),
},
RepoID: ctx.Repo.Repository.ID,
WorkflowFileName: workflow,
TriggerUserID: actorID,
Status: actions_model.Status(status),
RepoID: ctx.Repo.Repository.ID,
WorkflowID: workflow,
TriggerUserID: actorID,
}

// if status is not StatusUnknown, it means user has selected a status filter
if actions_model.Status(status) != actions_model.StatusUnknown {
opts.Status = []actions_model.Status{actions_model.Status(status)}
}

runs, total, err := actions_model.FindRuns(ctx, opts)
Expand Down
23 changes: 19 additions & 4 deletions services/actions/notifier_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,16 +230,31 @@ func notify(ctx context.Context, input *notifyInput) error {
log.Error("jobparser.Parse: %v", err)
continue
}

// cancel running jobs if the event is push
if run.Event == webhook_module.HookEventPush {
// cancel running jobs of the same workflow
if err := actions_model.CancelRunningJobs(
ctx,
run.RepoID,
run.Ref,
run.WorkflowID,
); err != nil {
log.Error("CancelRunningJobs: %v", err)
}
}

if err := actions_model.InsertRun(ctx, run, jobs); err != nil {
log.Error("InsertRun: %v", err)
continue
}
if jobs, _, err := actions_model.FindRunJobs(ctx, actions_model.FindRunJobOptions{RunID: run.ID}); err != nil {

alljobs, _, err := actions_model.FindRunJobs(ctx, actions_model.FindRunJobOptions{RunID: run.ID})
if err != nil {
log.Error("FindRunJobs: %v", err)
} else {
CreateCommitStatus(ctx, jobs...)
continue
}

CreateCommitStatus(ctx, alljobs...)
}
return nil
}
Expand Down

0 comments on commit 44781f9

Please sign in to comment.