diff --git a/src/fmu/ensemble/ensemble.py b/src/fmu/ensemble/ensemble.py index 02b730f8..15cfe303 100644 --- a/src/fmu/ensemble/ensemble.py +++ b/src/fmu/ensemble/ensemble.py @@ -73,7 +73,7 @@ class ScratchEnsemble(object): the basename of the Eclipse simulation, relative to RUNPATH. Fourth column is not used. runpathfilter (str): If supplied, the only the runpaths in - the runpathfile which contains this string will be included + the runpathfile which contain this string will be included Use to select only a specific realization f.ex. autodiscovery (boolean): True by default, means that the class can try to autodiscover data in the realization. Turn @@ -192,9 +192,12 @@ def add_realizations( Args: paths (list/str): String or list of strings with wildcards to file system. Absolute or relative paths. + realidxregexp (str): Passed on to ScratchRealization init, + used to determine index from the path. autodiscovery (boolean): whether files can be attempted auto-discovered - batch (list): Batch commands sent to each realization. + batch (list): Batch commands sent to each realization for + immediate execution after initialization. Returns: count (int): Number of realizations successfully added. @@ -287,24 +290,46 @@ def add_from_runpathfile(self, runpath, runpathfilter=None, batch=None): ): raise ValueError("runpath dataframe not correct") - for _, row in runpath_df.iterrows(): - if runpathfilter and runpathfilter not in row["runpath"]: - continue - logger.info("Adding realization from %s", row["runpath"]) - # TODO: Must be concurrent. find_files below must be - # prepended to the batch runs, but can we have realization - # specific arguments to batch jobs?. - realization = ScratchRealization( - row["runpath"], - index=int(row["index"]), - autodiscovery=False, - batch=batch, + if runpathfilter: + runpath_df = runpath_df[runpath_df["runpath"].str.contains(runpathfilter)] + + if use_concurrent(): + logger.info( + "Loading %s realizations concurrently from runpathfile", + str(len(runpath_df)), ) - # Use the ECLBASE from the runpath file to - # ensure we recognize the correct UNSMRY file - realization.find_files(row["eclbase"] + ".DATA") - realization.find_files(row["eclbase"] + ".UNSMRY") - self.realizations[int(row["index"])] = realization + with ProcessPoolExecutor() as executor: + loaded_reals = [ + executor.submit( + ScratchRealization, + row["runpath"], + index=int(row["index"]), + autodiscovery=False, + find_files=[ + row["eclbase"] + ".DATA", + row["eclbase"] + ".UNSMRY", + ], + batch=batch, + ).result() + for _, row in runpath_df.iterrows() + ] + else: + logger.info( + "Loading %s realizations sequentially from runpathfile", + str(len(runpath_df)), + ) + loaded_reals = [ + ScratchRealization( + row["runpath"], + index=int(row["index"]), + autodiscovery=False, + find_files=[row["eclbase"] + ".DATA", row["eclbase"] + ".UNSMRY"], + batch=batch, + ) + for _, row in runpath_df.iterrows() + ] + for real in loaded_reals: + self.realizations[real.index] = real return len(self) - prelength diff --git a/src/fmu/ensemble/realization.py b/src/fmu/ensemble/realization.py index 5e5e273c..a061af09 100644 --- a/src/fmu/ensemble/realization.py +++ b/src/fmu/ensemble/realization.py @@ -88,10 +88,18 @@ class ScratchRealization(object): should be run at time of initialization. Each element is a length 1 dictionary with the function name to run as the key and each keys value should be the function arguments as a dict. + find_files (list of str): Each element in this list will be given + to find_files() before any batch commands are processed. """ def __init__( - self, path, realidxregexp=None, index=None, autodiscovery=True, batch=None + self, + path, + realidxregexp=None, + index=None, + autodiscovery=True, + batch=None, + find_files=None, ): self._origpath = os.path.abspath(path) self.index = None @@ -171,6 +179,10 @@ def __init__( if os.path.exists(os.path.join(abspath, "parameters.txt")): self.load_txt("parameters.txt") + if find_files is not None: + for to_find in find_files: + self.find_files(to_find) + if batch: self.process_batch(batch) diff --git a/tests/test_batch.py b/tests/test_batch.py index 5f042661..c2cb5157 100644 --- a/tests/test_batch.py +++ b/tests/test_batch.py @@ -126,5 +126,6 @@ def test_speedup(): print("FMU_CONCURRENCY: {}".format(use_concurrent())) print("Elapsed time for sequential batch apply sleep: {}".format(seq_elapsed)) - if really_concurrent: - assert seq_elapsed > conc_elapsed * 4 + # Can't enforce this, it depends on physical hardware availability. + # if really_concurrent: + # assert seq_elapsed > conc_elapsed * 4 diff --git a/tests/test_ensemble.py b/tests/test_ensemble.py index 9c635b21..06c62349 100644 --- a/tests/test_ensemble.py +++ b/tests/test_ensemble.py @@ -774,6 +774,13 @@ def test_ertrunpathfile(): # because ECLBASE is given in the runpathfile assert sum(["UNSMRY" in x for x in ens.files["BASENAME"].unique()]) == 5 + # Run once more to test runpathfilter: + ens = ScratchEnsemble( + "filtensfromrunpath", runpathfile=testdir + "/data/ert-runpath-file", + runpathfilter="realization-3" + ) + assert len(ens) == 1 + assert ens[3].index == 3 os.chdir(cwd) diff --git a/tests/test_ensembleset.py b/tests/test_ensembleset.py index f019fc96..6b6b4e46 100644 --- a/tests/test_ensembleset.py +++ b/tests/test_ensembleset.py @@ -51,6 +51,13 @@ def symlink_iter(origensdir, newitername): fullpathrealizationdir + "/iter-0", realizationdir + "/" + newitername ) +def rms_vol2df(kwargs): + """Callback function to be sent to ensemble objects""" + fullpath = os.path.join(kwargs["realization"].runpath(), kwargs["filename"]) + # The supplied callback should not fail too easy. + if os.path.exists(fullpath): + return volumetrics.rmsvolumetrics_txt2df(fullpath) + return pd.DataFrame() def test_ensembleset_reek001(tmpdir): """Test import of a stripped 5 realization ensemble, @@ -211,15 +218,7 @@ def test_ensembleset_reek001(tmpdir): assert len(ensset3.keys()) == predel_len - 1 # Test callback functionality, that we can convert rms - # volumetrics in each realization. First we need a - # wrapper which is able to work on ScratchRealizations. - def rms_vol2df(kwargs): - """Callback function to be sent to ensemble objects""" - fullpath = os.path.join(kwargs["realization"].runpath(), kwargs["filename"]) - # The supplied callback should not fail too easy. - if os.path.exists(fullpath): - return volumetrics.rmsvolumetrics_txt2df(fullpath) - return pd.DataFrame() + # volumetrics in each realization. if not SKIP_FMU_TOOLS: rmsvols_df = ensset3.apply(