Skip to content

Commit

Permalink
fix(id-compressor): Prevent resubmission of ID allocation ops (#21043) (
Browse files Browse the repository at this point in the history
#21074)

Cherry-pick of #21043

## Description

This PR makes the runtime correctly resubmit ID allocation ops;
specifically, prior to initiating the replay of pending ops the runtime
will submit an allocation range that includes all pending IDs and then
ignore allocation ops in resubmit. This is necessary because the
resubmission of non-allocation ops can cause allocations (which would
then be resubmitted before earlier allocation ops, causing the "Ranges
finalized out of order." exception).

This PR also differentiates exception messages in the compressor for
this failure case.

Co-authored-by: Taylor Williams <60717813+taylorsw04@users.noreply.github.com>
  • Loading branch information
yann-achard-MS and taylorsw04 authored May 14, 2024
1 parent c6492fd commit d495c13
Show file tree
Hide file tree
Showing 13 changed files with 385 additions and 68 deletions.
15 changes: 12 additions & 3 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2342,6 +2342,7 @@ export class ContainerRuntime
let newState: boolean;

try {
this.submitIdAllocationOpIfNeeded(true);
// replay the ops
this.pendingStateManager.replayPendingStates();
} finally {
Expand Down Expand Up @@ -3918,9 +3919,11 @@ export class ContainerRuntime
return this.blobManager.createBlob(blob, signal);
}

private submitIdAllocationOpIfNeeded(): void {
private submitIdAllocationOpIfNeeded(resubmitOutstandingRanges = false): void {
if (this._idCompressor) {
const idRange = this._idCompressor.takeNextCreationRange();
const idRange = resubmitOutstandingRanges
? this.idCompressor?.takeUnfinalizedCreationRange()
: this._idCompressor.takeNextCreationRange();
// Don't include the idRange if there weren't any Ids allocated
if (idRange?.ids !== undefined) {
const idAllocationMessage: ContainerRuntimeIdAllocationMessage = {
Expand Down Expand Up @@ -4136,7 +4139,13 @@ export class ContainerRuntime
this.channelCollection.reSubmit(message.type, message.contents, localOpMetadata);
break;
case ContainerMessageType.IdAllocation: {
this.submit(message, localOpMetadata);
// Allocation ops are never resubmitted/rebased. This is because they require special handling to
// avoid being submitted out of order. For example, if the pending state manager contained
// [idOp1, dataOp1, idOp2, dataOp2] and the resubmission of dataOp1 generated idOp3, that would be
// placed into the outbox in the same batch as idOp1, but before idOp2 is resubmitted.
// To avoid this, allocation ops are simply never resubmitted. Prior to invoking the pending state
// manager to replay pending ops, the runtime will always submit a new allocation range that includes
// all pending IDs. The resubmitted allocation ops are then ignored here.
break;
}
case ContainerMessageType.BlobAttach:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ import { BatchMessage, IBatch, IBatchCheckpoint } from "./definitions.js";
export interface IBatchManagerOptions {
readonly hardLimit: number;
readonly compressionOptions?: ICompressionRuntimeOptions;

/**
* If true, the outbox is allowed to rebase the batch during flushing.
*/
readonly canRebase: boolean;
}

export interface BatchSequenceNumbers {
Expand Down
9 changes: 5 additions & 4 deletions packages/runtime/container-runtime/src/opLifecycle/outbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ export class Outbox {
// We need to allow infinite size batches if we enable compression
const hardLimit = isCompressionEnabled ? Infinity : this.params.config.maxBatchSizeInBytes;

this.mainBatch = new BatchManager({ hardLimit });
this.blobAttachBatch = new BatchManager({ hardLimit });
this.idAllocationBatch = new BatchManager({ hardLimit });
this.mainBatch = new BatchManager({ hardLimit, canRebase: true });
this.blobAttachBatch = new BatchManager({ hardLimit, canRebase: true });
this.idAllocationBatch = new BatchManager({ hardLimit, canRebase: false });
}

public get messageCount(): number {
Expand Down Expand Up @@ -270,7 +270,7 @@ export class Outbox {
const rawBatch = batchManager.popBatch();
const shouldGroup =
!disableGroupedBatching && this.params.groupingManager.shouldGroup(rawBatch);
if (rawBatch.hasReentrantOps === true && shouldGroup) {
if (batchManager.options.canRebase && rawBatch.hasReentrantOps === true && shouldGroup) {
assert(!this.rebasing, 0x6fa /* A rebased batch should never have reentrant ops */);
// If a batch contains reentrant ops (ops created as a result from processing another op)
// it needs to be rebased so that we can ensure consistent reference sequence numbers
Expand Down Expand Up @@ -300,6 +300,7 @@ export class Outbox {
*/
private rebase(rawBatch: IBatch, batchManager: BatchManager) {
assert(!this.rebasing, 0x6fb /* Reentrancy */);
assert(batchManager.options.canRebase, "BatchManager does not support rebase");

this.rebasing = true;
for (const message of rawBatch.content) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ describe("BatchManager", () => {

it("BatchManager: 'infinity' hard limit allows everything", () => {
const message = { contents: generateStringOfSize(1024) } as any as BatchMessage;
const batchManager = new BatchManager({ hardLimit: Infinity });
const batchManager = new BatchManager({ hardLimit: Infinity, canRebase: true });

for (let i = 1; i <= 10; i++) {
assert.equal(batchManager.push(message, /* reentrant */ false), true);
Expand All @@ -32,7 +32,7 @@ describe("BatchManager", () => {
});

it("Batch metadata is set correctly", () => {
const batchManager = new BatchManager({ hardLimit });
const batchManager = new BatchManager({ hardLimit, canRebase: true });
assert.equal(
batchManager.push(
{ ...smallMessage(), referenceSequenceNumber: 0 },
Expand Down Expand Up @@ -72,7 +72,7 @@ describe("BatchManager", () => {
});

it("Batch content size is tracked correctly", () => {
const batchManager = new BatchManager({ hardLimit });
const batchManager = new BatchManager({ hardLimit, canRebase: true });
assert.equal(batchManager.push(smallMessage(), /* reentrant */ false), true);
assert.equal(batchManager.contentSizeInBytes, smallMessageSize * batchManager.length);
assert.equal(batchManager.push(smallMessage(), /* reentrant */ false), true);
Expand All @@ -82,7 +82,7 @@ describe("BatchManager", () => {
});

it("Batch reference sequence number maps to the last message", () => {
const batchManager = new BatchManager({ hardLimit });
const batchManager = new BatchManager({ hardLimit, canRebase: true });
assert.equal(
batchManager.push(
{ ...smallMessage(), referenceSequenceNumber: 0 },
Expand All @@ -109,7 +109,7 @@ describe("BatchManager", () => {
});

it("Batch size estimates", () => {
const batchManager = new BatchManager({ hardLimit });
const batchManager = new BatchManager({ hardLimit, canRebase: true });
batchManager.push(smallMessage(), /* reentrant */ false);
// 10 bytes of content + 200 bytes overhead
assert.equal(estimateSocketSize(batchManager.popBatch()), 210);
Expand Down Expand Up @@ -137,7 +137,7 @@ describe("BatchManager", () => {
});

it("Batch op reentry state preserved during its lifetime", () => {
const batchManager = new BatchManager({ hardLimit });
const batchManager = new BatchManager({ hardLimit, canRebase: true });
assert.equal(
batchManager.push(
{ ...smallMessage(), referenceSequenceNumber: 0 },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ describe("Pending State Manager", () => {
rollbackContent = [];
rollbackShouldThrow = false;

batchManager = new BatchManager({ hardLimit: 950 * 1024 });
batchManager = new BatchManager({ hardLimit: 950 * 1024, canRebase: true });
});

it("should do nothing when rolling back empty pending stack", () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ export interface IIdCompressorCore {
serialize(withSession: true): SerializedIdCompressorWithOngoingSession;
serialize(withSession: false): SerializedIdCompressorWithNoSession;
takeNextCreationRange(): IdCreationRange;
takeUnfinalizedCreationRange(): IdCreationRange;
}

// @internal
Expand Down
64 changes: 57 additions & 7 deletions packages/runtime/id-compressor/src/idCompressor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@
import { bufferToString, stringToBuffer } from "@fluid-internal/client-utils";
import { ITelemetryBaseLogger } from "@fluidframework/core-interfaces";
import { assert } from "@fluidframework/core-utils/internal";
import { ITelemetryLoggerExt, createChildLogger } from "@fluidframework/telemetry-utils/internal";
import {
ITelemetryLoggerExt,
LoggingError,
createChildLogger,
} from "@fluidframework/telemetry-utils/internal";

import { FinalSpace } from "./finalSpace.js";
import { FinalCompressedId, LocalCompressedId, NumericUuid, isFinalId } from "./identifiers.js";
Expand Down Expand Up @@ -57,6 +61,13 @@ import {
*/
const currentWrittenVersion = 2.0;

function rangeFinalizationError(expectedStart: number, actualStart: number): LoggingError {
return new LoggingError("Ranges finalized out of order", {
expectedStart,
actualStart,
});
}

/**
* See {@link IIdCompressor} and {@link IIdCompressorCore}
*/
Expand Down Expand Up @@ -222,14 +233,49 @@ export class IdCompressor implements IIdCompressor, IIdCompressorCore {
),
},
};
return this.updateToRange(range);
}

public takeUnfinalizedCreationRange(): IdCreationRange {
const lastLocalCluster = this.localSession.getLastCluster();
let count: number;
let firstGenCount: number;
if (lastLocalCluster === undefined) {
firstGenCount = 1;
count = this.localGenCount;
} else {
firstGenCount = genCountFromLocalId(
(lastLocalCluster.baseLocalId - lastLocalCluster.count) as LocalCompressedId,
);
count = this.localGenCount - firstGenCount + 1;
}

if (count === 0) {
return {
sessionId: this.localSessionId,
};
}

const range: IdCreationRange = {
ids: {
count,
firstGenCount,
localIdRanges: this.normalizer.getRangesBetween(firstGenCount, this.localGenCount),
requestedClusterSize: this.nextRequestedClusterSize,
},
sessionId: this.localSessionId,
};
return this.updateToRange(range);
}

private updateToRange(range: IdCreationRange): IdCreationRange {
this.nextRangeBaseGenCount = this.localGenCount + 1;
IdCompressor.assertValidRange(range);
return range;
return IdCompressor.assertValidRange(range);
}

private static assertValidRange(range: IdCreationRange): void {
private static assertValidRange(range: IdCreationRange): IdCreationRange {
if (range.ids === undefined) {
return;
return range;
}
const { count, requestedClusterSize } = range.ids;
assert(count > 0, 0x755 /* Malformed ID Range. */);
Expand All @@ -238,6 +284,7 @@ export class IdCompressor implements IIdCompressor, IIdCompressorCore {
requestedClusterSize <= IdCompressor.maxClusterSize,
0x877 /* Clusters must not exceed max cluster size. */,
);
return range;
}

public finalizeCreationRange(range: IdCreationRange): void {
Expand All @@ -260,7 +307,7 @@ export class IdCompressor implements IIdCompressor, IIdCompressorCore {
if (lastCluster === undefined) {
// This is the first cluster in the session space
if (rangeBaseLocal !== -1) {
throw new Error("Ranges finalized out of order.");
throw rangeFinalizationError(-1, rangeBaseLocal);
}
lastCluster = this.addEmptyCluster(session, requestedClusterSize + count);
if (isLocal) {
Expand All @@ -273,7 +320,10 @@ export class IdCompressor implements IIdCompressor, IIdCompressorCore {

const remainingCapacity = lastCluster.capacity - lastCluster.count;
if (lastCluster.baseLocalId - lastCluster.count !== rangeBaseLocal) {
throw new Error("Ranges finalized out of order.");
throw rangeFinalizationError(
lastCluster.baseLocalId - lastCluster.count,
rangeBaseLocal,
);
}

if (remainingCapacity >= count) {
Expand Down
100 changes: 98 additions & 2 deletions packages/runtime/id-compressor/src/test/idCompressor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,96 @@ describe("IdCompressor", () => {
[10, 3],
]);
});

describe("by retaking all outstanding ranges", () => {
it("when there are no outstanding ranges", () => {
const compressor = CompressorFactory.createCompressor(Client.Client1, 2);
let retakenRangeEmpty = compressor.takeUnfinalizedCreationRange();
assert.equal(retakenRangeEmpty.ids, undefined);
compressor.finalizeCreationRange(retakenRangeEmpty);
generateCompressedIds(compressor, 1);
compressor.finalizeCreationRange(compressor.takeNextCreationRange());
retakenRangeEmpty = compressor.takeUnfinalizedCreationRange();
assert.equal(retakenRangeEmpty.ids, undefined);
});

it("when there is one outstanding ranges with local IDs only", () => {
const compressor = CompressorFactory.createCompressor(Client.Client1, 2);

generateCompressedIds(compressor, 1);
compressor.takeNextCreationRange();

let retakenRangeLocalOnly = compressor.takeUnfinalizedCreationRange();
assert.deepEqual(retakenRangeLocalOnly.ids, {
firstGenCount: 1,
count: 1,
localIdRanges: [[1, 1]],
requestedClusterSize: 2,
});

generateCompressedIds(compressor, 1);
retakenRangeLocalOnly = compressor.takeUnfinalizedCreationRange();
assert.deepEqual(retakenRangeLocalOnly.ids, {
firstGenCount: 1,
count: 2,
localIdRanges: [[1, 2]],
requestedClusterSize: 2,
});

let postRetakeRange = compressor.takeNextCreationRange();
// IDs should be undefined because retaking should still advance the taken ID counter
// if it doesn't, ranges will be resubmitted causing out of order errors
assert.equal(postRetakeRange.ids, undefined);
generateCompressedIds(compressor, 1);
postRetakeRange = compressor.takeNextCreationRange();
assert.deepEqual(postRetakeRange.ids, {
firstGenCount: 3,
count: 1,
localIdRanges: [[3, 1]],
requestedClusterSize: 2,
});

compressor.finalizeCreationRange(retakenRangeLocalOnly);
});

it("when there are multiple outstanding ranges", () => {
const compressor = CompressorFactory.createCompressor(Client.Client1, 2);
generateCompressedIds(compressor, 1);
const range1 = compressor.takeNextCreationRange();
generateCompressedIds(compressor, 1); // one local
compressor.finalizeCreationRange(range1);
const range2 = compressor.takeNextCreationRange();
assert.deepEqual(range2.ids?.localIdRanges, [[2, 1]]);
generateCompressedIds(compressor, 1); // one eager final
const range3 = compressor.takeNextCreationRange();
assert.deepEqual(range3.ids?.localIdRanges, []);
generateCompressedIds(compressor, 1); // one local
const range4 = compressor.takeNextCreationRange();
assert.deepEqual(range4.ids?.localIdRanges, [[4, 1]]);

const retakenRange = compressor.takeUnfinalizedCreationRange();
assert.deepEqual(retakenRange.ids?.firstGenCount, 2);
assert.deepEqual(retakenRange.ids?.count, 3);
assert.deepEqual(retakenRange.ids?.localIdRanges, [
[2, 1],
[4, 1],
]);

compressor.finalizeCreationRange(retakenRange);
assert.throws(
() => compressor.finalizeCreationRange(range2),
(e: Error) => e.message === "Ranges finalized out of order",
);
assert.throws(
() => compressor.finalizeCreationRange(range3),
(e: Error) => e.message === "Ranges finalized out of order",
);
assert.throws(
() => compressor.finalizeCreationRange(range4),
(e: Error) => e.message === "Ranges finalized out of order",
);
});
});
});

describe("Finalizing", () => {
Expand All @@ -379,7 +469,10 @@ describe("IdCompressor", () => {
compressor.finalizeCreationRange(batchRange);
assert.throws(
() => compressor.finalizeCreationRange(batchRange),
(e: Error) => e.message === "Ranges finalized out of order.",
(e: Error) =>
e.message === "Ranges finalized out of order" &&
(e as any).expectedStart === -4 &&
(e as any).actualStart === -1,
);
});

Expand All @@ -391,7 +484,10 @@ describe("IdCompressor", () => {
const secondRange = compressor.takeNextCreationRange();
assert.throws(
() => compressor.finalizeCreationRange(secondRange),
(e: Error) => e.message === "Ranges finalized out of order.",
(e: Error) =>
e.message === "Ranges finalized out of order" &&
(e as any).expectedStart === -1 &&
(e as any).actualStart === -2,
);
});

Expand Down
Loading

0 comments on commit d495c13

Please sign in to comment.