Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for partition_cols in to_parquet #23321

Merged
merged 19 commits into from
Nov 10, 2018
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions doc/source/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4668,6 +4668,36 @@ Passing ``index=True`` will *always* write the index, even if that's not the
underlying engine's default behavior.


Partitioning Parquet files
''''''''''''''''''''''''''
TomAugspurger marked this conversation as resolved.
Show resolved Hide resolved

.. versionadded:: 0.24.0

Parquet supports partitioning of data based on the values of one or more columns.

.. ipython:: python
:suppress:
anjsudh marked this conversation as resolved.
Show resolved Hide resolved

df = pd.DataFrame({'a': [0, 0, 1, 1], 'b': [0, 1, 0, 1]})
df.to_parquet(fname='test', engine='pyarrow', partition_cols=['a'], compression=None)

The `fname` specifies the parent directory to which data will be saved.
The `partition_cols` are the column names by which the dataset will be partitioned.
Columns are partitioned in the order they are given. The partition splits are
determined by the unique values in the partition columns.
The above example creates a partitioned dataset that may look like:
TomAugspurger marked this conversation as resolved.
Show resolved Hide resolved

.. code-block:: text

test
├── a=0
│ ├── 0bac803e32dc42ae83fddfd029cbdebc.parquet
│ └── ...
└── a=1
├── e6ab24a4f45147b49b54a662f0c412a3.parquet
└── ...


.. _io.sql:

SQL Queries
Expand Down
1 change: 1 addition & 0 deletions doc/source/whatsnew/v0.24.0.txt
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ Other Enhancements
- New attribute :attr:`__git_version__` will return git commit sha of current build (:issue:`21295`).
- Compatibility with Matplotlib 3.0 (:issue:`22790`).
- Added :meth:`Interval.overlaps`, :meth:`IntervalArray.overlaps`, and :meth:`IntervalIndex.overlaps` for determining overlaps between interval-like objects (:issue:`21998`)
- :func:`~DataFrame.to_parquet` now supports writing a ``DataFrame`` as a directory of parquet files partitioned by a subset of the columns, when ``engine = 'pyarrow'``. (:issue:`23283`).
TomAugspurger marked this conversation as resolved.
Show resolved Hide resolved
- :meth:`Timestamp.tz_localize`, :meth:`DatetimeIndex.tz_localize`, and :meth:`Series.tz_localize` have gained the ``nonexistent`` argument for alternative handling of nonexistent times. See :ref:`timeseries.timezone_nonexsistent` (:issue:`8917`)

.. _whatsnew_0240.api_breaking:
Expand Down
15 changes: 12 additions & 3 deletions pandas/core/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -1970,7 +1970,7 @@ def to_feather(self, fname):
to_feather(self, fname)

def to_parquet(self, fname, engine='auto', compression='snappy',
index=None, **kwargs):
index=None, partition_cols=None, **kwargs):
"""
Write a DataFrame to the binary parquet format.

Expand All @@ -1984,7 +1984,10 @@ def to_parquet(self, fname, engine='auto', compression='snappy',
Parameters
----------
fname : str
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

side issue. we use path elsewhere for IO routines. We should change this as well (out of scope here). would have to deprecate (the name) unfortunately.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we actually use path on the top-level .to_parquet, not sure how this is named this way.

String file path.
File path or Root Directory path. Will be used as Root Directory
path while writing a partitioned dataset.
TomAugspurger marked this conversation as resolved.
Show resolved Hide resolved

.. versionchanged:: 0.24.0
anjsudh marked this conversation as resolved.
Show resolved Hide resolved
engine : {'auto', 'pyarrow', 'fastparquet'}, default 'auto'
Parquet library to use. If 'auto', then the option
``io.parquet.engine`` is used. The default ``io.parquet.engine``
Expand All @@ -1997,6 +2000,11 @@ def to_parquet(self, fname, engine='auto', compression='snappy',
If ``False``, they will not be written to the file. If ``None``,
the behavior depends on the chosen engine.

.. versionadded:: 0.24.0
anjsudh marked this conversation as resolved.
Show resolved Hide resolved
partition_cols : list, optional, default None
Column names by which to partition the dataset
Columns are partitioned in the order they are given

.. versionadded:: 0.24.0
anjsudh marked this conversation as resolved.
Show resolved Hide resolved

**kwargs
Expand Down Expand Up @@ -2027,7 +2035,8 @@ def to_parquet(self, fname, engine='auto', compression='snappy',
"""
from pandas.io.parquet import to_parquet
to_parquet(self, fname, engine,
compression=compression, index=index, **kwargs)
compression=compression, index=index,
partition_cols=partition_cols, **kwargs)

