From 7c42c442f21a33f30ae047fb2882440d9f71f387 Mon Sep 17 00:00:00 2001 From: Jack Chen Date: Mon, 9 Sep 2024 20:48:37 +0800 Subject: [PATCH] feat: Automatically Trigger Missed Scheduled Jobs closes #4897 Signed-off-by: Jack Chen --- go.mod | 2 +- go.sum | 4 +-- .../infrastructure/postgres/schedulejob.go | 21 +++++------ .../application/scheduleactionrecord.go | 10 +++--- .../cronscheduler/application/schedulejob.go | 25 +++++++++---- openapi/v3/support-cron-scheduler.yaml | 36 +++++++++++++++++++ 6 files changed, 73 insertions(+), 25 deletions(-) diff --git a/go.mod b/go.mod index 0e0350d136..b76151fa0b 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/eclipse/paho.mqtt.golang v1.5.0 github.com/edgexfoundry/go-mod-bootstrap/v3 v3.2.0-dev.57 github.com/edgexfoundry/go-mod-configuration/v3 v3.2.0-dev.17 - github.com/edgexfoundry/go-mod-core-contracts/v3 v3.2.0-dev.43 + github.com/edgexfoundry/go-mod-core-contracts/v3 v3.2.0-dev.45 github.com/edgexfoundry/go-mod-messaging/v3 v3.2.0-dev.35 github.com/edgexfoundry/go-mod-secrets/v3 v3.2.0-dev.12 github.com/fxamacker/cbor/v2 v2.7.0 diff --git a/go.sum b/go.sum index a7c96f963a..5107282f0a 100644 --- a/go.sum +++ b/go.sum @@ -88,8 +88,8 @@ github.com/edgexfoundry/go-mod-bootstrap/v3 v3.2.0-dev.57 h1:vJgmzYmodJJae4xRXA4 github.com/edgexfoundry/go-mod-bootstrap/v3 v3.2.0-dev.57/go.mod h1:HKoNog2+02H0Eze8S4ibk9NTvH1LZrGjGPIzAhQnU2g= github.com/edgexfoundry/go-mod-configuration/v3 v3.2.0-dev.17 h1:eNLGThT5fYtFLSrr6vtvHV92cm5IohJzwSRYcd5pvtg= github.com/edgexfoundry/go-mod-configuration/v3 v3.2.0-dev.17/go.mod h1:va3l+Nri1KijDM5KItsaYrwUSG3Nhj/4EDvR4uB+Jds= -github.com/edgexfoundry/go-mod-core-contracts/v3 v3.2.0-dev.43 h1:/lMvMkSIXHnb5XBYr4O9GZM/nVdaKOsr7Q7I+RB9cwE= -github.com/edgexfoundry/go-mod-core-contracts/v3 v3.2.0-dev.43/go.mod h1:d/FCa9Djq/pb7RYGEEhrR7fnKo+JK5IQ2YGW4LIHAqE= +github.com/edgexfoundry/go-mod-core-contracts/v3 v3.2.0-dev.45 h1:rr/1rv8JxM6b8EXKPeqK97msmj7++20fgLNSyxaTHrM= +github.com/edgexfoundry/go-mod-core-contracts/v3 v3.2.0-dev.45/go.mod h1:d/FCa9Djq/pb7RYGEEhrR7fnKo+JK5IQ2YGW4LIHAqE= github.com/edgexfoundry/go-mod-messaging/v3 v3.2.0-dev.35 h1:ZKmq5Vd1QN86OLTbD8A2JIKTxR8k21gtqmEUS3hApBY= github.com/edgexfoundry/go-mod-messaging/v3 v3.2.0-dev.35/go.mod h1:SUTdo8v9V+XacowkZP5puBXnBcVP2NDSk5PBzOdsLEk= github.com/edgexfoundry/go-mod-registry/v3 v3.2.0-dev.16 h1:vG6cI1LU8QLpzUXYleYuVyqbpeYghLF3Jj6afcaUCHA= diff --git a/internal/pkg/infrastructure/postgres/schedulejob.go b/internal/pkg/infrastructure/postgres/schedulejob.go index 03edfb17ca..6b474e2db9 100644 --- a/internal/pkg/infrastructure/postgres/schedulejob.go +++ b/internal/pkg/infrastructure/postgres/schedulejob.go @@ -178,7 +178,7 @@ func queryScheduleJob(ctx context.Context, connPool *pgxpool.Pool, sql string, a job.Created = created.UnixMilli() job.Modified = modified.UnixMilli() - job, err = toScheduleJobsModel(job, scheduleJobJSONBytes) + job, err = toScheduleJobModel(job, scheduleJobJSONBytes) if err != nil { return job, errors.NewCommonEdgeXWrapper(err) } @@ -206,7 +206,7 @@ func queryScheduleJobs(ctx context.Context, connPool *pgxpool.Pool, sql string, job.Created = created.UnixMilli() job.Modified = modified.UnixMilli() - job, err = toScheduleJobsModel(job, scheduleJobJSONBytes) + job, err = toScheduleJobModel(job, scheduleJobJSONBytes) if err != nil { return nil, errors.NewCommonEdgeXWrapper(err) } @@ -219,16 +219,17 @@ func queryScheduleJobs(ctx context.Context, connPool *pgxpool.Pool, sql string, return scheduleJobs, nil } -func toScheduleJobsModel(scheduleJobs model.ScheduleJob, scheduleJobJSONBytes []byte) (model.ScheduleJob, errors.EdgeX) { +func toScheduleJobModel(scheduleJob model.ScheduleJob, scheduleJobJSONBytes []byte) (model.ScheduleJob, errors.EdgeX) { var storedJob model.ScheduleJob if err := json.Unmarshal(scheduleJobJSONBytes, &storedJob); err != nil { - return scheduleJobs, errors.NewCommonEdgeX(errors.KindContractInvalid, "unable to JSON unmarshal schedule job", err) + return scheduleJob, errors.NewCommonEdgeX(errors.KindContractInvalid, "unable to JSON unmarshal schedule job", err) } - scheduleJobs.Actions = storedJob.Actions - scheduleJobs.AdminState = storedJob.AdminState - scheduleJobs.Definition = storedJob.Definition - scheduleJobs.Labels = storedJob.Labels - scheduleJobs.Properties = storedJob.Properties - return scheduleJobs, nil + scheduleJob.Actions = storedJob.Actions + scheduleJob.AdminState = storedJob.AdminState + scheduleJob.AutoTriggerMissedRecords = storedJob.AutoTriggerMissedRecords + scheduleJob.Definition = storedJob.Definition + scheduleJob.Labels = storedJob.Labels + scheduleJob.Properties = storedJob.Properties + return scheduleJob, nil } diff --git a/internal/support/cronscheduler/application/scheduleactionrecord.go b/internal/support/cronscheduler/application/scheduleactionrecord.go index 242618eebb..7a8c27644d 100644 --- a/internal/support/cronscheduler/application/scheduleactionrecord.go +++ b/internal/support/cronscheduler/application/scheduleactionrecord.go @@ -117,11 +117,12 @@ func DeleteScheduleActionRecordsByAge(ctx context.Context, age int64, dic *di.Co } // GenerateMissedScheduleActionRecords generates missed schedule action records -func GenerateMissedScheduleActionRecords(ctx context.Context, dic *di.Container, job models.ScheduleJob, latestRecords []models.ScheduleActionRecord) errors.EdgeX { +func GenerateMissedScheduleActionRecords(ctx context.Context, dic *di.Container, job models.ScheduleJob, latestRecords []models.ScheduleActionRecord) (errors.EdgeX, bool) { dbClient := container.DBClientFrom(dic.Get) lc := bootstrapContainer.LoggingClientFrom(dic.Get) correlationId := correlation.FromContext(ctx) + var missedRecords []models.ScheduleActionRecord for _, latestRecord := range latestRecords { actionId := latestRecord.Action.GetBaseScheduleAction().Id lastRecordTimestamp := latestRecord.ScheduledAt @@ -137,10 +138,9 @@ func GenerateMissedScheduleActionRecords(ctx context.Context, dic *di.Container, missedRuns, err := generateMissedRuns(job.Definition, latestTime) if err != nil { lc.Errorf("Failed to generate missed records of job: %s. Correlation-ID: %s", job.Name, correlationId) - return errors.NewCommonEdgeXWrapper(err) + return errors.NewCommonEdgeXWrapper(err), len(missedRecords) > 0 } - var missedRecords []models.ScheduleActionRecord if len(missedRuns) != 0 { for _, run := range missedRuns { actionRecord := models.ScheduleActionRecord{ @@ -156,14 +156,14 @@ func GenerateMissedScheduleActionRecords(ctx context.Context, dic *di.Container, if _, err := dbClient.AddScheduleActionRecords(ctx, missedRecords); err != nil { lc.Errorf("Failed to add missed schedule action records with action id: %s of job: %s to database. Correlation-ID: %s", actionId, job.Name, correlationId) - return errors.NewCommonEdgeXWrapper(err) + return errors.NewCommonEdgeXWrapper(err), len(missedRecords) > 0 } lc.Debugf("Missed schedule action records with action id: %s of job: %s have been created successfully. Correlation-ID: %s", actionId, job.Name, correlationId) } } - return nil + return nil, len(missedRecords) > 0 } func generateMissedRuns(def models.ScheduleDef, latestTime time.Time) (missedRuns []time.Time, err errors.EdgeX) { diff --git a/internal/support/cronscheduler/application/schedulejob.go b/internal/support/cronscheduler/application/schedulejob.go index f00c9ffae8..5f42e1d5ad 100644 --- a/internal/support/cronscheduler/application/schedulejob.go +++ b/internal/support/cronscheduler/application/schedulejob.go @@ -202,10 +202,21 @@ func LoadScheduleJobsToSchedulerManager(ctx context.Context, dic *di.Container) arrangeScheduleJob(ctx, job, dic) // Generate missed schedule action records for the existing scheduled jobs - err = generateMissedRecords(ctx, job, dic) + err, hasMissedAction := generateMissedRecords(ctx, job, dic) if err != nil { return errors.NewCommonEdgeXWrapper(err) } + if hasMissedAction && job.AutoTriggerMissedRecords { + lc.Debugf("Auto-triggering the missed schedule actions once for the scheduled job: %s. Correlation-ID: %s", job.Name, correlationId) + err = schedulerManager.TriggerScheduleJobByName(job.Name, correlationId) + if err != nil { + return errors.NewCommonEdgeXWrapper(err) + } + } + + if !job.AutoTriggerMissedRecords { + lc.Debugf("AutoTriggerMissedRecords is disabled, the missed schedule actions for the scheduled job: %s will not be auto-triggered. Correlation-ID: %s", job.Name, correlationId) + } lc.Debugf("Successfully loaded the existing scheduled job: %s. Correlation-ID: %s", job.Name, correlationId) } @@ -266,25 +277,25 @@ func arrangeScheduleJob(ctx context.Context, job models.ScheduleJob, dic *di.Con } // generateMissedRecords generates missed schedule action records -func generateMissedRecords(ctx context.Context, job models.ScheduleJob, dic *di.Container) errors.EdgeX { +func generateMissedRecords(ctx context.Context, job models.ScheduleJob, dic *di.Container) (err errors.EdgeX, hasMissedAction bool) { dbClient := container.DBClientFrom(dic.Get) lc := bootstrapContainer.LoggingClientFrom(dic.Get) correlationId := correlation.FromContext(ctx) if job.AdminState != models.Unlocked { lc.Debugf("The scheduled job: %s is locked, skip generating missed schedule action records. ScheduleJob ID: %s, Correlation-ID: %s", job.Name, job.Id, correlationId) - return nil + return nil, hasMissedAction } // Get the latest schedule action records by job name and generate missed schedule action records latestRecords, err := dbClient.LatestScheduleActionRecordsByJobName(ctx, job.Name) if err != nil { - return errors.NewCommonEdgeX(errors.KindDatabaseError, fmt.Sprintf("failed to load the latest schedule action records of job: %s", job.Name), err) + return errors.NewCommonEdgeX(errors.KindDatabaseError, fmt.Sprintf("failed to load the latest schedule action records of job: %s", job.Name), err), hasMissedAction } - err = GenerateMissedScheduleActionRecords(ctx, dic, job, latestRecords) + err, hasMissedAction = GenerateMissedScheduleActionRecords(ctx, dic, job, latestRecords) if err != nil { - return errors.NewCommonEdgeXWrapper(err) + return errors.NewCommonEdgeXWrapper(err), hasMissedAction } - return nil + return nil, hasMissedAction } diff --git a/openapi/v3/support-cron-scheduler.yaml b/openapi/v3/support-cron-scheduler.yaml index 7603947672..2dd3fa6cc1 100644 --- a/openapi/v3/support-cron-scheduler.yaml +++ b/openapi/v3/support-cron-scheduler.yaml @@ -269,6 +269,9 @@ components: enum: - LOCKED - UNLOCKED + autoTriggerMissedRecords: + type: boolean + description: "Indicates whether to automatically trigger missed action records when the service starts." created: description: "A timestamp indicating when the schedule job was created." type: integer @@ -359,6 +362,9 @@ components: enum: - LOCKED - UNLOCKED + autoTriggerMissedRecords: + type: boolean + description: "Indicates whether to automatically trigger missed action records when the service starts. Default value is false." definition: description: "The schedule definition of the schedule job" $ref: '#/components/schemas/ScheduleDef' @@ -526,6 +532,7 @@ components: address: "http://localhost:59881/api/v3/ping" method: "GET" adminState: "UNLOCKED" + autoTriggerMissedRecords: true AddIntervalScheduleJobExample: value: - apiVersion: "v3" @@ -542,6 +549,7 @@ components: topic: "test_topic" payload: "eyJ0ZXN0I" adminState: "UNLOCKED" + autoTriggerMissedRecords: false MultiScheduleActionRecordsExample: value: apiVersion: "v3" @@ -657,6 +665,7 @@ components: address: "http://localhost:59881/api/v3/ping" method: "GET" adminState: "UNLOCKED" + autoTriggerMissedRecords: true - created: 1634280525302 modified: 1634280525302 id: "7b9c6908-0929-498d-9f81-c25abdedd93f" @@ -672,6 +681,23 @@ components: topic: "test_topic" payload: "eyJ0ZXN0I" adminState: "UNLOCKED" + SingleScheduleActionRecordExample: + value: + apiVersion: "v3" + requestId: "e6e8a2f4-eb14-4649-9e2b-175247911369" + statusCode: 200 + totalCount: 1 + scheduleActionRecords: + - created: 1634279367311 + scheduledAt: 1634279367311 + id: "debade10-7838-44bd-9c09-1283813db6c8" + jobName: "test_job_1" + status: "SUCCEEDED" + action: + type: "REST" + contentType: "application/json" + address: "http://localhost:59881/api/v3/ping" + method: "GET" UpdateCronScheduleJobExample: value: - apiVersion: "v3" @@ -686,6 +712,7 @@ components: address: "http://localhost:59881/api/v3/ping" method: "GET" adminState: "UNLOCKED" + autoTriggerMissedRecords: true paths: /job: parameters: @@ -1070,6 +1097,9 @@ paths: application/json: schema: $ref: '#/components/schemas/MultiScheduleActionRecordsResponse' + examples: + SingleScheduleActionRecordExample: + $ref: '#/components/examples/SingleScheduleActionRecordExample' '500': description: "An unexpected error occurred on the server" headers: @@ -1104,6 +1134,9 @@ paths: application/json: schema: $ref: '#/components/schemas/MultiScheduleActionRecordsResponse' + examples: + SingleScheduleActionRecordExample: + $ref: '#/components/examples/SingleScheduleActionRecordExample' '404': description: "The requested schedule job does not exist" headers: @@ -1149,6 +1182,9 @@ paths: application/json: schema: $ref: '#/components/schemas/MultiScheduleActionRecordsResponse' + examples: + SingleScheduleActionRecordExample: + $ref: '#/components/examples/SingleScheduleActionRecordExample' '404': description: "The requested schedule job does not exist" headers: