Skip to content

Commit

Permalink
Force-process entity submissions held in backlog (#1172)
Browse files Browse the repository at this point in the history
* Force-process entity submissions held in backlog

* Make task actually run

* Update formatting of force processing backlog

* Rename task to reference backlog

* Adding tests for force processing backlog for deleted entity or submission

* Dont force process a sub for a deleted entity

* Add transaction to backlog processor task

* pass baseVersion through in force create

* more empty label cases for update as create
  • Loading branch information
ktuite authored Aug 26, 2024
1 parent b4a8e6c commit fb78290
Show file tree
Hide file tree
Showing 5 changed files with 598 additions and 21 deletions.
23 changes: 23 additions & 0 deletions lib/bin/process-backlog.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright 2024 ODK Central Developers
// See the NOTICE file at the top-level directory of this distribution and at
// https://github.com/getodk/central-backend/blob/master/NOTICE.
// This file is part of ODK Central. It is subject to the license terms in
// the LICENSE file found in the top-level directory of this distribution and at
// https://www.apache.org/licenses/LICENSE-2.0. No part of ODK Central,
// including this file, may be copied, modified, propagated, or distributed
// except according to the terms contained in the LICENSE file.
//
// This script re-processes submissions containing offline entity actions that
// were previously held in a backlog due to submissions coming in out of order.

const { run } = require('../task/task');
const { processBacklog } = require('../task/process-backlog');

const { program } = require('commander');
program.option('-f, --force', 'Force all submissions in the backlog to be processed immediately.');
program.parse();

const options = program.opts();

run(processBacklog(options.force)
.then((count) => `Submissions processed: ${count}`));
102 changes: 81 additions & 21 deletions lib/model/query/entities.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
// including this file, may be copied, modified, propagated, or distributed
// except according to the terms contained in the LICENSE file.

const config = require('config');
const { sql } = require('slonik');
const { Actor, Entity, Submission, Form } = require('../frames');
const { Actor, Audit, Entity, Submission, Form } = require('../frames');
const { equals, extender, unjoiner, page, markDeleted, insertMany } = require('../../util/db');
const { map, mergeRight, pickAll } = require('ramda');
const { blankStringToNull, construct } = require('../../util/util');
Expand All @@ -17,7 +18,7 @@ const { odataFilter, odataOrderBy } = require('../../data/odata-filter');
const { odataToColumnMap, parseSubmissionXml, getDiffProp, ConflictType } = require('../../data/entity');
const { isTrue } = require('../../util/http');
const Problem = require('../../util/problem');
const { runSequentially } = require('../../util/promise');
const { getOrReject, runSequentially } = require('../../util/promise');


/////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -171,21 +172,31 @@ createVersion.audit = (updatedEntity, dataset, partial, subDef) => (log) => {
entityDefId: updatedEntity.aux.currentVersion.id,
entity: { uuid: updatedEntity.uuid, dataset: dataset.name }
});
return Promise.resolve();
};
createVersion.audit.withResult = true;

////////////////////////////////////////////////////////////////////////////////
// WRAPPER FUNCTIONS FOR CREATING AND UPDATING ENTITIES