@Substitution(header='Write out the column names. If a list of strings '
'is given, it is assumed to be aliases for the '
Expand Down
51 changes: 39 additions & 12 deletions pandas/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,19 +101,25 @@ def __init__(self):
self.api = pyarrow

def write(self, df, path, compression='snappy',
coerce_timestamps='ms', index=None, **kwargs):
coerce_timestamps='ms', index=None, partition_cols=None,
**kwargs):
self.validate_dataframe(df)
path, _, _, _ = get_filepath_or_buffer(path, mode='wb')

if index is None:
from_pandas_kwargs = {}
else:
from_pandas_kwargs = {'preserve_index': index}

table = self.api.Table.from_pandas(df, **from_pandas_kwargs)
self.api.parquet.write_table(
table, path, compression=compression,
coerce_timestamps=coerce_timestamps, **kwargs)
if partition_cols is not None:
self.api.parquet.write_to_dataset(
table, path, compression=compression,
coerce_timestamps=coerce_timestamps,
partition_cols=partition_cols, **kwargs)
else:
self.api.parquet.write_table(
table, path, compression=compression,
coerce_timestamps=coerce_timestamps, **kwargs)

def read(self, path, columns=None, **kwargs):
path, _, _, should_close = get_filepath_or_buffer(path)
Expand Down Expand Up @@ -156,12 +162,24 @@ def __init__(self):
)
self.api = fastparquet

def write(self, df, path, compression='snappy', index=None, **kwargs):
def write(self, df, path, compression='snappy', index=None,
partition_cols=None, **kwargs):
self.validate_dataframe(df)
# thriftpy/protocol/compact.py:339:
# DeprecationWarning: tostring() is deprecated.
# Use tobytes() instead.

if 'partition_on' in kwargs:
if partition_cols is None:
anjsudh marked this conversation as resolved.
Show resolved Hide resolved
partition_cols = kwargs.pop('partition_on')
else:
raise ValueError("Cannot use both partition_on and "
"partition_cols. Use partition_cols for "
"partitioning data")

if partition_cols is not None:
kwargs['file_scheme'] = 'hive'
TomAugspurger marked this conversation as resolved.
Show resolved Hide resolved

if is_s3_url(path):
# path is s3:// so we need to open the s3file in 'wb' mode.
# TODO: Support 'ab'
Expand All @@ -174,7 +192,8 @@ def write(self, df, path, compression='snappy', index=None, **kwargs):

with catch_warnings(record=True):
self.api.write(path, df, compression=compression,
write_index=index, **kwargs)
write_index=index, partition_on=partition_cols,
anjsudh marked this conversation as resolved.
Show resolved Hide resolved
**kwargs)

def read(self, path, columns=None, **kwargs):
if is_s3_url(path):
Expand All @@ -194,15 +213,17 @@ def read(self, path, columns=None, **kwargs):


def to_parquet(df, path, engine='auto', compression='snappy', index=None,
**kwargs):
partition_cols=None, **kwargs):
"""
Write a DataFrame to the parquet format.

Parameters
----------
df : DataFrame
path : string
File path
anjsudh marked this conversation as resolved.
Show resolved Hide resolved
path : str
File path or Root Directory path. Will be used as Root Directory path
while writing a partitioned dataset.
TomAugspurger marked this conversation as resolved.
Show resolved Hide resolved

.. versionchanged:: 0.24.0
engine : {'auto', 'pyarrow', 'fastparquet'}, default 'auto'
Parquet library to use. If 'auto', then the option
``io.parquet.engine`` is used. The default ``io.parquet.engine``
Expand All @@ -216,11 +237,17 @@ def to_parquet(df, path, engine='auto', compression='snappy', index=None,
engine's default behavior will be used.

.. versionadded 0.24.0
partition_cols : list, optional, default None
Column names by which to partition the dataset
Columns are partitioned in the order they are given

.. versionadded:: 0.24.0
kwargs
Additional keyword arguments passed to the engine
"""
impl = get_engine(engine)
return impl.write(df, path, compression=compression, index=index, **kwargs)
return impl.write(df, path, compression=compression, index=index,
partition_cols=partition_cols, **kwargs)


