Skip to content

Commit

Permalink
refactor: drastically simplify
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelfig committed Apr 11, 2020
1 parent 2d0f957 commit 18e8dd1
Showing 1 changed file with 74 additions and 82 deletions.
156 changes: 74 additions & 82 deletions packages/eventual-send/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,40 +54,59 @@ export function makeHandledPromise(Promise) {
let presenceToPromise;
let promiseToHandler;
let promiseToPresence; // only for HandledPromise.unwrap
let promiseToForwardedPromise; // forwarding, union-find-ish
let forwardedPromiseToPromise; // forwarding, union-find-ish
function ensureMaps() {
if (!presenceToHandler) {
presenceToHandler = new WeakMap();
presenceToPromise = new WeakMap();
promiseToHandler = new WeakMap();
promiseToPresence = new WeakMap();
promiseToForwardedPromise = new WeakMap();
forwardedPromiseToPromise = new WeakMap();
}
}

/**
* You can imagine a forest of trees in which the roots of each tree is an
* unresolved HandledPromise or a non-Promise, and each node's parent is the
* HandledPromise to which it was forwarded. We maintain that mapping of
* forwarded HandledPromise to its resolution in forwardedPromiseToPromise.
*
* We use something like the description of "Find" with "Path splitting"
* to propagate changes down to the children efficiently:
* https://en.wikipedia.org/wiki/Disjoint-set_data_structure
*
* @param {*} target Any value.
* @returns {*} If the target was a HandledPromise, the most-resolved parent of it, otherwise the target.
*/
function shorten(target) {
let p = target;
// target -> resolved1 -> resolved2
while (promiseToForwardedPromise.has(p)) {
p = promiseToForwardedPromise.get(p);
let lastHandler;
// Find the most-resolved value for p.
while (forwardedPromiseToPromise.has(p)) {
p = forwardedPromiseToPromise.get(p);
lastHandler = promiseToHandler.get(p) || lastHandler;
}
const presence = promiseToPresence.get(p);
if (presence) {
// Presences are final, so it is ok to propagate
// this upstream.
while (target !== p) {
const parent = promiseToForwardedPromise.get(target);
promiseToForwardedPromise.delete(target);
const parent = forwardedPromiseToPromise.get(target);
forwardedPromiseToPromise.delete(target);
promiseToPresence.set(target, presence);
target = parent;
}
} else {
// Even if p has an unfulfilledHandler, we don't care.
// We still propagate only p upstream since
// unfulfulled handlers are transient.
// We propagate both p and its unfulfilled handler (if any)
// upstream.
while (target !== p) {
const parent = promiseToForwardedPromise.get(target);
promiseToForwardedPromise.set(target, p);
const parent = forwardedPromiseToPromise.get(target);
forwardedPromiseToPromise.set(target, p);
if (lastHandler) {
promiseToHandler.set(target, lastHandler);
} else {
promiseToHandler.delete(target);
}
target = parent;
}
}
Expand All @@ -107,34 +126,53 @@ export function makeHandledPromise(Promise) {
let handledResolve;
let handledReject;
let resolved = false;
let resolvedTarget = null;
let handledP;
let continueForwarding = () => {};
const superExecutor = (resolve, reject) => {
handledResolve = value => {
if (promiseToForwardedPromise.has(handledP)) {
if (resolved) {
return resolvedTarget;
}
if (forwardedPromiseToPromise.has(handledP)) {
throw new TypeError('internal: already forwarded');
}
value = shorten(value);
resolved = true;
let targetP;
if (promiseToHandler.has(value) || promiseToPresence.has(value)) {
targetP = value;
} else {
targetP = presenceToPromise.get(value);
}
if (targetP && targetP !== handledP) {
promiseToForwardedPromise.set(handledP, targetP);
forwardedPromiseToPromise.set(handledP, targetP);
} else {
promiseToForwardedPromise.delete(handledP);
forwardedPromiseToPromise.delete(handledP);
}

// Synchronously apply handlers to us, too.
shorten(handledP);

// Finish the resolution.
resolve(value);
resolved = true;
resolvedTarget = value;

// We're resolved, so forward any postponed operations to us.
continueForwarding();
return resolvedTarget;
};
handledReject = err => {
if (promiseToForwardedPromise.has(handledP)) {
if (resolved) {
return;
}
if (forwardedPromiseToPromise.has(handledP)) {
throw new TypeError('internal: already forwarded');
}
promiseToHandler.delete(handledP);
resolved = true;
reject(err);
continueForwarding();
};
};
handledP = harden(Reflect.construct(Promise, [superExecutor], new.target));
Expand All @@ -145,16 +183,8 @@ export function makeHandledPromise(Promise) {
// Create a simple postponedHandler that just postpones until the
// fulfilledHandler is set.
let donePostponing;
const interlockP = new Promise((resolve, reject) => {
donePostponing = (err = null, targetP = undefined) => {
if (err !== null) {
reject(err);
return;
}
// Box the target promise so that it isn't further resolved.
resolve([targetP]);
// Return undefined.
};
const interlockP = new Promise(resolve => {
donePostponing = () => resolve();
});
// A failed interlock should not be recorded as an unhandled rejection.
// It will bubble up to the HandledPromise itself.
Expand All @@ -166,12 +196,9 @@ export function makeHandledPromise(Promise) {
// console.log(`forwarding ${postponedOperation} ${args[0]}`);
return new HandledPromise((resolve, reject) => {
interlockP
.then(([targetP]) => {
.then(_ => {
// If targetP is a handled promise, use it, otherwise x.
const nextPromise = targetP || x;
resolve(
HandledPromise[postponedOperation](nextPromise, ...args),
);
resolve(HandledPromise[postponedOperation](x, ...args));
})
.catch(reject);
});
Expand All @@ -185,7 +212,6 @@ export function makeHandledPromise(Promise) {
return [postponedHandler, donePostponing];
};

let continueForwarding = () => {};
if (!unfulfilledHandler) {
// This is insufficient for actual remote handled Promises
// (too many round-trips), but is an easy way to create a
Expand All @@ -207,54 +233,50 @@ export function makeHandledPromise(Promise) {
if (resolved) {
return;
}
if (promiseToForwardedPromise.has(handledP)) {
if (forwardedPromiseToPromise.has(handledP)) {
throw new TypeError('internal: already forwarded');
}
handledReject(reason);
continueForwarding(reason);
};

let resolvedPresence = null;
const resolveWithPresence = presenceHandler => {
if (resolved) {
return resolvedPresence;
return resolvedTarget;
}
if (promiseToForwardedPromise.has(handledP)) {
if (forwardedPromiseToPromise.has(handledP)) {
throw new TypeError('internal: already forwarded');
}
try {
// Sanity checks.
validateHandler(presenceHandler);

// Validate and install our mapped target (i.e. presence).
resolvedPresence = Object.create(null);
resolvedTarget = Object.create(null);

// Create table entries for the presence mapped to the
// fulfilledHandler.
presenceToPromise.set(resolvedPresence, handledP);
promiseToPresence.set(handledP, resolvedPresence);
presenceToHandler.set(resolvedPresence, presenceHandler);
presenceToPromise.set(resolvedTarget, handledP);
promiseToPresence.set(handledP, resolvedTarget);
presenceToHandler.set(resolvedTarget, presenceHandler);

// Remove the mapping, as our presenceHandler should be
// used instead.
promiseToHandler.delete(handledP);

// We committed to this presence, so resolve.
handledResolve(resolvedPresence);
continueForwarding();
return resolvedPresence;
handledResolve(resolvedTarget);
return resolvedTarget;
} catch (e) {
handledReject(e);
continueForwarding();
throw e;
}
};

const resolveHandled = async (target, deprecatedPresenceHandler) => {
if (resolved) {
return undefined;
return;
}
if (promiseToForwardedPromise.has(handledP)) {
if (forwardedPromiseToPromise.has(handledP)) {
throw new TypeError('internal: already forwarded');
}
try {
Expand All @@ -264,47 +286,17 @@ export function makeHandledPromise(Promise) {
);
}

target = shorten(target);

// Resolve with the target when it's ready.
// Resolve the target.
handledResolve(target);

const existingUnfulfilledHandler = promiseToHandler.get(target);
if (existingUnfulfilledHandler) {
// Reuse the unfulfilled handler.
promiseToHandler.set(handledP, existingUnfulfilledHandler);
return continueForwarding(null, target);
}

const receivePresence = presence => {
const existingPresenceHandler = presenceToHandler.get(presence);
if (!existingPresenceHandler) {
return false;
}
promiseToHandler.set(handledP, existingPresenceHandler);
promiseToPresence.set(handledP, presence);
continueForwarding(null, handledP);
return true;
};

// See if the target is a presence we already know of.
if (receivePresence(promiseToPresence.get(target))) {
return undefined;
}

// Wait for the target before continuing.
const presence = await target;
if (receivePresence(presence)) {
return undefined;
}
// Wait for our fulfillment.
await handledP;

// Remove the mapping, as we don't need a handler anymore.
// Now that we've fulfilled, we don't need an unfulfilledHandler.
promiseToHandler.delete(handledP);
return continueForwarding();
} catch (e) {
handledReject(e);
}
return continueForwarding();
};

// Invoke the callback to let the user resolve/reject.
Expand Down

0 comments on commit 18e8dd1

Please sign in to comment.