diff --git a/src/fmu/ensemble/ensemble.py b/src/fmu/ensemble/ensemble.py index 15cfe303..c56c2814 100644 --- a/src/fmu/ensemble/ensemble.py +++ b/src/fmu/ensemble/ensemble.py @@ -1567,20 +1567,41 @@ def get_smry( end_date=end_date, include_restart=include_restart, ) - dflist = [] - # TODO: Concurrent code - for index, realization in self.realizations.items(): - dframe = realization.get_smry( - time_index=time_index, - column_keys=column_keys, - cache_eclsum=cache_eclsum, - include_restart=include_restart, - ) - dframe.insert(0, "REAL", index) - dframe.index.name = "DATE" - dflist.append(dframe) - if dflist: - return pd.concat(dflist, sort=False).reset_index() + dframes = [] + if use_concurrent(): + with ProcessPoolExecutor() as executor: + real_indices = self.realizations.keys() + futures_dframes = [ + executor.submit( + realization.get_smry, + time_index=time_index, + column_keys=column_keys, + cache_eclsum=cache_eclsum, + include_restart=include_restart, + ) + for realization in self.realizations.values() + ] + # Reassemble a list of dataframes from the pickled results + # of the ProcessPool: + for realidx, dframe in zip( + real_indices, [x.result() for x in futures_dframes] + ): + dframes.append(dframe.assign(REAL=realidx).rename_axis("DATE")) + else: + # Sequential version: + for realidx, realization in self.realizations.items(): + dframes.append( + realization.get_smry( + time_index=time_index, + column_keys=column_keys, + cache_eclsum=cache_eclsum, + include_restart=include_restart, + ) + .assign(REAL=realidx) + .rename_axis("DATE") + ) + if dframes: + return pd.concat(dframes, sort=False).reset_index() return pd.DataFrame() def get_eclgrid(self, props, report=0, agg="mean", active_only=False): diff --git a/tests/test_ensemble.py b/tests/test_ensemble.py index 06c62349..a55039f8 100644 --- a/tests/test_ensemble.py +++ b/tests/test_ensemble.py @@ -311,8 +311,13 @@ def test_noautodiscovery(): ) # Default ensemble construction will include auto-discovery, check # that we got that: - assert not reekensemble.get_smry(column_keys="FOPT").empty + assert not reekensemble.load_smry(column_keys="FOPT").empty assert "UNSMRY" in reekensemble.files["FILETYPE"].values + # (beware that get_smry() behaves differently depending + # on whether it is run concurrently or not, sequential + # running of get_smry will lead to UNSMRY being discovered, + # while in concurrent mode the realization object where it + # is discovered is thrown away) # Now try again, with no autodiscovery reekensemble = ScratchEnsemble( @@ -776,8 +781,9 @@ def test_ertrunpathfile(): # Run once more to test runpathfilter: ens = ScratchEnsemble( - "filtensfromrunpath", runpathfile=testdir + "/data/ert-runpath-file", - runpathfilter="realization-3" + "filtensfromrunpath", + runpathfile=testdir + "/data/ert-runpath-file", + runpathfilter="realization-3", ) assert len(ens) == 1 assert ens[3].index == 3 diff --git a/tests/test_ensembleset.py b/tests/test_ensembleset.py index 6b6b4e46..2ab83bbb 100644 --- a/tests/test_ensembleset.py +++ b/tests/test_ensembleset.py @@ -51,6 +51,7 @@ def symlink_iter(origensdir, newitername): fullpathrealizationdir + "/iter-0", realizationdir + "/" + newitername ) + def rms_vol2df(kwargs): """Callback function to be sent to ensemble objects""" fullpath = os.path.join(kwargs["realization"].runpath(), kwargs["filename"]) @@ -59,6 +60,7 @@ def rms_vol2df(kwargs): return volumetrics.rmsvolumetrics_txt2df(fullpath) return pd.DataFrame() + def test_ensembleset_reek001(tmpdir): """Test import of a stripped 5 realization ensemble, manually doubled to two identical ensembles diff --git a/tests/test_realization.py b/tests/test_realization.py index d18ee089..ffdf0f34 100644 --- a/tests/test_realization.py +++ b/tests/test_realization.py @@ -1183,7 +1183,7 @@ def test_get_df_merge(): # Merge something that is not mergeable real.data["randtable"] = pd.DataFrame( - columns=["BARF", "ARBF"], data=[[1, 3], [2, 4],] + columns=["BARF", "ARBF"], data=[[1, 3], [2, 4]] ) with pytest.raises(TypeError): # pylint: disable=pointless-statement