Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ordered concurrency #31

Open
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

flying-scorpio
Copy link

As discussed in #22 and #3 (comment):
Concurrency on message handling broke the server-side ordering, because the server orders the messages in the sequence they arrived in.
To add a bit of concurrency anyways, the idea is to group messages per room and handle the messages in each room sequentially, while handling the rooms in parallel. Of course you would need a very large number of rooms for this to make a great difference.

To note that I am new to typescript...

Copy link
Collaborator

@HerHde HerHde left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This handles threads correctly, as parent message are handled before the children (as all are handled sequentially), right?

In TS camelCase is used, rather than snake_case, but good job! Also kudos for code which passes all pre-commit checks and tests 👍
https://ci.netzbegruenung.verdigado.net/repos/938/pipeline/1240

src/app.ts Outdated
const user_queue: RcUser[] = []
const room_queue: RcRoom[] = []
const messages_per_room: Map<string, RcMessage[]> = new Map()

const rl = new lineByLine(`./inputs/${entities[entity].filename}`)

let line: false | Buffer
while ((line = rl.next())) {
const item = JSON.parse(line.toString())
switch (entity) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I am thinking to move the switch (entity) outside and looping and handling the queue inside each case

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, threadmessages are handled just like any message, so the sequence is the same as from the RC export.
I converted the variables to camelCase. Thanks!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I am thinking to move the switch (entity) outside and looping and handling the queue inside each case

I've thought of it, but that would add as many loops as there are cases, not sure which of the two versions would be faster.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the function is called 3 times, each with one entity type, it would check 3 times and loop within it. We also don't need to check non-affected queues, then.

Copy link
Author

@flying-scorpio flying-scorpio Jul 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In 9fa8e03,
64935fd
and 7a7fdec :

I moved the switch outside of the loop. For users and rooms, the loop is handled by the PromisePool, so no need to use queues anymore. For messages, to maintain the sequence, I kept the two steps of populating the Map and then loading.
Because PromisePool loops over an iterator, I added a wrapper to turn lineByLine into an iterator (its .next() method made me think it was implemented as an iterator, but it isn't).

I feel the message handling can still be improved, maybe avoiding the first step of loading. To take care of #3 (comment), cut the messages in batches or something.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel the message handling can still be improved, maybe avoiding the first step of loading. To take care of #3 (comment), cut the messages in batches or something.

I added batches for the message handling in bee1c2f.
Tested with:

./reset.sh && npm run compile && systemd-run --scope -p MemoryMax=75M -p MemorySwapMax=1K --user node dist/app.js 

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants