Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sync writes to multiMerge() in WebStorage. Fix caching when multiMerge() runs. #122

Merged
merged 9 commits into from
Mar 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions __mocks__/lib/storage/index.js

This file was deleted.

34 changes: 32 additions & 2 deletions __mocks__/localforage.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,37 @@
import _ from 'underscore';

let storageMap = {};

const localforageMock = {
get storageMap() {
return storageMap;
},
keys() {
return new Promise((resolve) => {
resolve(_.keys(storageMap));
});
},
setInitialMockData: (data) => {
storageMap = data;
},
config: jest.fn(),
getItem: jest.fn(),
setItem: jest.fn(() => Promise.resolve()),
getItem: jest.fn(key => new Promise((resolve) => {
resolve(storageMap[key]);
})),
setItem: jest.fn((key, value) => new Promise((resolve) => {
storageMap[key] = value;
resolve();
})),
removeItem: jest.fn(key => new Promise((resolve) => {
delete storageMap[key];
resolve();
})),
clear() {
return new Promise((resolve) => {
storageMap = {};
resolve();
});
},
};

export default localforageMock;
13 changes: 7 additions & 6 deletions lib/Onyx.js
Original file line number Diff line number Diff line change
Expand Up @@ -768,11 +768,12 @@ function mergeCollection(collectionKey, collection) {
promises.push(Storage.multiSet(keyValuePairsForNewCollection));
}

// Merge original data to cache
cache.merge(collection);

// Optimistically inform collection subscribers
keysChanged(collectionKey, collection);
// Prefill cache if necessary by calling get() on any existing keys and then merge original data to cache
// and update all subscribers
Promise.all(_.map(existingKeys, get)).then(() => {
cache.merge(collection);
keysChanged(collectionKey, collection);
});
Comment on lines +771 to +776
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense, but:

  1. it suffer from the same problem we discovered - if get has to read from disk, data might not yet be written to disk (when write is still pending), and still result in partial data being added to cache
  2. delaying cache update opens a door for problems - other get/connect calls might use stale cache information
  3. it might delay the UI update a bit, since now keysChanged would be delayed if get actually has to read something

Point 2) is probably covered by keysChnaged being delayed as well - anyone that read stale information would be notified of an update, but we'd still need to explain that and warn anyone to keep usages together in that order.

The goal with cache is to update it sync as fast as possible to avoid concurrency issues
This solves problems like

  1. set starts and sets key: alpha
  2. it immediately updates cache before write to disk is complete
  3. get starts for key: alpha
  4. it receives the latest value even before it's written to disk

Perhaps you'd like this alternative:

  1. Keep code here unchanged
  2. Update code in cache.merge to exclude merging any keys that are not currently present in cache

The idea is:
If we lack data in cache, we skip merging potentially partial data
When someone actually needs to get this key, it would trigger a read from disk and re-populate cache with correct data

We don't have many usages of cache.merge but this could address the partial data problem for all such usages

Copy link
Contributor Author

@marcaaron marcaaron Mar 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if get has to read from disk, data might not yet be written to disk (when write is still pending), and still result in partial data being added to cache

If you can, please explain how exactly it can result in partial data being added to the cache? I would love to write a test for it and we can come up with a solution if it's possible, but I want to make sure there we understand how it can happen.

delaying cache update opens a door for problems - other get/connect calls might use stale cache information

One alternative I can think of is to prefill the cache with all data before Onyx does any work at all (guessing most front end caches work this way).

it might delay the UI update a bit, since now keysChanged would be delayed if get actually has to read something

Yes, but better for something to be delayed than send partial data without previous data initially in storage.

The goal with cache is to update it sync as fast as possible to avoid concurrency issues

I think I understand this argument, but I'm not sure that's how merge() works today. Here's the implementation for merge() which also potentially blocks with a get() call

