Skip to content

Commit

Permalink
Init benchmark base code (#80)
Browse files Browse the repository at this point in the history
  • Loading branch information
Wh1isper committed Dec 20, 2023
1 parent 788284a commit c498349
Show file tree
Hide file tree
Showing 15 changed files with 222 additions and 17 deletions.
3 changes: 3 additions & 0 deletions benchmarks/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
dataset/
.ndarry_cache
.sdgx_cache
54 changes: 54 additions & 0 deletions benchmarks/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# WIP

Please help us to improve our benchmark: https://github.com/hitsz-ids/synthetic-data-generator/issues/82

## Benchmarks

Benchmarks aim to measure the performance of the library.

- Performance: Processing time, Training time of model, Simpling rate...
- Memory Consumption
- Others, like cache hit rate...

Now we provide a simple benchmark for our CTGAN implementation against the original one. Fit them with a big ramdom dataset, and compare their memory consumptions.

### Setup

```bash
# Clone and install latest version
# You can also use our latest image: docker pull idsteam/sdgx:latest
git clone https://github.com/hitsz-ids/synthetic-data-generator.git
cd synthetic-data-generator && pip install -e ./
# Setup benchmark
cd benchmarks
pip install -r requirements.txt
```

Generate a dataset with `python generate_dataset.py`, you can use `python generate_dataset.py --help` to see the usage.

### Benchmark our implementation

We use [memory_profiler](https://github.com/pythonprofilers/memory_profiler) to benchmark our implementation.

```bash
mprof run python ./sdgx_ctgan.py
```

Plot the results with `mprof plot` or `mprof plot --output=sdgx_ctgan.png` to save the plot.

### Benchmark original implementation

```bash
pip install ctgan
mprof run python ./sdv_ctgan.py
```

Plot the results with `mprof plot` or `mprof plot --output=sdv_ctgan.png` to save the plot.

## Results

In default settings, our implementation can fit 1,000,000 x 50 size dataset in 32GB(usable nearly 20GB) memory mechine. And the original implementation need more than 20GB memory and crashed during training.

![succeed-memory-sdgx-ctgan](./img/succeed-memory-sdgx-ctgan.png)

![failed-memory-sdv_ctgan](./img/failed-memory-sdv_ctgan.png)
93 changes: 93 additions & 0 deletions benchmarks/generate_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import datetime
import itertools
import random
import string
from pathlib import Path

import click

_HERE = Path(__file__).parent


def random_string(length):
return "".join(random.choice(string.ascii_lowercase) for i in range(length))


def random_float():
return random.random() * 1000


def random_int():
return random.randint(0, 1000)


def random_timestamp():
current_timestamp = datetime.datetime.now().timestamp()
return current_timestamp + random_int()


def random_datetime():
return datetime.datetime.fromtimestamp(random_timestamp())


@click.option(
"--output_file",
default=(_HERE / "dataset/benchmark.csv").as_posix(),
)
@click.option("--num_rows", default=1_000_000)
@click.option("--int_cols", default=15)
@click.option("--float_cols", default=15)
@click.option("--string_cols", default=10)
@click.option("--string_discrete_nums", default=50)
@click.option("--timestamp_cols", default=10)
@click.option("--datetime_cols", default=0)
@click.command()
def generate_dateset(
output_file,
num_rows,
int_cols,
float_cols,
string_cols,
string_discrete_nums,
timestamp_cols,
datetime_cols,
):
headers = itertools.chain.from_iterable(
[
(f"int_col{i}" for i in range(int_cols)),
(f"float_col{i}" for i in range(float_cols)),
(f"string_col{i}" for i in range(string_cols)),
(f"timestamp_col{i}" for i in range(timestamp_cols)),
(f"datetime_col{i}" for i in range(datetime_cols)),
]
)
output_file = Path(output_file).expanduser().resolve()
output_file.parent.mkdir(parents=True, exist_ok=True)

random_str_list = [random_string(25) for i in range(string_discrete_nums)]

def _generate_one_line():
return ",".join(
map(
str,
itertools.chain(
(random_int() for _ in range(int_cols)),
(random_float() for _ in range(float_cols)),
(random.choice(random_str_list) for _ in range(string_cols)),
(random_timestamp() for _ in range(timestamp_cols)),
(random_datetime() for _ in range(datetime_cols)),
),
)
)

with output_file.open("w") as f:
f.write(",".join(headers) + "\n")

chunk_size = 1000
for i in range(0, num_rows, chunk_size):
f.write("\n".join(_generate_one_line() for _ in range(chunk_size)))
f.write("\n")


if __name__ == "__main__":
generate_dateset()
Binary file added benchmarks/img/failed-memory-sdv_ctgan.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added benchmarks/img/succeed-memory-sdgx-ctgan.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
3 changes: 3 additions & 0 deletions benchmarks/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
click
memory_profiler
psutil
23 changes: 23 additions & 0 deletions benchmarks/sdgx_ctgan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import os

os.environ["SDGX_LOG_LEVEL"] = "DEBUG"


from pathlib import Path

from sdgx.data_connectors.csv_connector import CsvConnector
from sdgx.models.ml.single_table.ctgan import CTGANSynthesizerModel
from sdgx.synthesizer import Synthesizer

_HERE = Path(__file__).parent

dataset_csv = (_HERE / "dataset/benchmark.csv").expanduser().resolve()
data_connector = CsvConnector(path=dataset_csv)
synthesizer = Synthesizer(
model=CTGANSynthesizerModel,
data_connector=data_connector,
model_kwargs={"epochs": 1, "device": "cpu"},
)
synthesizer.fit()
# sampled_data = synthesizer.sample(1000)
# synthesizer.cleanup() # Clean all cache
16 changes: 16 additions & 0 deletions benchmarks/sdv_ctgan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from pathlib import Path

import pandas as pd

_HERE = Path(__file__).parent

dataset_csv = (_HERE / "dataset/benchmark.csv").expanduser().resolve()
df = pd.read_csv(dataset_csv)

discrete_columns = [s for s in df.columns if s.startswith("string")]


from ctgan import CTGAN

ctgan = CTGAN(epochs=1, cuda=False)
ctgan.fit(df, discrete_columns)
4 changes: 4 additions & 0 deletions example/1_ctgan_example.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import os

os.environ["SDGX_LOG_LEVEL"] = "DEBUG"

from sdgx.data_connectors.csv_connector import CsvConnector
from sdgx.models.ml.single_table.ctgan import CTGANSynthesizerModel
from sdgx.synthesizer import Synthesizer
Expand Down
1 change: 0 additions & 1 deletion sdgx/cachers/disk_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ def _refresh(self, offset: int, data: pd.DataFrame) -> None:
else:
data.to_parquet(self._get_cache_filename(offset))

@lru_cache(maxsize=64)
def load(self, offset: int, chunksize: int, data_connector: DataConnector) -> pd.DataFrame:
"""
Load data from data_connector or cache
Expand Down
3 changes: 2 additions & 1 deletion sdgx/data_models/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from sdgx.data_loader import DataLoader
from sdgx.data_models.inspectors.manager import InspectorManager
from sdgx.exceptions import MetadataInitError
from sdgx.utils import cache
from sdgx.utils import logger

# TODO: Design metadata for relationships...
# class DType(Enum):
Expand Down Expand Up @@ -56,6 +56,7 @@ def from_dataloader(
exclude_inspectors: list[str] | None = None,
inspector_init_kwargs: dict[str, Any] | None = None,
) -> "Metadata":
logger.info("Inspecting metadata...")
inspectors = InspectorManager().init_inspcetors(
include_inspectors, exclude_inspectors, **(inspector_init_kwargs or {})
)
Expand Down
4 changes: 1 addition & 3 deletions sdgx/models/components/optimize/ndarray_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,8 @@ def load(self, index: int) -> ndarray:
return np.load(self._get_cache_filename(int(index)))

def cleanup(self):
for i in range(self.store_index):
self._get_cache_filename(i).unlink(missing_ok=True)
self.store_index = 0
shutil.rmtree(self.cache_dir, ignore_errors=True)
self.store_index = 0

def iter(self) -> Generator[ndarray, None, None]:
for i in range(self.store_index):
Expand Down
4 changes: 4 additions & 0 deletions sdgx/models/components/optimize/sdv_ctgan/data_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,18 @@ def fit(self, data_loader: DataLoader, discrete_columns=()):
self._column_transform_info_list = []
for column_name in data_loader.columns():
if column_name in discrete_columns:
logger.debug(f"Fitting discrete column {column_name}...")
column_transform_info = self._fit_discrete(data_loader[[column_name]])
else:
logger.debug(f"Fitting continuous column {column_name}...")
column_transform_info = self._fit_continuous(data_loader[[column_name]])

self.output_info_list.append(column_transform_info.output_info)
self.output_dimensions += column_transform_info.output_dimensions
self._column_transform_info_list.append(column_transform_info)

def _transform_continuous(self, column_transform_info, data):
logger.debug(f"Transforming continuous column {column_transform_info.column_name}...")
column_name = data.columns[0]
data[column_name] = data[column_name].to_numpy().flatten()
gm = column_transform_info.transform
Expand All @@ -130,6 +133,7 @@ def _transform_continuous(self, column_transform_info, data):
return output

def _transform_discrete(self, column_transform_info, data):
logger.debug(f"Transforming discrete column {column_transform_info.column_name}...")
ohe = column_transform_info.transform
return ohe.transform(data).to_numpy()

Expand Down
21 changes: 10 additions & 11 deletions sdgx/models/ml/single_table/ctgan.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import time
from pathlib import Path

import numpy as np
Expand Down Expand Up @@ -151,8 +152,6 @@ class CTGANSynthesizerModel(MLSynthesizerModel, SDVBaseSynthesizer):
log_frequency (boolean):
Whether to use log frequency of categorical levels in conditional
sampling. Defaults to ``True``.
verbose (boolean):
Whether to have print statements for progress results. Defaults to ``False``.
epochs (int):
Number of training epochs. Defaults to 300.
pac (int):
Expand All @@ -174,7 +173,6 @@ def __init__(
batch_size=500,
discriminator_steps=1,
log_frequency=True,
verbose=False,
epochs=300,
pac=10,
device="cuda" if torch.cuda.is_available() else "cpu",
Expand All @@ -193,11 +191,9 @@ def __init__(
self._batch_size = batch_size
self._discriminator_steps = discriminator_steps
self._log_frequency = log_frequency
self._verbose = verbose
self._epochs = epochs
self.pac = pac

device = device
self._device = torch.device(device)

# Following components are initialized in `_pre_fit`
Expand All @@ -218,8 +214,9 @@ def _pre_fit(self, dataloader: DataLoader, discrete_columns: list[str] = None) -
self._validate_discrete_columns(dataloader.columns(), discrete_columns)
# Fit Transformer and DataSampler
self._transformer = DataTransformer()
logger.info("Fitting model's transformer...")
self._transformer.fit(dataloader, discrete_columns)

logger.info("Transforming data...")
self._ndarry_loader = self._transformer.transform(dataloader)

self._data_sampler = DataSampler(
Expand Down Expand Up @@ -267,8 +264,10 @@ def _fit(self, data_size: int):
mean = torch.zeros(self._batch_size, self._embedding_dim, device=self._device)
std = mean + 1

logger.info("Starting training, epochs: {}".format(epochs))
steps_per_epoch = max(data_size // self._batch_size, 1)
for i in range(epochs):
start_time = time.time()
for id_ in range(steps_per_epoch):
for n in range(self._discriminator_steps):
fakez = torch.normal(mean=mean, std=std)
Expand Down Expand Up @@ -345,11 +344,11 @@ def _fit(self, data_size: int):
loss_g.backward()
optimizerG.step()

if self._verbose:
logger.info(
f"Epoch {i+1}, Loss G: {loss_g.detach().cpu(): .4f}," # noqa: T001
f"Loss D: {loss_d.detach().cpu(): .4f}",
)
logger.debug(
f"Epoch {i+1}, Loss G: {loss_g.detach().cpu(): .4f}," # noqa: T001
f"Loss D: {loss_d.detach().cpu(): .4f},"
f"Time: {time.time() - start_time: .4f}",
)

def sample(self, count: int, *args, **kwargs) -> pd.DataFrame:
return self._sample(count, *args, **kwargs)
Expand Down
10 changes: 9 additions & 1 deletion sdgx/synthesizer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import time
from pathlib import Path
from typing import Any, Generator

Expand Down Expand Up @@ -85,7 +86,7 @@ def __init__(
raise SynthesizerInitError(
"model as instance and model_path cannot be specified at the same time"
)
if isinstance(model, str):
if isinstance(model, str) or isinstance(model, type):
self.model = self.model_manager.init_model(model, **(model_kwargs or {}))
elif isinstance(model, SynthesizerModel):
self.model = model
Expand Down Expand Up @@ -177,6 +178,8 @@ def fit(
inspector_init_kwargs=inspector_init_kwargs,
)
)

logger.info("Fitting data processors...")
for d in self.data_processors:
d.fit(metadata)

Expand All @@ -186,11 +189,15 @@ def chunk_generator() -> Generator[pd.DataFrame, None, None]:
chunk = d.convert(chunk)
yield chunk

logger.info("Initializing processed data loader...")
start_time = time.time()
processed_dataloader = DataLoader(
GeneratorConnector(chunk_generator),
**self.processored_data_loaders_kwargs,
)
logger.info(f"Initialized processed data loader in {time.time() - start_time}s")
try:
logger.info("Starting model fit...")
self.model.fit(metadata, processed_dataloader, **(model_fit_kwargs or {}))
finally:
processed_dataloader.finalize(clear_cache=True)
Expand All @@ -202,6 +209,7 @@ def sample(
metadata: None | Metadata = None,
model_fit_kwargs: None | dict[str, Any] = None,
) -> pd.DataFrame | Generator[pd.DataFrame, None, None]:
logger.info("Sampling...")
metadata = metadata or self.metadata
if metadata:
for d in self.data_processors:
Expand Down

0 comments on commit c498349

Please sign in to comment.