Skip to content

Commit

Permalink
feat: utilization/memory usage on CPU and GPU (#420)
Browse files Browse the repository at this point in the history
The metric collector now runs two additional threads to track GPU/CPU utilization and memory usage.
So far, GPU utilization is still not specific to the process that is running.

Additionally, the metric collector was refactored in order to store samples in a variable stat_log.
Names of metrics now use a period to separate the metric type and alias.

setup.py has been updated to include gpustat and psutil.
  • Loading branch information
GustavBaumgart committed May 26, 2023
1 parent f14b5b3 commit 53a850e
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 35 deletions.
1 change: 1 addition & 0 deletions lib/python/flame/mode/distributed/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ def put(self, tag: str) -> None:
def save_metrics(self):
"""Save metrics in a model registry."""
self.metrics = self.metrics | self.mc.get()
self.mc.clear()
logger.debug(f"saving metrics: {self.metrics}")
if self.metrics:
self.registry_client.save_metrics(self._round, self.metrics)
Expand Down
5 changes: 3 additions & 2 deletions lib/python/flame/mode/horizontal/syncfl/top_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@

class TopAggregator(Role, metaclass=ABCMeta):
"""Top level Aggregator implements an ML aggregation role."""

@abstract_attribute
def config(self) -> Config:
"""Abstract attribute for config object."""
Expand Down Expand Up @@ -105,7 +105,7 @@ def internal_init(self) -> None:
self.datasampler = datasampler_provider.get(
self.config.datasampler.sort, **self.config.datasampler.kwargs
).aggregator_data_sampler

self._round = 1
self._rounds = 1
self._rounds = self.config.hyperparameters.rounds
Expand Down Expand Up @@ -248,6 +248,7 @@ def save_metrics(self):
"""Save metrics in a model registry."""
# update metrics with metrics from metric collector
self.metrics = self.metrics | self.mc.get()
self.mc.clear()
logger.debug(f"saving metrics: {self.metrics}")
if self.metrics:
self.registry_client.save_metrics(self._round - 1, self.metrics)
Expand Down
1 change: 1 addition & 0 deletions lib/python/flame/mode/horizontal/syncfl/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ def save_metrics(self):
"""Save metrics in a model registry."""
# update self.metrics with metrics from MetricCollector instance
self.metrics = self.metrics | self.mc.get()
self.mc.clear()
logger.debug(f"saving metrics: {self.metrics}")
if self.metrics:
self.registry_client.save_metrics(self._round - 1, self.metrics)
Expand Down
112 changes: 103 additions & 9 deletions lib/python/flame/monitor/metric_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,125 @@
"""Metric Collector."""

import logging
import os
import time
import threading
import psutil
import numpy as np
from collections import defaultdict
from gpustat.nvml import pynvml as N

logger = logging.getLogger(__name__)


class MetricCollector:
def __init__(self):
"""Initialize Metric Collector."""
self.state_dict = dict()

self.stat_log = defaultdict(list)

# CPU monitoring
cpu_thread = threading.Thread(target=self.gather_cpu_stats)
cpu_thread.start()

# GPU monitoring
gpu_thread = threading.Thread(target=self.gather_gpu_stats)
gpu_thread.start()

def gather_gpu_stats(self, interval=1):
pid = os.getpid()

try:
N.nvmlInit()
except:
logger.debug(
"could not start gpustat.nvml.pynvml; no GPU metrics will be collected"
)
return

dcount = N.nvmlDeviceGetCount()
if dcount == 0:
logger.debug("no GPUs detected")
return

while True:
for d in range(dcount):
try:
handle = N.nvmlDeviceGetHandleByIndex(d)
processes = N.nvmlDeviceGetComputeRunningProcesses(handle)
except:
logger.debug(f"failed to retrieve GPU processes for GPU {d}")
continue

curr_pids = [process.pid for process in processes]

# GPU utilization
# TO DO: implement metric gathering for process-specific utilization of the GPUs
try:
if pid in curr_pids:
self.stat_log[f"gpu{d}_utilization"].append(
N.nvmlDeviceGetUtilizationRates(handle).gpu
)
else:
self.stat_log[f"gpu{d}_utilization"].append(0)
except:
logger.debug(f"failed to get GPU utilization of GPU {d}")

# GPU memory usage of process
total_mem = 0
for process in processes:
if pid == process.pid:
total_mem += process.usedGpuMemory

self.stat_log[f"gpu{d}_memory"].append(total_mem)

time.sleep(interval)

def gather_cpu_stats(self, interval=1):
pid = os.getpid()
proc = psutil.Process(pid)

while True:
self.stat_log["cpu_utilization"].append(proc.cpu_percent())

mem_info = proc.memory_info()
self.stat_log["cpu_memory_rss"].append(mem_info.rss)
self.stat_log["cpu_memory_vsz"].append(mem_info.vms)

time.sleep(interval)

def get_key(self, mtype, alias):
return f'{mtype}-{alias}'
return f"{alias}.{mtype}"

def save(self, mtype, alias, value):
"""Saves key-value pair for metric."""
key = self.get_key(mtype, alias)
self.state_dict[key] = value
logger.debug(f"Saving state_dict[{key}] = {value}")

def accumulate(self, mtype, alias, value):
key = self.get_key(mtype, alias)
self.state_dict[key] = value + self.state_dict.get(key, 0)
logger.debug(f"Accumulating metric state_dict[{key}] = {self.state_dict[key]}")


def save_log_statistics(self):
for name in self.stat_log:
values = self.stat_log[name]
if values:
self.save(name, "min", np.min(values))
self.save(name, "max", np.max(values))
self.save(name, "mean", np.mean(values))
self.save(name, "25th_prcntl", np.percentile(values, 25))
self.save(name, "50th_prcntl", np.percentile(values, 50))
self.save(name, "75th_prcntl", np.percentile(values, 75))
self.save(name, "std", np.std(values))

def get(self):
"""Returns the current metrics that were collected and clears the saved dictionary."""
temp = self.state_dict
"""Returns the current metrics that were collected."""
self.save_log_statistics()
return self.state_dict

def clear(self):
self.state_dict = dict()
return temp

for key in self.stat_log:
self.stat_log[key].clear()
51 changes: 27 additions & 24 deletions lib/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,38 +18,41 @@
from setuptools import find_packages, setup

setup(
name='flame',
version='0.1.0',
author='Flame Maintainers',
author_email='flame-github-owners@cisco.com',
name="flame",
version="0.1.0",
author="Flame Maintainers",
author_email="flame-github-owners@cisco.com",
include_package_data=True,
packages=find_packages(),
data_files=[],
scripts=['scripts/flame-config'],
url='https://github.com/cisco-open/flame/',
license='LICENSE.txt',
scripts=["scripts/flame-config"],
url="https://github.com/cisco-open/flame/",
license="LICENSE.txt",
description="This package is a python library"
" to run ML workloads in the flame system",
long_description=open('README.md').read(),
long_description=open("README.md").read(),
install_requires=[
'aiostream',
'boto3',
'cloudpickle',
'diskcache',
'mlflow==2.0.1',
'paho-mqtt',
'protobuf==3.19.5',
'grpcio==1.51.1',
'pydantic',
"aiostream",
"boto3",
"cloudpickle",
"diskcache",
"mlflow==2.0.1",
"paho-mqtt",
"protobuf==3.19.5",
"grpcio==1.51.1",
"pydantic",
"gpustat",
"psutil",
"numpy",
],
extras_require={
'dev': [
'pre-commit',
'black',
'flake8',
'bandit',
'mypy',
'isort',
"dev": [
"pre-commit",
"black",
"flake8",
"bandit",
"mypy",
"isort",
],
},
)

0 comments on commit 53a850e

Please sign in to comment.