return get(key)
.then((data) => {
try {
const modifiedData = applyMerge(key, data);
// Clean up the write queue so we
// don't apply these changes again
delete mergeQueue[key];
return set(key, modifiedData);

set() is the only thing which modifies the data in the cache first before reading existing values

cache.set(key, value);
// Optimistically inform subscribers on the next tick
Promise.resolve().then(() => keyChanged(key, value));

Perhaps you'd like this alternative:
Keep code here unchanged
Update code in cache.merge to exclude merging any keys that are not currently present in cache

I do like this idea and it solves this problem in a different way, but we would defer the filling of the cache. I'm not sure if that is bad or good or doesn't make a difference and would be willing to try it if it sounds simpler. Thanks!

Copy link
Contributor Author

@marcaaron marcaaron Mar 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok so pulling on that last thread and I can't get it working. It leads to a stale cache that is missing the data. Here are the changes I made:

  • Reverted the change above.
  • Added this so we would not set data if no existing key was in the cache
@@ -107,11 +110,19 @@ class OnyxCache {
     }

     /**
-     * Deep merge data to cache, any non existing keys will be created
+     * Deep merge data to cache, any non existing keys will not be merged
      * @param {Record<string, *>} data - a map of (cache) key - values
      */
     merge(data) {
-        this.storageMap = lodashMerge({}, this.storageMap, data);
+        // Filter any keys we have not yet cached
+        const newData = _.reduce(data, (prev, curr, key) => {
+            if (this.hasCacheForKey(key)) {
+                return {...prev, [key]: curr};
+            }
+            return prev;
+        }, {});
+
+        this.storageMap = lodashMerge({}, this.storageMap, newData);

         const storageKeys = this.getAllKeys();
  • Re-ran tests and added a subscriber (which should trigger the cache fill)
  • Storage ends up correct, but cache is wrong

I think it's because the subscriber can be added at anytime while we are adding new data. We already skipped the cache for any consecutive calls to mergeCollection() and yes they are making it to storage correctly. But if you add a subscriber at the same time while this process is happened it will only get the result of what has been written and save that to the cache.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's because the subscriber can be added at anytime while we are adding new data. We already skipped the cache for any consecutive calls to mergeCollection() and yes they are making it to storage correctly. But if you add a subscriber at the same time while this process is happened it will only get the result of what has been written and save that to the cache.

Yeah, this case slipped my mind

If you can, please explain how exactly it can result in partial data being added to the cache? I would love to write a test for it and we can come up with a solution if it's possible, but I want to make sure there we understand how it can happen.

I think you answered that yourself at the end or at least the reason is similar

  1. We have a mergeCollection call and then another mergeCollection call(s) in close proximity
  2. Writes are scheduled and happen one by one
  3. Promise.all(_.map(existingKeys, get)) is invoked before scheduled writes complete
  4. When some of the existingKeys is not available in cache
  5. get from disk is triggered - it reads older data and puts it in cache

To capture that in a test we're looking for a way to:

  1. Have storage in a state where it has { a: 1, b: 1, c: 1 } for a given key
  2. Have a queue of writes happening and one particular key being written at the end of the queue. Changing b: 2
  3. Have cache in a state where it lacks the given key
  4. Call mergeCollection to merge multiple updates, one of them regarding key setting c: 2

I think that what we have here with Promise.all would result in cache ending up with

{ a: 1, b: 1, c: 2 }

vs

{ a: 1, b: 2, c: 2 }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @kidroca, I couldn't figure this one out, but got your other idea working. Let me know if you have any other thoughts here.

Copy link
Contributor

@kidroca kidroca Mar 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think prefilling cache might be our safest and optimal option here

It seems that for Native:

  1. Calling AsyncStorage.get
  2. Then calling AsyncStorage.set, before the get is completed
  3. get would resolve with the value from step 2)

I'm not certain this would work out the same way on Web/Desktop
It used to work that way when we used window.localStorage because local storage calls are sync and happen sequentially

So we're back to: "Oh, Cache save us from concurrency issues" 😄
And to do that it might be best to leverage multiGet and get everything but safe eviction keys in cache during init
Then we don't have to delay updates to cache and cache should never become stale


Another strategy could be to recreated the above 3 steps ourselves

  • capture pending writes (we already have a queue)
  • when a read happens check whether we're not writing or are about to write the requested key
  • if we are - resolve the get with the value from the write queue, otherwise read from disk


return Promise.all(promises)
.catch(error => evictStorageAndRetry(error, mergeCollection, collection));
Expand Down Expand Up @@ -808,7 +809,7 @@ function init({
keys = {},
initialKeyStates = {},
safeEvictionKeys = [],
maxCachedKeysCount = 55,
maxCachedKeysCount = 1000,
captureMetrics = false,
shouldSyncMultipleInstances = Boolean(global.localStorage),
} = {}) {
Expand Down
51 changes: 51 additions & 0 deletions lib/SyncQueue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* Synchronous queue that can be used to ensure promise based tasks are run in sequence.
* Pass to the constructor a function that returns a promise to run the task then add data.
*
* @example
*
* const queue = new SyncQueue(({key, val}) => {
* return someAsyncProcess(key, val);
* });
*
* queue.push({key: 1, val: '1'});
* queue.push({key: 2, val: '2'});
*/
export default class SyncQueue {
/**
* @param {Function} run - must return a promise
*/
constructor(run) {
this.queue = [];
this.isProcessing = false;
this.run = run;
}

process() {
if (this.isProcessing || this.queue.length === 0) {
return;
}

this.isProcessing = true;

const {data, resolve, reject} = this.queue.shift();
this.run(data)
.then(resolve)
.catch(reject)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we log or do anything with failed requests? Or I guess we will have server logs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do have some handling here (but it could be better). In this case, we're just letting the original caller handle the error though and don't need more specific handling (at least not for now since the error will reach this code here)..

.catch(error => evictStorageAndRetry(error, mergeCollection, collection));

.finally(() => {
this.isProcessing = false;
this.process();
});
}

/**
* @param {*} data
* @returns {Promise}
*/
push(data) {
return new Promise((resolve, reject) => {
this.queue.push({resolve, reject, data});
this.process();
});
}
}
3 changes: 3 additions & 0 deletions lib/storage/__mocks__/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import WebStorage from '../WebStorage';

export default WebStorage;
67 changes: 24 additions & 43 deletions lib/storage/providers/LocalForage.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,39 +7,32 @@
import localforage from 'localforage';
import _ from 'underscore';
import lodashMerge from 'lodash/merge';
import SyncQueue from '../../SyncQueue';

localforage.config({
name: 'OnyxDB'
});

const writeQueue = [];
let isQueueProcessing = false;

/**
* Writing very quickly to IndexedDB causes performance issues and can lock up the page and lead to jank.
* So, we are slowing this process down by waiting until one write is complete before moving on
* to the next.
*/
function processNextWrite() {
if (isQueueProcessing || writeQueue.length === 0) {
return;
}

isQueueProcessing = true;

const {
key, value, resolve, reject,
} = writeQueue.shift();
localforage.setItem(key, value)
.then(resolve)
.catch(reject)
.finally(() => {
isQueueProcessing = false;
processNextWrite();
});
}

const provider = {
/**
* Writing very quickly to IndexedDB causes performance issues and can lock up the page and lead to jank.
* So, we are slowing this process down by waiting until one write is complete before moving on
* to the next.
*/
setItemQueue: new SyncQueue(({key, value, shouldMerge}) => {
if (shouldMerge) {
return localforage.getItem(key)
.then((existingValue) => {
const newValue = _.isObject(existingValue)
? lodashMerge({}, existingValue, value)
: value;
return localforage.setItem(key, newValue);
});
}

return localforage.setItem(key, value);
}),

/**
* Get multiple key-value pairs for the give array of keys in a batch
* @param {String[]} keys
Expand All @@ -61,14 +54,7 @@ const provider = {
* @return {Promise<void>}
*/
multiMerge(pairs) {
const tasks = _.map(pairs, ([key, partialValue]) => this.getItem(key)
.then((existingValue) => {
const newValue = _.isObject(existingValue)
? lodashMerge(existingValue, partialValue)
: partialValue;

return this.setItem(key, newValue);
}));
const tasks = _.map(pairs, ([key, value]) => this.setItemQueue.push({key, value, shouldMerge: true}));

// We're returning Promise.resolve, otherwise the array of task results will be returned to the caller
return Promise.all(tasks).then(() => Promise.resolve());
Expand Down Expand Up @@ -117,14 +103,9 @@ const provider = {
* @param {*} value
* @return {Promise<void>}
*/
setItem: (key, value) => (
new Promise((resolve, reject) => {
writeQueue.push({
key, value, resolve, reject,
});
processNextWrite();
})
),
setItem(key, value) {
return this.setItemQueue.push({key, value});
},
};

export default provider;
172 changes: 172 additions & 0 deletions tests/unit/onyxMultiMergeWebStorageTest.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
import OnyxCache from '../../lib/OnyxCache';
import waitForPromisesToResolve from '../utils/waitForPromisesToResolve';
import localforageMock from '../../__mocks__/localforage';

const ONYX_KEYS = {
COLLECTION: {
TEST_KEY: 'test_',
},
};

const initialTestObject = {a: 'a'};
const initialData = {
test_1: initialTestObject,
test_2: initialTestObject,
test_3: initialTestObject,
};

localforageMock.setInitialMockData(initialData);

describe('Onyx.mergeCollection() amd WebStorage', () => {
let Onyx;

beforeAll(() => {
// Force using WebStorage provider for these tests
jest.mock('../../lib/storage');
Onyx = require('../../index').default;
jest.useRealTimers();

Onyx.init({
keys: ONYX_KEYS,
registerStorageEventListener: () => {},
initialKeyStates: {},
});
});

afterEach(() => Onyx.clear());

it('merges two sets of data consecutively', () => {
// Given initial data in storage
expect(localforageMock.storageMap.test_1).toEqual(initialTestObject);
expect(localforageMock.storageMap.test_2).toEqual(initialTestObject);
expect(localforageMock.storageMap.test_3).toEqual(initialTestObject);

// And an empty cache values for the collection keys
expect(OnyxCache.getValue('test_1')).not.toBeDefined();
expect(OnyxCache.getValue('test_2')).not.toBeDefined();
expect(OnyxCache.getValue('test_3')).not.toBeDefined();

// When we merge additional data
const additionalDataOne = {b: 'b', c: 'c'};
Onyx.mergeCollection(ONYX_KEYS.COLLECTION.TEST_KEY, {
test_1: additionalDataOne,
test_2: additionalDataOne,
test_3: additionalDataOne,
});

// And call again consecutively with different data
const additionalDataTwo = {d: 'd'};
Onyx.mergeCollection(ONYX_KEYS.COLLECTION.TEST_KEY, {
test_1: additionalDataTwo,
test_2: additionalDataTwo,
test_3: additionalDataTwo,
});

return waitForPromisesToResolve()
.then(() => {
const finalObject = {
a: 'a', b: 'b', c: 'c', d: 'd',
};

// Then our new data should merge with the existing data in the cache
expect(OnyxCache.getValue('test_1')).toEqual(finalObject);
expect(OnyxCache.getValue('test_2')).toEqual(finalObject);
expect(OnyxCache.getValue('test_3')).toEqual(finalObject);

// And the storage should reflect the same state
expect(localforageMock.storageMap.test_1).toEqual(finalObject);
expect(localforageMock.storageMap.test_2).toEqual(finalObject);
expect(localforageMock.storageMap.test_3).toEqual(finalObject);
});
});

it('cache updates correctly when accessed again if keys are removed or evicted', () => {
// Given empty storage
expect(localforageMock.storageMap.test_1).toBeFalsy();
expect(localforageMock.storageMap.test_2).toBeFalsy();
expect(localforageMock.storageMap.test_3).toBeFalsy();

// And an empty cache values for the collection keys
expect(OnyxCache.getValue('test_1')).toBeFalsy();
expect(OnyxCache.getValue('test_2')).toBeFalsy();
expect(OnyxCache.getValue('test_3')).toBeFalsy();

// When we merge additional data and wait for the change
const data = {a: 'a', b: 'b'};
Onyx.mergeCollection(ONYX_KEYS.COLLECTION.TEST_KEY, {
test_1: data,
test_2: data,
test_3: data,
});

return waitForPromisesToResolve()
.then(() => {
// Then the cache and storage should match
expect(OnyxCache.getValue('test_1')).toEqual(data);
expect(OnyxCache.getValue('test_2')).toEqual(data);
expect(OnyxCache.getValue('test_3')).toEqual(data);
expect(localforageMock.storageMap.test_1).toEqual(data);
expect(localforageMock.storageMap.test_2).toEqual(data);
expect(localforageMock.storageMap.test_3).toEqual(data);

// When we drop all the cache keys (but do not modify the underlying storage) and merge another object
OnyxCache.drop('test_1');
OnyxCache.drop('test_2');
OnyxCache.drop('test_3');

const additionalData = {c: 'c'};
Onyx.mergeCollection(ONYX_KEYS.COLLECTION.TEST_KEY, {
test_1: additionalData,
test_2: additionalData,
test_3: additionalData,
});

return waitForPromisesToResolve();
})
.then(() => {
const finalObject = {
a: 'a', b: 'b', c: 'c',
};

// Then our new data should merge with the existing data in the cache
expect(OnyxCache.getValue('test_1')).toEqual(finalObject);
expect(OnyxCache.getValue('test_2')).toEqual(finalObject);
expect(OnyxCache.getValue('test_3')).toEqual(finalObject);

// And the storage should reflect the same state
expect(localforageMock.storageMap.test_1).toEqual(finalObject);
expect(localforageMock.storageMap.test_2).toEqual(finalObject);
expect(localforageMock.storageMap.test_3).toEqual(finalObject);
});
});

it('setItem() and multiMerge()', () => {
expect(localforageMock.storageMap).toEqual({});

// Given no previous data and several calls to setItem and call to mergeCollection to update a given key

// 1st call
Onyx.set('test_1', {a: 'a'});

// These merges will all queue together
Onyx.merge('test_1', {b: 'b'});
Onyx.merge('test_1', {c: 'c'});

// 2nd call
Onyx.mergeCollection(ONYX_KEYS.COLLECTION.TEST_KEY, {
test_1: {d: 'd', e: 'e'},
});

// Last call
Onyx.merge('test_1', {f: 'f'});
return waitForPromisesToResolve()
.then(() => {
const finalObject = {
a: 'a', b: 'b', c: 'c', f: 'f', d: 'd', e: 'e',
};

expect(OnyxCache.getValue('test_1')).toEqual(finalObject);
expect(localforageMock.storageMap.test_1).toEqual(finalObject);
});
});
});
Loading