Skip to content

Commit

Permalink
feat: Automatically Trigger Missed Scheduled Jobs
Browse files Browse the repository at this point in the history
closes #4897

Signed-off-by: Jack Chen <jack@iotechsys.com>
  • Loading branch information
jackchenjc committed Sep 9, 2024
1 parent d108964 commit 7c42c44
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 25 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/eclipse/paho.mqtt.golang v1.5.0
github.com/edgexfoundry/go-mod-bootstrap/v3 v3.2.0-dev.57
github.com/edgexfoundry/go-mod-configuration/v3 v3.2.0-dev.17
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.2.0-dev.43
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.2.0-dev.45
github.com/edgexfoundry/go-mod-messaging/v3 v3.2.0-dev.35
github.com/edgexfoundry/go-mod-secrets/v3 v3.2.0-dev.12
github.com/fxamacker/cbor/v2 v2.7.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ github.com/edgexfoundry/go-mod-bootstrap/v3 v3.2.0-dev.57 h1:vJgmzYmodJJae4xRXA4
github.com/edgexfoundry/go-mod-bootstrap/v3 v3.2.0-dev.57/go.mod h1:HKoNog2+02H0Eze8S4ibk9NTvH1LZrGjGPIzAhQnU2g=
github.com/edgexfoundry/go-mod-configuration/v3 v3.2.0-dev.17 h1:eNLGThT5fYtFLSrr6vtvHV92cm5IohJzwSRYcd5pvtg=
github.com/edgexfoundry/go-mod-configuration/v3 v3.2.0-dev.17/go.mod h1:va3l+Nri1KijDM5KItsaYrwUSG3Nhj/4EDvR4uB+Jds=
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.2.0-dev.43 h1:/lMvMkSIXHnb5XBYr4O9GZM/nVdaKOsr7Q7I+RB9cwE=
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.2.0-dev.43/go.mod h1:d/FCa9Djq/pb7RYGEEhrR7fnKo+JK5IQ2YGW4LIHAqE=
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.2.0-dev.45 h1:rr/1rv8JxM6b8EXKPeqK97msmj7++20fgLNSyxaTHrM=
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.2.0-dev.45/go.mod h1:d/FCa9Djq/pb7RYGEEhrR7fnKo+JK5IQ2YGW4LIHAqE=
github.com/edgexfoundry/go-mod-messaging/v3 v3.2.0-dev.35 h1:ZKmq5Vd1QN86OLTbD8A2JIKTxR8k21gtqmEUS3hApBY=
github.com/edgexfoundry/go-mod-messaging/v3 v3.2.0-dev.35/go.mod h1:SUTdo8v9V+XacowkZP5puBXnBcVP2NDSk5PBzOdsLEk=
github.com/edgexfoundry/go-mod-registry/v3 v3.2.0-dev.16 h1:vG6cI1LU8QLpzUXYleYuVyqbpeYghLF3Jj6afcaUCHA=
Expand Down
21 changes: 11 additions & 10 deletions internal/pkg/infrastructure/postgres/schedulejob.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func queryScheduleJob(ctx context.Context, connPool *pgxpool.Pool, sql string, a
job.Created = created.UnixMilli()
job.Modified = modified.UnixMilli()

job, err = toScheduleJobsModel(job, scheduleJobJSONBytes)
job, err = toScheduleJobModel(job, scheduleJobJSONBytes)
if err != nil {
return job, errors.NewCommonEdgeXWrapper(err)
}
Expand Down Expand Up @@ -206,7 +206,7 @@ func queryScheduleJobs(ctx context.Context, connPool *pgxpool.Pool, sql string,
job.Created = created.UnixMilli()
job.Modified = modified.UnixMilli()

job, err = toScheduleJobsModel(job, scheduleJobJSONBytes)
job, err = toScheduleJobModel(job, scheduleJobJSONBytes)
if err != nil {
return nil, errors.NewCommonEdgeXWrapper(err)
}
Expand All @@ -219,16 +219,17 @@ func queryScheduleJobs(ctx context.Context, connPool *pgxpool.Pool, sql string,
return scheduleJobs, nil
}

func toScheduleJobsModel(scheduleJobs model.ScheduleJob, scheduleJobJSONBytes []byte) (model.ScheduleJob, errors.EdgeX) {
func toScheduleJobModel(scheduleJob model.ScheduleJob, scheduleJobJSONBytes []byte) (model.ScheduleJob, errors.EdgeX) {
var storedJob model.ScheduleJob
if err := json.Unmarshal(scheduleJobJSONBytes, &storedJob); err != nil {
return scheduleJobs, errors.NewCommonEdgeX(errors.KindContractInvalid, "unable to JSON unmarshal schedule job", err)
return scheduleJob, errors.NewCommonEdgeX(errors.KindContractInvalid, "unable to JSON unmarshal schedule job", err)
}

scheduleJobs.Actions = storedJob.Actions
scheduleJobs.AdminState = storedJob.AdminState
scheduleJobs.Definition = storedJob.Definition
scheduleJobs.Labels = storedJob.Labels
scheduleJobs.Properties = storedJob.Properties
return scheduleJobs, nil
scheduleJob.Actions = storedJob.Actions
scheduleJob.AdminState = storedJob.AdminState
scheduleJob.AutoTriggerMissedRecords = storedJob.AutoTriggerMissedRecords
scheduleJob.Definition = storedJob.Definition
scheduleJob.Labels = storedJob.Labels
scheduleJob.Properties = storedJob.Properties
return scheduleJob, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,12 @@ func DeleteScheduleActionRecordsByAge(ctx context.Context, age int64, dic *di.Co
}

// GenerateMissedScheduleActionRecords generates missed schedule action records
func GenerateMissedScheduleActionRecords(ctx context.Context, dic *di.Container, job models.ScheduleJob, latestRecords []models.ScheduleActionRecord) errors.EdgeX {
func GenerateMissedScheduleActionRecords(ctx context.Context, dic *di.Container, job models.ScheduleJob, latestRecords []models.ScheduleActionRecord) (errors.EdgeX, bool) {
dbClient := container.DBClientFrom(dic.Get)
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
correlationId := correlation.FromContext(ctx)

var missedRecords []models.ScheduleActionRecord
for _, latestRecord := range latestRecords {
actionId := latestRecord.Action.GetBaseScheduleAction().Id
lastRecordTimestamp := latestRecord.ScheduledAt
Expand All @@ -137,10 +138,9 @@ func GenerateMissedScheduleActionRecords(ctx context.Context, dic *di.Container,
missedRuns, err := generateMissedRuns(job.Definition, latestTime)
if err != nil {
lc.Errorf("Failed to generate missed records of job: %s. Correlation-ID: %s", job.Name, correlationId)
return errors.NewCommonEdgeXWrapper(err)
return errors.NewCommonEdgeXWrapper(err), len(missedRecords) > 0
}

var missedRecords []models.ScheduleActionRecord
if len(missedRuns) != 0 {
for _, run := range missedRuns {
actionRecord := models.ScheduleActionRecord{
Expand All @@ -156,14 +156,14 @@ func GenerateMissedScheduleActionRecords(ctx context.Context, dic *di.Container,

if _, err := dbClient.AddScheduleActionRecords(ctx, missedRecords); err != nil {
lc.Errorf("Failed to add missed schedule action records with action id: %s of job: %s to database. Correlation-ID: %s", actionId, job.Name, correlationId)
return errors.NewCommonEdgeXWrapper(err)
return errors.NewCommonEdgeXWrapper(err), len(missedRecords) > 0
}

lc.Debugf("Missed schedule action records with action id: %s of job: %s have been created successfully. Correlation-ID: %s", actionId, job.Name, correlationId)
}
}

return nil
return nil, len(missedRecords) > 0
}

func generateMissedRuns(def models.ScheduleDef, latestTime time.Time) (missedRuns []time.Time, err errors.EdgeX) {
Expand Down
25 changes: 18 additions & 7 deletions internal/support/cronscheduler/application/schedulejob.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,21 @@ func LoadScheduleJobsToSchedulerManager(ctx context.Context, dic *di.Container)
arrangeScheduleJob(ctx, job, dic)

// Generate missed schedule action records for the existing scheduled jobs
err = generateMissedRecords(ctx, job, dic)
err, hasMissedAction := generateMissedRecords(ctx, job, dic)
if err != nil {
return errors.NewCommonEdgeXWrapper(err)
}
if hasMissedAction && job.AutoTriggerMissedRecords {
lc.Debugf("Auto-triggering the missed schedule actions once for the scheduled job: %s. Correlation-ID: %s", job.Name, correlationId)
err = schedulerManager.TriggerScheduleJobByName(job.Name, correlationId)
if err != nil {
return errors.NewCommonEdgeXWrapper(err)
}
}

if !job.AutoTriggerMissedRecords {
lc.Debugf("AutoTriggerMissedRecords is disabled, the missed schedule actions for the scheduled job: %s will not be auto-triggered. Correlation-ID: %s", job.Name, correlationId)
}

lc.Debugf("Successfully loaded the existing scheduled job: %s. Correlation-ID: %s", job.Name, correlationId)
}
Expand Down Expand Up @@ -266,25 +277,25 @@ func arrangeScheduleJob(ctx context.Context, job models.ScheduleJob, dic *di.Con
}

// generateMissedRecords generates missed schedule action records
func generateMissedRecords(ctx context.Context, job models.ScheduleJob, dic *di.Container) errors.EdgeX {
func generateMissedRecords(ctx context.Context, job models.ScheduleJob, dic *di.Container) (err errors.EdgeX, hasMissedAction bool) {
dbClient := container.DBClientFrom(dic.Get)
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
correlationId := correlation.FromContext(ctx)

if job.AdminState != models.Unlocked {
lc.Debugf("The scheduled job: %s is locked, skip generating missed schedule action records. ScheduleJob ID: %s, Correlation-ID: %s", job.Name, job.Id, correlationId)
return nil
return nil, hasMissedAction
}

// Get the latest schedule action records by job name and generate missed schedule action records
latestRecords, err := dbClient.LatestScheduleActionRecordsByJobName(ctx, job.Name)
if err != nil {
return errors.NewCommonEdgeX(errors.KindDatabaseError, fmt.Sprintf("failed to load the latest schedule action records of job: %s", job.Name), err)
return errors.NewCommonEdgeX(errors.KindDatabaseError, fmt.Sprintf("failed to load the latest schedule action records of job: %s", job.Name), err), hasMissedAction
}
err = GenerateMissedScheduleActionRecords(ctx, dic, job, latestRecords)
err, hasMissedAction = GenerateMissedScheduleActionRecords(ctx, dic, job, latestRecords)
if err != nil {
return errors.NewCommonEdgeXWrapper(err)
return errors.NewCommonEdgeXWrapper(err), hasMissedAction
}

return nil
return nil, hasMissedAction
}
36 changes: 36 additions & 0 deletions openapi/v3/support-cron-scheduler.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,9 @@ components:
enum:
- LOCKED
- UNLOCKED
autoTriggerMissedRecords:
type: boolean
description: "Indicates whether to automatically trigger missed action records when the service starts."
created:
description: "A timestamp indicating when the schedule job was created."
type: integer
Expand Down Expand Up @@ -359,6 +362,9 @@ components:
enum:
- LOCKED
- UNLOCKED
autoTriggerMissedRecords:
type: boolean
description: "Indicates whether to automatically trigger missed action records when the service starts. Default value is false."
definition:
description: "The schedule definition of the schedule job"
$ref: '#/components/schemas/ScheduleDef'
Expand Down Expand Up @@ -526,6 +532,7 @@ components:
address: "http://localhost:59881/api/v3/ping"
method: "GET"
adminState: "UNLOCKED"
autoTriggerMissedRecords: true
AddIntervalScheduleJobExample:
value:
- apiVersion: "v3"
Expand All @@ -542,6 +549,7 @@ components:
topic: "test_topic"
payload: "eyJ0ZXN0I"
adminState: "UNLOCKED"
autoTriggerMissedRecords: false
MultiScheduleActionRecordsExample:
value:
apiVersion: "v3"
Expand Down Expand Up @@ -657,6 +665,7 @@ components:
address: "http://localhost:59881/api/v3/ping"
method: "GET"
adminState: "UNLOCKED"
autoTriggerMissedRecords: true
- created: 1634280525302
modified: 1634280525302
id: "7b9c6908-0929-498d-9f81-c25abdedd93f"
Expand All @@ -672,6 +681,23 @@ components:
topic: "test_topic"
payload: "eyJ0ZXN0I"
adminState: "UNLOCKED"
SingleScheduleActionRecordExample:
value:
apiVersion: "v3"
requestId: "e6e8a2f4-eb14-4649-9e2b-175247911369"
statusCode: 200
totalCount: 1
scheduleActionRecords:
- created: 1634279367311
scheduledAt: 1634279367311
id: "debade10-7838-44bd-9c09-1283813db6c8"
jobName: "test_job_1"
status: "SUCCEEDED"
action:
type: "REST"
contentType: "application/json"
address: "http://localhost:59881/api/v3/ping"
method: "GET"
UpdateCronScheduleJobExample:
value:
- apiVersion: "v3"
Expand All @@ -686,6 +712,7 @@ components:
address: "http://localhost:59881/api/v3/ping"
method: "GET"
adminState: "UNLOCKED"
autoTriggerMissedRecords: true
paths:
/job:
parameters:
Expand Down Expand Up @@ -1070,6 +1097,9 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/MultiScheduleActionRecordsResponse'
examples:
SingleScheduleActionRecordExample:
$ref: '#/components/examples/SingleScheduleActionRecordExample'
'500':
description: "An unexpected error occurred on the server"
headers:
Expand Down Expand Up @@ -1104,6 +1134,9 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/MultiScheduleActionRecordsResponse'
examples:
SingleScheduleActionRecordExample:
$ref: '#/components/examples/SingleScheduleActionRecordExample'
'404':
description: "The requested schedule job does not exist"
headers:
Expand Down Expand Up @@ -1149,6 +1182,9 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/MultiScheduleActionRecordsResponse'
examples:
SingleScheduleActionRecordExample:
$ref: '#/components/examples/SingleScheduleActionRecordExample'
'404':
description: "The requested schedule job does not exist"
headers:
Expand Down

0 comments on commit 7c42c44

Please sign in to comment.