Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more testing, fix some bugs, drop mem cache #85

Merged
merged 8 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ jobs:
- name: Test with pytest
run: |
pytest -vv --cov-config=.coveragerc --cov=sdgx/ tests
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
- name: Install dependencies for building
run: |
pip install build twine hatch
Expand Down
2 changes: 1 addition & 1 deletion docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ Quick demo
print(sampled_data)


You can refer our user guides for more details.
We provided user guides with lots of examples for researchers, scientists and developers. Learn more if you are interested!

.. toctree::
:maxdepth: 3
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ classifiers = [
]

[project.optional-dependencies]
test = ["pytest", "pytest-cov"]
test = ["pytest", "pytest-cov", "coverage<7"]
docs = [
"Sphinx",
"pydata-sphinx-theme",
Expand Down
17 changes: 16 additions & 1 deletion sdgx/cachers/disk_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,24 @@ def load(self, offset: int, chunksize: int, data_connector: DataConnector) -> pd
if len(cached_data) >= chunksize:
return cached_data[:chunksize]
return cached_data
data = data_connector.read(offset=offset, limit=max(self.blocksize, chunksize))
limit = max(self.blocksize, chunksize)
data = data_connector.read(offset=offset, limit=limit)
if data is None:
return data
while len(data) < limit:
# When generator size is less than blocksize
# Continue to read until fit the limit
next_data = data_connector.read(offset=offset + len(data), limit=limit - len(data))
if next_data is None or len(next_data) == 0:
break
data = pd.concat(
[
data,
next_data,
],
ignore_index=True,
)

self._refresh(offset, data)
if len(data) < chunksize:
return data
Expand Down
83 changes: 0 additions & 83 deletions sdgx/cachers/memory_cache.py

This file was deleted.

4 changes: 2 additions & 2 deletions sdgx/data_connectors/csv_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def _read(self, offset: int = 0, limit: int | None = None) -> pd.DataFrame | Non
self.path,
sep=self.sep,
header=self.header,
skiprows=range(1, offset), # don't skip header
skiprows=range(1, offset + 1), # don't skip header
nrows=limit,
**self.read_csv_kwargs,
)
Expand All @@ -81,7 +81,7 @@ def _iter(self, offset: int = 0, chunksize: int = 1000) -> Generator[pd.DataFram
self.path,
sep=self.sep,
header=self.header,
skiprows=range(1, offset), # don't skip header
skiprows=range(1, offset + 1), # don't skip header
chunksize=chunksize,
**self.read_csv_kwargs,
):
Expand Down
40 changes: 24 additions & 16 deletions sdgx/data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pandas as pd

from sdgx.cachers.base import Cacher, NoCache
from sdgx.cachers.disk_cache import DiskCache
from sdgx.cachers.manager import CacherManager
from sdgx.data_connectors.base import DataConnector
from sdgx.data_connectors.generator_connector import GeneratorConnector
Expand Down Expand Up @@ -86,12 +87,13 @@ def generator() -> Generator[pd.DataFrame, None, None]:

"""

DEFAULT_CACHER = DiskCache

def __init__(
self,
data_connector: DataConnector,
chunksize: int = 10000,
cacher: Cacher | None = None,
cache_mode: str = "DiskCache",
cacher: Cacher | str | type[Cacher] | None = None,
cacher_kwargs: None | dict[str, Any] = None,
) -> None:
self.data_connector = data_connector
Expand All @@ -102,12 +104,17 @@ def __init__(
cacher_kwargs = {}
cacher_kwargs.setdefault("blocksize", self.chunksize)
cacher_kwargs.setdefault("identity", self.data_connector.identity)
self.cacher = cacher or self.cache_manager.init_cacher(cache_mode, **cacher_kwargs)
if isinstance(cacher, Cacher):
self.cacher = cacher
elif isinstance(cacher, str) or isinstance(cacher, type):
self.cacher = self.cache_manager.init_cacher(cacher, **cacher_kwargs)
else:
self.cacher = self.cache_manager.init_cacher(self.DEFAULT_CACHER, **cacher_kwargs)

self.cacher.clear_invalid_cache()

if isinstance(data_connector, GeneratorConnector):
if isinstance(cacher, NoCache):
if isinstance(self.cacher, NoCache):
raise DataLoaderInitError("NoCache can't be used with GeneratorConnector")
# Warmup cache for generator, this allows random access
self.load_all()
Expand Down Expand Up @@ -148,23 +155,24 @@ def finalize(self, clear_cache=False) -> None:
if clear_cache:
self.cacher.clear_cache()

def __getitem__(self, key: list | slice | tuple) -> pd.DataFrame:
def __getitem__(self, key: int | slice | list) -> pd.DataFrame:
"""
Support get data by index and slice.
"""
if isinstance(key, list):
sli = None
rows = key
else:
sli = key
rows = None
if isinstance(key, int):
return self.cacher.load(
offset=(key // self.chunksize) * self.chunksize,
chunksize=self.chunksize,
data_connector=self.data_connector,
)[0]

if not sli:
return pd.concat((d[rows] for d in self.iter()), ignore_index=True)
if isinstance(key, list):
return pd.concat((d[key] for d in self.iter()), ignore_index=True)

start = sli.start or 0
stop = sli.stop or len(self)
step = sli.step or 1
assert isinstance(key, slice)
start = key.start or 0
stop = key.stop or len(self)
step = key.step or 1

offset = (start // self.chunksize) * self.chunksize
n_iter = ((stop - start) // self.chunksize) + 1
Expand Down
Loading