Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: non-blocking metrics reports [MD-144] #9107

Merged
merged 10 commits into from
Apr 18, 2024
Merged

Conversation

azhou-determined
Copy link
Contributor

@azhou-determined azhou-determined commented Apr 4, 2024

Description

Report metrics in a background thread to not block training.

Introduce core.MetricsContext as a centralized place for all metric reporting.

Test Plan

This PR makes all metrics reporting happen in the background and affects many parts of the python code. A few areas that would be helpful to test:

Core API

submit a core API experiment that reports a lot of metrics quickly in succession.

metrics.py

import logging

import determined as det
from determined import core


def main(core_context):
    for batch in range(100):
        steps_completed = batch + 1
        if steps_completed % 5 == 0:
            core_context.train.report_training_metrics(
                steps_completed=steps_completed, metrics={"x": batch}
            )
        if steps_completed % 10 == 0:
            core_context.train.report_validation_metrics(
                steps_completed=steps_completed, metrics={"x": batch}
            )


if __name__ == "__main__":
    logging.basicConfig(level=logging.DEBUG, format=det.LOG_FORMAT)
    with core.init() as core_context:
        main(core_context=core_context)
        print(f"Finished 'training' loop")
    print(f"Finished reporting metrics. Exiting.")

metrics.yaml

name: metrics
entrypoint: python3 metrics.py

searcher:
   name: single
   metric: x
   max_length: 1

max_restarts: 0

