Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: separate dataframe and dataset #865

Merged
merged 3 commits into from
Sep 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/pythonpackage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ jobs:
brew install libomp
- name: Cache compiled binaries
id: cache-compiled-binaries
uses: actions/cache@v1
uses: actions/cache@v2
with:
path: packages/vaex-core/build
key: ${{ runner.OS }}-${{ matrix.python-version }}-${{ hashFiles('packages/vaex-core/src/*') }}-v4
key: ${{ runner.OS }}-${{ matrix.python-version }}-${{ hashFiles('packages/vaex-core/src/*') }}-v1
- name: Fix cache timestamp
if: steps.cache-compiled-binaries.outputs.cache-hit == 'true' && matrix.os != 'windows-latest'
shell: bash
Expand Down
4 changes: 2 additions & 2 deletions packages/vaex-astro/vaex/astro/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def export_hdf5_v1(dataset, path, column_names=None, byteorder="=", shuffle=Fals
dataset_output.description = description
logger.debug("writing meta information")
dataset_output.write_meta()
dataset_output.close_files()
dataset_output.close()
return


Expand Down Expand Up @@ -200,5 +200,5 @@ def export_hdf5(dataset, path, column_names=None, byteorder="=", shuffle=False,
dataset_output.description = description
logger.debug("writing meta information")
dataset_output.write_meta()
dataset_output.close_files()
dataset_output.close()
return
3 changes: 2 additions & 1 deletion packages/vaex-core/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
install_requires_core = ["numpy>=1.16", "astropy>=2", "aplus", "tabulate>=0.8.3",
"future>=0.15.2", "pyyaml", "progressbar2", "psutil>=1.2.1",
"requests", "six", "cloudpickle", "pandas", "dask[array]",
"nest-asyncio>=1.3.3", "pyarrow>=1.0"]
"nest-asyncio>=1.3.3", "pyarrow>=1.0", "frozendict",
"blake3"]
if sys.version_info[0] == 2:
install_requires_core.append("futures>=2.2.0")
install_requires_viz = ["matplotlib>=1.3.1", ]
Expand Down
46 changes: 18 additions & 28 deletions packages/vaex-core/vaex/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,10 @@ def open(path, convert=False, shuffle=False, copy_index=False, *args, **kwargs):
else:
# with ProcessPoolExecutor() as executor:
# executor.submit(read_csv_and_convert, filenames, shuffle=shuffle, **kwargs)
DataFrames = []
dfs = []
for filename in filenames:
DataFrames.append(open(filename, convert=bool(convert), shuffle=shuffle, **kwargs))
ds = vaex.dataframe.DataFrameConcatenated(DataFrames)
dfs.append(open(filename, convert=bool(convert), shuffle=shuffle, **kwargs))
ds = concat(dfs)
if convert:
ds.export_hdf5(filename_hdf5, shuffle=shuffle)
ds = vaex.file.open(filename_hdf5)
Expand All @@ -253,7 +253,7 @@ def open_many(filenames):
filename = filename.strip()
if filename and filename[0] != "#":
dfs.append(open(filename))
return vaex.dataframe.DataFrameConcatenated(dfs=dfs)
return concat(dfs)


def from_samp(username=None, password=None):
Expand All @@ -270,7 +270,8 @@ def from_samp(username=None, password=None):
def from_astropy_table(table):
"""Create a vaex DataFrame from an Astropy Table."""
import vaex.file.other
return vaex.file.other.DatasetAstropyTable(table=table)
ds = vaex.file.other.DatasetAstropyTable(table=table)
return vaex.dataframe.DataFrameLocal(ds)


def from_dict(data):
Expand Down Expand Up @@ -312,11 +313,7 @@ def from_items(*items):
:rtype: DataFrame

"""
import numpy as np
df = vaex.dataframe.DataFrameArrays("array")
for name, array in items:
df.add_column(name, np.asanyarray(array))
return df
return from_dict(dict(items))


def from_arrays(**arrays):
Expand Down Expand Up @@ -348,15 +345,8 @@ def from_arrays(**arrays):
"""
import numpy as np
import six
from .column import Column, supported_column_types
df = vaex.dataframe.DataFrameArrays("array")
for name, array in arrays.items():
if isinstance(array, supported_column_types):
df.add_column(name, array)
else:
array = np.asanyarray(array)
df.add_column(name, array)
return df
dataset = vaex.dataset.DatasetArrays(arrays)
return vaex.dataframe.DataFrameLocal(dataset)

def from_arrow_table(table, as_numpy=True):
"""Creates a vaex DataFrame from an arrow Table.
Expand Down Expand Up @@ -397,27 +387,27 @@ def from_pandas(df, name="pandas", copy_index=False, index_name="index"):
import six
import pandas as pd
import numpy as np
vaex_df = vaex.dataframe.DataFrameArrays(name)
columns = {}

def add(name, column):
values = column.values
# the first test is to support (partially) pandas 0.23
if hasattr(pd.core.arrays, 'integer') and isinstance(values, pd.core.arrays.integer.IntegerArray):
values = np.ma.array(values._data, mask=values._mask)
try:
vaex_df.add_column(name, values)
columns[name] = vaex.dataset.to_supported_array(values)
except Exception as e:
print("could not convert column %s, error: %r, will try to convert it to string" % (name, e))
try:
values = values.astype("S")
vaex_df.add_column(name, values)
columns[name] = vaex.dataset.to_supported_array(values)
except Exception as e:
print("Giving up column %s, error: %r" % (name, e))
for name in df.columns:
add(name, df[name])
if copy_index:
add(index_name, df.index)
return vaex_df
return from_dict(columns)


def from_ascii(path, seperator=None, names=True, skip_lines=0, skip_after=0, **kwargs):
Expand All @@ -437,7 +427,7 @@ def from_ascii(path, seperator=None, names=True, skip_lines=0, skip_after=0, **k
"""

import vaex.ext.readcol as rc
ds = vaex.dataframe.DataFrameArrays(path)
ds = vaex.dataframe.DataFrameLocal()
if names not in [True, False]:
namelist = names
names = False
Expand Down Expand Up @@ -562,13 +552,13 @@ def _from_csv_convert_and_read(filename_or_buffer, copy_index, maybe_convert_pat
else:
logger.info('converting %d chunks into single HDF5 file %s' % (len(converted_paths), combined_hdf5))
dfs = [vaex.file.open(p) for p in converted_paths]
df_combined = vaex.dataframe.DataFrameConcatenated(dfs)
df_combined = vaex.concat(dfs)
df_combined.export_hdf5(combined_hdf5, shuffle=False)

logger.info('deleting %d chunk files' % len(converted_paths))
for df, df_path in zip(dfs, converted_paths):
try:
df.close_files()
df.close()
os.remove(df_path)
except Exception as e:
logger.error('Could not close or delete intermediate hdf5 file %s used to convert %s to hdf5: %s' % (
Expand Down Expand Up @@ -814,8 +804,8 @@ def concat(dfs):

:rtype: DataFrame
'''
ds = reduce((lambda x, y: x.concat(y)), dfs)
return ds
df, *tail = dfs
return df.concat(*tail)

def vrange(start, stop, step=1, dtype='f8'):
"""Creates a virtual column which is the equivalent of numpy.arange, but uses 0 memory"""
Expand Down
8 changes: 8 additions & 0 deletions packages/vaex-core/vaex/array_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def filter(ar, boolean_mask):
else:
return ar[boolean_mask]


def same_type(type1, type2):
try:
return type1 == type2
Expand All @@ -40,6 +41,13 @@ def tolist(ar):
return ar.tolist()


def data_type(ar):
if isinstance(ar, supported_arrow_array_types):
return ar.type
else:
return ar.dtype


def to_numpy(x, strict=False):
import vaex.arrow.convert
import vaex.column
Expand Down
9 changes: 4 additions & 5 deletions packages/vaex-core/vaex/arrow/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@


def from_table(table, as_numpy=True):
df = vaex.dataframe.DataFrameLocal(None, None, [])
df._length_unfiltered = df._length_original = table.num_rows
df._index_end = df._length_original = table.num_rows
for col, name in zip(table.columns, table.schema.names):
df.add_column(name, col)
columns = dict(zip(table.schema.names, table.columns))
# TODO: this should be an DatasetArrow and/or DatasetParquet
dataset = vaex.dataset.DatasetArrays(columns)
df = vaex.dataframe.DataFrameLocal(dataset)
return df.as_numpy() if as_numpy else df


Expand Down
49 changes: 40 additions & 9 deletions packages/vaex-core/vaex/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
logger = logging.getLogger("vaex.column")



class Column(object):
def tolist(self):
return self.to_numpy().tolist()
Expand Down Expand Up @@ -96,12 +97,36 @@ def __getitem__(self, slice):
def __setitem__(self, slice, value):
self.ar[slice] = value



class ColumnArrowLazyCast(Column):
"""Wraps an array like object and cast it lazily"""
def __init__(self, ar, type):
self.ar = ar # this should behave like a numpy array
self.type = type

@property
def dtype(self):
return vaex.array_types.to_numpy_type(self.type)

def __len__(self):
return len(self.ar)

def trim(self, i1, i2):
return type(self)(self.ar[i1:i2], self.type)

def __getitem__(self, slice):
if self.ar.dtype == object and vaex.array_types.is_string_type(self.type):
# this seem to be the only way to convert mixed str and nan to include nulls
return pa.Array.from_pandas(self.ar[slice], type=self.type)
return pa.array(self.ar[slice], type=self.type)


class ColumnIndexed(Column):
def __init__(self, df, indices, name, masked=False):
self.df = df
def __init__(self, column, indices, masked=False):
self.column = column
self.indices = indices
self.name = name
self.dtype = self.df.data_type(name)
self.dtype = vaex.array_types.data_type(column)
self.shape = (len(indices),)
self.masked = masked
# this check is too expensive
Expand All @@ -110,7 +135,7 @@ def __init__(self, df, indices, name, masked=False):
# assert max_index < self.df._length_original

@staticmethod
def index(df, column, name, indices, direct_indices_map=None, masked=False):
def index(column, indices, direct_indices_map=None, masked=False):
"""Creates a new column indexed by indices which avoids nested indices

:param df: Dataframe where column comes from
Expand All @@ -131,15 +156,15 @@ def index(df, column, name, indices, direct_indices_map=None, masked=False):
direct_indices_map[id(column.indices)] = direct_indices
else:
direct_indices = direct_indices_map[id(column.indices)]
return ColumnIndexed(column.df, direct_indices, column.name, masked=masked or column.masked)
return ColumnIndexed(column.column, direct_indices, masked=masked or column.masked)
else:
return ColumnIndexed(df, indices, name, masked=masked)
return ColumnIndexed(column, indices, masked=masked)

def __len__(self):
return len(self.indices)

def trim(self, i1, i2):
return ColumnIndexed(self.df, self.indices[i1:i2], self.name, masked=self.masked)
return ColumnIndexed(self.column, self.indices[i1:i2], masked=self.masked)

def __arrow_array__(self, type=None):
# TODO: without a copy we get a buserror
Expand All @@ -150,13 +175,16 @@ def __arrow_array__(self, type=None):
# else:
return pa.array(self)

def to_numpy(self):
return np.array(self[:])

def __getitem__(self, slice):
start, stop, step = slice.start, slice.stop, slice.step
start = start or 0
stop = stop or len(self)
assert step in [None, 1]
indices = self.indices[start:stop]
ar_unfiltered = self.df.columns[self.name]
ar_unfiltered = self.column
if self.masked:
mask = indices == -1
if isinstance(ar_unfiltered, Column):
Expand Down Expand Up @@ -297,6 +325,9 @@ def trim(self, i1, i2):

return ColumnConcatenatedLazy(expressions, self.dtype)

def __arrow_array__(self, type=None):
return pa.array(self[:], type=type)

def __getitem__(self, slice):
start, stop, step = slice.start, slice.stop, slice.step
start = start or 0
Expand Down
Loading