Skip to content

Commit

Permalink
Force-process entity submissions held in backlog
Browse files Browse the repository at this point in the history
  • Loading branch information
ktuite committed Aug 20, 2024
1 parent e748024 commit 6c0404d
Show file tree
Hide file tree
Showing 4 changed files with 438 additions and 20 deletions.
23 changes: 23 additions & 0 deletions lib/bin/process-held-submissions.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright 2024 ODK Central Developers
// See the NOTICE file at the top-level directory of this distribution and at
// https://github.com/getodk/central-backend/blob/master/NOTICE.
// This file is part of ODK Central. It is subject to the license terms in
// the LICENSE file found in the top-level directory of this distribution and at
// https://www.apache.org/licenses/LICENSE-2.0. No part of ODK Central,
// including this file, may be copied, modified, propagated, or distributed
// except according to the terms contained in the LICENSE file.
//
// This script re-processes submissions containing offline entity actions that
// were previously held in a backlog due to submissions coming in out of order.

const { run } = require('../task/task');
const { processHeldSubmissions } = require('../task/process-held-submissions');

const { program } = require('commander');
program.option('-f, --force', 'Force all submissions in the backlog to be processed immediately.');
program.parse();

const options = program.opts();

run(processHeldSubmissions(options.force)
.then((count) => `Submissions processed: ${count}`));
73 changes: 53 additions & 20 deletions lib/model/query/entities.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
// including this file, may be copied, modified, propagated, or distributed
// except according to the terms contained in the LICENSE file.

const config = require('config');
const { sql } = require('slonik');
const { Actor, Entity, Submission, Form } = require('../frames');
const { Actor, Audit, Entity, Submission, Form } = require('../frames');
const { equals, extender, unjoiner, page, markDeleted, insertMany } = require('../../util/db');
const { map, mergeRight, pickAll } = require('ramda');
const { blankStringToNull, construct } = require('../../util/util');
Expand All @@ -17,7 +18,7 @@ const { odataFilter, odataOrderBy } = require('../../data/odata-filter');
const { odataToColumnMap, parseSubmissionXml, getDiffProp, ConflictType } = require('../../data/entity');
const { isTrue } = require('../../util/http');
const Problem = require('../../util/problem');
const { runSequentially } = require('../../util/promise');
const { getOrReject, runSequentially } = require('../../util/promise');


/////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -177,14 +178,18 @@ createVersion.audit.withResult = true;
////////////////////////////////////////////////////////////////////////////////
// WRAPPER FUNCTIONS FOR CREATING AND UPDATING ENTITIES