const _createEntity = (dataset, entityData, submissionId, submissionDef, submissionDefId, event, parentEvent) => async ({ Audits, Entities }) => {
const _createEntity = (dataset, entityData, submissionId, submissionDef, submissionDefId, event, parentEvent, forceOutOfOrderProcessing = false) => async ({ Audits, Entities }) => {
// If dataset requires approval on submission to create an entity and this event is not
// an approval event, then don't create an entity
if ((dataset.approvalRequired && event.details.reviewState !== 'approved') ||
(!dataset.approvalRequired && event.action === 'submission.update')) // don't process submission if approval is not required and submission metadata is updated
return null;

// TODO: auto-generate a label if forced and if the submission doesn't provide one
const partial = await Entity.fromParseEntityData(entityData, { create: true });
// Auto-generate a label if forced and if the submission doesn't provide one
if (forceOutOfOrderProcessing && (entityData.system.label == null || entityData.system.label.trim() === '')) {
// eslint-disable-next-line no-param-reassign
entityData.system.label = 'auto generated';
}

// Add the branchBaseVersion to the partial if we are forcing the create and it has one
const _partial = await Entity.fromParseEntityData(entityData, { create: true });
const partial = (forceOutOfOrderProcessing)
? _partial.auxWith('def', { branchBaseVersion: _partial.def.baseVersion })
: _partial;

const sourceDetails = { submission: { instanceId: submissionDef.instanceId }, parentEventId: parentEvent ? parentEvent.id : undefined };
const sourceId = await Entities.createSource(sourceDetails, submissionDefId, event.id);
Expand All @@ -202,7 +213,7 @@ const _createEntity = (dataset, entityData, submissionId, submissionDef, submiss
return entity;
};

const _updateEntity = (dataset, entityData, submissionId, submissionDef, submissionDefId, event) => async ({ Audits, Entities }) => {
const _updateEntity = (dataset, entityData, submissionId, submissionDef, submissionDefId, event, forceOutOfOrderProcessing = false) => async ({ Audits, Entities }) => {
if (!(event.action === 'submission.create'
|| event.action === 'submission.update.version'
|| event.action === 'submission.reprocess'))
Expand All @@ -213,7 +224,21 @@ const _updateEntity = (dataset, entityData, submissionId, submissionDef, submiss

// Figure out the intended baseVersion
// If this is an offline update with a branchId, the baseVersion value is local to that offline context.
const baseEntityDef = await Entities._computeBaseVersion(event.id, dataset, clientEntity, submissionDef);
let baseEntityDef;

// Try computing base version.
// But if there is a 404.8 not found error, double-check if the entity never existed or was deleted.
try {
baseEntityDef = await Entities._computeBaseVersion(event.id, dataset, clientEntity, submissionDef, forceOutOfOrderProcessing);
} catch (err) {
if (err.problemCode === 404.8) {
// Look up deleted entity by passing deleted as option argData
const deletedEntity = await Entities.getById(dataset.id, clientEntity.uuid, new QueryOptions({ argData: { deleted: 'true' } }));
if (deletedEntity.isDefined())
throw Problem.user.entityDeleted({ entityUuid: clientEntity.uuid });
}
throw err;
}

// If baseEntityVersion is null, we held a submission and will stop processing now.
if (baseEntityDef == null)
Expand Down Expand Up @@ -294,7 +319,7 @@ const _updateEntity = (dataset, entityData, submissionId, submissionDef, submiss

// Used by _updateVerison to figure out the intended base version in Central
// based on the branchId, trunkVersion, and baseVersion in the submission
const _computeBaseVersion = (eventId, dataset, clientEntity, submissionDef) => async ({ Entities }) => {
const _computeBaseVersion = (eventId, dataset, clientEntity, submissionDef, forceOutOfOrderProcessing = false) => async ({ Entities }) => {
if (!clientEntity.def.branchId) {

// no offline branching to deal with, use baseVersion as is
Expand Down Expand Up @@ -327,10 +352,17 @@ const _computeBaseVersion = (eventId, dataset, clientEntity, submissionDef) => a
const baseEntityVersion = await Entities.getDef(dataset.id, clientEntity.uuid, new QueryOptions({ condition }));

if (!baseEntityVersion.isDefined()) {
// TODO: add case for force-processing
// If there is no base version and we are not forcing the processing, hold submission in the backlog.
await Entities._holdSubmission(eventId, submissionDef.submissionId, submissionDef.id, clientEntity.uuid, clientEntity.def.branchId, clientEntity.def.baseVersion);
return null;
if (forceOutOfOrderProcessing) {
// If the base version doesn't exist but we forcing the update anyway, use the latest version on the server as the base.
// If that can't be found, throw an error for _processSubmissionEvent to catch so it can try create instead of update.
const latestEntity = await Entities.getById(dataset.id, clientEntity.uuid)
.then(getOrReject(Problem.user.entityNotFound({ entityUuid: clientEntity.uuid, datasetName: dataset.name })));
return latestEntity.aux.currentVersion;
} else {
// If there is no base version and we are not forcing the processing, hold submission in the backlog.
await Entities._holdSubmission(eventId, submissionDef.submissionId, submissionDef.id, clientEntity.uuid, clientEntity.def.branchId, clientEntity.def.baseVersion);
return null;
}
}

// Return the base entity version
Expand All @@ -351,7 +383,7 @@ const _getFormDefActions = (oneFirst, datasetId, formDefId) => oneFirst(sql`
// so any errors can be rolled back and logged as an entity processing error.
const _processSubmissionEvent = (event, parentEvent) => async ({ Audits, Datasets, Entities, Submissions, Forms, oneFirst }) => {
const { submissionId, submissionDefId } = event.details;
// TODO: check parentEvent details to determine if this is a forced reprocessing or not
const forceOutOfOrderProcessing = parentEvent?.details?.force === true;

const form = await Forms.getByActeeId(event.acteeId);
// If form is deleted/purged then submission won't be there either.
Expand All @@ -369,7 +401,6 @@ const _processSubmissionEvent = (event, parentEvent) => async ({ Audits, Dataset
// don't try to process it now, it will be dequeued and reprocessed elsewhere.
if (existingHeldSubmission.isDefined())
return null;
// TODO: check how force-reprocessing interacts with this logic above

const submission = await Submissions.getSubAndDefById(submissionDefId);

Expand Down Expand Up @@ -409,22 +440,21 @@ const _processSubmissionEvent = (event, parentEvent) => async ({ Audits, Dataset
throw Problem.user.entityActionNotPermitted({ action, permitted: permittedActions });
}

// TODO: work out how force-reprocessing interacts with this logic (updateEntity and createEntity should know about it)
let maybeEntity = null;
// Try update before create (if both are specified)
if (entityData.system.update === '1' || entityData.system.update === 'true')
try {
maybeEntity = await Entities._updateEntity(dataset, entityData, submissionId, submissionDef, submissionDefId, event);
maybeEntity = await Entities._updateEntity(dataset, entityData, submissionId, submissionDef, submissionDefId, event, forceOutOfOrderProcessing);
} catch (err) {
const attemptCreate = (entityData.system.create === '1' || entityData.system.create === 'true');
const attemptCreate = (entityData.system.create === '1' || entityData.system.create === 'true') || forceOutOfOrderProcessing;
if ((err.problemCode === 404.8) && attemptCreate) {
maybeEntity = await Entities._createEntity(dataset, entityData, submissionId, submissionDef, submissionDefId, event, parentEvent);
maybeEntity = await Entities._createEntity(dataset, entityData, submissionId, submissionDef, submissionDefId, event, parentEvent, forceOutOfOrderProcessing);
} else {
throw (err);
}
}
else if (entityData.system.create === '1' || entityData.system.create === 'true')
maybeEntity = await Entities._createEntity(dataset, entityData, submissionId, submissionDef, submissionDefId, event, parentEvent);
maybeEntity = await Entities._createEntity(dataset, entityData, submissionId, submissionDef, submissionDefId, event, parentEvent, forceOutOfOrderProcessing);

// Check for held submissions that follow this one in the same branch
if (maybeEntity != null) {
Expand All @@ -434,8 +464,7 @@ const _processSubmissionEvent = (event, parentEvent) => async ({ Audits, Dataset
const currentBranchBaseVersion = branchBaseVersion ?? 0;
const nextSub = await Entities._getNextHeldSubmissionInBranch(entityUuid, branchId, currentBranchBaseVersion + 1);

// TODO: don't handle the next submission if the current one was processed forcefully
if (nextSub.isDefined()) {
if (nextSub.isDefined() && !forceOutOfOrderProcessing) {
const { submissionId: nextSubmissionId, submissionDefId: nextSubmissionDefId, auditId } = nextSub.get();
await Entities._deleteHeldSubmissionByEventId(auditId);
await Audits.log({ id: event.actorId }, 'submission.reprocess', { acteeId: event.acteeId },
Expand Down Expand Up @@ -496,6 +525,35 @@ const _deleteHeldSubmissionByEventId = (eventId) => ({ run }) => run(sql`
WHERE "auditId"=${eventId}`);


////////////////////////////////////////////////////////////////////////////////
// FORCE PROCESSING SUBMISSIONS FROM BACKLOG

const DAY_RANGE = config.has('default.taskSchedule.forceProcess')
? config.get('default.taskSchedule.forceProcess')
: 7; // Default is 7 days

const _getHeldSubmissionsAsEvents = (force) => ({ all }) => all(sql`
SELECT audits.* FROM entity_submission_backlog
JOIN audits on entity_submission_backlog."auditId" = audits.id
${force ? sql`` : sql`WHERE entity_submission_backlog."loggedAt" < current_date - cast(${DAY_RANGE} as int)`}
ORDER BY "branchId", "branchBaseVersion"`)
.then(map(construct(Audit)));

const _processSingleBacklogEvent = (event) => (container) =>
container.db.transaction(async (trxn) => {
const { Entities } = container.with({ db: trxn });
await Entities._deleteHeldSubmissionByEventId(event.id);
await Entities.processSubmissionEvent(event, { details: { force: true } });
return true;
});

const processBacklog = (force = false) => async (container) => {
const events = await container.Entities._getHeldSubmissionsAsEvents(force);
await runSequentially(events.map(event =>
() => container.Entities._processSingleBacklogEvent(event)));
return events.length;
};

////////////////////////////////////////////////////////////////////////////////
// PROCESSING PENDING SUBMISSIONS FROM TOGGLING DATASET APPROVALREQUIRED FLAG

Expand Down Expand Up @@ -680,6 +738,8 @@ module.exports = {
_computeBaseVersion,
_holdSubmission, _checkHeldSubmission,
_getNextHeldSubmissionInBranch, _deleteHeldSubmissionByEventId,
_getHeldSubmissionsAsEvents,
processBacklog, _processSingleBacklogEvent,
processSubmissionEvent, streamForExport,
getDefBySubmissionId,
createVersion,
Expand Down
13 changes: 13 additions & 0 deletions lib/task/process-backlog.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright 2024 ODK Central Developers
// See the NOTICE file at the top-level directory of this distribution and at
// https://github.com/getodk/central-backend/blob/master/NOTICE.
// This file is part of ODK Central. It is subject to the license terms in
// the LICENSE file found in the top-level directory of this distribution and at
// https://www.apache.org/licenses/LICENSE-2.0. No part of ODK Central,
// including this file, may be copied, modified, propagated, or distributed
// except according to the terms contained in the LICENSE file.

const { task } = require('./task');
const processBacklog = task.withContainer(({ Entities }) => Entities.processBacklog);
module.exports = { processBacklog };

3 changes: 3 additions & 0 deletions lib/util/problem.js
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ const problems = {
// entity base version specified in submission does not exist
entityVersionNotFound: problem(404.9, ({ baseVersion, entityUuid, datasetName }) => `Base version (${baseVersion}) does not exist for entity UUID (${entityUuid}) in dataset (${datasetName}).`),

// entity has been deleted
entityDeleted: problem(404.11, ({ entityUuid }) => `The entity with UUID (${entityUuid}) has been deleted.`),

// { allowed: [ acceptable formats ], got }
unacceptableFormat: problem(406.1, ({ allowed }) => `Requested format not acceptable; this resource allows: ${allowed.join()}.`),

Expand Down
Loading

0 comments on commit fb78290

Please sign in to comment.