From fb78290a5819a65cae30056dc56177b4889444ed Mon Sep 17 00:00:00 2001 From: Kathleen Tuite Date: Mon, 26 Aug 2024 16:01:25 -0700 Subject: [PATCH] Force-process entity submissions held in backlog (#1172) * Force-process entity submissions held in backlog * Make task actually run * Update formatting of force processing backlog * Rename task to reference backlog * Adding tests for force processing backlog for deleted entity or submission * Dont force process a sub for a deleted entity * Add transaction to backlog processor task * pass baseVersion through in force create * more empty label cases for update as create --- lib/bin/process-backlog.js | 23 ++ lib/model/query/entities.js | 102 ++++- lib/task/process-backlog.js | 13 + lib/util/problem.js | 3 + test/integration/api/offline-entities.js | 478 +++++++++++++++++++++++ 5 files changed, 598 insertions(+), 21 deletions(-) create mode 100644 lib/bin/process-backlog.js create mode 100644 lib/task/process-backlog.js diff --git a/lib/bin/process-backlog.js b/lib/bin/process-backlog.js new file mode 100644 index 000000000..be84f9f91 --- /dev/null +++ b/lib/bin/process-backlog.js @@ -0,0 +1,23 @@ +// Copyright 2024 ODK Central Developers +// See the NOTICE file at the top-level directory of this distribution and at +// https://github.com/getodk/central-backend/blob/master/NOTICE. +// This file is part of ODK Central. It is subject to the license terms in +// the LICENSE file found in the top-level directory of this distribution and at +// https://www.apache.org/licenses/LICENSE-2.0. No part of ODK Central, +// including this file, may be copied, modified, propagated, or distributed +// except according to the terms contained in the LICENSE file. +// +// This script re-processes submissions containing offline entity actions that +// were previously held in a backlog due to submissions coming in out of order. + +const { run } = require('../task/task'); +const { processBacklog } = require('../task/process-backlog'); + +const { program } = require('commander'); +program.option('-f, --force', 'Force all submissions in the backlog to be processed immediately.'); +program.parse(); + +const options = program.opts(); + +run(processBacklog(options.force) + .then((count) => `Submissions processed: ${count}`)); diff --git a/lib/model/query/entities.js b/lib/model/query/entities.js index 8b2b248fb..8b7de267f 100644 --- a/lib/model/query/entities.js +++ b/lib/model/query/entities.js @@ -7,8 +7,9 @@ // including this file, may be copied, modified, propagated, or distributed // except according to the terms contained in the LICENSE file. +const config = require('config'); const { sql } = require('slonik'); -const { Actor, Entity, Submission, Form } = require('../frames'); +const { Actor, Audit, Entity, Submission, Form } = require('../frames'); const { equals, extender, unjoiner, page, markDeleted, insertMany } = require('../../util/db'); const { map, mergeRight, pickAll } = require('ramda'); const { blankStringToNull, construct } = require('../../util/util'); @@ -17,7 +18,7 @@ const { odataFilter, odataOrderBy } = require('../../data/odata-filter'); const { odataToColumnMap, parseSubmissionXml, getDiffProp, ConflictType } = require('../../data/entity'); const { isTrue } = require('../../util/http'); const Problem = require('../../util/problem'); -const { runSequentially } = require('../../util/promise'); +const { getOrReject, runSequentially } = require('../../util/promise'); ///////////////////////////////////////////////////////////////////////////////// @@ -171,21 +172,31 @@ createVersion.audit = (updatedEntity, dataset, partial, subDef) => (log) => { entityDefId: updatedEntity.aux.currentVersion.id, entity: { uuid: updatedEntity.uuid, dataset: dataset.name } }); + return Promise.resolve(); }; createVersion.audit.withResult = true; //////////////////////////////////////////////////////////////////////////////// // WRAPPER FUNCTIONS FOR CREATING AND UPDATING ENTITIES -const _createEntity = (dataset, entityData, submissionId, submissionDef, submissionDefId, event, parentEvent) => async ({ Audits, Entities }) => { +const _createEntity = (dataset, entityData, submissionId, submissionDef, submissionDefId, event, parentEvent, forceOutOfOrderProcessing = false) => async ({ Audits, Entities }) => { // If dataset requires approval on submission to create an entity and this event is not // an approval event, then don't create an entity if ((dataset.approvalRequired && event.details.reviewState !== 'approved') || (!dataset.approvalRequired && event.action === 'submission.update')) // don't process submission if approval is not required and submission metadata is updated return null; - // TODO: auto-generate a label if forced and if the submission doesn't provide one - const partial = await Entity.fromParseEntityData(entityData, { create: true }); + // Auto-generate a label if forced and if the submission doesn't provide one + if (forceOutOfOrderProcessing && (entityData.system.label == null || entityData.system.label.trim() === '')) { + // eslint-disable-next-line no-param-reassign + entityData.system.label = 'auto generated'; + } + + // Add the branchBaseVersion to the partial if we are forcing the create and it has one + const _partial = await Entity.fromParseEntityData(entityData, { create: true }); + const partial = (forceOutOfOrderProcessing) + ? _partial.auxWith('def', { branchBaseVersion: _partial.def.baseVersion }) + : _partial; const sourceDetails = { submission: { instanceId: submissionDef.instanceId }, parentEventId: parentEvent ? parentEvent.id : undefined }; const sourceId = await Entities.createSource(sourceDetails, submissionDefId, event.id); @@ -202,7 +213,7 @@ const _createEntity = (dataset, entityData, submissionId, submissionDef, submiss return entity; }; -const _updateEntity = (dataset, entityData, submissionId, submissionDef, submissionDefId, event) => async ({ Audits, Entities }) => { +const _updateEntity = (dataset, entityData, submissionId, submissionDef, submissionDefId, event, forceOutOfOrderProcessing = false) => async ({ Audits, Entities }) => { if (!(event.action === 'submission.create' || event.action === 'submission.update.version' || event.action === 'submission.reprocess')) @@ -213,7 +224,21 @@ const _updateEntity = (dataset, entityData, submissionId, submissionDef, submiss // Figure out the intended baseVersion // If this is an offline update with a branchId, the baseVersion value is local to that offline context. - const baseEntityDef = await Entities._computeBaseVersion(event.id, dataset, clientEntity, submissionDef); + let baseEntityDef; + + // Try computing base version. + // But if there is a 404.8 not found error, double-check if the entity never existed or was deleted. + try { + baseEntityDef = await Entities._computeBaseVersion(event.id, dataset, clientEntity, submissionDef, forceOutOfOrderProcessing); + } catch (err) { + if (err.problemCode === 404.8) { + // Look up deleted entity by passing deleted as option argData + const deletedEntity = await Entities.getById(dataset.id, clientEntity.uuid, new QueryOptions({ argData: { deleted: 'true' } })); + if (deletedEntity.isDefined()) + throw Problem.user.entityDeleted({ entityUuid: clientEntity.uuid }); + } + throw err; + } // If baseEntityVersion is null, we held a submission and will stop processing now. if (baseEntityDef == null) @@ -294,7 +319,7 @@ const _updateEntity = (dataset, entityData, submissionId, submissionDef, submiss // Used by _updateVerison to figure out the intended base version in Central // based on the branchId, trunkVersion, and baseVersion in the submission -const _computeBaseVersion = (eventId, dataset, clientEntity, submissionDef) => async ({ Entities }) => { +const _computeBaseVersion = (eventId, dataset, clientEntity, submissionDef, forceOutOfOrderProcessing = false) => async ({ Entities }) => { if (!clientEntity.def.branchId) { // no offline branching to deal with, use baseVersion as is @@ -327,10 +352,17 @@ const _computeBaseVersion = (eventId, dataset, clientEntity, submissionDef) => a const baseEntityVersion = await Entities.getDef(dataset.id, clientEntity.uuid, new QueryOptions({ condition })); if (!baseEntityVersion.isDefined()) { - // TODO: add case for force-processing - // If there is no base version and we are not forcing the processing, hold submission in the backlog. - await Entities._holdSubmission(eventId, submissionDef.submissionId, submissionDef.id, clientEntity.uuid, clientEntity.def.branchId, clientEntity.def.baseVersion); - return null; + if (forceOutOfOrderProcessing) { + // If the base version doesn't exist but we forcing the update anyway, use the latest version on the server as the base. + // If that can't be found, throw an error for _processSubmissionEvent to catch so it can try create instead of update. + const latestEntity = await Entities.getById(dataset.id, clientEntity.uuid) + .then(getOrReject(Problem.user.entityNotFound({ entityUuid: clientEntity.uuid, datasetName: dataset.name }))); + return latestEntity.aux.currentVersion; + } else { + // If there is no base version and we are not forcing the processing, hold submission in the backlog. + await Entities._holdSubmission(eventId, submissionDef.submissionId, submissionDef.id, clientEntity.uuid, clientEntity.def.branchId, clientEntity.def.baseVersion); + return null; + } } // Return the base entity version @@ -351,7 +383,7 @@ const _getFormDefActions = (oneFirst, datasetId, formDefId) => oneFirst(sql` // so any errors can be rolled back and logged as an entity processing error. const _processSubmissionEvent = (event, parentEvent) => async ({ Audits, Datasets, Entities, Submissions, Forms, oneFirst }) => { const { submissionId, submissionDefId } = event.details; - // TODO: check parentEvent details to determine if this is a forced reprocessing or not + const forceOutOfOrderProcessing = parentEvent?.details?.force === true; const form = await Forms.getByActeeId(event.acteeId); // If form is deleted/purged then submission won't be there either. @@ -369,7 +401,6 @@ const _processSubmissionEvent = (event, parentEvent) => async ({ Audits, Dataset // don't try to process it now, it will be dequeued and reprocessed elsewhere. if (existingHeldSubmission.isDefined()) return null; - // TODO: check how force-reprocessing interacts with this logic above const submission = await Submissions.getSubAndDefById(submissionDefId); @@ -409,22 +440,21 @@ const _processSubmissionEvent = (event, parentEvent) => async ({ Audits, Dataset throw Problem.user.entityActionNotPermitted({ action, permitted: permittedActions }); } - // TODO: work out how force-reprocessing interacts with this logic (updateEntity and createEntity should know about it) let maybeEntity = null; // Try update before create (if both are specified) if (entityData.system.update === '1' || entityData.system.update === 'true') try { - maybeEntity = await Entities._updateEntity(dataset, entityData, submissionId, submissionDef, submissionDefId, event); + maybeEntity = await Entities._updateEntity(dataset, entityData, submissionId, submissionDef, submissionDefId, event, forceOutOfOrderProcessing); } catch (err) { - const attemptCreate = (entityData.system.create === '1' || entityData.system.create === 'true'); + const attemptCreate = (entityData.system.create === '1' || entityData.system.create === 'true') || forceOutOfOrderProcessing; if ((err.problemCode === 404.8) && attemptCreate) { - maybeEntity = await Entities._createEntity(dataset, entityData, submissionId, submissionDef, submissionDefId, event, parentEvent); + maybeEntity = await Entities._createEntity(dataset, entityData, submissionId, submissionDef, submissionDefId, event, parentEvent, forceOutOfOrderProcessing); } else { throw (err); } } else if (entityData.system.create === '1' || entityData.system.create === 'true') - maybeEntity = await Entities._createEntity(dataset, entityData, submissionId, submissionDef, submissionDefId, event, parentEvent); + maybeEntity = await Entities._createEntity(dataset, entityData, submissionId, submissionDef, submissionDefId, event, parentEvent, forceOutOfOrderProcessing); // Check for held submissions that follow this one in the same branch if (maybeEntity != null) { @@ -434,8 +464,7 @@ const _processSubmissionEvent = (event, parentEvent) => async ({ Audits, Dataset const currentBranchBaseVersion = branchBaseVersion ?? 0; const nextSub = await Entities._getNextHeldSubmissionInBranch(entityUuid, branchId, currentBranchBaseVersion + 1); - // TODO: don't handle the next submission if the current one was processed forcefully - if (nextSub.isDefined()) { + if (nextSub.isDefined() && !forceOutOfOrderProcessing) { const { submissionId: nextSubmissionId, submissionDefId: nextSubmissionDefId, auditId } = nextSub.get(); await Entities._deleteHeldSubmissionByEventId(auditId); await Audits.log({ id: event.actorId }, 'submission.reprocess', { acteeId: event.acteeId }, @@ -496,6 +525,35 @@ const _deleteHeldSubmissionByEventId = (eventId) => ({ run }) => run(sql` WHERE "auditId"=${eventId}`); +//////////////////////////////////////////////////////////////////////////////// +// FORCE PROCESSING SUBMISSIONS FROM BACKLOG + +const DAY_RANGE = config.has('default.taskSchedule.forceProcess') + ? config.get('default.taskSchedule.forceProcess') + : 7; // Default is 7 days + +const _getHeldSubmissionsAsEvents = (force) => ({ all }) => all(sql` + SELECT audits.* FROM entity_submission_backlog + JOIN audits on entity_submission_backlog."auditId" = audits.id + ${force ? sql`` : sql`WHERE entity_submission_backlog."loggedAt" < current_date - cast(${DAY_RANGE} as int)`} + ORDER BY "branchId", "branchBaseVersion"`) + .then(map(construct(Audit))); + +const _processSingleBacklogEvent = (event) => (container) => + container.db.transaction(async (trxn) => { + const { Entities } = container.with({ db: trxn }); + await Entities._deleteHeldSubmissionByEventId(event.id); + await Entities.processSubmissionEvent(event, { details: { force: true } }); + return true; + }); + +const processBacklog = (force = false) => async (container) => { + const events = await container.Entities._getHeldSubmissionsAsEvents(force); + await runSequentially(events.map(event => + () => container.Entities._processSingleBacklogEvent(event))); + return events.length; +}; + //////////////////////////////////////////////////////////////////////////////// // PROCESSING PENDING SUBMISSIONS FROM TOGGLING DATASET APPROVALREQUIRED FLAG @@ -680,6 +738,8 @@ module.exports = { _computeBaseVersion, _holdSubmission, _checkHeldSubmission, _getNextHeldSubmissionInBranch, _deleteHeldSubmissionByEventId, + _getHeldSubmissionsAsEvents, + processBacklog, _processSingleBacklogEvent, processSubmissionEvent, streamForExport, getDefBySubmissionId, createVersion, diff --git a/lib/task/process-backlog.js b/lib/task/process-backlog.js new file mode 100644 index 000000000..1a8ba5bd7 --- /dev/null +++ b/lib/task/process-backlog.js @@ -0,0 +1,13 @@ +// Copyright 2024 ODK Central Developers +// See the NOTICE file at the top-level directory of this distribution and at +// https://github.com/getodk/central-backend/blob/master/NOTICE. +// This file is part of ODK Central. It is subject to the license terms in +// the LICENSE file found in the top-level directory of this distribution and at +// https://www.apache.org/licenses/LICENSE-2.0. No part of ODK Central, +// including this file, may be copied, modified, propagated, or distributed +// except according to the terms contained in the LICENSE file. + +const { task } = require('./task'); +const processBacklog = task.withContainer(({ Entities }) => Entities.processBacklog); +module.exports = { processBacklog }; + diff --git a/lib/util/problem.js b/lib/util/problem.js index 76fde7b55..c5c99c61c 100644 --- a/lib/util/problem.js +++ b/lib/util/problem.js @@ -166,6 +166,9 @@ const problems = { // entity base version specified in submission does not exist entityVersionNotFound: problem(404.9, ({ baseVersion, entityUuid, datasetName }) => `Base version (${baseVersion}) does not exist for entity UUID (${entityUuid}) in dataset (${datasetName}).`), + // entity has been deleted + entityDeleted: problem(404.11, ({ entityUuid }) => `The entity with UUID (${entityUuid}) has been deleted.`), + // { allowed: [ acceptable formats ], got } unacceptableFormat: problem(406.1, ({ allowed }) => `Requested format not acceptable; this resource allows: ${allowed.join()}.`), diff --git a/test/integration/api/offline-entities.js b/test/integration/api/offline-entities.js index 4a4527798..8c00002e1 100644 --- a/test/integration/api/offline-entities.js +++ b/test/integration/api/offline-entities.js @@ -780,4 +780,482 @@ describe('Offline Entities', () => { count.should.equal(0); })); }); + + describe('force-processing held submissions', () => { + it('should apply an entity update when the previous update is missing', testOfflineEntities(async (service, container) => { + const asAlice = await service.login('alice'); + const branchId = uuid(); + + // Trunk version is 1, but base version is 2 + await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions') + .send(testData.instances.offlineEntity.one + .replace('branchId=""', `branchId="${branchId}"`) + .replace('baseVersion="1"', 'baseVersion="2"') + ) + .set('Content-Type', 'application/xml') + .expect(200); + + await exhaust(container); + + let backlogCount = await container.oneFirst(sql`select count(*) from entity_submission_backlog`); + backlogCount.should.equal(1); + + await container.Entities.processBacklog(true); + + await asAlice.get('/v1/projects/1/datasets/people/entities/12345678-1234-4123-8234-123456789abc') + .expect(200) + .then(({ body }) => { + body.currentVersion.version.should.equal(2); + body.currentVersion.baseVersion.should.equal(1); + body.currentVersion.data.should.eql({ age: '22', status: 'arrived', first_name: 'Johnny' }); + + body.currentVersion.branchId.should.equal(branchId); + body.currentVersion.trunkVersion.should.equal(1); + body.currentVersion.branchBaseVersion.should.equal(2); + }); + + backlogCount = await container.oneFirst(sql`select count(*) from entity_submission_backlog`); + backlogCount.should.equal(0); + })); + + it('should apply two updates when first upate is missing', testOfflineEntities(async (service, container) => { + const asAlice = await service.login('alice'); + const branchId = uuid(); + + // Trunk version is 1, but base version is 2 + await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions') + .send(testData.instances.offlineEntity.one + .replace('branchId=""', `branchId="${branchId}"`) + .replace('baseVersion="1"', 'baseVersion="2"') + ) + .set('Content-Type', 'application/xml') + .expect(200); + + await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions') + .send(testData.instances.offlineEntity.one + .replace('branchId=""', `branchId="${branchId}"`) + .replace('one', 'one-update2') + .replace('baseVersion="1"', 'baseVersion="3"') + .replace('arrived', 'departed') + ) + .set('Content-Type', 'application/xml') + .expect(200); + + await exhaust(container); + + let backlogCount = await container.oneFirst(sql`select count(*) from entity_submission_backlog`); + backlogCount.should.equal(2); + + await container.Entities.processBacklog(true); + + await asAlice.get('/v1/projects/1/datasets/people/entities/12345678-1234-4123-8234-123456789abc') + .expect(200) + .then(({ body }) => { + body.currentVersion.version.should.equal(3); + body.currentVersion.baseVersion.should.equal(2); + body.currentVersion.data.should.eql({ age: '22', status: 'departed', first_name: 'Johnny' }); + + body.currentVersion.branchId.should.equal(branchId); + body.currentVersion.trunkVersion.should.equal(1); + body.currentVersion.branchBaseVersion.should.equal(3); + }); + + backlogCount = await container.oneFirst(sql`select count(*) from entity_submission_backlog`); + backlogCount.should.equal(0); + })); + + it('should apply an entity update as a create', testOfflineEntities(async (service, container) => { + const asAlice = await service.login('alice'); + const branchId = uuid(); + const newUuid = uuid(); + + // Base version is 1 but it doesnt exist + // trunk version doesnt make sense to exist here either + await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions') + .send(testData.instances.offlineEntity.one + .replace('id="12345678-1234-4123-8234-123456789abc"', `id="${newUuid}"`) + .replace('branchId=""', `branchId="${branchId}"`) + .replace('trunkVersion="1"', 'trunkVersion=""') + .replace('baseVersion="1"', 'baseVersion="2"') + ) + .set('Content-Type', 'application/xml') + .expect(200); + + await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions') + .send(testData.instances.offlineEntity.two + .replace('branchId=""', `branchId="${uuid()}"`) + .replace('create="1"', 'update="1"') + .replace('', '') + .replace('baseVersion=""', 'baseVersion="2"') + ) + .set('Content-Type', 'application/xml') + .expect(200); + + await exhaust(container); + + let backlogCount = await container.oneFirst(sql`select count(*) from entity_submission_backlog`); + backlogCount.should.equal(2); + + await container.Entities.processBacklog(true); + + await asAlice.get(`/v1/projects/1/datasets/people/entities/${newUuid}`) + .expect(200) + .then(({ body }) => { + body.currentVersion.version.should.equal(1); + body.currentVersion.data.should.eql({ status: 'arrived' }); + body.currentVersion.label.should.eql('auto generated'); + body.currentVersion.branchId.should.equal(branchId); + body.currentVersion.branchBaseVersion.should.equal(2); + + // This is the first version of the entity so there should be no base or trunk versions + should.not.exist(body.currentVersion.trunkVersion); + should.not.exist(body.currentVersion.baseVersion); + }); + + + await asAlice.get(`/v1/projects/1/datasets/people/entities/12345678-1234-4123-8234-123456789ddd`) + .expect(200) + .then(({ body }) => { + body.currentVersion.version.should.equal(1); + body.currentVersion.data.should.eql({ status: 'new', first_name: 'Megan', age: '20' }); + body.currentVersion.label.should.eql('auto generated'); + body.currentVersion.branchId.should.be.a.uuid(); + body.currentVersion.branchBaseVersion.should.equal(2); + + // This is the first version of the entity so there should be no base or trunk versions + should.not.exist(body.currentVersion.trunkVersion); + should.not.exist(body.currentVersion.baseVersion); + }); + + backlogCount = await container.oneFirst(sql`select count(*) from entity_submission_backlog`); + backlogCount.should.equal(0); + })); + + it('should apply an entity update as a create followed by another update', testOfflineEntities(async (service, container) => { + const asAlice = await service.login('alice'); + const branchId = uuid(); + const newUuid = uuid(); + + // Base version is 1 but it doesnt exist + // trunk version doesnt make sense to exist here either + await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions') + .send(testData.instances.offlineEntity.one + .replace('id="12345678-1234-4123-8234-123456789abc"', `id="${newUuid}"`) + .replace('branchId=""', `branchId="${branchId}"`) + .replace('trunkVersion="1"', 'trunkVersion=""') + ) + .set('Content-Type', 'application/xml') + .expect(200); + + // base version is 2 now + await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions') + .send(testData.instances.offlineEntity.one + .replace('one', 'one-update') + .replace('id="12345678-1234-4123-8234-123456789abc"', `id="${newUuid}"`) + .replace('branchId=""', `branchId="${branchId}"`) + .replace('baseVersion="1"', 'baseVersion="2"') + .replace('trunkVersion="1"', 'trunkVersion=""') + .replace('arrived', 'Danachecked in') + ) + .set('Content-Type', 'application/xml') + .expect(200); + + await exhaust(container); + + let backlogCount = await container.oneFirst(sql`select count(*) from entity_submission_backlog`); + backlogCount.should.equal(2); + + await container.Entities.processBacklog(true); + + await asAlice.get(`/v1/projects/1/datasets/people/entities/${newUuid}`) + .expect(200) + .then(({ body }) => { + body.currentVersion.version.should.equal(2); + body.currentVersion.data.should.eql({ status: 'checked in', first_name: 'Dana' }); + body.currentVersion.label.should.eql('auto generated'); + body.currentVersion.branchId.should.equal(branchId); + body.currentVersion.baseVersion.should.equal(1); + body.currentVersion.branchBaseVersion.should.equal(2); + should.not.exist(body.currentVersion.trunkVersion); + }); + + backlogCount = await container.oneFirst(sql`select count(*) from entity_submission_backlog`); + backlogCount.should.equal(0); + + // send in another update much later in the same branch + // base version is 10 now (many missing intermediate updates) + await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions') + .send(testData.instances.offlineEntity.one + .replace('one', 'one-update10') + .replace('id="12345678-1234-4123-8234-123456789abc"', `id="${newUuid}"`) + .replace('branchId=""', `branchId="${branchId}"`) + .replace('baseVersion="1"', 'baseVersion="10"') + .replace('trunkVersion="1"', 'trunkVersion=""') + .replace('arrived', 'Danaregistered') + ) + .set('Content-Type', 'application/xml') + .expect(200); + + await exhaust(container); + + backlogCount = await container.oneFirst(sql`select count(*) from entity_submission_backlog`); + backlogCount.should.equal(1); + + await container.Entities.processBacklog(true); + + await asAlice.get(`/v1/projects/1/datasets/people/entities/${newUuid}`) + .expect(200) + .then(({ body }) => { + body.currentVersion.version.should.equal(3); + body.currentVersion.data.should.eql({ status: 'registered', first_name: 'Dana' }); + body.currentVersion.label.should.eql('auto generated'); + body.currentVersion.branchId.should.equal(branchId); + body.currentVersion.baseVersion.should.equal(2); + body.currentVersion.branchBaseVersion.should.equal(10); + should.not.exist(body.currentVersion.trunkVersion); + }); + + backlogCount = await container.oneFirst(sql`select count(*) from entity_submission_backlog`); + backlogCount.should.equal(0); + })); + + it.skip('should apply an entity update as a create, and then properly handle the delayed create', testOfflineEntities(async (service, container) => { + const asAlice = await service.login('alice'); + const branchId = uuid(); + + // Send first submission, which is an update that will be applied as a create + // Removing extra fields of the submission to demonstrate a simpler update with missing fields + await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions') + .send(testData.instances.offlineEntity.two + .replace('create="1"', 'update="1"') + .replace('branchId=""', `branchId="${branchId}"`) + .replace('two', 'two-update') + .replace('baseVersion=""', 'baseVersion="1"') + .replace('new', 'checked in') + .replace('', '') + .replace('20', '') + .replace('Megan', '') + ) + .set('Content-Type', 'application/xml') + .expect(200); + + await exhaust(container); + + let backlogCount = await container.oneFirst(sql`select count(*) from entity_submission_backlog`); + backlogCount.should.equal(1); + + // Force the update submission to be processed as a create + await container.Entities.processBacklog(true); + + await asAlice.get(`/v1/projects/1/datasets/people/entities/12345678-1234-4123-8234-123456789ddd`) + .expect(200) + .then(({ body }) => { + body.currentVersion.version.should.equal(1); + body.currentVersion.data.should.eql({ status: 'checked in' }); + body.currentVersion.label.should.eql('auto generated'); + body.currentVersion.branchId.should.equal(branchId); + should.not.exist(body.currentVersion.baseVersion); + should.not.exist(body.currentVersion.branchBaseVersion); // No base version because this is a create, though maybe this should be here. + should.not.exist(body.currentVersion.trunkVersion); + }); + + backlogCount = await container.oneFirst(sql`select count(*) from entity_submission_backlog`); + backlogCount.should.equal(0); + + // First submission creates the entity, but this will be processed as an update + await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions') + .send(testData.instances.offlineEntity.two + .replace('branchId=""', `branchId="${branchId}"`) + ) + .set('Content-Type', 'application/xml') + .expect(200); + + await exhaust(container); + + // In the default behavior, attempting create on an entity that already exists causes a conflict error. + await asAlice.get('/v1/projects/1/forms/offlineEntity/submissions/two/audits') + .expect(200) + .then(({ body }) => { + body[0].details.errorMessage.should.eql('A resource already exists with uuid value(s) of 12345678-1234-4123-8234-123456789ddd.'); + }); + + await asAlice.get(`/v1/projects/1/datasets/people/entities/12345678-1234-4123-8234-123456789ddd`) + .expect(200) + .then(({ body }) => { + body.currentVersion.version.should.equal(1); + }); + })); + + describe('only force-process submissions held in backlog for a certain amount of time', () => { + it('should process a submission from over 7 days ago', testOfflineEntities(async (service, container) => { + const asAlice = await service.login('alice'); + const branchId = uuid(); + + // Neither update below will be applied at first because the first + // update in the branch is missing. + + // Send the first submission, which will be held in the backlog + await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions') + .send(testData.instances.offlineEntity.one + .replace('branchId=""', `branchId="${branchId}"`) + .replace('baseVersion="1"', 'baseVersion="2"') + ) + .set('Content-Type', 'application/xml') + .expect(200); + + await exhaust(container); + + let backlogCount = await container.oneFirst(sql`select count(*) from entity_submission_backlog`); + backlogCount.should.equal(1); + + // Update the timestamp on this backlog + await container.run(sql`UPDATE entity_submission_backlog SET "loggedAt" = "loggedAt" - interval '8 days'`); + + // Send the next submission, which will also be held in the backlog. + // This submission immediately follows the previous one, but force-processing + // the first submission does not cause this one to be processed. + await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions') + .send(testData.instances.offlineEntity.one + .replace('branchId=""', `branchId="${branchId}"`) + .replace('one', 'one-update') + .replace('baseVersion="1"', 'baseVersion="3"') + .replace('arrived', 'departed') + ) + .set('Content-Type', 'application/xml') + .expect(200); + + await exhaust(container); + + // Both submissions should be in the backlog now + backlogCount = await container.oneFirst(sql`select count(*) from entity_submission_backlog`); + backlogCount.should.equal(2); + + // Process submissions that have been in the backlog for a long time + // (only 1 of 2 should be processed) + const count = await container.Entities.processBacklog(); + count.should.equal(1); + + await asAlice.get('/v1/projects/1/datasets/people/entities/12345678-1234-4123-8234-123456789abc') + .expect(200) + .then(({ body }) => { + body.currentVersion.version.should.equal(2); + body.currentVersion.baseVersion.should.equal(1); + body.currentVersion.data.should.eql({ age: '22', status: 'arrived', first_name: 'Johnny' }); + + body.currentVersion.branchId.should.equal(branchId); + body.currentVersion.trunkVersion.should.equal(1); + body.currentVersion.branchBaseVersion.should.equal(2); + }); + + // One submission should still be in the backlog + backlogCount = await container.oneFirst(sql`select count(*) from entity_submission_backlog`); + backlogCount.should.equal(1); + })); + }); + + describe('force-processing deleted submissions and entities', () => { + it('should not process a submission in a soft-deleted form', testOfflineEntities(async (service, container) => { + const asAlice = await service.login('alice'); + const branchId = uuid(); + + // Send the first submission, which will be held in the backlog because the base version is high + await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions') + .send(testData.instances.offlineEntity.one + .replace('branchId=""', `branchId="${branchId}"`) + .replace('baseVersion="1"', 'baseVersion="2"') + ) + .set('Content-Type', 'application/xml') + .expect(200); + + await exhaust(container); + + const backlogCount = await container.oneFirst(sql`select count(*) from entity_submission_backlog`); + backlogCount.should.equal(1); + + // Soft-delete the form to delete the submission + await asAlice.delete('/v1/projects/1/forms/offlineEntity'); + + // Process the backlog (count will be 1 but update should not be applied to entity) + const processedCount = await container.Entities.processBacklog(true); + processedCount.should.equal(1); + + // Check that the entity is still at version 1 + await asAlice.get('/v1/projects/1/datasets/people/entities/12345678-1234-4123-8234-123456789abc') + .expect(200) + .then(({ body }) => { + body.currentVersion.version.should.equal(1); + }); + })); + + it('should not process a submission that has been soft-deleted', testOfflineEntities(async (service, container) => { + const asAlice = await service.login('alice'); + const branchId = uuid(); + + // Send the first submission, which will be held in the backlog because the base version is high + await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions') + .send(testData.instances.offlineEntity.one + .replace('branchId=""', `branchId="${branchId}"`) + .replace('baseVersion="1"', 'baseVersion="2"') + ) + .set('Content-Type', 'application/xml') + .expect(200); + + await exhaust(container); + + const backlogCount = await container.oneFirst(sql`select count(*) from entity_submission_backlog`); + backlogCount.should.equal(1); + + // Soft-delete the submission via the database + await container.run(sql`UPDATE submissions SET "deletedAt" = NOW() WHERE "instanceId" = 'one'`); + + // Process the backlog (count will be 1 but update should not be applied to entity) + const processedCount = await container.Entities.processBacklog(true); + processedCount.should.equal(1); + + // Check that the entity is still at version 1 + await asAlice.get('/v1/projects/1/datasets/people/entities/12345678-1234-4123-8234-123456789abc') + .expect(200) + .then(({ body }) => { + body.currentVersion.version.should.equal(1); + }); + })); + + it('should not process a submission for an entity that has been soft-deleted', testOfflineEntities(async (service, container) => { + const asAlice = await service.login('alice'); + const branchId = uuid(); + + // Send the first submission, which will be held in the backlog because the base version is high + await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions') + .send(testData.instances.offlineEntity.one + .replace('branchId=""', `branchId="${branchId}"`) + .replace('baseVersion="1"', 'baseVersion="2"') + ) + .set('Content-Type', 'application/xml') + .expect(200); + + await exhaust(container); + + let backlogCount = await container.oneFirst(sql`select count(*) from entity_submission_backlog`); + backlogCount.should.equal(1); + + // Soft-delete the entity + await asAlice.delete('/v1/projects/1/datasets/people/entities/12345678-1234-4123-8234-123456789abc'); + + // Process the backlog (count will be 1 but update should not be applied to entity) + const processedCount = await container.Entities.processBacklog(true); + processedCount.should.equal(1); + + // Check that the backlog count is now 0 + backlogCount = await container.oneFirst(sql`select count(*) from entity_submission_backlog`); + backlogCount.should.equal(0); + + // Check for an entity error on the submission + await asAlice.get('/v1/projects/1/forms/offlineEntity/submissions/one/audits') + .expect(200) + .then(({ body }) => { + body[0].details.errorMessage.should.eql('The entity with UUID (12345678-1234-4123-8234-123456789abc) has been deleted.'); + }); + })); + }); + }); });