the trial logs should show print(f"Finished 'training' loop") print statement for a few seconds, while the remaining metrics finish reporting (and the metrics view updates accordingly). the debug logs should not show HTTP POSTs (i.e. urllib3.connectionpool: http://host.docker.internal:8080 "POST /api/v1/trials/197/metrics) immediately/synchronously following each determined.core: report_training_metrics log. since these are reported in the background now, most POST logs should appear between the Finished 'training' loop and Finished reporting metrics. Exiting. print statements.

Exception handling

Modify above script to throw an exception at some point in the loop:

import logging

import determined as det
from determined import core


def main(core_context):
    for batch in range(100):
+       if batch == 10:
+          raise ValueError("test exception!")
        
        steps_completed = batch + 1
        if steps_completed % 5 == 0:
            core_context.train.report_training_metrics(
                steps_completed=steps_completed, metrics={"x": batch}
            )
        if steps_completed % 10 == 0:
            core_context.train.report_validation_metrics(
                steps_completed=steps_completed, metrics={"x": batch}
            )


if __name__ == "__main__":
    logging.basicConfig(level=logging.DEBUG, format=det.LOG_FORMAT)
    with core.init() as core_context:
        main(core_context=core_context)
        print(f"Finished 'training' loop")
    print(f"Finished reporting metrics. Exiting.")

the logs should show the exception being thrown, and the experiment should terminate (not hang) with an error. the metrics reported before the exception should show up (only 1 in this case)

Detached mode

run this script locally:

os.environ["DET_MASTER"] = "MASTER_URL"

def main():
    core_v2.init(
        defaults=core_v2.DefaultConfig(
            name="unmanaged-1-singleton",
        ),
    )
    for i in range(100):
        core_v2.train.report_training_metrics(
            steps_completed=i, metrics={"loss": random.random()}
        )

        if (i + 1) % 10 == 0:
            core_v2.train.report_validation_metrics(
                steps_completed=i, metrics={"loss": random.random()}
            )
    print("Waiting for metrics reporting...")
    core_v2.close()


if __name__ == "__main__":
    logging.basicConfig(level=logging.DEBUG, format=det.LOG_FORMAT)
    main()

the script should run instantaneously, and you should see the Waiting for metrics reporting... print statement for a few seconds, while the remaining metrics finish reporting (and the metrics view updates accordingly). the trial should then exit successfully.

Exception handling

modify the above script to throw an exception at some point during metrics reporting:

os.environ["DET_MASTER"] = "MASTER_URL"

def main():
    core_v2.init(
        defaults=core_v2.DefaultConfig(
            name="unmanaged-1-singleton",
        ),
    )
    for i in range(100):
+       if i == 10:
+          raise ValueError("test exception!")
        core_v2.train.report_training_metrics(
            steps_completed=i, metrics={"loss": random.random()}
        )

        if (i + 1) % 10 == 0:
            core_v2.train.report_validation_metrics(
                steps_completed=i, metrics={"loss": random.random()}
            )
    print("Waiting for metrics reporting...")
    core_v2.close()


if __name__ == "__main__":
    logging.basicConfig(level=logging.DEBUG, format=det.LOG_FORMAT)
    main()

the logs should show the exception being thrown, and the experiment should terminate (not hang) with an error. the metrics reported before the exception should show up ([0-9] in this case)

Commentary (optional)

Checklist

  • Changes have been manually QA'd
  • User-facing API changes need the "User-facing API Change" label.
  • Release notes should be added as a separate file under docs/release-notes/.
    See Release Note for details.
  • Licenses should be included for new code which was copied and/or modified from any external code.

Ticket

Copy link

netlify bot commented Apr 4, 2024

Deploy Preview for determined-ui canceled.

Name Link
🔨 Latest commit b229537
🔍 Latest deploy log https://app.netlify.com/sites/determined-ui/deploys/662014320b88b0000880166c

Copy link

codecov bot commented Apr 4, 2024

Codecov Report

Attention: Patch coverage is 85.10638% with 21 lines in your changes are missing coverage. Please review.

Project coverage is 45.52%. Comparing base (0fc247c) to head (b229537).
Report is 16 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #9107      +/-   ##
==========================================
+ Coverage   45.48%   45.52%   +0.03%     
==========================================
  Files        1197     1199       +2     
  Lines      147556   147646      +90     
  Branches     2438     2437       -1     
==========================================
+ Hits        67121    67209      +88     
- Misses      80203    80205       +2     
  Partials      232      232              
Flag Coverage Δ
backend 43.68% <ø> (-0.07%) ⬇️
harness 64.17% <85.10%> (+0.15%) ⬆️
web 35.41% <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Coverage Δ
harness/determined/core/__init__.py 100.00% <100.00%> (ø)
harness/tests/core/test_metrics.py 100.00% <100.00%> (ø)
harness/determined/core/_train.py 40.90% <33.33%> (+1.08%) ⬆️
harness/determined/core/_context.py 57.63% <50.00%> (+0.08%) ⬆️
harness/determined/core/_profiler.py 57.56% <14.28%> (+5.08%) ⬆️
harness/determined/core/_metrics.py 88.37% <88.37%> (ø)

... and 6 files with indirect coverage changes

Comment on lines 44 to 47
# Check for thread exceptions here since we're not polling.
if not self._error_queue.empty():
err_msg = self._error_queue.get(block=False)
logger.error(f"Error reporting metrics: {err_msg}")
raise err_msg

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also check/flush the error queue on close?

e.g. what if user

  1. logs only one metric
  2. it fails in the background
  3. user logs no more and exits

it'd be good to wait for that report to flush & log error, if any.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea. done.

Comment on lines 65 to 108
class _TrialMetrics:
def __init__(
self,
group: str,
steps_completed: int,
metrics: Dict[str, Any],
batch_metrics: Optional[List[Dict[str, Any]]] = None,
):
self.group = group
self.steps_completed = steps_completed
self.metrics = metrics
self.batch_metrics = batch_metrics
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we have something like this class elsewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have determined.common.experimental.metrics.TrialMetrics but it's under the experimental namespace.
the profiler determined.core._profiler had a similar object for the same purpose, but i've refactored profiler to now use the MetricsContext


def close(self) -> None:
self._shipper.stop()
self._shipper.join()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of a plain join, can we do a join with a time out logic similar to https://github.com/determined-ai/determined/blob/main/harness/determined/core/_log_shipper.py#L107

way too many times we had cases when the process is stuck on bg thread join, because there's a networking hangup or something. hard to debug every time. so I'd suggest all non-daemon threads which are joined must have a timeout and logging of some sort for "it took too long and we gave up".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point. done, it's gated behind a check for if the inbound queue is empty. because if the core context is waiting on metrics to finish reporting, we can't just time it out.

if the queue is not empty and there's a hang, i'd wager it's probably better to risk the hang than to risk killing some metric reports.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the core context is waiting on metrics to finish reporting, but they are stuck for a long time, and that join takes a while, it'd still be opaque for a user why their script doesn't exit. I'd suggest we should also join with a timeout in a loop, and print a log message (after 10 seconds) that we are waiting for that to finish.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

stepsCompleted=steps_completed,
trialId=self._trial_id,
trialRunId=self._run_id,
self._metrics.report(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

general question: do we have sufficient automated testing for this.

  1. do we have an existing unit test, or is it possible to have a unit test testing completeness/correctness of metrics sent from the harness?
  2. if not possible: do we have an e2e test which'd check these?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good question. the full end-to-end journey of metrics probably goes something like:
training code -> trial APIs -> core API (now with async logic)-> POST APIs to master -> master side stuff -> read APIs

individual trials have unit tests (test_tf_keras_trial.py, test_pytorch_trial.py) that test the Trial API generates the expected metrics (training code -> trial APIs) but we don't have unit tests for the core API -> master APIs leg of metrics reporting. i'll write one.

beyond that the end-to-end training code -> metrics read APIs is covered by e2e_tests/.../test_metrics.py

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added unit tests for the new MetricsContext

self._trial_id = trial_id
self._run_id = run_id

super().__init__()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm, one last debugging convenience

Suggested change
super().__init__()
super().__init__(name="MetricsShipperThread")

@azhou-determined azhou-determined merged commit 0bc13d8 into main Apr 18, 2024
74 of 86 checks passed
@azhou-determined azhou-determined deleted the non-blocking-metrics branch April 18, 2024 20:01
JComins000 pushed a commit that referenced this pull request Apr 22, 2024
- introduce ``_MetricsContext`` into Core API as a centralized place for metrics reporting, which reports metrics in a background thread
- refactor `core.train` and `core.profiler` to report metrics through the new ``_MetricsContext``
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants