Skip to content

Commit

Permalink
Merge pull request #377 from matrix-org/kegan/indexeddb-account-data
Browse files Browse the repository at this point in the history
Store account data in the same way as room data
  • Loading branch information
dbkr committed Feb 21, 2017
2 parents bbe74e6 + 36b8b2c commit 2b96cd7
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 32 deletions.
51 changes: 41 additions & 10 deletions spec/unit/sync-accumulator.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ describe("SyncAccumulator", function() {
},
},
};
sa.accumulateRooms(res);
sa.accumulate(res);
const output = sa.getJSON();
expect(output.nextBatch).toEqual(res.next_batch);
expect(output.roomsData).toEqual(res.rooms);
Expand All @@ -69,7 +69,7 @@ describe("SyncAccumulator", function() {
it("should prune the timeline to the oldest prev_batch within the limit", () => {
// maxTimelineEntries is 10 so we should get back all
// 10 timeline messages with a prev_batch of "pinned_to_1"
sa.accumulateRooms(syncSkeleton({
sa.accumulate(syncSkeleton({
state: { events: [member("alice", "join")] },
timeline: {
events: [
Expand All @@ -84,7 +84,7 @@ describe("SyncAccumulator", function() {
prev_batch: "pinned_to_1",
},
}));
sa.accumulateRooms(syncSkeleton({
sa.accumulate(syncSkeleton({
state: { events: [] },
timeline: {
events: [
Expand All @@ -93,7 +93,7 @@ describe("SyncAccumulator", function() {
prev_batch: "pinned_to_8",
},
}));
sa.accumulateRooms(syncSkeleton({
sa.accumulate(syncSkeleton({
state: { events: [] },
timeline: {
events: [
Expand All @@ -116,7 +116,7 @@ describe("SyncAccumulator", function() {
// AND give us <= 10 messages without losing messages in-between.
// It should try to find the oldest prev_batch which still fits into 10
// messages, which is "pinned to 8".
sa.accumulateRooms(syncSkeleton({
sa.accumulate(syncSkeleton({
state: { events: [] },
timeline: {
events: [
Expand Down Expand Up @@ -153,7 +153,7 @@ describe("SyncAccumulator", function() {
}],
},
});
sa.accumulateRooms(res);
sa.accumulate(res);
expect(
sa.getJSON().roomsData.join["!foo:bar"].ephemeral.events.length,
).toEqual(0);
Expand All @@ -172,12 +172,12 @@ describe("SyncAccumulator", function() {
food: "apple",
},
};
sa.accumulateRooms(syncSkeleton({
sa.accumulate(syncSkeleton({
account_data: {
events: [acc1],
},
}));
sa.accumulateRooms(syncSkeleton({
sa.accumulate(syncSkeleton({
account_data: {
events: [acc2],
},
Expand All @@ -190,6 +190,37 @@ describe("SyncAccumulator", function() {
).toEqual(acc2);
});

it("should clobber global account data based on event type", () => {
const acc1 = {
type: "favourite.food",
content: {
food: "banana",
},
};
const acc2 = {
type: "favourite.food",
content: {
food: "apple",
},
};
sa.accumulate({
account_data: {
events: [acc1],
},
});
sa.accumulate({
account_data: {
events: [acc2],
},
});
expect(
sa.getJSON().accountData.length,
).toEqual(1);
expect(
sa.getJSON().accountData[0],
).toEqual(acc2);
});

it("should accumulate read receipts", () => {
const receipt1 = {
type: "m.receipt",
Expand Down Expand Up @@ -218,12 +249,12 @@ describe("SyncAccumulator", function() {
},
},
};
sa.accumulateRooms(syncSkeleton({
sa.accumulate(syncSkeleton({
ephemeral: {
events: [receipt1],
},
}));
sa.accumulateRooms(syncSkeleton({
sa.accumulate(syncSkeleton({
ephemeral: {
events: [receipt2],
},
Expand Down
27 changes: 11 additions & 16 deletions src/store/indexeddb.js
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,15 @@ IndexedDBStoreBackend.prototype = {
/**
* Persist a list of account data events. Events with the same 'type' will
* be replaced.
* @param {MatrixEvent[]} accountData An array of user-scoped account data events
* @param {Object[]} accountData An array of raw user-scoped account data events
* @return {Promise} Resolves if the events were persisted.
*/
persistAccountData: function(accountData) {
return q.try(() => {
const txn = this.db.transaction(["accountData"], "readwrite");
const store = txn.objectStore("accountData");
for (let i = 0; i < accountData.length; i++) {
store.put(accountData[i].event); // put == UPSERT
store.put(accountData[i]); // put == UPSERT
}
return promiseifyTxn(txn);
});
Expand Down Expand Up @@ -172,14 +172,14 @@ IndexedDBStoreBackend.prototype = {

/**
* Load all the account data events from the database. This is not cached.
* @return {Promise<MatrixEvent[]>} A list of events.
* @return {Promise<Object[]>} A list of raw global account events.
*/
loadAccountData: function() {
return q.try(() => {
const txn = this.db.transaction(["accountData"], "readonly");
const store = txn.objectStore("accountData");
return selectQuery(store, undefined, (cursor) => {
return new MatrixEvent(cursor.value);
return cursor.value;
});
});
},
Expand Down Expand Up @@ -283,10 +283,9 @@ IndexedDBStore.prototype.startup = function() {
this._userModifiedMap[u.userId] = u.getLastModifiedTime();
this.storeUser(u);
});
this.storeAccountDataEvents(accountData);
this._syncTs = Date.now(); // pretend we've written so we don't rewrite
this.setSyncToken(syncData.nextBatch);
this._setSyncData(syncData.nextBatch, syncData.roomsData);
this._setSyncData(syncData.nextBatch, syncData.roomsData, accountData);
});
};

Expand Down Expand Up @@ -325,10 +324,13 @@ IndexedDBStore.prototype.save = function() {
return q();
};

IndexedDBStore.prototype._setSyncData = function(nextBatch, roomsData) {
this._syncAccumulator.accumulateRooms({
IndexedDBStore.prototype._setSyncData = function(nextBatch, roomsData, accountData) {
this._syncAccumulator.accumulate({
next_batch: nextBatch,
rooms: roomsData,
account_data: {
events: accountData,
},
});
};

Expand All @@ -344,18 +346,11 @@ IndexedDBStore.prototype._syncToDatabase = function() {
this._userModifiedMap[u.userId] = u.getLastModifiedTime();
});

// TODO: work out changed account data events. They don't have timestamps or IDs.
// so we'll need to hook into storeAccountDataEvents instead to catch them when
// they update from /sync
const changedAccountData = Object.keys(this.accountData).map((etype) => {
return this.accountData[etype];
});

const syncData = this._syncAccumulator.getJSON();

return q.all([
this.backend.persistUsers(changedUsers),
this.backend.persistAccountData(changedAccountData),
this.backend.persistAccountData(syncData.accountData),
this.backend.persistSyncData(syncData.nextBatch, syncData.roomsData),
]);
};
Expand Down
38 changes: 33 additions & 5 deletions src/sync-accumulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ class SyncAccumulator {
opts = opts || {};
opts.maxTimelineEntries = opts.maxTimelineEntries || 50;
this.opts = opts;
this.accountData = {
//$event_type: Object
};
this.inviteRooms = {
//$roomId: { ... sync 'invite' json data ... }
};
Expand All @@ -71,15 +74,30 @@ class SyncAccumulator {
this.nextBatch = null;
}

accumulate(syncResponse) {
this._accumulateRooms(syncResponse);
this._accumulateAccountData(syncResponse);
this.nextBatch = syncResponse.next_batch;
}

_accumulateAccountData(syncResponse) {
if (!syncResponse.account_data || !syncResponse.account_data.events) {
return;
}
// Clobbers based on event type.
syncResponse.account_data.events.forEach((e) => {
this.accountData[e.type] = e;
});
}

/**
* Accumulate incremental /sync room data.
* @param {Object} syncResponse the complete /sync JSON
*/
accumulateRooms(syncResponse) {
_accumulateRooms(syncResponse) {
if (!syncResponse.rooms) {
return;
}
this.nextBatch = syncResponse.next_batch;
if (syncResponse.rooms.invite) {
Object.keys(syncResponse.rooms.invite).forEach((roomId) => {
this._accumulateRoom(
Expand Down Expand Up @@ -316,13 +334,15 @@ class SyncAccumulator {
* Return everything under the 'rooms' key from a /sync response which
* represents all room data that should be stored. This should be paired
* with the sync token which represents the most recent /sync response
* provided to accumulateRooms().
* @return {Object} An object with a "nextBatch" key and a "roomsData" key.
* provided to accumulate().
* @return {Object} An object with a "nextBatch", "roomsData" and "accountData"
* keys.
* The "nextBatch" key is a string which represents at what point in the
* /sync stream the accumulator reached. This token should be used when
* restarting a /sync stream at startup. Failure to do so can lead to missing
* events. The "roomsData" key is an Object which represents the entire
* /sync response from the 'rooms' key onwards.
* /sync response from the 'rooms' key onwards. The "accountData" key is
* a list of raw events which represent global account data.
*/
getJSON() {
const data = {
Expand Down Expand Up @@ -434,9 +454,17 @@ class SyncAccumulator {
});
data.join[roomId] = roomJson;
});

// Add account data
const accData = [];
Object.keys(this.accountData).forEach((evType) => {
accData.push(this.accountData[evType]);
});

return {
nextBatch: this.nextBatch,
roomsData: data,
accountData: accData,
};
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,9 @@ SyncApi.prototype._sync = function(syncOptions) {
this._currentSyncRequest = q.resolve({
next_batch: data.nextBatch,
rooms: data.roomsData,
account_data: {
events: data.accountData,
},
});
isCachedResponse = true;
}
Expand Down Expand Up @@ -576,7 +579,7 @@ SyncApi.prototype._sync = function(syncOptions) {
// accumulated data. We don't want to accumulate the same thing twice, so
// only accumulate if this isn't a cached response.
if (self.opts.syncAccumulator && !isCachedResponse) {
self.opts.syncAccumulator.accumulateRooms(data);
self.opts.syncAccumulator.accumulate(data);
}

// emit synced events
Expand Down

0 comments on commit 2b96cd7

Please sign in to comment.