Skip to content

Commit

Permalink
Add more testing, fix some bugs, drop mem cache (#85)
Browse files Browse the repository at this point in the history
  • Loading branch information
Wh1isper committed Dec 21, 2023
1 parent 97daa04 commit 030fffc
Show file tree
Hide file tree
Showing 18 changed files with 156 additions and 654 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ jobs:
- name: Test with pytest
run: |
pytest -vv --cov-config=.coveragerc --cov=sdgx/ tests
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
- name: Install dependencies for building
run: |
pip install build twine hatch
Expand Down
2 changes: 1 addition & 1 deletion docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ Quick demo
print(sampled_data)
You can refer our user guides for more details.
We provided user guides with lots of examples for researchers, scientists and developers. Learn more if you are interested!

.. toctree::
:maxdepth: 3
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ classifiers = [
]

[project.optional-dependencies]
test = ["pytest", "pytest-cov"]
test = ["pytest", "pytest-cov", "coverage<7"]
docs = [
"Sphinx",
"pydata-sphinx-theme",
Expand Down
17 changes: 16 additions & 1 deletion sdgx/cachers/disk_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,24 @@ def load(self, offset: int, chunksize: int, data_connector: DataConnector) -> pd
if len(cached_data) >= chunksize:
return cached_data[:chunksize]
return cached_data
data = data_connector.read(offset=offset, limit=max(self.blocksize, chunksize))
limit = max(self.blocksize, chunksize)
data = data_connector.read(offset=offset, limit=limit)
if data is None:
return data
while len(data) < limit:
# When generator size is less than blocksize
# Continue to read until fit the limit
next_data = data_connector.read(offset=offset + len(data), limit=limit - len(data))
if next_data is None or len(next_data) == 0:
break
data = pd.concat(
[
data,
next_data,
],
ignore_index=True,
)

self._refresh(offset, data)
if len(data) < chunksize:
return data
Expand Down
83 changes: 0 additions & 83 deletions sdgx/cachers/memory_cache.py

This file was deleted.

4 changes: 2 additions & 2 deletions sdgx/data_connectors/csv_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def _read(self, offset: int = 0, limit: int | None = None) -> pd.DataFrame | Non
self.path,
sep=self.sep,
header=self.header,
skiprows=range(1, offset), # don't skip header
skiprows=range(1, offset + 1), # don't skip header
nrows=limit,
**self.read_csv_kwargs,
)
Expand All @@ -81,7 +81,7 @@ def _iter(self, offset: int = 0, chunksize: int = 1000) -> Generator[pd.DataFram
self.path,
sep=self.sep,
header=self.header,
skiprows=range(1, offset), # don't skip header
skiprows=range(1, offset + 1), # don't skip header
chunksize=chunksize,
**self.read_csv_kwargs,
):
Expand Down
40 changes: 24 additions & 16 deletions sdgx/data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pandas as pd

from sdgx.cachers.base import Cacher, NoCache
from sdgx.cachers.disk_cache import DiskCache
from sdgx.cachers.manager import CacherManager
from sdgx.data_connectors.base import DataConnector
from sdgx.data_connectors.generator_connector import GeneratorConnector
Expand Down Expand Up @@ -86,12 +87,13 @@ def generator() -> Generator[pd.DataFrame, None, None]:
"""

DEFAULT_CACHER = DiskCache

def __init__(
self,
data_connector: DataConnector,
chunksize: int = 10000,
cacher: Cacher | None = None,
cache_mode: str = "DiskCache",
cacher: Cacher | str | type[Cacher] | None = None,
cacher_kwargs: None | dict[str, Any] = None,
) -> None:
self.data_connector = data_connector
Expand All @@ -102,12 +104,17 @@ def __init__(
cacher_kwargs = {}
cacher_kwargs.setdefault("blocksize", self.chunksize)
cacher_kwargs.setdefault("identity", self.data_connector.identity)
self.cacher = cacher or self.cache_manager.init_cacher(cache_mode, **cacher_kwargs)
if isinstance(cacher, Cacher):
self.cacher = cacher
elif isinstance(cacher, str) or isinstance(cacher, type):
self.cacher = self.cache_manager.init_cacher(cacher, **cacher_kwargs)
else:
self.cacher = self.cache_manager.init_cacher(self.DEFAULT_CACHER, **cacher_kwargs)

self.cacher.clear_invalid_cache()

if isinstance(data_connector, GeneratorConnector):
if isinstance(cacher, NoCache):
if isinstance(self.cacher, NoCache):
raise DataLoaderInitError("NoCache can't be used with GeneratorConnector")
# Warmup cache for generator, this allows random access
self.load_all()
Expand Down Expand Up @@ -148,23 +155,24 @@ def finalize(self, clear_cache=False) -> None:
if clear_cache:
self.cacher.clear_cache()

def __getitem__(self, key: list | slice | tuple) -> pd.DataFrame:
def __getitem__(self, key: int | slice | list) -> pd.DataFrame:
"""
Support get data by index and slice.
"""
if isinstance(key, list):
sli = None
rows = key
else:
sli = key
rows = None
if isinstance(key, int):
return self.cacher.load(
offset=(key // self.chunksize) * self.chunksize,
chunksize=self.chunksize,
data_connector=self.data_connector,
)[0]

if not sli:
return pd.concat((d[rows] for d in self.iter()), ignore_index=True)
if isinstance(key, list):
return pd.concat((d[key] for d in self.iter()), ignore_index=True)

start = sli.start or 0
stop = sli.stop or len(self)
step = sli.step or 1
assert isinstance(key, slice)
start = key.start or 0
stop = key.stop or len(self)
step = key.step or 1

offset = (start // self.chunksize) * self.chunksize
n_iter = ((stop - start) // self.chunksize) + 1
Expand Down
Loading

0 comments on commit 030fffc

Please sign in to comment.