Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrency implementation using batch processor #106

Closed
wants to merge 4 commits into from
Closed

Conversation

berland
Copy link
Collaborator

@berland berland commented Mar 13, 2020

Very much work-in-progress.

@berland
Copy link
Collaborator Author

berland commented Apr 15, 2020

Code runs through test. Using concurrency on the test workloads leads to slow-down, the penalty of sending dataframes back and forth between Python processes outweighs the benefit from concurrency.

@berland
Copy link
Collaborator Author

berland commented Apr 15, 2020

Self-assessment, concurrent code is missing in add_from_runpathfile(), apply() and get_smry() in the ScratchEnsemble object.

There is no concurrent code in EnsembleSet. Paralellizing this has very little potential, assuming the there are always more realizations in an ensemble, than ensembles in an ensembleset.

@asnyv
Copy link
Collaborator

asnyv commented Apr 15, 2020

There is no concurrent code in EnsembleSet. Paralellizing this has very little potential, assuming the there are always more realizations in an ensemble, than ensembles in an ensembleset.

Flownet might be an exception to that assumption @anders-kiaer ?

@anders-kiaer
Copy link
Collaborator

Flownet might be an exception to that assumption @anders-kiaer ?

I actually think @wouterjdb did some test on concurrency earlier, using flownet + fmu-ensemble?

src/fmu/ensemble/__init__.py Outdated Show resolved Hide resolved
@wouterjdb
Copy link

Self-assessment, concurrent code is missing in add_from_runpathfile(), apply() and get_smry() in the ScratchEnsemble object.

There is no concurrent code in EnsembleSet. Paralellizing this has very little potential, assuming the there are always more realizations in an ensemble, than ensembles in an ensembleset.

This holds for FlowNet just as well.

@berland berland force-pushed the havbconc branch 2 times, most recently from 7a484fc to c2da73d Compare April 16, 2020 06:09
@berland
Copy link
Collaborator Author

berland commented Apr 16, 2020

This is now potentially done, except for a Py27 blocker as it turned out I have used a Py3-only feature in a part of the code that is only to be run on Py3.

@berland berland force-pushed the havbconc branch 3 times, most recently from 76ccda8 to 1759358 Compare April 16, 2020 07:15
@berland berland changed the title WIP: Concurrency implementation using batch processor Concurrency implementation using batch processor Apr 16, 2020
@berland berland force-pushed the havbconc branch 4 times, most recently from 4f7cfbb to 56934c4 Compare April 16, 2020 08:22
@berland berland requested review from asnyv and wouterjdb April 16, 2020 09:06
@berland berland requested a review from oysteoh April 17, 2020 07:41
Copy link
Collaborator

@asnyv asnyv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good 👍 Left some comments

src/fmu/ensemble/ensemble.py Outdated Show resolved Hide resolved
src/fmu/ensemble/ensemble.py Outdated Show resolved Hide resolved
src/fmu/ensemble/ensemble.py Outdated Show resolved Hide resolved
src/fmu/ensemble/ensemble.py Outdated Show resolved Hide resolved
src/fmu/ensemble/ensemble.py Outdated Show resolved Hide resolved
tests/test_batch.py Outdated Show resolved Hide resolved
tests/test_batch.py Outdated Show resolved Hide resolved
tests/test_batch.py Outdated Show resolved Hide resolved
tests/test_batch.py Outdated Show resolved Hide resolved
tests/test_webviz_subsurface_testdata.py Outdated Show resolved Hide resolved
@berland
Copy link
Collaborator Author

berland commented Apr 17, 2020

get_volumetric_rates() needs parallelization.

@berland berland mentioned this pull request May 19, 2020
@berland berland linked an issue May 19, 2020 that may be closed by this pull request
@berland berland force-pushed the havbconc branch 2 times, most recently from 4555268 to 6ef5556 Compare May 19, 2020 08:17
Comment on lines +215 to +213
loaded_reals = [
executor.submit(
ScratchRealization,
realdir,
realidxregexp=realidxregexp,
autodiscovery=autodiscovery,
batch=batch,
).result()
for realdir in globbedpaths
]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really concurrent, as the .result() waits for the subprocess to finish for each iteration. Should rather do something like below (which is how it is done in most of the rest of this PR, so this is probably just something you forgot to fix).

Suggested change
loaded_reals = [
executor.submit(
ScratchRealization,
realdir,
realidxregexp=realidxregexp,
autodiscovery=autodiscovery,
batch=batch,
).result()
for realdir in globbedpaths
]
reals_futures = [
executor.submit(
ScratchRealization,
realdir,
realidxregexp=realidxregexp,
autodiscovery=autodiscovery,
batch=batch,
)
for realdir in globbedpaths
]
loaded_reals = [x.result() for x in reals_futures]

Comment on lines 301 to 304
with ProcessPoolExecutor() as executor:
loaded_reals = [
executor.submit(
ScratchRealization,
row.runpath,
index=int(row.index),
autodiscovery=False,
find_files=[row.eclbase + ".DATA", row.eclbase + ".UNSMRY",],
batch=batch,
).result()
for row in runpath_df.itertuples()
]
Copy link
Collaborator

@asnyv asnyv May 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

Suggested change
with ProcessPoolExecutor() as executor:
loaded_reals = [
executor.submit(
ScratchRealization,
row.runpath,
index=int(row.index),
autodiscovery=False,
find_files=[row.eclbase + ".DATA", row.eclbase + ".UNSMRY",],
batch=batch,
).result()
for row in runpath_df.itertuples()
]
with ProcessPoolExecutor() as executor:
reals_futures = [
executor.submit(
ScratchRealization,
row.runpath,
index=int(row.index),
autodiscovery=False,
find_files=[row.eclbase + ".DATA", row.eclbase + ".UNSMRY",],
batch=batch,
).result()
for row in runpath_df.itertuples()
]
loaded_reals = [x.result() for x in reals_futures]

Comment on lines +975 to +951
if use_concurrent():
# In concurrent mode, caching is not used as
# we do not pickle the loaded EclSum objects
cache = False
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is significant. Each call to ScratchRealization.get_eclsum() is quite heavy, and many methods are written with several calls to get_eclsum(), such that the loss of caching might actually increase runtime even with concurrency.

Copy link
Collaborator

@asnyv asnyv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also tested setting lazy_loading=True in the EclSum in get_eclsum to reduce cost of each get_eclsum() call, but that was not a game changer (in some cases slower).

As I see it, all methods that currently run get_eclsum() somewhere in their dependencies are not safe to merge as it is now, without the risk of performance decrease for some users.

@berland berland modified the milestones: 1.0, 2.0 Sep 1, 2020
@berland berland added the enhancement New feature or request label Sep 7, 2020
@ertomatic
Copy link

Can one of the admins verify this patch?

@berland
Copy link
Collaborator Author

berland commented Feb 2, 2021

Rebased on top of #182

* Batch processing after init on ensembles

* Functionality for turning off concurrency

* Concurrent apply()

* Parallelize add_from_runpathfile

* Allow running find_files at init of realizations

* Parallelize get_smry()
str(env_var) == "0"
or str(env_var).lower() == "false"
or str(env_var).lower() == "no"
):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe this looks better and reduces the usage of str(..) and lower() ?

env_var = str(os.environ[ENV_NAME]).lower()
if( env_var == "0" or env_var == "false" or env_var == "no"):
    ..
    ..

@berland
Copy link
Collaborator Author

berland commented Mar 17, 2021

Superseded by #206

@berland berland closed this Mar 17, 2021
@berland berland removed a link to an issue Mar 18, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants