Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for partition_cols in to_parquet #23321

Merged
merged 19 commits into from
Nov 10, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions doc/source/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4574,6 +4574,8 @@ Several caveats.
* Categorical dtypes can be serialized to parquet, but will de-serialize as ``object`` dtype.
* Non supported types include ``Period`` and actual Python object types. These will raise a helpful error message
on an attempt at serialization.
* ``partition_cols`` will be used for partitioning the dataset, where the dataset will be written to multiple
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rest of the items in this lists feel more like limitations of pandas / these engines. Requiring that path be a directory when partition_cols is set doesn't seem to fit here.

I think this is important / different enough to deserve a new small section below "Handling Indexes", with

  1. A description of what partition_cols requires (list of column names, directory for file path)
  2. A description of why you might want to use partition_cols
  3. A small example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

files in the path specified. Therefore, the path specified, must be a directory path.

You can specify an ``engine`` to direct the serialization. This can be one of ``pyarrow``, or ``fastparquet``, or ``auto``.
If the engine is NOT specified, then the ``pd.options.io.parquet.engine`` option is checked; if this is also ``auto``,
Expand Down
1 change: 1 addition & 0 deletions doc/source/whatsnew/v0.24.0.txt
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ Other Enhancements
- New attribute :attr:`__git_version__` will return git commit sha of current build (:issue:`21295`).
- Compatibility with Matplotlib 3.0 (:issue:`22790`).
- Added :meth:`Interval.overlaps`, :meth:`IntervalArray.overlaps`, and :meth:`IntervalIndex.overlaps` for determining overlaps between interval-like objects (:issue:`21998`)
- :func:`~DataFrame.to_parquet` now supports writing a DataFrame as a directory of parquet files partitioned by a subset of the columns. (:issue:`23283`).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably best to mention "with the pyarrow engine (this was previously supported with fastparquet)."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

- :meth:`Timestamp.tz_localize`, :meth:`DatetimeIndex.tz_localize`, and :meth:`Series.tz_localize` have gained the ``nonexistent`` argument for alternative handling of nonexistent times. See :ref:`timeseries.timezone_nonexsistent` (:issue:`8917`)

.. _whatsnew_0240.api_breaking:
Expand Down
14 changes: 11 additions & 3 deletions pandas/core/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -1970,7 +1970,7 @@ def to_feather(self, fname):
to_feather(self, fname)

def to_parquet(self, fname, engine='auto', compression='snappy',
index=None, **kwargs):
index=None, partition_cols=None, **kwargs):
"""
Write a DataFrame to the binary parquet format.

Expand All @@ -1984,7 +1984,8 @@ def to_parquet(self, fname, engine='auto', compression='snappy',
Parameters
----------
fname : str
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

side issue. we use path elsewhere for IO routines. We should change this as well (out of scope here). would have to deprecate (the name) unfortunately.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we actually use path on the top-level .to_parquet, not sure how this is named this way.

String file path.
File path or Root Directory path. Will be used as Root Directory
path while writing a partitioned dataset.
TomAugspurger marked this conversation as resolved.
Show resolved Hide resolved
engine : {'auto', 'pyarrow', 'fastparquet'}, default 'auto'
Parquet library to use. If 'auto', then the option
``io.parquet.engine`` is used. The default ``io.parquet.engine``
Expand All @@ -1998,6 +1999,12 @@ def to_parquet(self, fname, engine='auto', compression='snappy',
the behavior depends on the chosen engine.

.. versionadded:: 0.24.0
anjsudh marked this conversation as resolved.
Show resolved Hide resolved
partition_cols : list, optional, default None
Column names by which to partition the dataset
Columns are partitioned in the order they are given
The behaviour applies only to pyarrow >= 0.7.0 and fastparquet
For other versions, this argument will be ignored.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it actually ignored for older pyarrows? I would have hoped it would raise when pyarrow gets the unrecognized argument.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it seems like we raise. Could you update this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

.. versionadded:: 0.24.0
anjsudh marked this conversation as resolved.
Show resolved Hide resolved

**kwargs
Additional arguments passed to the parquet library. See
Expand Down Expand Up @@ -2027,7 +2034,8 @@ def to_parquet(self, fname, engine='auto', compression='snappy',
"""
from pandas.io.parquet import to_parquet
to_parquet(self, fname, engine,
compression=compression, index=index, **kwargs)
compression=compression, index=index,
partition_cols=partition_cols, **kwargs)

@Substitution(header='Write out the column names. If a list of strings '
'is given, it is assumed to be aliases for the '
Expand Down
41 changes: 30 additions & 11 deletions pandas/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ def __init__(self):
self.api = pyarrow

def write(self, df, path, compression='snappy',
coerce_timestamps='ms', index=None, **kwargs):
coerce_timestamps='ms', index=None, partition_cols=None,
**kwargs):
self.validate_dataframe(df)

# Only validate the index if we're writing it.
Expand All @@ -125,9 +126,15 @@ def write(self, df, path, compression='snappy',

else:
table = self.api.Table.from_pandas(df, **from_pandas_kwargs)
self.api.parquet.write_table(
table, path, compression=compression,
coerce_timestamps=coerce_timestamps, **kwargs)
if partition_cols is not None:
self.api.parquet.write_to_dataset(
table, path, compression=compression,
coerce_timestamps=coerce_timestamps,
partition_cols=partition_cols, **kwargs)
else:
self.api.parquet.write_table(
table, path, compression=compression,
coerce_timestamps=coerce_timestamps, **kwargs)

def read(self, path, columns=None, **kwargs):
path, _, _, should_close = get_filepath_or_buffer(path)
Expand Down Expand Up @@ -206,12 +213,16 @@ def __init__(self):
)
self.api = fastparquet

