Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add process monitoring to simulation executor #127

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 69 additions & 7 deletions folding/miners/folding_miner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Dict, List, Tuple
from collections import defaultdict
import bittensor as bt
import signal

# import base miner class which takes care of most of the boilerplate
from folding.base.miner import BaseMinerNeuron
Expand All @@ -22,6 +23,11 @@
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
BASE_DATA_PATH = os.path.join(ROOT_DIR, "miner-data")

import multiprocessing

MANAGER = multiprocessing.Manager()
PROCESS_QUEUE_MANAGER = MANAGER.Queue()


def attach_files(files_to_attach: List, synapse: FoldingSynapse) -> FoldingSynapse:
"""function that parses a list of files and attaches them to the synapse object"""
Expand Down Expand Up @@ -221,11 +227,15 @@ def check_and_remove_simulations(self, event: Dict) -> Dict:
"""Check to see if any simulations have finished, and remove them
from the simulation store
"""
sims_to_delete = []
if len(self.simulations) > 0:
sims_to_delete = []

bt.logging.warning(f"current simulatinos in the stack: {self.simulations}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
bt.logging.warning(f"current simulatinos in the stack: {self.simulations}")
bt.logging.warning(f"current simulations in the stack: {self.simulations}")

for pdb_id, simulation in self.simulations.items():
time_since_last_query = time.time() - simulation["queried_at"]
bt.logging.info(
f"Time since last query for {pdb_id}: {time_since_last_query}"
)
bt.logging.warning(f"Simulation info: {simulation['executor']}")
current_executor_state = simulation["executor"].get_state()

if current_executor_state == "finished":
Expand All @@ -239,11 +249,12 @@ def check_and_remove_simulations(self, event: Dict) -> Dict:
)
sims_to_delete.append(pdb_id)

for pdb_id in sims_to_delete:
del self.simulations[pdb_id]
bt.logging.warning(f"Sims to delete: {sims_to_delete}")
for pdb_id in sims_to_delete:
self.simulations[pdb_id] = None

event["running_simulations"] = list(self.simulations.keys())
bt.logging.warning(f"Simulations Running: {list(self.simulations.keys())}")
event["running_simulations"] = list(self.simulations.keys())
bt.logging.warning(f"Simulations Running: {list(self.simulations.keys())}")

return event

Expand Down Expand Up @@ -274,7 +285,48 @@ def forward(self, synapse: FoldingSynapse) -> FoldingSynapse:
output_dir = os.path.join(self.base_data_path, synapse.pdb_id)

# check if any of the simulations have finished
event = self.check_and_remove_simulations(event=event)
# event = self.check_and_remove_simulations(event=event)

# THIS IS THE COED BUT IN FORWWARD TO SEE IF THINGS CHANGE

sims_to_delete = {
"finished_simulations": [],
"timed_out_simulations": [],
}
if len(self.simulations) > 0:
bt.logging.warning(f"current simulatinos in the stack: {self.simulations}")
for pdb_id, simulation in self.simulations.items():
time_since_last_query = time.time() - simulation["queried_at"]
bt.logging.info(
f"Time since last query for {pdb_id}: {time_since_last_query}"
)
bt.logging.warning(f"Simulation info: {simulation['executor']}")

current_executor_state = simulation["executor"].get_state()

if current_executor_state == "finished":
bt.logging.warning(
f"✅ {pdb_id} finished simulation... Removing from execution stack ✅"
)
sims_to_delete["finished_simulations"].append(pdb_id)
elif time_since_last_query > self.config.neuron.query_timeout:
bt.logging.warning(
f"⏰ Query timeout of {self.config.neuron.query_timeout} reached for {pdb_id}... Removing from execution stack ⏰"
)
sims_to_delete["timed_out_simulations"].append(pdb_id)

bt.logging.warning(f"Sims to delete: {sims_to_delete}")
for key, pdbs_to_delete in sims_to_delete.items():
if key == "finished_simulations":
for pdb_id in pdbs_to_delete:
del self.simulations[pdb_id]
elif key == "timed_out_simulations":
for pdb_id in pdbs_to_delete:
os.kill(self.simulations[pdb_id]["process_id"], signal.SIGTERM)
del self.simulations[pdb_id]

event["running_simulations"] = list(self.simulations.keys())
bt.logging.warning(f"Simulations Running: {list(self.simulations.keys())}")

# The set of RUNNING simulations.
if synapse.pdb_id in self.simulations:
Expand Down Expand Up @@ -355,16 +407,22 @@ def forward(self, synapse: FoldingSynapse) -> FoldingSynapse:
future = self.executor.submit(
simulation_manager.run,
synapse.md_inputs,
PROCESS_QUEUE_MANAGER,
state_commands,
self.config.neuron.suppress_cmd_output,
self.config.mock or self.mock, # self.mock is inside of MockFoldingMiner
)

# Get the process id of the simulation manager and add it to the queue
pdb_id, process_id = PROCESS_QUEUE_MANAGER.get(block=True, timeout=1)

self.simulations[synapse.pdb_id]["executor"] = simulation_manager
self.simulations[pdb_id]["process_id"] = process_id
self.simulations[synapse.pdb_id]["future"] = future
self.simulations[synapse.pdb_id]["output_dir"] = simulation_manager.output_dir
self.simulations[synapse.pdb_id]["queried_at"] = time.time()

bt.logging.warning(f"Running pdb_id {pdb_id} with process_id {process_id}")
bt.logging.success(
f"✅ New pdb_id {synapse.pdb_id} submitted to job executor ✅ "
)
Expand Down Expand Up @@ -433,6 +491,7 @@ def create_empty_file(self, file_path: str):
def run(
self,
md_inputs: Dict,
queue: multiprocessing.Queue,
commands: Dict,
suppress_cmd_output: bool = True,
mock: bool = False,
Expand All @@ -449,6 +508,9 @@ def run(
f"Running simulation for protein: {self.pdb_id} with files {md_inputs.keys()}"
)

process_id = os.getpid()
queue.put((self.pdb_id, process_id))

# Make sure the output directory exists and if not, create it
check_if_directory_exists(output_directory=self.output_dir)
os.chdir(self.output_dir) # TODO: will this be a problem with many processes?
Expand Down