Skip to content

Commit

Permalink
[lib] Use queue when processing inbound messages from the DB
Browse files Browse the repository at this point in the history
Summary:
Use the queue instead of processing operations in a loop.

https://linear.app/comm/issue/ENG-9470/mitigate-risks-of-effects-running-on-outdated-data

Test Plan:
Disabled inbound messages processing by commenting out `handleOlmMessageToDevice` in `usePeerToPeerMessageHandler`.
On one device, created a thread, changed its name, and then its description. These operations weren't processed on another device. Refreshed the app so that the operations are read from the DB and processed. Made sure that the result is correct.

Checked what was the behavior before this diff. Surprisingly, it was the same. Using the queue here sounds more correct and feels safer, but the previous approach also works correctly.

Reviewers: kamil, inka

Reviewed By: kamil

Subscribers: ashoat

Differential Revision: https://phab.comm.dev/D13623
  • Loading branch information
palys-swm committed Oct 16, 2024
1 parent a8b8ff0 commit 93296ad
Showing 1 changed file with 98 additions and 107 deletions.
205 changes: 98 additions & 107 deletions lib/tunnelbroker/peer-to-peer-message-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
usePeerToPeerMessageHandler,
} from './use-peer-to-peer-message-handler.js';
import { useLoggedInUserInfo } from '../hooks/account-hooks.js';
import { useActionsQueue } from '../hooks/actions-queue.js';
import type { InboundP2PMessage } from '../types/sqlite-types.js';
import type { MessageReceiveConfirmation } from '../types/tunnelbroker/message-receive-confirmation-types.js';
import {
Expand All @@ -33,16 +34,74 @@ function PeerToPeerMessageHandler(props: Props): React.Node {
const peerToPeerMessageHandler = usePeerToPeerMessageHandler();
const handleOlmMessageToDevice = useHandleOlmMessageToDevice();

const [messagesQueue, setMessagesQueue] = React.useState<
$ReadOnlyArray<{
+peerToPeerMessage: PeerToPeerMessage,
+messageID: string,
+localSocketSessionCounter: number,
}>,
>([]);
const [isProcessing, setProcessing] = React.useState(false);
const [processedInboundMessages, setProcessedInboundMessages] =
React.useState(false);
const processItem = React.useCallback(
async (
item:
| {
+type: 'persisted_message',
+message: InboundP2PMessage,
}
| {
+type: 'received_message',
+message: {
+peerToPeerMessage: PeerToPeerMessage,
+messageID: string,
+localSocketSessionCounter: number,
},
},
) => {
if (item.type === 'persisted_message') {
const { message } = item;
try {
await handleOlmMessageToDevice(
message.plaintext,
{ deviceID: message.senderDeviceID, userID: message.senderUserID },
message.messageID,
);
} catch (e) {
console.log('Failed processing Olm P2P message:', e);
}
} else {
const { peerToPeerMessage, messageID, localSocketSessionCounter } =
item.message;
// Since scheduling processing this message socket is closed
// or was closed and reopened, we have to stop processing
// because Tunnelbroker flushes the message again when opening
// the socket, and we want to process this only once.
if (
localSocketSessionCounter !== getSessionCounter() ||
!doesSocketExist()
) {
return;
}

try {
await peerToPeerMessageHandler(peerToPeerMessage, messageID);
} catch (e) {
console.log(e.message);
} finally {
if (
localSocketSessionCounter === getSessionCounter() &&
doesSocketExist()
) {
const confirmation: MessageReceiveConfirmation = {
type: deviceToTunnelbrokerMessageTypes.MESSAGE_RECEIVE_CONFIRMATION,
messageIDs: [messageID],
};
socketSend(JSON.stringify(confirmation));
}
}
}
},
[
doesSocketExist,
getSessionCounter,
handleOlmMessageToDevice,
peerToPeerMessageHandler,
socketSend,
],
);
const { enqueue } = useActionsQueue(processItem);

const tunnelbrokerMessageListener = React.useCallback(
async (message: TunnelbrokerToDeviceMessage) => {
Expand Down Expand Up @@ -76,121 +135,53 @@ function PeerToPeerMessageHandler(props: Props): React.Node {
return;
}
const peerToPeerMessage: PeerToPeerMessage = rawPeerToPeerMessage;
setMessagesQueue(oldQueue => [
...oldQueue,
enqueue([
{
peerToPeerMessage,
messageID: message.messageID,
localSocketSessionCounter: getSessionCounter(),
type: 'received_message',
message: {
peerToPeerMessage,
messageID: message.messageID,
localSocketSessionCounter: getSessionCounter(),
},
},
]);
},
[getSessionCounter, socketSend],
[enqueue, getSessionCounter, socketSend],
);

const processPersistedInboundMessages = React.useCallback(async () => {
if (isProcessing || processedInboundMessages) {
return;
}
setProcessing(true);
React.useEffect(() => {
addListener(tunnelbrokerMessageListener);
return () => {
removeListener(tunnelbrokerMessageListener);
};
}, [addListener, removeListener, tunnelbrokerMessageListener]);

const processPersistedInboundMessages = React.useCallback(async () => {
try {
const { sqliteAPI } = getConfig();
const messages = await sqliteAPI.getAllInboundP2PMessages();

for (const message: InboundP2PMessage of messages) {
try {
await handleOlmMessageToDevice(
message.plaintext,
{ deviceID: message.senderDeviceID, userID: message.senderUserID },
message.messageID,
);
} catch (e) {
console.log('Failed processing Olm P2P message:', e);
}
}
} finally {
setProcessedInboundMessages(true);
setProcessing(false);
}
}, [handleOlmMessageToDevice, isProcessing, processedInboundMessages]);

const processMessage = React.useCallback(async () => {
if (messagesQueue.length === 0 || isProcessing) {
return;
}

setProcessing(true);

const { peerToPeerMessage, messageID, localSocketSessionCounter } =
messagesQueue[0];

// Since scheduling processing this message socket is closed
// or was closed and reopened, we have to stop processing
// because Tunnelbroker flushes the message again when opening
// the socket, and we want to process this only once.
if (
localSocketSessionCounter !== getSessionCounter() ||
!doesSocketExist()
) {
setMessagesQueue(currentQueue => currentQueue.slice(1));
setProcessing(false);
return;
}

try {
await peerToPeerMessageHandler(peerToPeerMessage, messageID);
enqueue(
messages.map(message => ({
type: 'persisted_message',
message,
})),
);
} catch (e) {
console.log(e.message);
} finally {
if (
localSocketSessionCounter === getSessionCounter() &&
doesSocketExist()
) {
const confirmation: MessageReceiveConfirmation = {
type: deviceToTunnelbrokerMessageTypes.MESSAGE_RECEIVE_CONFIRMATION,
messageIDs: [messageID],
};
socketSend(JSON.stringify(confirmation));
}
setMessagesQueue(currentQueue => currentQueue.slice(1));
setProcessing(false);
console.log('error while reading persisted inbound messages:', e.message);
}
}, [
doesSocketExist,
getSessionCounter,
isProcessing,
messagesQueue,
peerToPeerMessageHandler,
socketSend,
]);
}, [enqueue]);

const loggedInUserInfo = useLoggedInUserInfo();
const viewerID = loggedInUserInfo?.id;

const processingInputMessagesStarted = React.useRef(false);
React.useEffect(() => {
if (isProcessing || !viewerID) {
if (!viewerID || processingInputMessagesStarted.current) {
return;
}
if (!processedInboundMessages) {
void processPersistedInboundMessages();
} else if (messagesQueue.length > 0) {
void processMessage();
}
}, [
messagesQueue,
isProcessing,
processMessage,
processedInboundMessages,
processPersistedInboundMessages,
viewerID,
]);

React.useEffect(() => {
addListener(tunnelbrokerMessageListener);
return () => {
removeListener(tunnelbrokerMessageListener);
};
}, [addListener, removeListener, tunnelbrokerMessageListener]);
processingInputMessagesStarted.current = true;
void processPersistedInboundMessages();
}, [processPersistedInboundMessages, viewerID]);
}

export { PeerToPeerMessageHandler };

0 comments on commit 93296ad

Please sign in to comment.