Skip to content

Commit

Permalink
Parallelize add_from_runpathfile
Browse files Browse the repository at this point in the history
Allow running find_files at init of realizations

lint runpathfile
  • Loading branch information
berland committed Apr 16, 2020
1 parent dfd3ded commit e1acf8d
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 31 deletions.
63 changes: 44 additions & 19 deletions src/fmu/ensemble/ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class ScratchEnsemble(object):
the basename of the Eclipse simulation, relative to RUNPATH.
Fourth column is not used.
runpathfilter (str): If supplied, the only the runpaths in
the runpathfile which contains this string will be included
the runpathfile which contain this string will be included
Use to select only a specific realization f.ex.
autodiscovery (boolean): True by default, means that the class
can try to autodiscover data in the realization. Turn
Expand Down Expand Up @@ -192,9 +192,12 @@ def add_realizations(
Args:
paths (list/str): String or list of strings with wildcards
to file system. Absolute or relative paths.
realidxregexp (str): Passed on to ScratchRealization init,
used to determine index from the path.
autodiscovery (boolean): whether files can be attempted
auto-discovered
batch (list): Batch commands sent to each realization.
batch (list): Batch commands sent to each realization for
immediate execution after initialization.
Returns:
count (int): Number of realizations successfully added.
Expand Down Expand Up @@ -287,24 +290,46 @@ def add_from_runpathfile(self, runpath, runpathfilter=None, batch=None):
):
raise ValueError("runpath dataframe not correct")

for _, row in runpath_df.iterrows():
if runpathfilter and runpathfilter not in row["runpath"]:
continue
logger.info("Adding realization from %s", row["runpath"])
# TODO: Must be concurrent. find_files below must be
# prepended to the batch runs, but can we have realization
# specific arguments to batch jobs?.
realization = ScratchRealization(
row["runpath"],
index=int(row["index"]),
autodiscovery=False,
batch=batch,
if runpathfilter:
runpath_df = runpath_df[runpath_df["runpath"].str.contains(runpathfilter)]

if use_concurrent():
logger.info(
"Loading %s realizations concurrently from runpathfile",
str(len(runpath_df)),
)
# Use the ECLBASE from the runpath file to
# ensure we recognize the correct UNSMRY file
realization.find_files(row["eclbase"] + ".DATA")
realization.find_files(row["eclbase"] + ".UNSMRY")
self.realizations[int(row["index"])] = realization
with ProcessPoolExecutor() as executor:
loaded_reals = [
executor.submit(
ScratchRealization,
row["runpath"],
index=int(row["index"]),
autodiscovery=False,
find_files=[
row["eclbase"] + ".DATA",
row["eclbase"] + ".UNSMRY",
],
batch=batch,
).result()
for _, row in runpath_df.iterrows()
]
else:
logger.info(
"Loading %s realizations sequentially from runpathfile",
str(len(runpath_df)),
)
loaded_reals = [
ScratchRealization(
row["runpath"],
index=int(row["index"]),
autodiscovery=False,
find_files=[row["eclbase"] + ".DATA", row["eclbase"] + ".UNSMRY"],
batch=batch,
)
for _, row in runpath_df.iterrows()
]
for real in loaded_reals:
self.realizations[real.index] = real

return len(self) - prelength

Expand Down
14 changes: 13 additions & 1 deletion src/fmu/ensemble/realization.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,18 @@ class ScratchRealization(object):
should be run at time of initialization. Each element is a
length 1 dictionary with the function name to run as the key
and each keys value should be the function arguments as a dict.
find_files (list of str): Each element in this list will be given
to find_files() before any batch commands are processed.
"""

def __init__(
self, path, realidxregexp=None, index=None, autodiscovery=True, batch=None
self,
path,
realidxregexp=None,
index=None,
autodiscovery=True,
batch=None,
find_files=None,
):
self._origpath = os.path.abspath(path)
self.index = None
Expand Down Expand Up @@ -171,6 +179,10 @@ def __init__(
if os.path.exists(os.path.join(abspath, "parameters.txt")):
self.load_txt("parameters.txt")

if find_files is not None:
for to_find in find_files:
self.find_files(to_find)

if batch:
self.process_batch(batch)

Expand Down
5 changes: 3 additions & 2 deletions tests/test_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,5 +126,6 @@ def test_speedup():
print("FMU_CONCURRENCY: {}".format(use_concurrent()))
print("Elapsed time for sequential batch apply sleep: {}".format(seq_elapsed))

if really_concurrent:
assert seq_elapsed > conc_elapsed * 4
# Can't enforce this, it depends on physical hardware availability.
# if really_concurrent:
# assert seq_elapsed > conc_elapsed * 4
7 changes: 7 additions & 0 deletions tests/test_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,13 @@ def test_ertrunpathfile():
# because ECLBASE is given in the runpathfile
assert sum(["UNSMRY" in x for x in ens.files["BASENAME"].unique()]) == 5

# Run once more to test runpathfilter:
ens = ScratchEnsemble(
"filtensfromrunpath", runpathfile=testdir + "/data/ert-runpath-file",
runpathfilter="realization-3"
)
assert len(ens) == 1
assert ens[3].index == 3
os.chdir(cwd)


Expand Down
17 changes: 8 additions & 9 deletions tests/test_ensembleset.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ def symlink_iter(origensdir, newitername):
fullpathrealizationdir + "/iter-0", realizationdir + "/" + newitername
)

def rms_vol2df(kwargs):
"""Callback function to be sent to ensemble objects"""
fullpath = os.path.join(kwargs["realization"].runpath(), kwargs["filename"])
# The supplied callback should not fail too easy.
if os.path.exists(fullpath):
return volumetrics.rmsvolumetrics_txt2df(fullpath)
return pd.DataFrame()

def test_ensembleset_reek001(tmpdir):
"""Test import of a stripped 5 realization ensemble,
Expand Down Expand Up @@ -211,15 +218,7 @@ def test_ensembleset_reek001(tmpdir):
assert len(ensset3.keys()) == predel_len - 1

# Test callback functionality, that we can convert rms
# volumetrics in each realization. First we need a
# wrapper which is able to work on ScratchRealizations.
def rms_vol2df(kwargs):
"""Callback function to be sent to ensemble objects"""
fullpath = os.path.join(kwargs["realization"].runpath(), kwargs["filename"])
# The supplied callback should not fail too easy.
if os.path.exists(fullpath):
return volumetrics.rmsvolumetrics_txt2df(fullpath)
return pd.DataFrame()
# volumetrics in each realization.

if not SKIP_FMU_TOOLS:
rmsvols_df = ensset3.apply(
Expand Down

0 comments on commit e1acf8d

Please sign in to comment.