Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#6492: Add from_map feature to create dataframe #7215

Merged
merged 6 commits into from
Apr 30, 2024

Conversation

YarShev
Copy link
Collaborator

@YarShev YarShev commented Apr 24, 2024

What do these changes do?

  • first commit message and PR title follow format outlined here

    NOTE: If you edit the PR title to match this format, you need to add another commit (even if it's empty) or amend your last commit for the CI job that checks the PR title to pick up the new PR title.

  • passes flake8 modin/ asv_bench/benchmarks scripts/doc_checker.py
  • passes black --check modin/ asv_bench/benchmarks scripts/doc_checker.py
  • signed commit with git commit -s
  • Resolves Equivalent of dask.dataframe.from_map? #6492
  • tests added and passing
  • module layout described at docs/development/architecture.rst is up-to-date

Signed-off-by: Igoshev, Iaroslav <iaroslav.igoshev@intel.com>
Signed-off-by: Igoshev, Iaroslav <iaroslav.igoshev@intel.com>
Signed-off-by: Igoshev, Iaroslav <iaroslav.igoshev@intel.com>
Signed-off-by: Igoshev, Iaroslav <iaroslav.igoshev@intel.com>
@@ -258,3 +261,66 @@ def func(df, **kw): # pragma: no cover
UnidistWrapper.materialize(
[part.list_of_blocks[0] for row in result for part in row]
)

@classmethod
def from_map(cls, func, iterable, *args, **kwargs):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to use already implemented functions with num_splits=1?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite get what would you like use use instead. Please elaborate. We are adding a new from_map by analogy with other io functions.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose we can't use anything from existing functionality as every method of a Modin Dataframe assumes there is a dataframe with partitions to apply a function to.

@@ -1109,6 +1109,36 @@ def from_dask(dask_obj) -> DataFrame:
return ModinObjects.DataFrame(query_compiler=FactoryDispatcher.from_dask(dask_obj))


def from_map(func, iterable, *args, **kwargs) -> DataFrame:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documentation needs to be updated I suppose.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have docs for such methods as from_pandas, from_ray, from_dask, etc. Do you think we should update docs on this matter in a separate issue in one go?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ок

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@YarShev are you going to do this before release?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that would be great - #7256.

[
[
cls.frame_partition_cls(
deploy_map_func.remote(func, obj, *args, **kwargs)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to use RayWrraper.deploy here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And corresponding wrappers for other engines.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RayWrapper.deploy deploys a function that can return any object but here we intentionally wrap a result in a pandas DataFrame if the user hasn't done so. I would leave the changes as is. What do you think?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To reduce the likelihood of error, we need to either have all launch options in one place, or use only one method. There is a tendency that launching functions becomes more difficult due to additional parameters. A good example is resources=RayTaskCustomResources.get(), which is currently not taken into account here.

We can move this function to engine_wrapper.py and call it inside Raywrapper.deploy using an additional parameter.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-used *.deploy.

Signed-off-by: Igoshev, Iaroslav <iaroslav.igoshev@intel.com>
Comment on lines +219 to +233
partitions = np.array(
[
[
cls.frame_partition_cls(
DaskWrapper.deploy(
func,
f_args=(obj,) + args,
f_kwargs=kwargs,
return_pandas_df=True,
)
)
]
for obj in iterable
]
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the information required to perform this task, it seems that a more appropriate level at which to define the function would be a partition manager, for example somewhere around:

def create_partition_from_metadata(cls, **metadata):

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would leave it here. Imagine a case when iterable is a list files.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imagine a case when iterable is a list files.

We'll be abstracting from the parameters just like we're doing now, so I don't see any difference.



@pytest.mark.skipif(
condition=Engine.get() not in ("Ray", "Dask", "Unidist"),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be more correct to limit it not by engines, but by storage format: pandas?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PandasOnPython wouldn't work. Let's leave as is.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PandasOnPython wouldn't work.

As far as I can see, there are no restrictions on its operation. We just need to add essentially the same code as for the other engines.

@anmyachev anmyachev merged commit 9fa326f into modin-project:main Apr 30, 2024
54 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Equivalent of dask.dataframe.from_map?
2 participants