const _createEntity = (dataset, entityData, submissionId, submissionDef, submissionDefId, event, parentEvent) => async ({ Audits, Entities }) => {
const _createEntity = (dataset, entityData, submissionId, submissionDef, submissionDefId, event, parentEvent, forceOutOfOrderProcessing = false) => async ({ Audits, Entities }) => {
// If dataset requires approval on submission to create an entity and this event is not
// an approval event, then don't create an entity
if ((dataset.approvalRequired && event.details.reviewState !== 'approved') ||
(!dataset.approvalRequired && event.action === 'submission.update')) // don't process submission if approval is not required and submission metadata is updated
return null;

// TODO: auto-generate a label if forced and if the submission doesn't provide one
// Auto-generate a label if forced and if the submission doesn't provide one
if (forceOutOfOrderProcessing && entityData.system.label == null) {
// eslint-disable-next-line no-param-reassign
entityData.system.label = 'auto generated';
}
const partial = await Entity.fromParseEntityData(entityData, { create: true });

const sourceDetails = { submission: { instanceId: submissionDef.instanceId }, parentEventId: parentEvent ? parentEvent.id : undefined };
Expand All @@ -202,7 +207,7 @@ const _createEntity = (dataset, entityData, submissionId, submissionDef, submiss
return entity;
};

const _updateEntity = (dataset, entityData, submissionId, submissionDef, submissionDefId, event) => async ({ Audits, Entities }) => {
const _updateEntity = (dataset, entityData, submissionId, submissionDef, submissionDefId, event, forceOutOfOrderProcessing = false) => async ({ Audits, Entities }) => {
if (!(event.action === 'submission.create'
|| event.action === 'submission.update.version'
|| event.action === 'submission.reprocess'))
Expand All @@ -213,7 +218,7 @@ const _updateEntity = (dataset, entityData, submissionId, submissionDef, submiss

// Figure out the intended baseVersion
// If this is an offline update with a branchId, the baseVersion value is local to that offline context.
const baseEntityDef = await Entities._computeBaseVersion(event.id, dataset, clientEntity, submissionDef);
const baseEntityDef = await Entities._computeBaseVersion(event.id, dataset, clientEntity, submissionDef, forceOutOfOrderProcessing);

// If baseEntityVersion is null, we held a submission and will stop processing now.
if (baseEntityDef == null)
Expand Down Expand Up @@ -294,7 +299,7 @@ const _updateEntity = (dataset, entityData, submissionId, submissionDef, submiss

// Used by _updateVerison to figure out the intended base version in Central
// based on the branchId, trunkVersion, and baseVersion in the submission
const _computeBaseVersion = (eventId, dataset, clientEntity, submissionDef) => async ({ Entities }) => {
const _computeBaseVersion = (eventId, dataset, clientEntity, submissionDef, forceOutOfOrderProcessing = false) => async ({ Entities }) => {
if (!clientEntity.def.branchId) {

// no offline branching to deal with, use baseVersion as is
Expand Down Expand Up @@ -327,10 +332,17 @@ const _computeBaseVersion = (eventId, dataset, clientEntity, submissionDef) => a
const baseEntityVersion = await Entities.getDef(dataset.id, clientEntity.uuid, new QueryOptions({ condition }));

if (!baseEntityVersion.isDefined()) {
// TODO: add case for force-processing
// If there is no base version and we are not forcing the processing, hold submission in the backlog.
await Entities._holdSubmission(eventId, submissionDef.submissionId, submissionDef.id, clientEntity.uuid, clientEntity.def.branchId, clientEntity.def.baseVersion);
return null;
if (forceOutOfOrderProcessing) {
// If the base version doesn't exist but we forcing the update anyway, use the latest version on the server as the base.
// If that can't be found, throw an error for _processSubmissionEvent to catch so it can try create instead of update.
const latestEntity = await Entities.getById(dataset.id, clientEntity.uuid)
.then(getOrReject(Problem.user.entityNotFound({ entityUuid: clientEntity.uuid, datasetName: dataset.name })));
return latestEntity.aux.currentVersion;
} else {
// If there is no base version and we are not forcing the processing, hold submission in the backlog.
await Entities._holdSubmission(eventId, submissionDef.submissionId, submissionDef.id, clientEntity.uuid, clientEntity.def.branchId, clientEntity.def.baseVersion);
return null;
}
}

// Return the base entity version
Expand All @@ -351,7 +363,7 @@ const _getFormDefActions = (oneFirst, datasetId, formDefId) => oneFirst(sql`
// so any errors can be rolled back and logged as an entity processing error.
const _processSubmissionEvent = (event, parentEvent) => async ({ Audits, Datasets, Entities, Submissions, Forms, oneFirst }) => {
const { submissionId, submissionDefId } = event.details;
// TODO: check parentEvent details to determine if this is a forced reprocessing or not
const forceOutOfOrderProcessing = parentEvent?.details?.force === true;

const form = await Forms.getByActeeId(event.acteeId);
// If form is deleted/purged then submission won't be there either.
Expand All @@ -369,7 +381,6 @@ const _processSubmissionEvent = (event, parentEvent) => async ({ Audits, Dataset
// don't try to process it now, it will be dequeued and reprocessed elsewhere.
if (existingHeldSubmission.isDefined())
return null;
// TODO: check how force-reprocessing interacts with this logic above

const submission = await Submissions.getSubAndDefById(submissionDefId);

Expand Down Expand Up @@ -409,22 +420,21 @@ const _processSubmissionEvent = (event, parentEvent) => async ({ Audits, Dataset
throw Problem.user.entityActionNotPermitted({ action, permitted: permittedActions });
}

// TODO: work out how force-reprocessing interacts with this logic (updateEntity and createEntity should know about it)
let maybeEntity = null;
// Try update before create (if both are specified)
if (entityData.system.update === '1' || entityData.system.update === 'true')
try {
maybeEntity = await Entities._updateEntity(dataset, entityData, submissionId, submissionDef, submissionDefId, event);
maybeEntity = await Entities._updateEntity(dataset, entityData, submissionId, submissionDef, submissionDefId, event, forceOutOfOrderProcessing);
} catch (err) {
const attemptCreate = (entityData.system.create === '1' || entityData.system.create === 'true');
const attemptCreate = (entityData.system.create === '1' || entityData.system.create === 'true') || forceOutOfOrderProcessing;
if ((err.problemCode === 404.8) && attemptCreate) {
maybeEntity = await Entities._createEntity(dataset, entityData, submissionId, submissionDef, submissionDefId, event, parentEvent);
maybeEntity = await Entities._createEntity(dataset, entityData, submissionId, submissionDef, submissionDefId, event, parentEvent, forceOutOfOrderProcessing);
} else {
throw (err);
}
}
else if (entityData.system.create === '1' || entityData.system.create === 'true')
maybeEntity = await Entities._createEntity(dataset, entityData, submissionId, submissionDef, submissionDefId, event, parentEvent);
maybeEntity = await Entities._createEntity(dataset, entityData, submissionId, submissionDef, submissionDefId, event, parentEvent, forceOutOfOrderProcessing);

// Check for held submissions that follow this one in the same branch
if (maybeEntity != null) {
Expand All @@ -434,8 +444,7 @@ const _processSubmissionEvent = (event, parentEvent) => async ({ Audits, Dataset
const currentBranchBaseVersion = branchBaseVersion ?? 0;
const nextSub = await Entities._getNextHeldSubmissionInBranch(entityUuid, branchId, currentBranchBaseVersion + 1);

// TODO: don't handle the next submission if the current one was processed forcefully
if (nextSub.isDefined()) {
if (nextSub.isDefined() && !forceOutOfOrderProcessing) {
const { submissionId: nextSubmissionId, submissionDefId: nextSubmissionDefId, auditId } = nextSub.get();
await Entities._deleteHeldSubmissionByEventId(auditId);
await Audits.log({ id: event.actorId }, 'submission.reprocess', { acteeId: event.acteeId },
Expand Down Expand Up @@ -496,6 +505,29 @@ const _deleteHeldSubmissionByEventId = (eventId) => ({ run }) => run(sql`
WHERE "auditId"=${eventId}`);


////////////////////////////////////////////////////////////////////////////////
// FORCE PROCESSING SUBMISSIONS FROM BACKLOG

const DAY_RANGE = config.has('default.taskSchedule.forceProcess')
? config.get('default.taskSchedule.forceProcess')
: 7; // Default is 7 days

const _getHeldSubmissionsAsEvents = (force) => ({ all }) => all(sql`
SELECT audits.* FROM entity_submission_backlog
JOIN audits on entity_submission_backlog."auditId" = audits.id
${force ? sql`` : sql`WHERE entity_submission_backlog."loggedAt" < current_date - cast(${DAY_RANGE} as int)`}
ORDER BY "branchId", "branchBaseVersion"`)
.then(map(construct(Audit)));

const processHeldSubmissions = (force = false) => async (container) => {
const events = await container.Entities._getHeldSubmissionsAsEvents(force);
return runSequentially(events.map(event =>
async () => {
await container.Entities._deleteHeldSubmissionByEventId(event.id);
return processSubmissionEvent(event, { details: { force: true } })(container);
}));
};

////////////////////////////////////////////////////////////////////////////////
// PROCESSING PENDING SUBMISSIONS FROM TOGGLING DATASET APPROVALREQUIRED FLAG

Expand Down Expand Up @@ -680,6 +712,7 @@ module.exports = {
_computeBaseVersion,
_holdSubmission, _checkHeldSubmission,
_getNextHeldSubmissionInBranch, _deleteHeldSubmissionByEventId,
_getHeldSubmissionsAsEvents, processHeldSubmissions,
processSubmissionEvent, streamForExport,
getDefBySubmissionId,
createVersion,
Expand Down
16 changes: 16 additions & 0 deletions lib/task/process-held-submissions.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright 2019 ODK Central Developers
// See the NOTICE file at the top-level directory of this distribution and at
// https://github.com/getodk/central-backend/blob/master/NOTICE.
// This file is part of ODK Central. It is subject to the license terms in
// the LICENSE file found in the top-level directory of this distribution and at
// https://www.apache.org/licenses/LICENSE-2.0. No part of ODK Central,
// including this file, may be copied, modified, propagated, or distributed
// except according to the terms contained in the LICENSE file.
//
// This task deletes expired sessions from the table so it does not become
// overladen and bogged down over time.

const { task } = require('./task');
const processHeldSubmissions = task.withContainer(({ Entities }) => Entities.processHeldSubmissions);
module.exports = { processHeldSubmissions };

Loading

0 comments on commit 6c0404d

Please sign in to comment.