def read_parquet(path, engine='auto', columns=None, **kwargs):
Expand Down
47 changes: 47 additions & 0 deletions pandas/tests/io/test_parquet.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
""" test parquet compat """
import os

import pytest
import datetime
Expand Down Expand Up @@ -454,6 +455,18 @@ def test_s3_roundtrip(self, df_compat, s3_resource, pa):
check_round_trip(df_compat, pa,
path='s3://pandas-test/pyarrow.parquet')

def test_partition_cols_supported(self, pa, df_full):
# GH #23283
partition_cols = ['bool', 'int']
df = df_full
with tm.ensure_clean_dir() as path:
df.to_parquet(path, partition_cols=partition_cols,
compression=None)
import pyarrow.parquet as pq
dataset = pq.ParquetDataset(path, validate_schema=False)
assert len(dataset.partitions.partition_names) == 2
assert dataset.partitions.partition_names == set(partition_cols)


class TestParquetFastParquet(Base):

Expand Down Expand Up @@ -519,3 +532,37 @@ def test_s3_roundtrip(self, df_compat, s3_resource, fp):
# GH #19134
check_round_trip(df_compat, fp,
path='s3://pandas-test/fastparquet.parquet')

def test_partition_cols_supported(self, fp, df_full):
# GH #23283
partition_cols = ['bool', 'int']
df = df_full
with tm.ensure_clean_dir() as path:
df.to_parquet(path, engine="fastparquet",
partition_cols=partition_cols, compression=None)
assert os.path.exists(path)
import fastparquet
actual_partition_cols = fastparquet.ParquetFile(path, False).cats
assert len(actual_partition_cols) == 2

def test_partition_on_supported(self, fp, df_full):
anjsudh marked this conversation as resolved.
Show resolved Hide resolved
# GH #23283
partition_cols = ['bool', 'int']
df = df_full
with tm.ensure_clean_dir() as path:
df.to_parquet(path, engine="fastparquet", compression=None,
partition_on=partition_cols)
assert os.path.exists(path)
import fastparquet
actual_partition_cols = fastparquet.ParquetFile(path, False).cats
assert len(actual_partition_cols) == 2

def test_error_on_using_partition_cols_and_partition_on(self, fp, df_full):
# GH #23283
partition_cols = ['bool', 'int']
df = df_full
with pytest.raises(ValueError):
with tm.ensure_clean_dir() as path:
df.to_parquet(path, engine="fastparquet", compression=None,
partition_on=partition_cols,
partition_cols=partition_cols)
7 changes: 7 additions & 0 deletions pandas/tests/util/test_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -876,3 +876,10 @@ def test_datapath_missing(datapath, request):
)

assert result == expected


def test_create_temp_directory():
with tm.ensure_clean_dir() as path:
assert os.path.exists(path)
assert os.path.isdir(path)
assert not os.path.exists(path)
20 changes: 20 additions & 0 deletions pandas/util/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import locale
import os
import re
from shutil import rmtree
import string
import subprocess
import sys
Expand Down Expand Up @@ -759,6 +760,25 @@ def ensure_clean(filename=None, return_filelike=False):
print("Exception on removing file: {error}".format(error=e))


@contextmanager
def ensure_clean_dir():
"""
Get a temporary directory path and agrees to remove on close.

Yields
------
Temporary directory path
"""
directory_name = tempfile.mkdtemp(suffix='')
try:
yield directory_name
finally:
try:
rmtree(directory_name)
except Exception:
pass


# -----------------------------------------------------------------------------
# Comparators

Expand Down