Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Running tasks creates linear memory growth #14329

Closed
desertaxle opened this issue Jun 25, 2024 · 3 comments · Fixed by #14651
Closed

Running tasks creates linear memory growth #14329

desertaxle opened this issue Jun 25, 2024 · 3 comments · Fixed by #14651
Assignees
Labels
3.x bug Something isn't working

Comments

@desertaxle
Copy link
Member

The following script is an MRE to demonstrate linear memory growth when running many tasks in a flow:

from memory_profiler import profile

from prefect import flow, task

try:
    from prefect.cache_policies import NONE

    cache_policy = dict(cache_policy=NONE)
except ImportError:
    cache_policy = {}


@task(**cache_policy)
def with_task():
    return [{"abc": "123"} for _ in range(10_000)]

@flow
@profile
def some_flow(n: int = 100):
    for i in range(n):
        with_task()
        # without_task()
    print("DONE")


@profile
def main():
    some_flow(n=100)


if __name__ == "__main__":
    main()

Running the flow with 100 tasks produces the following profile:

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    22    365.8 MiB    365.8 MiB           1   @flow
    23                                         @profile
    24                                         def some_flow(n: int = 100):
    25    624.2 MiB  -9381.2 MiB         101       for i in range(n):
    26    624.2 MiB  -9122.8 MiB         100           with_task()
    27                                                 # without_task()
    28    624.3 MiB      0.1 MiB           1       print("DONE")

Running the flow with 1000 tasks produces the following profile:

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    22    344.1 MiB    344.1 MiB           1   @flow
    23                                         @profile
    24                                         def some_flow(n: int = 100):
    25   3019.1 MiB -570676.8 MiB        1001       for i in range(n):
    26   3019.1 MiB -568007.0 MiB        1000           with_task()
    27                                                 # without_task()
    28   1190.7 MiB  -1828.5 MiB           1       print("DONE")

We'd expect the memory usage to be consistent across runs since the flow isn't holding onto any return values, which suggests that we are holding onto results somewhere that we shouldn't be.

@desertaxle desertaxle added the 3.x label Jun 25, 2024
@desertaxle
Copy link
Member Author

If the task engine is updated to explicitly drop in memory results, the memory usage decreases and stays flat across invocations.

10 task runs:

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    22    360.8 MiB    360.8 MiB           1   @flow
    23                                         @profile
    24                                         def some_flow(n: int = 100):
    25    369.5 MiB    -11.7 MiB          11       for i in range(n):
    26    369.5 MiB     -3.0 MiB          10           with_task()
    27                                                 # without_task()
    28    363.7 MiB     -5.8 MiB           1       print("DONE")

100 task runs:

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    22    346.5 MiB    346.5 MiB           1   @flow
    23                                         @profile
    24                                         def some_flow(n: int = 100):
    25    360.4 MiB  -6784.7 MiB         101       for i in range(n):
    26    360.4 MiB  -6770.8 MiB         100           with_task()
    27                                                 # without_task()
    28    329.9 MiB    -30.5 MiB           1       print("DONE")

This suggests a more sustainable solution to avoiding caching results in memory will resolve this issue.

@desertaxle desertaxle added the bug Something isn't working label Jun 25, 2024
@desertaxle desertaxle self-assigned this Jun 25, 2024
@frankvp11
Copy link

I am also dealing with memory issues in Prefect, and it seems similar to the one you have described.
You mentioned how telling the task engine to explicitly drop in memory results fixed the issue - how can I do that?
I'm not entirely sure of how to explicitly drop in memory results - or what that even means. If it means that you won't be able to access the return value (or worse, can't return anything), are there any other solutions?

@desertaxle desertaxle removed their assignment Jul 9, 2024
@zhen0
Copy link
Member

zhen0 commented Jul 11, 2024

Hi @frankvp11 - this issue for Prefect 3.0. If you're on 3.0 please follow along here. If you're using Prefect 2, you may find #12668 a more relevant issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
3.x bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants