diff --git a/packages/runtime/container-runtime/src/containerRuntime.ts b/packages/runtime/container-runtime/src/containerRuntime.ts index 4eadf0697afa..767b8382835c 100644 --- a/packages/runtime/container-runtime/src/containerRuntime.ts +++ b/packages/runtime/container-runtime/src/containerRuntime.ts @@ -2342,6 +2342,7 @@ export class ContainerRuntime let newState: boolean; try { + this.submitIdAllocationOpIfNeeded(true); // replay the ops this.pendingStateManager.replayPendingStates(); } finally { @@ -3918,9 +3919,11 @@ export class ContainerRuntime return this.blobManager.createBlob(blob, signal); } - private submitIdAllocationOpIfNeeded(): void { + private submitIdAllocationOpIfNeeded(resubmitOutstandingRanges = false): void { if (this._idCompressor) { - const idRange = this._idCompressor.takeNextCreationRange(); + const idRange = resubmitOutstandingRanges + ? this.idCompressor?.takeUnfinalizedCreationRange() + : this._idCompressor.takeNextCreationRange(); // Don't include the idRange if there weren't any Ids allocated if (idRange?.ids !== undefined) { const idAllocationMessage: ContainerRuntimeIdAllocationMessage = { @@ -4136,7 +4139,13 @@ export class ContainerRuntime this.channelCollection.reSubmit(message.type, message.contents, localOpMetadata); break; case ContainerMessageType.IdAllocation: { - this.submit(message, localOpMetadata); + // Allocation ops are never resubmitted/rebased. This is because they require special handling to + // avoid being submitted out of order. For example, if the pending state manager contained + // [idOp1, dataOp1, idOp2, dataOp2] and the resubmission of dataOp1 generated idOp3, that would be + // placed into the outbox in the same batch as idOp1, but before idOp2 is resubmitted. + // To avoid this, allocation ops are simply never resubmitted. Prior to invoking the pending state + // manager to replay pending ops, the runtime will always submit a new allocation range that includes + // all pending IDs. The resubmitted allocation ops are then ignored here. break; } case ContainerMessageType.BlobAttach: diff --git a/packages/runtime/container-runtime/src/opLifecycle/batchManager.ts b/packages/runtime/container-runtime/src/opLifecycle/batchManager.ts index 6c48d07af351..6fdb0d17d5d2 100644 --- a/packages/runtime/container-runtime/src/opLifecycle/batchManager.ts +++ b/packages/runtime/container-runtime/src/opLifecycle/batchManager.ts @@ -10,6 +10,11 @@ import { BatchMessage, IBatch, IBatchCheckpoint } from "./definitions.js"; export interface IBatchManagerOptions { readonly hardLimit: number; readonly compressionOptions?: ICompressionRuntimeOptions; + + /** + * If true, the outbox is allowed to rebase the batch during flushing. + */ + readonly canRebase: boolean; } export interface BatchSequenceNumbers { diff --git a/packages/runtime/container-runtime/src/opLifecycle/outbox.ts b/packages/runtime/container-runtime/src/opLifecycle/outbox.ts index 5323dabdf028..0f9cc8f28a73 100644 --- a/packages/runtime/container-runtime/src/opLifecycle/outbox.ts +++ b/packages/runtime/container-runtime/src/opLifecycle/outbox.ts @@ -112,9 +112,9 @@ export class Outbox { // We need to allow infinite size batches if we enable compression const hardLimit = isCompressionEnabled ? Infinity : this.params.config.maxBatchSizeInBytes; - this.mainBatch = new BatchManager({ hardLimit }); - this.blobAttachBatch = new BatchManager({ hardLimit }); - this.idAllocationBatch = new BatchManager({ hardLimit }); + this.mainBatch = new BatchManager({ hardLimit, canRebase: true }); + this.blobAttachBatch = new BatchManager({ hardLimit, canRebase: true }); + this.idAllocationBatch = new BatchManager({ hardLimit, canRebase: false }); } public get messageCount(): number { @@ -270,7 +270,7 @@ export class Outbox { const rawBatch = batchManager.popBatch(); const shouldGroup = !disableGroupedBatching && this.params.groupingManager.shouldGroup(rawBatch); - if (rawBatch.hasReentrantOps === true && shouldGroup) { + if (batchManager.options.canRebase && rawBatch.hasReentrantOps === true && shouldGroup) { assert(!this.rebasing, 0x6fa /* A rebased batch should never have reentrant ops */); // If a batch contains reentrant ops (ops created as a result from processing another op) // it needs to be rebased so that we can ensure consistent reference sequence numbers @@ -300,6 +300,7 @@ export class Outbox { */ private rebase(rawBatch: IBatch, batchManager: BatchManager) { assert(!this.rebasing, 0x6fb /* Reentrancy */); + assert(batchManager.options.canRebase, "BatchManager does not support rebase"); this.rebasing = true; for (const message of rawBatch.content) { diff --git a/packages/runtime/container-runtime/src/test/opLifecycle/batchManager.spec.ts b/packages/runtime/container-runtime/src/test/opLifecycle/batchManager.spec.ts index 8d2175565b23..9be4e938f172 100644 --- a/packages/runtime/container-runtime/src/test/opLifecycle/batchManager.spec.ts +++ b/packages/runtime/container-runtime/src/test/opLifecycle/batchManager.spec.ts @@ -23,7 +23,7 @@ describe("BatchManager", () => { it("BatchManager: 'infinity' hard limit allows everything", () => { const message = { contents: generateStringOfSize(1024) } as any as BatchMessage; - const batchManager = new BatchManager({ hardLimit: Infinity }); + const batchManager = new BatchManager({ hardLimit: Infinity, canRebase: true }); for (let i = 1; i <= 10; i++) { assert.equal(batchManager.push(message, /* reentrant */ false), true); @@ -32,7 +32,7 @@ describe("BatchManager", () => { }); it("Batch metadata is set correctly", () => { - const batchManager = new BatchManager({ hardLimit }); + const batchManager = new BatchManager({ hardLimit, canRebase: true }); assert.equal( batchManager.push( { ...smallMessage(), referenceSequenceNumber: 0 }, @@ -72,7 +72,7 @@ describe("BatchManager", () => { }); it("Batch content size is tracked correctly", () => { - const batchManager = new BatchManager({ hardLimit }); + const batchManager = new BatchManager({ hardLimit, canRebase: true }); assert.equal(batchManager.push(smallMessage(), /* reentrant */ false), true); assert.equal(batchManager.contentSizeInBytes, smallMessageSize * batchManager.length); assert.equal(batchManager.push(smallMessage(), /* reentrant */ false), true); @@ -82,7 +82,7 @@ describe("BatchManager", () => { }); it("Batch reference sequence number maps to the last message", () => { - const batchManager = new BatchManager({ hardLimit }); + const batchManager = new BatchManager({ hardLimit, canRebase: true }); assert.equal( batchManager.push( { ...smallMessage(), referenceSequenceNumber: 0 }, @@ -109,7 +109,7 @@ describe("BatchManager", () => { }); it("Batch size estimates", () => { - const batchManager = new BatchManager({ hardLimit }); + const batchManager = new BatchManager({ hardLimit, canRebase: true }); batchManager.push(smallMessage(), /* reentrant */ false); // 10 bytes of content + 200 bytes overhead assert.equal(estimateSocketSize(batchManager.popBatch()), 210); @@ -137,7 +137,7 @@ describe("BatchManager", () => { }); it("Batch op reentry state preserved during its lifetime", () => { - const batchManager = new BatchManager({ hardLimit }); + const batchManager = new BatchManager({ hardLimit, canRebase: true }); assert.equal( batchManager.push( { ...smallMessage(), referenceSequenceNumber: 0 }, diff --git a/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts b/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts index 60d2b7eed16a..56ed1fa5cf97 100644 --- a/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts +++ b/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts @@ -46,7 +46,7 @@ describe("Pending State Manager", () => { rollbackContent = []; rollbackShouldThrow = false; - batchManager = new BatchManager({ hardLimit: 950 * 1024 }); + batchManager = new BatchManager({ hardLimit: 950 * 1024, canRebase: true }); }); it("should do nothing when rolling back empty pending stack", () => { diff --git a/packages/runtime/id-compressor/api-report/id-compressor.api.md b/packages/runtime/id-compressor/api-report/id-compressor.api.md index e73393930104..3b80fee95804 100644 --- a/packages/runtime/id-compressor/api-report/id-compressor.api.md +++ b/packages/runtime/id-compressor/api-report/id-compressor.api.md @@ -60,6 +60,7 @@ export interface IIdCompressorCore { serialize(withSession: true): SerializedIdCompressorWithOngoingSession; serialize(withSession: false): SerializedIdCompressorWithNoSession; takeNextCreationRange(): IdCreationRange; + takeUnfinalizedCreationRange(): IdCreationRange; } // @internal diff --git a/packages/runtime/id-compressor/src/idCompressor.ts b/packages/runtime/id-compressor/src/idCompressor.ts index 7824961a2616..b2cb9b0be37b 100644 --- a/packages/runtime/id-compressor/src/idCompressor.ts +++ b/packages/runtime/id-compressor/src/idCompressor.ts @@ -6,7 +6,11 @@ import { bufferToString, stringToBuffer } from "@fluid-internal/client-utils"; import { ITelemetryBaseLogger } from "@fluidframework/core-interfaces"; import { assert } from "@fluidframework/core-utils/internal"; -import { ITelemetryLoggerExt, createChildLogger } from "@fluidframework/telemetry-utils/internal"; +import { + ITelemetryLoggerExt, + LoggingError, + createChildLogger, +} from "@fluidframework/telemetry-utils/internal"; import { FinalSpace } from "./finalSpace.js"; import { FinalCompressedId, LocalCompressedId, NumericUuid, isFinalId } from "./identifiers.js"; @@ -57,6 +61,13 @@ import { */ const currentWrittenVersion = 2.0; +function rangeFinalizationError(expectedStart: number, actualStart: number): LoggingError { + return new LoggingError("Ranges finalized out of order", { + expectedStart, + actualStart, + }); +} + /** * See {@link IIdCompressor} and {@link IIdCompressorCore} */ @@ -222,14 +233,49 @@ export class IdCompressor implements IIdCompressor, IIdCompressorCore { ), }, }; + return this.updateToRange(range); + } + + public takeUnfinalizedCreationRange(): IdCreationRange { + const lastLocalCluster = this.localSession.getLastCluster(); + let count: number; + let firstGenCount: number; + if (lastLocalCluster === undefined) { + firstGenCount = 1; + count = this.localGenCount; + } else { + firstGenCount = genCountFromLocalId( + (lastLocalCluster.baseLocalId - lastLocalCluster.count) as LocalCompressedId, + ); + count = this.localGenCount - firstGenCount + 1; + } + + if (count === 0) { + return { + sessionId: this.localSessionId, + }; + } + + const range: IdCreationRange = { + ids: { + count, + firstGenCount, + localIdRanges: this.normalizer.getRangesBetween(firstGenCount, this.localGenCount), + requestedClusterSize: this.nextRequestedClusterSize, + }, + sessionId: this.localSessionId, + }; + return this.updateToRange(range); + } + + private updateToRange(range: IdCreationRange): IdCreationRange { this.nextRangeBaseGenCount = this.localGenCount + 1; - IdCompressor.assertValidRange(range); - return range; + return IdCompressor.assertValidRange(range); } - private static assertValidRange(range: IdCreationRange): void { + private static assertValidRange(range: IdCreationRange): IdCreationRange { if (range.ids === undefined) { - return; + return range; } const { count, requestedClusterSize } = range.ids; assert(count > 0, 0x755 /* Malformed ID Range. */); @@ -238,6 +284,7 @@ export class IdCompressor implements IIdCompressor, IIdCompressorCore { requestedClusterSize <= IdCompressor.maxClusterSize, 0x877 /* Clusters must not exceed max cluster size. */, ); + return range; } public finalizeCreationRange(range: IdCreationRange): void { @@ -260,7 +307,7 @@ export class IdCompressor implements IIdCompressor, IIdCompressorCore { if (lastCluster === undefined) { // This is the first cluster in the session space if (rangeBaseLocal !== -1) { - throw new Error("Ranges finalized out of order."); + throw rangeFinalizationError(-1, rangeBaseLocal); } lastCluster = this.addEmptyCluster(session, requestedClusterSize + count); if (isLocal) { @@ -273,7 +320,10 @@ export class IdCompressor implements IIdCompressor, IIdCompressorCore { const remainingCapacity = lastCluster.capacity - lastCluster.count; if (lastCluster.baseLocalId - lastCluster.count !== rangeBaseLocal) { - throw new Error("Ranges finalized out of order."); + throw rangeFinalizationError( + lastCluster.baseLocalId - lastCluster.count, + rangeBaseLocal, + ); } if (remainingCapacity >= count) { diff --git a/packages/runtime/id-compressor/src/test/idCompressor.spec.ts b/packages/runtime/id-compressor/src/test/idCompressor.spec.ts index df4406bac02e..3361365d7ad5 100644 --- a/packages/runtime/id-compressor/src/test/idCompressor.spec.ts +++ b/packages/runtime/id-compressor/src/test/idCompressor.spec.ts @@ -369,6 +369,96 @@ describe("IdCompressor", () => { [10, 3], ]); }); + + describe("by retaking all outstanding ranges", () => { + it("when there are no outstanding ranges", () => { + const compressor = CompressorFactory.createCompressor(Client.Client1, 2); + let retakenRangeEmpty = compressor.takeUnfinalizedCreationRange(); + assert.equal(retakenRangeEmpty.ids, undefined); + compressor.finalizeCreationRange(retakenRangeEmpty); + generateCompressedIds(compressor, 1); + compressor.finalizeCreationRange(compressor.takeNextCreationRange()); + retakenRangeEmpty = compressor.takeUnfinalizedCreationRange(); + assert.equal(retakenRangeEmpty.ids, undefined); + }); + + it("when there is one outstanding ranges with local IDs only", () => { + const compressor = CompressorFactory.createCompressor(Client.Client1, 2); + + generateCompressedIds(compressor, 1); + compressor.takeNextCreationRange(); + + let retakenRangeLocalOnly = compressor.takeUnfinalizedCreationRange(); + assert.deepEqual(retakenRangeLocalOnly.ids, { + firstGenCount: 1, + count: 1, + localIdRanges: [[1, 1]], + requestedClusterSize: 2, + }); + + generateCompressedIds(compressor, 1); + retakenRangeLocalOnly = compressor.takeUnfinalizedCreationRange(); + assert.deepEqual(retakenRangeLocalOnly.ids, { + firstGenCount: 1, + count: 2, + localIdRanges: [[1, 2]], + requestedClusterSize: 2, + }); + + let postRetakeRange = compressor.takeNextCreationRange(); + // IDs should be undefined because retaking should still advance the taken ID counter + // if it doesn't, ranges will be resubmitted causing out of order errors + assert.equal(postRetakeRange.ids, undefined); + generateCompressedIds(compressor, 1); + postRetakeRange = compressor.takeNextCreationRange(); + assert.deepEqual(postRetakeRange.ids, { + firstGenCount: 3, + count: 1, + localIdRanges: [[3, 1]], + requestedClusterSize: 2, + }); + + compressor.finalizeCreationRange(retakenRangeLocalOnly); + }); + + it("when there are multiple outstanding ranges", () => { + const compressor = CompressorFactory.createCompressor(Client.Client1, 2); + generateCompressedIds(compressor, 1); + const range1 = compressor.takeNextCreationRange(); + generateCompressedIds(compressor, 1); // one local + compressor.finalizeCreationRange(range1); + const range2 = compressor.takeNextCreationRange(); + assert.deepEqual(range2.ids?.localIdRanges, [[2, 1]]); + generateCompressedIds(compressor, 1); // one eager final + const range3 = compressor.takeNextCreationRange(); + assert.deepEqual(range3.ids?.localIdRanges, []); + generateCompressedIds(compressor, 1); // one local + const range4 = compressor.takeNextCreationRange(); + assert.deepEqual(range4.ids?.localIdRanges, [[4, 1]]); + + const retakenRange = compressor.takeUnfinalizedCreationRange(); + assert.deepEqual(retakenRange.ids?.firstGenCount, 2); + assert.deepEqual(retakenRange.ids?.count, 3); + assert.deepEqual(retakenRange.ids?.localIdRanges, [ + [2, 1], + [4, 1], + ]); + + compressor.finalizeCreationRange(retakenRange); + assert.throws( + () => compressor.finalizeCreationRange(range2), + (e: Error) => e.message === "Ranges finalized out of order", + ); + assert.throws( + () => compressor.finalizeCreationRange(range3), + (e: Error) => e.message === "Ranges finalized out of order", + ); + assert.throws( + () => compressor.finalizeCreationRange(range4), + (e: Error) => e.message === "Ranges finalized out of order", + ); + }); + }); }); describe("Finalizing", () => { @@ -379,7 +469,10 @@ describe("IdCompressor", () => { compressor.finalizeCreationRange(batchRange); assert.throws( () => compressor.finalizeCreationRange(batchRange), - (e: Error) => e.message === "Ranges finalized out of order.", + (e: Error) => + e.message === "Ranges finalized out of order" && + (e as any).expectedStart === -4 && + (e as any).actualStart === -1, ); }); @@ -391,7 +484,10 @@ describe("IdCompressor", () => { const secondRange = compressor.takeNextCreationRange(); assert.throws( () => compressor.finalizeCreationRange(secondRange), - (e: Error) => e.message === "Ranges finalized out of order.", + (e: Error) => + e.message === "Ranges finalized out of order" && + (e as any).expectedStart === -1 && + (e as any).actualStart === -2, ); }); diff --git a/packages/runtime/id-compressor/src/test/idCompressorTestUtilities.ts b/packages/runtime/id-compressor/src/test/idCompressorTestUtilities.ts index 20ca3ff6db99..8d7b0ef7cccd 100644 --- a/packages/runtime/id-compressor/src/test/idCompressorTestUtilities.ts +++ b/packages/runtime/id-compressor/src/test/idCompressorTestUtilities.ts @@ -33,6 +33,7 @@ import { } from "../index.js"; import { assertIsSessionId, createSessionId, localIdFromGenCount } from "../utilities.js"; +import { SessionSpaceNormalizer } from "../sessionSpaceNormalizer.js"; import { FinalCompressedId, ReadonlyIdCompressor, @@ -286,8 +287,7 @@ export class IdCompressorTestNetwork { * Changes the capacity request amount for a client. It will take effect immediately. */ public changeCapacity(client: Client, newClusterCapacity: number): void { - // eslint-disable-next-line @typescript-eslint/dot-notation - this.compressors.get(client)["nextRequestedClusterSize"] = newClusterCapacity; + changeCapacity(this.compressors.get(client), newClusterCapacity); } private addNewId( @@ -427,26 +427,89 @@ export class IdCompressorTestNetwork { (client) => [this.compressors.get(client), this.getSequencedIdLog(client)] as const, ); - // Ensure creation ranges for clients we track contain the correct local ID ranges - this.serverOperations.forEach(([range, opSpaceIds, clientFrom]) => { + const getLocalIdsInRange = ( + range: IdCreationRange, + opSpaceIds?: OpSpaceCompressedId[], + ): Set => { const localIdsInCreationRange = new Set(); - if (clientFrom !== OriginatingClient.Remote) { - const ids = range.ids; - if (ids !== undefined) { - const { firstGenCount, localIdRanges } = ids; - for (const [genCount, count] of localIdRanges) { - for (let g = genCount; g < genCount + count; g++) { - const local = localIdFromGenCount(g); + const ids = range.ids; + if (ids !== undefined) { + const { firstGenCount, localIdRanges } = ids; + for (const [genCount, count] of localIdRanges) { + for (let g = genCount; g < genCount + count; g++) { + const local = localIdFromGenCount(g); + if (opSpaceIds) { assert.strictEqual(opSpaceIds[g - firstGenCount], local); - localIdsInCreationRange.add(local); } + localIdsInCreationRange.add(local); } } + } + return localIdsInCreationRange; + }; + + // Ensure creation ranges for clients we track contain the correct local ID ranges + this.serverOperations.forEach(([range, opSpaceIds, clientFrom]) => { + if (clientFrom !== OriginatingClient.Remote) { + const localIdsInCreationRange = getLocalIdsInRange(range, opSpaceIds); + let localCount = 0; opSpaceIds.forEach((id) => { if (isLocalId(id)) { + localCount++; assert(localIdsInCreationRange.has(id), "Local ID not in creation range"); } }); + assert.strictEqual( + localCount, + localIdsInCreationRange.size, + "Local ID count mismatch", + ); + } + }); + + const undeliveredRanges = new Map(); + this.clientProgress.forEach((progress, client) => { + const ranges = this.serverOperations + .slice(progress) + .filter((op) => op[2] === client) + .map(([range]) => range); + undeliveredRanges.set(client, ranges); + }); + undeliveredRanges.forEach((ranges, client) => { + const compressor = this.compressors.get(client); + let firstGenCount: number | undefined; + let totalCount = 0; + const unionedLocalRanges = new SessionSpaceNormalizer(); + ranges.forEach((range) => { + assert(range.sessionId === compressor.localSessionId); + if (range.ids !== undefined) { + // initialize firstGenCount if not set + if (firstGenCount === undefined) { + firstGenCount = range.ids.firstGenCount; + } + totalCount += range.ids.count; + range.ids.localIdRanges.forEach(([genCount, count]) => { + unionedLocalRanges.addLocalRange(genCount, count); + }); + } + }); + + const retakenRange = compressor.takeUnfinalizedCreationRange(); + if (retakenRange.ids !== undefined) { + const retakenLocalIds = new SessionSpaceNormalizer(); + retakenRange.ids.localIdRanges.forEach(([genCount, count]) => { + retakenLocalIds.addLocalRange(genCount, count); + }); + assert.strictEqual( + retakenLocalIds.equals(unionedLocalRanges), + true, + "Local ID ranges mismatch", + ); + assert.strictEqual(retakenRange.ids.count, totalCount, "Count mismatch"); + assert.strictEqual(retakenRange.ids.firstGenCount, firstGenCount, "Count mismatch"); + } else { + assert.strictEqual(totalCount, 0); + assert.strictEqual(unionedLocalRanges.idRanges.size, 0); } }); @@ -588,6 +651,11 @@ export class IdCompressorTestNetwork { } } +function changeCapacity(compressor: IdCompressor, newClusterCapacity: number): void { + // eslint-disable-next-line @typescript-eslint/dot-notation + compressor["nextRequestedClusterSize"] = newClusterCapacity; +} + /** * Roundtrips the supplied compressor through serialization and deserialization. */ @@ -608,13 +676,21 @@ export function roundtrip( compressor: ReadonlyIdCompressor, withSession: boolean, ): [SerializedIdCompressorWithOngoingSession | SerializedIdCompressorWithNoSession, IdCompressor] { + // preserve the capacity request as this property is normally private and resets + // to a default on construction (deserialization) + // eslint-disable-next-line @typescript-eslint/dot-notation + const capacity = compressor["nextRequestedClusterSize"]; if (withSession) { const serialized = compressor.serialize(withSession); - return [serialized, IdCompressor.deserialize(serialized)]; + const roundtripped = IdCompressor.deserialize(serialized); + changeCapacity(roundtripped, capacity); + return [serialized, roundtripped]; + } else { + const nonLocalSerialized = compressor.serialize(withSession); + const roundtripped = IdCompressor.deserialize(nonLocalSerialized, createSessionId()); + changeCapacity(roundtripped, capacity); + return [nonLocalSerialized, roundtripped]; } - - const nonLocalSerialized = compressor.serialize(withSession); - return [nonLocalSerialized, IdCompressor.deserialize(nonLocalSerialized, createSessionId())]; } /** @@ -811,13 +887,13 @@ export function makeOpGenerator( return { type: "reconnect", client: random.pick(activeClients) }; } - const allocationWeight = 16; + const allocationWeight = 20; return interleave( createWeightedGenerator([ [changeCapacityGenerator, 1], [allocateIdsGenerator, Math.round(allocationWeight * (1 - outsideAllocationFraction))], [allocateOutsideIdsGenerator, Math.round(allocationWeight * outsideAllocationFraction)], - [deliverAllOperationsGenerator, 2], + [deliverAllOperationsGenerator, 1], [deliverSomeOperationsGenerator, 6], [reconnectGenerator, 1], ]), @@ -889,7 +965,6 @@ export function performFuzzActions( return state; }, validate: (state) => { - network.deliverOperations(DestinationClient.All); validator?.(network); return state; }, diff --git a/packages/runtime/id-compressor/src/types/idCompressor.ts b/packages/runtime/id-compressor/src/types/idCompressor.ts index d0c019293a75..fffeb03d582d 100644 --- a/packages/runtime/id-compressor/src/types/idCompressor.ts +++ b/packages/runtime/id-compressor/src/types/idCompressor.ts @@ -80,12 +80,23 @@ import { export interface IIdCompressorCore { /** * Returns a range of IDs created by this session in a format for sending to the server for finalizing. - * The range will include all IDs generated via calls to `generateCompressedId` since the last time this method was called. + * The range will include all IDs generated via calls to `generateCompressedId` since the last time a + * range was taken (via this method or `takeUnfinalizedCreationRange`). * @returns the range of IDs, which may be empty. This range must be sent to the server for ordering before * it is finalized. Ranges must be sent to the server in the order that they are taken via calls to this method. */ takeNextCreationRange(): IdCreationRange; + /** + * Returns a range of IDs created by this session in a format for sending to the server for finalizing. + * The range will include all unfinalized IDs generated via calls to `generateCompressedId`. + * @returns the range of IDs, which may be empty. This range must be sent to the server for ordering before + * it is finalized. Ranges must be sent to the server in the order that they are taken via calls to this method. + * Note: after finalizing the range returned by this method, finalizing any ranges that had been previously taken + * will result in an error. + */ + takeUnfinalizedCreationRange(): IdCreationRange; + /** * Finalizes the supplied range of IDs (which may be from either a remote or local session). * @param range - the range of session-local IDs to finalize. diff --git a/packages/runtime/test-runtime-utils/package.json b/packages/runtime/test-runtime-utils/package.json index 1cfd645f7e00..a62cd7880f8d 100644 --- a/packages/runtime/test-runtime-utils/package.json +++ b/packages/runtime/test-runtime-utils/package.json @@ -149,6 +149,9 @@ }, "ClassDeclaration_MockHandle": { "forwardCompat": false + }, + "ClassDeclaration_MockFluidDataStoreContext": { + "forwardCompat": false } } } diff --git a/packages/runtime/test-runtime-utils/src/test/types/validateTestRuntimeUtilsPrevious.generated.ts b/packages/runtime/test-runtime-utils/src/test/types/validateTestRuntimeUtilsPrevious.generated.ts index 835b5cfd4e3e..6487c8e71af2 100644 --- a/packages/runtime/test-runtime-utils/src/test/types/validateTestRuntimeUtilsPrevious.generated.ts +++ b/packages/runtime/test-runtime-utils/src/test/types/validateTestRuntimeUtilsPrevious.generated.ts @@ -399,6 +399,7 @@ declare function get_old_ClassDeclaration_MockFluidDataStoreContext(): declare function use_current_ClassDeclaration_MockFluidDataStoreContext( use: TypeOnly): void; use_current_ClassDeclaration_MockFluidDataStoreContext( + // @ts-expect-error compatibility expected to be broken get_old_ClassDeclaration_MockFluidDataStoreContext()); /* diff --git a/packages/test/test-end-to-end-tests/src/test/idCompressor.spec.ts b/packages/test/test-end-to-end-tests/src/test/idCompressor.spec.ts index 9dc49cb268b9..98a0bc2d4aba 100644 --- a/packages/test/test-end-to-end-tests/src/test/idCompressor.spec.ts +++ b/packages/test/test-end-to-end-tests/src/test/idCompressor.spec.ts @@ -21,7 +21,7 @@ import type { IChannel } from "@fluidframework/datastore-definitions"; import { IIdCompressor, SessionSpaceCompressedId, StableId } from "@fluidframework/id-compressor"; // eslint-disable-next-line @typescript-eslint/no-restricted-imports import { ISharedMap, SharedDirectory } from "@fluidframework/map/internal"; -import { ISummaryTree } from "@fluidframework/protocol-definitions"; +import { ISummaryTree, type ISequencedDocumentMessage } from "@fluidframework/protocol-definitions"; import { DataObjectFactoryType, ITestContainerConfig, @@ -33,6 +33,8 @@ import { summarizeNow, waitForContainerConnection, } from "@fluidframework/test-utils/internal"; +import { delay } from "@fluidframework/core-utils/internal"; +import { generatePairwiseOptions } from "@fluid-private/test-pairwise-generator"; function getIdCompressor(dds: IChannel): IIdCompressor { return (dds as any).runtime.idCompressor as IIdCompressor; @@ -553,38 +555,101 @@ describeCompat("Runtime IdCompressor", "NoCompat", (getTestObjectProvider, apis) assert.strictEqual(sharedMapContainer1.get(sharedMapDecompressedId), "value"); }); - it("Ids generated when disconnected are correctly resubmitted", async () => { - // Disconnect the first container - container1.disconnect(); + const sharedPoints = [0, 1, 2]; + const testConfigs = generatePairwiseOptions({ + preOfflineChanges: sharedPoints, + postOfflineChanges: sharedPoints, + allocateDuringResubmitStride: [1, 2, 3], + delayBetweenOfflineChanges: [true, false], + }); + + for (const testConfig of testConfigs) { + it(`Ids generated across batches are correctly resubmitted: ${JSON.stringify( + testConfig, + )}`, async () => { + const idPairs: [SessionSpaceCompressedId, IIdCompressor][] = []; + + const simulateAllocation = (map: ISharedMap) => { + const idCompressor = getIdCompressor(map); + const id = idCompressor.generateCompressedId(); + idPairs.push([id, idCompressor]); + }; + + for (let i = 0; i < testConfig.preOfflineChanges; i++) { + simulateAllocation(sharedMapContainer1); + sharedMapContainer1.set("key", i); // Trigger Id submission + } + container1.disconnect(); + + for (let i = 0; i < testConfig.postOfflineChanges; i++) { + simulateAllocation(sharedMapContainer1); + sharedMapContainer1.set("key", i); // Trigger Id submission + + if (testConfig.delayBetweenOfflineChanges) { + await delay(100); // Trigger Id submission + } + } + + let invokedCount = 0; + const superResubmit = (sharedMapContainer1 as any).reSubmitCore.bind( + sharedMapContainer1, + ); + (sharedMapContainer1 as any).reSubmitCore = ( + content: unknown, + localOpMetadata: unknown, + ) => { + invokedCount++; + if (invokedCount % testConfig.allocateDuringResubmitStride === 0) { + // Simulate a DDS that generates IDs as part of the resubmit path (e.g. SharedTree) + // This will test that ID allocation ops are correctly sorted into a separate batch in the outbox + simulateAllocation(sharedMapContainer1); + } + superResubmit(content, localOpMetadata); + }; + + // important allocation to test the ordering of generate, takeNext, generate, retakeOutstanding, takeNext. + // correctness here relies on mutation in retaking if we want the last takeNext to return an empty range + // which it must be if we want to avoid overlapping range bugs + simulateAllocation(sharedMapContainer1); + + container1.connect(); + await waitForContainerConnection(container1); + await assureAlignment([sharedMapContainer1, sharedMapContainer2], idPairs); + }); + } + + it("Reentrant ops do not cause resubmission of ID allocation ops", async () => { const idPairs: [SessionSpaceCompressedId, IIdCompressor][] = []; - // Generate a new Id in the disconnected container - const id1 = getIdCompressor(sharedMapContainer1).generateCompressedId(); - idPairs.push([id1, getIdCompressor(sharedMapContainer1)]); - // Trigger Id submission - sharedMapContainer1.set("key", "value"); + const simulateAllocation = (map: ISharedMap) => { + const idCompressor = getIdCompressor(map); + const id = idCompressor.generateCompressedId(); + idPairs.push([id, idCompressor]); + }; + + container1.disconnect(); - const superResubmit = (sharedMapContainer1 as any).reSubmitCore.bind(sharedMapContainer1); - (sharedMapContainer1 as any).reSubmitCore = ( - content: unknown, + sharedMapContainer2.set("key", "first"); + + let invokedCount = 0; + const superProcessCore = (sharedMapContainer1 as any).processCore.bind(sharedMapContainer1); + (sharedMapContainer1 as any).processCore = ( + message: ISequencedDocumentMessage, + local: boolean, localOpMetadata: unknown, ) => { - // Simulate a DDS that generates IDs as part of the resubmit path (e.g. SharedTree) - // This will test that ID allocation ops are correctly sorted into a separate batch in the outbox - const id = getIdCompressor(sharedMapContainer1).generateCompressedId(); - idPairs.push([id, getIdCompressor(sharedMapContainer1)]); - superResubmit(content, localOpMetadata); + if (invokedCount === 0) { + // Force reentrancy during first op processing to cause batch manager rebase (which should skip rebasing allocation ops) + simulateAllocation(sharedMapContainer1); + sharedMapContainer1.set("key", "reentrant1"); + simulateAllocation(sharedMapContainer1); + sharedMapContainer1.set("key", "reentrant2"); + } + superProcessCore(message, local, localOpMetadata); + invokedCount++; }; - // Generate ids in a connected container but don't send them yet - const id2 = getIdCompressor(sharedMapContainer2).generateCompressedId(); - idPairs.push([id2, getIdCompressor(sharedMapContainer2)]); - const id3 = getIdCompressor(sharedMapContainer2).generateCompressedId(); - idPairs.push([id3, getIdCompressor(sharedMapContainer2)]); - - // Reconnect the first container - // IdRange should be resubmitted and reflected in all compressors container1.connect(); await waitForContainerConnection(container1); await assureAlignment([sharedMapContainer1, sharedMapContainer2], idPairs);