Skip to content

Commit

Permalink
Parallelize get_smry()
Browse files Browse the repository at this point in the history
  • Loading branch information
berland committed Apr 16, 2020
1 parent e1acf8d commit 76ccda8
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 18 deletions.
49 changes: 35 additions & 14 deletions src/fmu/ensemble/ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -1567,20 +1567,41 @@ def get_smry(
end_date=end_date,
include_restart=include_restart,
)
dflist = []
# TODO: Concurrent code
for index, realization in self.realizations.items():
dframe = realization.get_smry(
time_index=time_index,
column_keys=column_keys,
cache_eclsum=cache_eclsum,
include_restart=include_restart,
)
dframe.insert(0, "REAL", index)
dframe.index.name = "DATE"
dflist.append(dframe)
if dflist:
return pd.concat(dflist, sort=False).reset_index()
dframes = []
if use_concurrent():
with ProcessPoolExecutor() as executor:
real_indices = self.realizations.keys()
futures_dframes = [
executor.submit(
realization.get_smry,
time_index=time_index,
column_keys=column_keys,
cache_eclsum=cache_eclsum,
include_restart=include_restart,
)
for realization in self.realizations.values()
]
# Reassemble a list of dataframes from the pickled results
# of the ProcessPool:
for realidx, dframe in zip(
real_indices, [x.result() for x in futures_dframes]
):
dframes.append(dframe.assign(REAL=realidx).rename_axis("DATE"))
else:
# Sequential version:
for realidx, realization in self.realizations.items():
dframes.append(
realization.get_smry(
time_index=time_index,
column_keys=column_keys,
cache_eclsum=cache_eclsum,
include_restart=include_restart,
)
.assign(REAL=realidx)
.rename_axis("DATE")
)
if dframes:
return pd.concat(dframes, sort=False).reset_index()
return pd.DataFrame()

def get_eclgrid(self, props, report=0, agg="mean", active_only=False):
Expand Down
12 changes: 9 additions & 3 deletions tests/test_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,13 @@ def test_noautodiscovery():
)
# Default ensemble construction will include auto-discovery, check
# that we got that:
assert not reekensemble.get_smry(column_keys="FOPT").empty
assert not reekensemble.load_smry(column_keys="FOPT").empty
assert "UNSMRY" in reekensemble.files["FILETYPE"].values
# (beware that get_smry() behaves differently depending
# on whether it is run concurrently or not, sequential
# running of get_smry will lead to UNSMRY being discovered,
# while in concurrent mode the realization object where it
# is discovered is thrown away)

# Now try again, with no autodiscovery
reekensemble = ScratchEnsemble(
Expand Down Expand Up @@ -776,8 +781,9 @@ def test_ertrunpathfile():

# Run once more to test runpathfilter:
ens = ScratchEnsemble(
"filtensfromrunpath", runpathfile=testdir + "/data/ert-runpath-file",
runpathfilter="realization-3"
"filtensfromrunpath",
runpathfile=testdir + "/data/ert-runpath-file",
runpathfilter="realization-3",
)
assert len(ens) == 1
assert ens[3].index == 3
Expand Down
2 changes: 2 additions & 0 deletions tests/test_ensembleset.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def symlink_iter(origensdir, newitername):
fullpathrealizationdir + "/iter-0", realizationdir + "/" + newitername
)


def rms_vol2df(kwargs):
"""Callback function to be sent to ensemble objects"""
fullpath = os.path.join(kwargs["realization"].runpath(), kwargs["filename"])
Expand All @@ -59,6 +60,7 @@ def rms_vol2df(kwargs):
return volumetrics.rmsvolumetrics_txt2df(fullpath)
return pd.DataFrame()


def test_ensembleset_reek001(tmpdir):
"""Test import of a stripped 5 realization ensemble,
manually doubled to two identical ensembles
Expand Down
2 changes: 1 addition & 1 deletion tests/test_realization.py
Original file line number Diff line number Diff line change
Expand Up @@ -1183,7 +1183,7 @@ def test_get_df_merge():

# Merge something that is not mergeable
real.data["randtable"] = pd.DataFrame(
columns=["BARF", "ARBF"], data=[[1, 3], [2, 4],]
columns=["BARF", "ARBF"], data=[[1, 3], [2, 4]]
)
with pytest.raises(TypeError):
# pylint: disable=pointless-statement
Expand Down

0 comments on commit 76ccda8

Please sign in to comment.