Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use websockets to make on-demand worker file previews faster #4096

Merged
merged 39 commits into from
Dec 2, 2022
Merged

Conversation

epicfaace
Copy link
Member

@epicfaace epicfaace commented May 5, 2022

Use websockets to make on-demand worker file previews faster. File previews (such as loading stdout) of a worker that is currently running a bundle now take ~0.5 seconds.

Fixes #4084. This PR is basically a POC of my comment #4084 (comment) -- I've added a thin websocket layer so that when a user requests to view one of the worker's files:

  • the server will then ping the worker through the websocket
  • the worker will then check in
  • the worker will then send back the file.

This also gives us the latitude to change the worker's default checkin frequency from 5 seconds -> 20 seconds, further decreasing the load on the rest server.

TODOs:

  • don't hardcode the ws-server URL
  • test this on dev, with ~20 workers
  • make sure worker websocket listening thread code is thread-safe

@epicfaace epicfaace changed the title websocket test Use websockets to make on-demand worker file previews faster May 5, 2022
@@ -138,7 +138,7 @@ def parse_args():
'--checkin-frequency-seconds',
help='Number of seconds to wait between worker check-ins',
type=int,
default=5,
default=20,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think it would be nice to update the worker status at least once every 5 seconds...we don't need this to be 20, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No -- we can keep it at 5

@epicfaace
Copy link
Member Author

image

logging.warn(
f"Got websocket message, got data: {data}, going to check in now."
)
self.checkin()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For concurrency:

  • hold a global lock when running checkin()
  • also hold the lock in the worker loop 1) when running checkin() 2) when processing bundles

print("RSH")
worker_id = await websocket.recv()
logger.warn(f"Got a message from the rest server, to ping worker: {worker_id}.")

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

document

docs/Server-Setup.md Outdated Show resolved Hide resolved
@@ -154,6 +154,40 @@ containers, and periodically report on the status of the runs.
A worker also has to respond to various commands such as reading files in the
bundle while it's running, killing bundles, etc.

All data transfer between the worker and the server happens through a process known
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AndrewJGaut here's an explanation of my approach here, would love any feedback!

@AndrewJGaut AndrewJGaut marked this pull request as ready for review November 9, 2022 06:35
@AndrewJGaut
Copy link
Contributor

@epicfaace Can you please review the changes made since your last change? They are quite minor. Once you give the LGTM, I'll approve it and merge it

Copy link
Member Author

@epicfaace epicfaace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

self._checkin_lock = Lock()
# Lock ensures listening thread and main thread don't simultaneously
# access the runs dictionary, thereby causing race conditions.
self._lock = RLock()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use RLock instead of Lock?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just for elegance's sake.

RLock allows the lock to be acquired multiple times (provided it is also released precisely as many times at is acquired). This is desirable in this case because, to avoid race conditions, we must call self.process_runs before the lock is released within the checkin() function. By using RLock, we allow the code to acquire the lock a second time so that we can run process_runs within checkin.

If we didn't do this, we could use a flag to tell process_runs we already acquired the lock and not acquire the lock in that case, but that seemed less elegant to me.

codalab/worker/worker.py Show resolved Hide resolved
@@ -1724,7 +1724,7 @@ def test_run(ctx):
# Test that bundle fails when run without sufficient time quota
_run_command([cl, 'uedit', 'codalab', '--time-quota', '2'])
uuid = _run_command([cl, 'run', 'sleep 100000'])
wait_until_state(uuid, State.KILLED, timeout_seconds=60)
wait_until_state(uuid, State.KILLED, timeout_seconds=63)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why from 60 to 63?

def send_json_message(self, socket_id, message, timeout_secs, autoretry=True):
def _ping_worker_ws(self, worker_id):
async def ping_ws():
async with websockets.connect("ws://ws-server:2901/main") as websocket:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to ensure this isn't hardcoded.

while not self.terminate:
logging.warn(f"Connecting anew to: ws://ws-server:2901/worker/{self.id}")
async with websockets.connect(
f"ws://ws-server:2901/worker/{self.id}", max_queue=1
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to ensure this isn't hardcoded.

@epicfaace
Copy link
Member Author

@AndrewJGaut my main feedback is that ws-server:2901 shouldn't be hardcoded. we should ensure we make use of the ws_port configured in codalab_service.py. Once you add that, feel free to merge!

@AndrewJGaut AndrewJGaut merged commit 5c59e6a into master Dec 2, 2022
@AndrewJGaut AndrewJGaut mentioned this pull request Dec 6, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Improve STDOUT for users
3 participants