diff --git a/pydra/engine/core.py b/pydra/engine/core.py index 82400a1be..a523c2452 100644 --- a/pydra/engine/core.py +++ b/pydra/engine/core.py @@ -9,7 +9,7 @@ import sys from pathlib import Path import typing as ty -from copy import deepcopy +from copy import deepcopy, copy from uuid import uuid4 from filelock import SoftFileLock import shutil @@ -281,13 +281,15 @@ def checksum_states(self, state_index=None): """ if is_workflow(self) and self.inputs._graph_checksums is attr.NOTHING: - self.inputs._graph_checksums = [nd.checksum for nd in self.graph_sorted] + self.inputs._graph_checksums = { + nd.name: nd.checksum for nd in self.graph_sorted + } if state_index is not None: - inputs_copy = deepcopy(self.inputs) + inputs_copy = copy(self.inputs) for key, ind in self.state.inputs_ind[state_index].items(): val = self._extract_input_el( - inputs=inputs_copy, inp_nm=key.split(".")[1], ind=ind + inputs=self.inputs, inp_nm=key.split(".")[1], ind=ind ) setattr(inputs_copy, key.split(".")[1], val) # setting files_hash again in case it was cleaned by setting specific element @@ -462,13 +464,25 @@ def __call__( return res def _modify_inputs(self): - """Update and preserve a Task's original inputs""" + """This method modifies the inputs of the task ahead of its execution: + - links/copies upstream files and directories into the destination tasks + working directory as required select state array values corresponding to + state index (it will try to leave them where they are unless specified or + they are on different file systems) + - resolve template values (e.g. output_file_template) + - deepcopy all inputs to guard against in-place changes during the task's + execution (they will be replaced after the task's execution with the + original inputs to ensure the tasks checksums are consistent) + """ orig_inputs = { - k: deepcopy(v) for k, v in attr.asdict(self.inputs, recurse=False).items() + k: v + for k, v in attr.asdict(self.inputs, recurse=False).items() + if not k.startswith("_") } map_copyfiles = {} - for fld in attr_fields(self.inputs): - value = getattr(self.inputs, fld.name) + input_fields = attr.fields(type(self.inputs)) + for name, value in orig_inputs.items(): + fld = getattr(input_fields, name) copy_mode, copy_collation = parse_copyfile( fld, default_collation=self.DEFAULT_COPY_COLLATION ) @@ -483,12 +497,22 @@ def _modify_inputs(self): supported_modes=self.SUPPORTED_COPY_MODES, ) if value is not copied_value: - map_copyfiles[fld.name] = copied_value + map_copyfiles[name] = copied_value modified_inputs = template_update( self.inputs, self.output_dir, map_copyfiles=map_copyfiles ) - if modified_inputs: - self.inputs = attr.evolve(self.inputs, **modified_inputs) + assert all(m in orig_inputs for m in modified_inputs), ( + "Modified inputs contain fields not present in original inputs. " + "This is likely a bug." + ) + for name, orig_value in orig_inputs.items(): + try: + value = modified_inputs[name] + except KeyError: + # Ensure we pass a copy not the original just in case inner + # attributes are modified during execution + value = deepcopy(orig_value) + setattr(self.inputs, name, value) return orig_inputs def _populate_filesystem(self, checksum, output_dir): @@ -548,13 +572,14 @@ def _run(self, rerun=False, environment=None, **kwargs): save(output_dir, result=result, task=self) # removing the additional file with the checksum (self.cache_dir / f"{self.uid}_info.json").unlink() - # # function etc. shouldn't change anyway, so removing - orig_inputs = { - k: v for k, v in orig_inputs.items() if not k.startswith("_") - } - self.inputs = attr.evolve(self.inputs, **orig_inputs) + # Restore original values to inputs + for field_name, field_value in orig_inputs.items(): + setattr(self.inputs, field_name, field_value) os.chdir(cwd) self.hooks.post_run(self, result) + # Check for any changes to the input hashes that have occurred during the execution + # of the task + self._check_for_hash_changes() return result def _collect_outputs(self, output_dir): @@ -816,8 +841,8 @@ def result(self, state_index=None, return_inputs=False): Returns ------- - result : - + result : Result + the result of the task """ # TODO: check if result is available in load_result and # return a future if not @@ -884,6 +909,47 @@ def _reset(self): for task in self.graph.nodes: task._reset() + def _check_for_hash_changes(self): + hash_changes = self.inputs.hash_changes() + details = "" + for changed in hash_changes: + field = getattr(attr.fields(type(self.inputs)), changed) + val = getattr(self.inputs, changed) + field_type = type(val) + if issubclass(field.type, FileSet): + details += ( + f"- {changed}: value passed to the {field.type} field is of type " + f"{field_type} ('{val}'). If it is intended to contain output data " + "then the type of the field in the interface class should be changed " + "to `pathlib.Path`. Otherwise, if the field is intended to be an " + "input field but it gets altered by the task in some way, then the " + "'copyfile' flag should be set to 'copy' in the field metadata of " + "the task interface class so copies of the files/directories in it " + "are passed to the task instead.\n" + ) + else: + details += ( + f"- {changed}: the {field_type} object passed to the {field.type}" + f"field appears to have an unstable hash. This could be due to " + "a stochastic/non-thread-safe attribute(s) of the object\n\n" + f"The {field.type}.__bytes_repr__() method can be implemented to " + "bespoke hashing methods based only on the stable attributes for " + f"the `{field_type.__module__}.{field_type.__name__}` type. " + f"See pydra/utils/hash.py for examples. Value: {val}\n" + ) + if hash_changes: + raise RuntimeError( + f"Input field hashes have changed during the execution of the " + f"'{self.name}' {type(self).__name__}.\n\n{details}" + ) + logger.debug( + "Input values and hashes for '%s' %s node:\n%s\n%s", + self.name, + type(self).__name__, + self.inputs, + self.inputs._hashes, + ) + SUPPORTED_COPY_MODES = FileSet.CopyMode.any DEFAULT_COPY_COLLATION = FileSet.CopyCollation.any @@ -1076,7 +1142,9 @@ def checksum(self): """ # if checksum is called before run the _graph_checksums is not ready if is_workflow(self) and self.inputs._graph_checksums is attr.NOTHING: - self.inputs._graph_checksums = [nd.checksum for nd in self.graph_sorted] + self.inputs._graph_checksums = { + nd.name: nd.checksum for nd in self.graph_sorted + } input_hash = self.inputs.hash if not self.state: @@ -1256,8 +1324,9 @@ async def _run(self, submitter=None, rerun=False, **kwargs): (self.cache_dir / f"{self.uid}_info.json").unlink() os.chdir(cwd) self.hooks.post_run(self, result) - if result is None: - raise Exception("This should never happen, please open new issue") + # Check for any changes to the input hashes that have occurred during the execution + # of the task + self._check_for_hash_changes() return result async def _run_task(self, submitter, rerun=False): diff --git a/pydra/engine/specs.py b/pydra/engine/specs.py index 4e0e66e19..16cd925ce 100644 --- a/pydra/engine/specs.py +++ b/pydra/engine/specs.py @@ -15,7 +15,7 @@ ) import pydra from .helpers_file import template_update_single -from ..utils.hash import hash_function +from ..utils.hash import hash_function, Cache # from ..utils.misc import add_exc_note @@ -73,21 +73,22 @@ class SpecInfo: class BaseSpec: """The base dataclass specs for all inputs and outputs.""" - # def __attrs_post_init__(self): - # self.files_hash = { - # field.name: {} - # for field in attr_fields( - # self, exclude_names=("_graph_checksums", "bindings", "files_hash") - # ) - # if field.metadata.get("output_file_template") is None - # } - def collect_additional_outputs(self, inputs, output_dir, outputs): """Get additional outputs.""" return {} @property def hash(self): + hsh, self._hashes = self._compute_hashes() + return hsh + + def hash_changes(self): + """Detects any changes in the hashed values between the current inputs and the + previously calculated values""" + _, new_hashes = self._compute_hashes() + return [k for k, v in new_hashes.items() if v != self._hashes[k]] + + def _compute_hashes(self) -> ty.Tuple[bytes, ty.Dict[str, bytes]]: """Compute a basic hash for any given set of fields.""" inp_dict = {} for field in attr_fields( @@ -101,10 +102,13 @@ def hash(self): if "container_path" in field.metadata: continue inp_dict[field.name] = getattr(self, field.name) - inp_hash = hash_function(inp_dict) + hash_cache = Cache({}) + field_hashes = { + k: hash_function(v, cache=hash_cache) for k, v in inp_dict.items() + } if hasattr(self, "_graph_checksums"): - inp_hash = hash_function((inp_hash, self._graph_checksums)) - return inp_hash + field_hashes["_graph_checksums"] = self._graph_checksums + return hash_function(sorted(field_hashes.items())), field_hashes def retrieve_values(self, wf, state_index: ty.Optional[int] = None): """Get values contained by this spec.""" @@ -984,8 +988,21 @@ def get_value( if result is None: raise RuntimeError( f"Could not find results of '{node.name}' node in a sub-directory " - f"named '{node.checksum}' in any of the cache locations:\n" + f"named '{node.checksum}' in any of the cache locations.\n" + "\n".join(str(p) for p in set(node.cache_locations)) + + f"\n\nThis is likely due to hash changes in '{self.name}' node inputs. " + f"Current values and hashes: {self.inputs}, " + f"{self.inputs._hashes}\n\n" + "Set loglevel to 'debug' in order to track hash changes " + "throughout the execution of the workflow.\n\n " + "These issues may have been caused by `bytes_repr()` methods " + "that don't return stable hash values for specific object " + "types across multiple processes (see bytes_repr() " + '"singledispatch "function in pydra/utils/hash.py).' + "You may need to implement a specific `bytes_repr()` " + '"singledispatch overload"s or `__bytes_repr__()` ' + "dunder methods to handle one or more types in " + "your interface inputs." ) _, split_depth = TypeParser.strip_splits(self.type) diff --git a/pydra/engine/submitter.py b/pydra/engine/submitter.py index 23c8f50b0..e2610c9bd 100644 --- a/pydra/engine/submitter.py +++ b/pydra/engine/submitter.py @@ -183,24 +183,55 @@ async def expand_workflow(self, wf, rerun=False): # don't block the event loop! await asyncio.sleep(1) if ii > 60: - blocked = _list_blocked_tasks(graph_copy) - # get_runnable_tasks(graph_copy) # Uncomment to debug `get_runnable_tasks` - raise Exception( - "graph is not empty, but not able to get more tasks " - "- something may have gone wrong when retrieving the results " - "of predecessor tasks. This could be caused by a file-system " - "error or a bug in the internal workflow logic, but is likely " - "to be caused by the hash of an upstream node being unstable." - " \n\nHash instability can be caused by an input of the node being " - "modified in place, or by psuedo-random ordering of `set` or " - "`frozenset` inputs (or nested attributes of inputs) in the hash " - "calculation. To ensure that sets are hashed consistently you can " - "you can try set the environment variable PYTHONHASHSEED=0 for " - "all processes, but it is best to try to identify where the set " - "objects are occurring and manually hash their sorted elements. " - "(or use list objects instead)" - "\n\nBlocked tasks\n-------------\n" + "\n".join(blocked) + msg = ( + f"Graph of '{wf}' workflow is not empty, but not able to get " + "more tasks - something has gone wrong when retrieving the " + "results predecessors:\n\n" ) + # Get blocked tasks and the predecessors they are waiting on + outstanding = { + t: [ + p for p in graph_copy.predecessors[t.name] if not p.done + ] + for t in graph_copy.sorted_nodes + } + + hashes_have_changed = False + for task, waiting_on in outstanding.items(): + if not waiting_on: + continue + msg += f"- '{task.name}' node blocked due to\n" + for pred in waiting_on: + if ( + pred.checksum + != wf.inputs._graph_checksums[pred.name] + ): + msg += ( + f" - hash changes in '{pred.name}' node inputs. " + f"Current values and hashes: {pred.inputs}, " + f"{pred.inputs._hashes}\n" + ) + hashes_have_changed = True + elif pred not in outstanding: + msg += ( + f" - undiagnosed issues in '{pred.name}' node, " + "potentially related to file-system access issues " + ) + msg += "\n" + if hashes_have_changed: + msg += ( + "Set loglevel to 'debug' in order to track hash changes " + "throughout the execution of the workflow.\n\n " + "These issues may have been caused by `bytes_repr()` methods " + "that don't return stable hash values for specific object " + "types across multiple processes (see bytes_repr() " + '"singledispatch "function in pydra/utils/hash.py).' + "You may need to implement a specific `bytes_repr()` " + '"singledispatch overload"s or `__bytes_repr__()` ' + "dunder methods to handle one or more types in " + "your interface inputs." + ) + raise RuntimeError(msg) for task in tasks: # grab inputs if needed logger.debug(f"Retrieving inputs for {task}") diff --git a/pydra/engine/task.py b/pydra/engine/task.py index cfe12af3b..cb55d9e39 100644 --- a/pydra/engine/task.py +++ b/pydra/engine/task.py @@ -337,8 +337,8 @@ def command_args(self, root=None): raise NotImplementedError modified_inputs = template_update(self.inputs, output_dir=self.output_dir) - if modified_inputs is not None: - self.inputs = attr.evolve(self.inputs, **modified_inputs) + for field_name, field_value in modified_inputs.items(): + setattr(self.inputs, field_name, field_value) pos_args = [] # list for (position, command arg) self._positions_provided = [] diff --git a/pydra/engine/tests/test_specs.py b/pydra/engine/tests/test_specs.py index 0370d92a5..f7edd0f57 100644 --- a/pydra/engine/tests/test_specs.py +++ b/pydra/engine/tests/test_specs.py @@ -25,7 +25,7 @@ def test_basespec(): spec = BaseSpec() - assert spec.hash == "06fe829a5dca34cc5f0710b454c24808" + assert spec.hash == "0b1d98df22ecd1733562711c205abca2" def test_runtime(): @@ -132,14 +132,14 @@ def test_input_file_hash_1(tmp_path): fields = [("in_file", ty.Any)] input_spec = SpecInfo(name="Inputs", fields=fields, bases=(BaseSpec,)) inputs = make_klass(input_spec) - assert inputs(in_file=outfile).hash == "02e248cb7ca3628af6b97aa27723b623" + assert inputs(in_file=outfile).hash == "9a106eb2830850834d9b5bf098d5fa85" with open(outfile, "w") as fp: fp.write("test") fields = [("in_file", File)] input_spec = SpecInfo(name="Inputs", fields=fields, bases=(BaseSpec,)) inputs = make_klass(input_spec) - assert inputs(in_file=outfile).hash == "c1156e9576b0266f23c30771bf59482a" + assert inputs(in_file=outfile).hash == "0e9306e5cae1de1b4dff1f27cca03bce" def test_input_file_hash_2(tmp_path): @@ -153,7 +153,7 @@ def test_input_file_hash_2(tmp_path): # checking specific hash value hash1 = inputs(in_file=file).hash - assert hash1 == "73745b60b45052d6020918fce5801581" + assert hash1 == "17e4e2b4d8ce8f36bf3fd65804958dbb" # checking if different name doesn't affect the hash file_diffname = tmp_path / "in_file_2.txt" @@ -183,7 +183,7 @@ def test_input_file_hash_2a(tmp_path): # checking specific hash value hash1 = inputs(in_file=file).hash - assert hash1 == "73745b60b45052d6020918fce5801581" + assert hash1 == "17e4e2b4d8ce8f36bf3fd65804958dbb" # checking if different name doesn't affect the hash file_diffname = tmp_path / "in_file_2.txt" @@ -201,7 +201,7 @@ def test_input_file_hash_2a(tmp_path): # checking if string is also accepted hash4 = inputs(in_file=str(file)).hash - assert hash4 == "aaee75d79f1bc492619fabfa68cb3c69" + assert hash4 == "aee7c7ae25509fb4c92a081d58d17a67" def test_input_file_hash_3(tmp_path): @@ -274,7 +274,7 @@ def test_input_file_hash_4(tmp_path): # checking specific hash value hash1 = inputs(in_file=[[file, 3]]).hash - assert hash1 == "b8d8255b923b7bb8817da16e6ec57fae" + assert hash1 == "11b7e9c90bc8d9dc5ccfc8d4526ba091" # the same file, but int field changes hash1a = inputs(in_file=[[file, 5]]).hash @@ -310,7 +310,7 @@ def test_input_file_hash_5(tmp_path): # checking specific hash value hash1 = inputs(in_file=[{"file": file, "int": 3}]).hash - assert hash1 == "dedaf3899cce99d19238c2efb1b19a89" + assert hash1 == "5fd53b79e55bbf62a4bb3027eb753a2c" # the same file, but int field changes hash1a = inputs(in_file=[{"file": file, "int": 5}]).hash diff --git a/pydra/engine/tests/test_submitter.py b/pydra/engine/tests/test_submitter.py index a3219521a..298e7e74b 100644 --- a/pydra/engine/tests/test_submitter.py +++ b/pydra/engine/tests/test_submitter.py @@ -1,12 +1,15 @@ from dateutil import parser +import secrets import re import subprocess as sp import time +import attrs +import typing as ty +from random import randint import os from unittest.mock import patch - import pytest - +from fileformats.generic import Directory from .utils import ( need_sge, need_slurm, @@ -576,40 +579,95 @@ def test_sge_no_limit_maxthreads(tmpdir): assert job_1_endtime > job_2_starttime -# @pytest.mark.xfail(reason="Not sure") -def test_wf_with_blocked_tasks(tmpdir): - wf = Workflow(name="wf_with_blocked_tasks", input_spec=["x"]) - wf.add(identity(name="taska", x=wf.lzin.x)) - wf.add(alter_input(name="taskb", x=wf.taska.lzout.out)) - wf.add(to_tuple(name="taskc", x=wf.taska.lzout.out, y=wf.taskb.lzout.out)) - wf.set_output([("out", wf.taskb.lzout.out)]) +def test_hash_changes_in_task_inputs_file(tmp_path): + @mark.task + def output_dir_as_input(out_dir: Directory) -> Directory: + (out_dir.fspath / "new-file.txt").touch() + return out_dir - wf.inputs.x = A(1) + task = output_dir_as_input(out_dir=tmp_path) + with pytest.raises(RuntimeError, match="Input field hashes have changed"): + task() - wf.cache_dir = tmpdir - # with pytest.raises(Exception, match="graph is not empty,"): - with Submitter("serial") as sub: - sub(wf) +def test_hash_changes_in_task_inputs_unstable(tmp_path): + @attrs.define + class Unstable: + value: int # type: ignore + def __bytes_repr__(self, cache) -> ty.Iterator[bytes]: + """Random 128-bit bytestring""" + yield secrets.token_bytes(16) -class A: - def __init__(self, a): - self.a = a + @mark.task + def unstable_input(unstable: Unstable) -> int: + return unstable.value - def __bytes_repr__(self, cache): - yield bytes(self.a) + task = unstable_input(unstable=Unstable(1)) + with pytest.raises(RuntimeError, match="Input field hashes have changed"): + task() -@mark.task -def identity(x): - return x +def test_hash_changes_in_workflow_inputs(tmp_path): + @mark.task + def output_dir_as_output(out_dir: Path) -> Directory: + (out_dir / "new-file.txt").touch() + return out_dir + wf = Workflow( + name="test_hash_change", input_spec={"in_dir": Directory}, in_dir=tmp_path + ) + wf.add(output_dir_as_output(out_dir=wf.lzin.in_dir, name="task")) + wf.set_output(("out_dir", wf.task.lzout.out)) + with pytest.raises(RuntimeError, match="Input field hashes have changed.*Workflow"): + wf() -@mark.task -def alter_input(x): - x.a = 2 - return x + +def test_hash_changes_in_workflow_graph(tmpdir): + class X: + """Dummy class with unstable hash (i.e. which isn't altered in a node in which + it is an input)""" + + value = 1 + + def __bytes_repr__(self, cache): + """Bytes representation from class attribute, which will be changed be + 'alter_x" node. + + NB: this is a contrived example where the bytes_repr implementation returns + a bytes representation of a class attribute in order to trigger the exception, + hopefully cases like this will be very rare""" + yield bytes(self.value) + + @mark.task + @mark.annotate({"return": {"x": X, "y": int}}) + def identity(x: X) -> ty.Tuple[X, int]: + return x, 99 + + @mark.task + def alter_x(y): + X.value = 2 + return y + + @mark.task + def to_tuple(x, y): + return (x, y) + + wf = Workflow(name="wf_with_blocked_tasks", input_spec=["x", "y"]) + wf.add(identity(name="taska", x=wf.lzin.x)) + wf.add(alter_x(name="taskb", y=wf.taska.lzout.y)) + wf.add(to_tuple(name="taskc", x=wf.taska.lzout.x, y=wf.taskb.lzout.out)) + wf.set_output([("out", wf.taskc.lzout.out)]) + + wf.inputs.x = X() + + wf.cache_dir = tmpdir + + with pytest.raises( + RuntimeError, match="Graph of 'wf_with_blocked_tasks' workflow is not empty" + ): + with Submitter("cf") as sub: + result = sub(wf) @mark.task diff --git a/pydra/engine/tests/test_task.py b/pydra/engine/tests/test_task.py index 8ff3ce6d4..0d666574e 100644 --- a/pydra/engine/tests/test_task.py +++ b/pydra/engine/tests/test_task.py @@ -5,6 +5,7 @@ import cloudpickle as cp from pathlib import Path import json +import glob as glob from ... import mark from ..core import Workflow from ..task import AuditFlag, ShellCommandTask diff --git a/pydra/utils/hash.py b/pydra/utils/hash.py index 0c5f5f870..74d3b3a44 100644 --- a/pydra/utils/hash.py +++ b/pydra/utils/hash.py @@ -59,12 +59,12 @@ class UnhashableError(ValueError): """Error for objects that cannot be hashed""" -def hash_function(obj): +def hash_function(obj, cache=None): """Generate hash of object.""" - return hash_object(obj).hex() + return hash_object(obj, cache=cache).hex() -def hash_object(obj: object) -> Hash: +def hash_object(obj: object, cache: ty.Optional[Cache] = None) -> Hash: """Hash an object Constructs a byte string that uniquely identifies the object, @@ -73,8 +73,10 @@ def hash_object(obj: object) -> Hash: Base Python types are implemented, including recursive lists and dicts. Custom types can be registered with :func:`register_serializer`. """ + if cache is None: + cache = Cache({}) try: - return hash_single(obj, Cache({})) + return hash_single(obj, cache) except Exception as e: raise UnhashableError(f"Cannot hash object {obj!r}") from e @@ -105,6 +107,22 @@ def __bytes_repr__(self, cache: Cache) -> Iterator[bytes]: @singledispatch def bytes_repr(obj: object, cache: Cache) -> Iterator[bytes]: + """Default implementation of hashing for generic objects. Single dispatch is used + to provide hooks for class-specific implementations + + Parameters + ---------- + obj: object + the object to hash + cache : Cache + a dictionary object used to store a cache of previously cached objects to + handle circular object references + + Yields + ------- + bytes + unique representation of the object in a series of bytes + """ cls = obj.__class__ yield f"{cls.__module__}.{cls.__name__}:{{".encode() dct: Dict[str, ty.Any]