def write(self, df, path, compression='snappy', index=None, **kwargs):
def write(self, df, path, compression='snappy', index=None,
partition_cols=None, **kwargs):
self.validate_dataframe(df)
# thriftpy/protocol/compact.py:339:
# DeprecationWarning: tostring() is deprecated.
# Use tobytes() instead.

if partition_cols is not None:
kwargs['file_scheme'] = 'hive'
TomAugspurger marked this conversation as resolved.
Show resolved Hide resolved

if is_s3_url(path):
# path is s3:// so we need to open the s3file in 'wb' mode.
# TODO: Support 'ab'
Expand All @@ -224,7 +235,8 @@ def write(self, df, path, compression='snappy', index=None, **kwargs):

with catch_warnings(record=True):
self.api.write(path, df, compression=compression,
write_index=index, **kwargs)
write_index=index, partition_on=partition_cols,
anjsudh marked this conversation as resolved.
Show resolved Hide resolved
**kwargs)

def read(self, path, columns=None, **kwargs):
if is_s3_url(path):
Expand All @@ -244,15 +256,15 @@ def read(self, path, columns=None, **kwargs):


def to_parquet(df, path, engine='auto', compression='snappy', index=None,
**kwargs):
partition_cols=None, **kwargs):
"""
Write a DataFrame to the parquet format.

Parameters
----------
df : DataFrame
path : string
File path
anjsudh marked this conversation as resolved.
Show resolved Hide resolved
path : str
File path or Root Directory path. Will be used as Root Directory path
while writing a partitioned dataset.
TomAugspurger marked this conversation as resolved.
Show resolved Hide resolved
engine : {'auto', 'pyarrow', 'fastparquet'}, default 'auto'
Parquet library to use. If 'auto', then the option
``io.parquet.engine`` is used. The default ``io.parquet.engine``
Expand All @@ -266,11 +278,18 @@ def to_parquet(df, path, engine='auto', compression='snappy', index=None,
engine's default behavior will be used.

.. versionadded 0.24.0
partition_cols : list, optional
Column names by which to partition the dataset
Columns are partitioned in the order they are given
The behaviour applies only to pyarrow >= 0.7.0 and fastparquet
For other versions, this argument will be ignored.
anjsudh marked this conversation as resolved.
Show resolved Hide resolved
anjsudh marked this conversation as resolved.
Show resolved Hide resolved
.. versionadded:: 0.24.0
kwargs
Additional keyword arguments passed to the engine
"""
impl = get_engine(engine)
return impl.write(df, path, compression=compression, index=index, **kwargs)
return impl.write(df, path, compression=compression, index=index,
partition_cols=partition_cols, **kwargs)


def read_parquet(path, engine='auto', columns=None, **kwargs):
Expand Down
33 changes: 33 additions & 0 deletions pandas/tests/io/test_parquet.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
""" test parquet compat """
import os

import pytest
import datetime
Expand Down Expand Up @@ -478,6 +479,26 @@ def test_s3_roundtrip(self, df_compat, s3_resource, pa):
check_round_trip(df_compat, pa,
path='s3://pandas-test/pyarrow.parquet')

def test_partition_cols_supported(self, pa_ge_070, df_full):
# GH #23283
partition_cols = ['bool', 'int']
df = df_full
with tm.ensure_clean_dir() as path:
df.to_parquet(path, partition_cols=partition_cols,
compression=None)
import pyarrow.parquet as pq
dataset = pq.ParquetDataset(path, validate_schema=False)
assert len(dataset.partitions.partition_names) == 2
assert dataset.partitions.partition_names == set(partition_cols)

def test_ignore_partition_cols_lt_070(self, pa_lt_070, df_full):
anjsudh marked this conversation as resolved.
Show resolved Hide resolved
# GH #23283
partition_cols = ['bool', 'int']
pa = pa_lt_070
df = df_full
check_round_trip(df, pa,
write_kwargs={'partition_cols': partition_cols})


class TestParquetFastParquet(Base):

Expand Down Expand Up @@ -543,3 +564,15 @@ def test_s3_roundtrip(self, df_compat, s3_resource, fp):
# GH #19134
check_round_trip(df_compat, fp,
path='s3://pandas-test/fastparquet.parquet')

def test_partition_cols_supported(self, fp, df_full):
# GH #23283
partition_cols = ['bool', 'int']
df = df_full
with tm.ensure_clean_dir() as path:
df.to_parquet(path, partition_cols=partition_cols,
compression=None)
assert os.path.exists(path)
import fastparquet
actual_partition_cols = fastparquet.ParquetFile(path, False).cats
assert len(actual_partition_cols) == 2
9 changes: 9 additions & 0 deletions pandas/tests/util/test_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -875,3 +875,12 @@ def test_datapath_missing(datapath, request):
)

assert result == expected


def test_create_temp_directory():
temppath = ''
with tm.ensure_clean_dir() as path:
assert os.path.exists(path)
assert os.path.isdir(path)
temppath = path
assert not os.path.exists(temppath)
anjsudh marked this conversation as resolved.
Show resolved Hide resolved
17 changes: 17 additions & 0 deletions pandas/util/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,23 @@ def ensure_clean(filename=None, return_filelike=False):
print("Exception on removing file: {error}".format(error=e))


@contextmanager
def ensure_clean_dir():
"""
Get a temporary directory path and agrees to remove on close.

Yields
----------
anjsudh marked this conversation as resolved.
Show resolved Hide resolved
Temporary directory path
"""
directory_name = tempfile.mkdtemp(suffix='')
try:
yield directory_name
finally:
import shutil
shutil.rmtree(directory_name)
anjsudh marked this conversation as resolved.
Show resolved Hide resolved


# -----------------------------------------------------------------------------
# Comparators

Expand Down