Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Init benchmark base code #80

Merged
merged 14 commits into from
Dec 20, 2023
3 changes: 3 additions & 0 deletions benchmarks/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
dataset/
.ndarry_cache
.sdgx_cache
54 changes: 54 additions & 0 deletions benchmarks/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# WIP

Please help us to improve our benchmark: https://github.com/hitsz-ids/synthetic-data-generator/issues/82

## Benchmarks

Benchmarks aim to measure the performance of the library.

- Performance: Processing time, Training time of model, Simpling rate...
- Memory Consumption
- Others, like cache hit rate...

Now we provide a simple benchmark for our CTGAN implementation against the original one. Fit them with a big ramdom dataset, and compare their memory consumptions.

### Setup

```bash
# Clone and install latest version
# You can also use our latest image: docker pull idsteam/sdgx:latest
git clone https://github.com/hitsz-ids/synthetic-data-generator.git
cd synthetic-data-generator && pip install -e ./
# Setup benchmark
cd benchmarks
pip install -r requirements.txt
```

Generate a dataset with `python generate_dataset.py`, you can use `python generate_dataset.py --help` to see the usage.

### Benchmark our implementation

We use [memory_profiler](https://github.com/pythonprofilers/memory_profiler) to benchmark our implementation.

```bash
mprof run python ./sdgx_ctgan.py
```

Plot the results with `mprof plot` or `mprof plot --output=sdgx_ctgan.png` to save the plot.

### Benchmark original implementation

```bash
pip install ctgan
mprof run python ./sdv_ctgan.py
```

Plot the results with `mprof plot` or `mprof plot --output=sdv_ctgan.png` to save the plot.

## Results

In default settings, our implementation can fit 1,000,000 x 50 size dataset in 32GB(usable nearly 20GB) memory mechine. And the original implementation need more than 20GB memory and crashed during training.

![succeed-memory-sdgx-ctgan](./img/succeed-memory-sdgx-ctgan.png)

![failed-memory-sdv_ctgan](./img/failed-memory-sdv_ctgan.png)
93 changes: 93 additions & 0 deletions benchmarks/generate_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import datetime
import itertools
import random
import string
from pathlib import Path

import click

_HERE = Path(__file__).parent


def random_string(length):
return "".join(random.choice(string.ascii_lowercase) for i in range(length))


def random_float():
return random.random() * 1000


def random_int():
return random.randint(0, 1000)


def random_timestamp():
current_timestamp = datetime.datetime.now().timestamp()
return current_timestamp + random_int()


def random_datetime():
return datetime.datetime.fromtimestamp(random_timestamp())


@click.option(
"--output_file",
default=(_HERE / "dataset/benchmark.csv").as_posix(),
)
@click.option("--num_rows", default=1_000_000)
@click.option("--int_cols", default=15)
@click.option("--float_cols", default=15)
@click.option("--string_cols", default=10)
@click.option("--string_discrete_nums", default=50)
@click.option("--timestamp_cols", default=10)
@click.option("--datetime_cols", default=0)
@click.command()
def generate_dateset(
output_file,
num_rows,
int_cols,
float_cols,
string_cols,
string_discrete_nums,
timestamp_cols,
datetime_cols,
):
headers = itertools.chain.from_iterable(
[
(f"int_col{i}" for i in range(int_cols)),
(f"float_col{i}" for i in range(float_cols)),
(f"string_col{i}" for i in range(string_cols)),
(f"timestamp_col{i}" for i in range(timestamp_cols)),
(f"datetime_col{i}" for i in range(datetime_cols)),
]
)
output_file = Path(output_file).expanduser().resolve()
output_file.parent.mkdir(parents=True, exist_ok=True)

random_str_list = [random_string(25) for i in range(string_discrete_nums)]

def _generate_one_line():
return ",".join(
map(
str,
itertools.chain(
(random_int() for _ in range(int_cols)),
(random_float() for _ in range(float_cols)),
(random.choice(random_str_list) for _ in range(string_cols)),
(random_timestamp() for _ in range(timestamp_cols)),
(random_datetime() for _ in range(datetime_cols)),
),
)
)

with output_file.open("w") as f:
f.write(",".join(headers) + "\n")

chunk_size = 1000
for i in range(0, num_rows, chunk_size):
f.write("\n".join(_generate_one_line() for _ in range(chunk_size)))
f.write("\n")


if __name__ == "__main__":
generate_dateset()
Binary file added benchmarks/img/failed-memory-sdv_ctgan.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added benchmarks/img/succeed-memory-sdgx-ctgan.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
3 changes: 3 additions & 0 deletions benchmarks/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
click
memory_profiler
psutil
23 changes: 23 additions & 0 deletions benchmarks/sdgx_ctgan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import os

os.environ["SDGX_LOG_LEVEL"] = "DEBUG"


from pathlib import Path

from sdgx.data_connectors.csv_connector import CsvConnector
from sdgx.models.ml.single_table.ctgan import CTGANSynthesizerModel
from sdgx.synthesizer import Synthesizer

_HERE = Path(__file__).parent

dataset_csv = (_HERE / "dataset/benchmark.csv").expanduser().resolve()
data_connector = CsvConnector(path=dataset_csv)
synthesizer = Synthesizer(
model=CTGANSynthesizerModel,
data_connector=data_connector,
model_kwargs={"epochs": 1, "device": "cpu"},
)
synthesizer.fit()
# sampled_data = synthesizer.sample(1000)
# synthesizer.cleanup() # Clean all cache
16 changes: 16 additions & 0 deletions benchmarks/sdv_ctgan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from pathlib import Path

import pandas as pd

_HERE = Path(__file__).parent

dataset_csv = (_HERE / "dataset/benchmark.csv").expanduser().resolve()
df = pd.read_csv(dataset_csv)

discrete_columns = [s for s in df.columns if s.startswith("string")]


from ctgan import CTGAN

ctgan = CTGAN(epochs=1, cuda=False)
ctgan.fit(df, discrete_columns)
4 changes: 4 additions & 0 deletions example/1_ctgan_example.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import os

os.environ["SDGX_LOG_LEVEL"] = "DEBUG"

from sdgx.data_connectors.csv_connector import CsvConnector
from sdgx.models.ml.single_table.ctgan import CTGANSynthesizerModel
from sdgx.synthesizer import Synthesizer
Expand Down
1 change: 0 additions & 1 deletion sdgx/cachers/disk_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ def _refresh(self, offset: int, data: pd.DataFrame) -> None:
else:
data.to_parquet(self._get_cache_filename(offset))

@lru_cache(maxsize=64)
def load(self, offset: int, chunksize: int, data_connector: DataConnector) -> pd.DataFrame:
"""
Load data from data_connector or cache
Expand Down
3 changes: 2 additions & 1 deletion sdgx/data_models/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from sdgx.data_loader import DataLoader
from sdgx.data_models.inspectors.manager import InspectorManager
from sdgx.exceptions import MetadataInitError
from sdgx.utils import cache
from sdgx.utils import logger

# TODO: Design metadata for relationships...
# class DType(Enum):
Expand Down Expand Up @@ -56,6 +56,7 @@ def from_dataloader(
exclude_inspectors: list[str] | None = None,
inspector_init_kwargs: dict[str, Any] | None = None,
) -> "Metadata":
logger.info("Inspecting metadata...")
inspectors = InspectorManager().init_inspcetors(
include_inspectors, exclude_inspectors, **(inspector_init_kwargs or {})
)
Expand Down
4 changes: 1 addition & 3 deletions sdgx/models/components/optimize/ndarray_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,8 @@ def load(self, index: int) -> ndarray:
return np.load(self._get_cache_filename(int(index)))

def cleanup(self):
for i in range(self.store_index):
self._get_cache_filename(i).unlink(missing_ok=True)
self.store_index = 0
shutil.rmtree(self.cache_dir, ignore_errors=True)
self.store_index = 0

def iter(self) -> Generator[ndarray, None, None]:
for i in range(self.store_index):
Expand Down
4 changes: 4 additions & 0 deletions sdgx/models/components/optimize/sdv_ctgan/data_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,18 @@ def fit(self, data_loader: DataLoader, discrete_columns=()):
self._column_transform_info_list = []
for column_name in data_loader.columns():
if column_name in discrete_columns:
logger.debug(f"Fitting discrete column {column_name}...")
column_transform_info = self._fit_discrete(data_loader[[column_name]])
else:
logger.debug(f"Fitting continuous column {column_name}...")
column_transform_info = self._fit_continuous(data_loader[[column_name]])

self.output_info_list.append(column_transform_info.output_info)
self.output_dimensions += column_transform_info.output_dimensions
self._column_transform_info_list.append(column_transform_info)

def _transform_continuous(self, column_transform_info, data):
logger.debug(f"Transforming continuous column {column_transform_info.column_name}...")
column_name = data.columns[0]
data[column_name] = data[column_name].to_numpy().flatten()
gm = column_transform_info.transform
Expand All @@ -130,6 +133,7 @@ def _transform_continuous(self, column_transform_info, data):
return output

def _transform_discrete(self, column_transform_info, data):
logger.debug(f"Transforming discrete column {column_transform_info.column_name}...")
ohe = column_transform_info.transform
return ohe.transform(data).to_numpy()

Expand Down
21 changes: 10 additions & 11 deletions sdgx/models/ml/single_table/ctgan.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import time
from pathlib import Path

import numpy as np
Expand Down Expand Up @@ -151,8 +152,6 @@ class CTGANSynthesizerModel(MLSynthesizerModel, SDVBaseSynthesizer):
log_frequency (boolean):
Whether to use log frequency of categorical levels in conditional
sampling. Defaults to ``True``.
verbose (boolean):
Whether to have print statements for progress results. Defaults to ``False``.
epochs (int):
Number of training epochs. Defaults to 300.
pac (int):
Expand All @@ -174,7 +173,6 @@ def __init__(
batch_size=500,
discriminator_steps=1,
log_frequency=True,
verbose=False,
epochs=300,
pac=10,
device="cuda" if torch.cuda.is_available() else "cpu",
Expand All @@ -193,11 +191,9 @@ def __init__(
self._batch_size = batch_size
self._discriminator_steps = discriminator_steps
self._log_frequency = log_frequency
self._verbose = verbose
self._epochs = epochs
self.pac = pac

device = device
self._device = torch.device(device)

# Following components are initialized in `_pre_fit`
Expand All @@ -218,8 +214,9 @@ def _pre_fit(self, dataloader: DataLoader, discrete_columns: list[str] = None) -
self._validate_discrete_columns(dataloader.columns(), discrete_columns)
# Fit Transformer and DataSampler
self._transformer = DataTransformer()
logger.info("Fitting model's transformer...")
self._transformer.fit(dataloader, discrete_columns)

logger.info("Transforming data...")
self._ndarry_loader = self._transformer.transform(dataloader)

self._data_sampler = DataSampler(
Expand Down Expand Up @@ -267,8 +264,10 @@ def _fit(self, data_size: int):
mean = torch.zeros(self._batch_size, self._embedding_dim, device=self._device)
std = mean + 1

logger.info("Starting training, epochs: {}".format(epochs))
steps_per_epoch = max(data_size // self._batch_size, 1)
for i in range(epochs):
start_time = time.time()
for id_ in range(steps_per_epoch):
for n in range(self._discriminator_steps):
fakez = torch.normal(mean=mean, std=std)
Expand Down Expand Up @@ -345,11 +344,11 @@ def _fit(self, data_size: int):
loss_g.backward()
optimizerG.step()

if self._verbose:
logger.info(
f"Epoch {i+1}, Loss G: {loss_g.detach().cpu(): .4f}," # noqa: T001
f"Loss D: {loss_d.detach().cpu(): .4f}",
)
logger.debug(
f"Epoch {i+1}, Loss G: {loss_g.detach().cpu(): .4f}," # noqa: T001
f"Loss D: {loss_d.detach().cpu(): .4f},"
f"Time: {time.time() - start_time: .4f}",
)

def sample(self, count: int, *args, **kwargs) -> pd.DataFrame:
return self._sample(count, *args, **kwargs)
Expand Down
10 changes: 9 additions & 1 deletion sdgx/synthesizer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import time
from pathlib import Path
from typing import Any, Generator

Expand Down Expand Up @@ -85,7 +86,7 @@ def __init__(
raise SynthesizerInitError(
"model as instance and model_path cannot be specified at the same time"
)
if isinstance(model, str):
if isinstance(model, str) or isinstance(model, type):
self.model = self.model_manager.init_model(model, **(model_kwargs or {}))
elif isinstance(model, SynthesizerModel):
self.model = model
Expand Down Expand Up @@ -177,6 +178,8 @@ def fit(
inspector_init_kwargs=inspector_init_kwargs,
)
)

logger.info("Fitting data processors...")
for d in self.data_processors:
d.fit(metadata)

Expand All @@ -186,11 +189,15 @@ def chunk_generator() -> Generator[pd.DataFrame, None, None]:
chunk = d.convert(chunk)
yield chunk

logger.info("Initializing processed data loader...")
start_time = time.time()
processed_dataloader = DataLoader(
GeneratorConnector(chunk_generator),
**self.processored_data_loaders_kwargs,
)
logger.info(f"Initialized processed data loader in {time.time() - start_time}s")
try:
logger.info("Starting model fit...")
self.model.fit(metadata, processed_dataloader, **(model_fit_kwargs or {}))
finally:
processed_dataloader.finalize(clear_cache=True)
Expand All @@ -202,6 +209,7 @@ def sample(
metadata: None | Metadata = None,
model_fit_kwargs: None | dict[str, Any] = None,
) -> pd.DataFrame | Generator[pd.DataFrame, None, None]:
logger.info("Sampling...")
metadata = metadata or self.metadata
if metadata:
for d in self.data_processors:
Expand Down