Skip to content

Commit

Permalink
fix: prevent execWithCount race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
Joel Nuesch committed Oct 24, 2022
1 parent ef9ba0e commit ebd3026
Showing 1 changed file with 43 additions and 25 deletions.
68 changes: 43 additions & 25 deletions src/CachedQuery/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,20 +91,27 @@ class CachedQuery<

private async serializeAndCache(result: T[], cacheKey: string) {
const { redis } = this.context;
const { hash, config: { populate, expiry } } = this;
const { hash, config: { cacheCount, populate, expiry } } = this;
try {
const bson = serialize(result);
const docIds = result.map((doc) => String(doc._id));
const populatedIds = collectPopulatedIds(result, populate);
const allKey = `A:${hash}`;

const multi = redis.multi();
if (cacheCount === Infinity) {
multi.hset(cacheKey, 'N', result.length);
} else {
multi.hdel(cacheKey, 'N');
}
multi
.hset(cacheKey, 'V', bson)
.hset(cacheKey, 'O', docIds.join(' '))
.hset(cacheKey, 'P', populatedIds.join(' '))
.expiregt(cacheKey, expiry);

await Promise.all([
redis.multi()
.del(cacheKey)
.hset(cacheKey, 'V', bson)
.hset(cacheKey, 'O', docIds.join(' '))
.hset(cacheKey, 'P', populatedIds.join(' '))
.expiregt(cacheKey, expiry)
.exec(),
multi.exec(),
redis.pipeline()
.sadd(allKey, cacheKey)
.expiregt(allKey, expiry)
Expand Down Expand Up @@ -156,7 +163,7 @@ class CachedQuery<
* unique queries do not invalidate upon document insert, that event
* would not be detected for invalidation. */
if (result.length > 0 || !unique) {
this.serializeAndCache(result, cacheKey);
await this.serializeAndCache(result, cacheKey);
}
}
if (filter) result = result.filter(filter);
Expand All @@ -174,49 +181,60 @@ class CachedQuery<
const { cacheCount } = this.config;
/* If cacheCount is infinity, we know all the documents matching the query
* are already stored on cache. We already have to grab and splice it,
* so we might as well use the array length instead of another lookup.
*/
* so we might as well use the array length instead of another lookup. */
if (cacheCount === Infinity) {
const { skip, limit } = opts.exec;
// note: if applicable, the filter func is already applied to fullResult.
const fullResult = await this.exec(opts.merged({ skip: 0, limit: undefined }));
const result = skipAndLimit(fullResult, skip, limit);
return [
result,
fullResult.length,
];
return [result, fullResult.length];
}
return Promise.all([

const [result, { count, save: saveCount }] = await Promise.all([
this.exec(opts),
this.count(opts),
this.fetchCount(opts),
]);
if (saveCount) await saveCount();
return [result, count];
}

async count(input: InputExecOpts<T, P>) {
const opts = this.parseOpts(input);
const { redis } = this.context;
const { key: cacheKey, exec: { filter, skipCache = false } } = opts;
const { filter } = opts.exec;
if (filter) {
const fullResult = await this.exec(opts.merged({ skip: 0, limit: undefined }));
return fullResult.length;
}
let count;
const { count, save } = await this.fetchCount(opts);
if (save) await save();
return count;
}

/**
* This method is used to allow more flexible timing of when the count is written to redis.
* When used with `count()`, we save right away.
* When used with `execWithCount()`, we first await the completion of `exec`. */
private async fetchCount(input: InputExecOpts<T, P>) {
const opts = this.parseOpts(input);
const { redis } = this.context;
const { key: cacheKey, exec: { skipCache } } = opts;
if (!skipCache) {
try {
count = await redis.hget(cacheKey, 'count');
const count = await redis.hget(cacheKey, 'N');
if (count) return { count: parseInt(count, 10) };
} catch (err) {
// logger.warn({ err, tag: 'CACHE_REDIS_GET', cacheKey }, 'Failed to HGET value');
}
}
if (!count) {
count = await this.countMongo(opts.fresh({}));
const count = await this.countMongo(opts.fresh({}));
async function save() {
try {
await redis.hset(cacheKey, 'count', count);
await redis.hset(cacheKey, 'N', count);
} catch (err) {
// logger.warn({ err, tag: 'CACHE_REDIS_SET', cacheKey }, 'Failed to set value');
}
}
return typeof count === 'number' ? count : parseInt(count, 10);
return { count, save };
}

private parseOpts(input: InputExecOpts<T, P>) {
Expand Down

0 comments on commit ebd3026

Please sign in to comment.