-
Notifications
You must be signed in to change notification settings - Fork 140
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Export Refactor] Prepare the module to be more general (before including transformers
)
#1908
Changes from all commits
23e892f
2bb1f20
fb20ae2
eab9fb1
0897615
9c7fac5
05a9ee3
450b286
313ecf8
6829514
4ada09a
5840eb5
d7c2375
c308949
ad2a7f0
9fbead1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,20 +13,21 @@ | |
# limitations under the License. | ||
|
||
import logging | ||
import os | ||
from pathlib import Path | ||
from typing import Any, List, Optional, Union | ||
|
||
from sparseml.export.export_data import export_data_samples | ||
from sparseml.export.helpers import ( | ||
AVAILABLE_DEPLOYMENT_TARGETS, | ||
ONNX_MODEL_NAME, | ||
apply_optimizations, | ||
create_deployment_folder, | ||
create_export_kwargs, | ||
) | ||
from sparseml.export.validators import validate_correctness as validate_correctness_ | ||
from sparseml.export.validators import validate_structure as validate_structure_ | ||
from sparseml.pytorch.opset import TORCH_DEFAULT_ONNX_OPSET | ||
from sparseml.pytorch.utils.helpers import default_device | ||
from sparseml.pytorch.utils.helpers import default_device, use_single_gpu | ||
from src.sparseml.integration_helper_functions import ( | ||
IntegrationHelperFunctions, | ||
resolve_integration, | ||
|
@@ -44,14 +45,15 @@ def export( | |
opset: int = TORCH_DEFAULT_ONNX_OPSET, | ||
single_graph_file: bool = True, | ||
num_export_samples: int = 0, | ||
batch_size: int = 1, | ||
deployment_directory_name: str = "deployment", | ||
device: str = "auto", | ||
graph_optimizations: Union[str, List[str], None] = "all", | ||
validate_correctness: bool = False, | ||
validate_structure: bool = True, | ||
integration: Optional[str] = None, | ||
sample_data: Optional[Any] = None, | ||
batch_size: Optional[int] = None, | ||
task: Optional[str] = None, | ||
**kwargs, | ||
): | ||
""" | ||
|
@@ -84,6 +86,8 @@ def export( | |
file. Defaults to True. | ||
:param num_export_samples: The number of samples to create for | ||
the exported model. Defaults to 0. | ||
:param batch_size: The batch size to use for exporting the data. | ||
Defaults to None. | ||
:param deployment_directory_name: The name of the deployment | ||
directory to create for the exported model. Thus, the exported | ||
model will be saved to `target_path/deployment_directory_name`. | ||
|
@@ -102,7 +106,7 @@ def export( | |
:param sample_data: Optional sample data to use for exporting | ||
the model. If not provided, a dummy input will be created | ||
for the model. Defaults to None. | ||
:param batch_size: The batch size to use for exporting the data. | ||
:param task: Optional task to use for exporting the model. | ||
Defaults to None. | ||
""" | ||
|
||
|
@@ -112,6 +116,7 @@ def export( | |
|
||
# choose the appropriate device | ||
device = default_device() if device == "auto" else device | ||
device = use_single_gpu(device) if "cuda" in device else device | ||
|
||
# assert the valid deployment target | ||
if deployment_target not in AVAILABLE_DEPLOYMENT_TARGETS: | ||
|
@@ -126,69 +131,55 @@ def export( | |
_LOGGER.info(f"Starting export for {integration} model...") | ||
|
||
helper_functions: IntegrationHelperFunctions = ( | ||
IntegrationHelperFunctions.load_from_registry(integration) | ||
IntegrationHelperFunctions.load_from_registry(integration, task=task) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moved the use of the optional |
||
) | ||
|
||
_LOGGER.info("Creating model for the export...") | ||
model, validation_dataloader = helper_functions.create_model( | ||
source_path, batch_size, device, **kwargs | ||
|
||
# loaded_model_kwargs may include any objects | ||
# that were created along with the model and are needed | ||
# for the export | ||
model, loaded_model_kwargs = helper_functions.create_model( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
source_path, device=device, task=task, batch_size=batch_size, **kwargs | ||
) | ||
|
||
if validation_dataloader: | ||
_LOGGER.info("Created validation dataloader for the export") | ||
else: | ||
_LOGGER.warning( | ||
"Failed to create validation dataloader for the export. " | ||
"Will be using the dummy (or user-provided) data instead " | ||
"and will be not able to export samples or validate the model " | ||
"correctness." | ||
if loaded_model_kwargs: | ||
_LOGGER.info( | ||
"Created additional items that will " | ||
f"be used for the export: {list(loaded_model_kwargs.keys())}" | ||
) | ||
|
||
sample_data = ( | ||
helper_functions.create_dummy_input( | ||
validation_dataloader=validation_dataloader, **kwargs | ||
) | ||
helper_functions.create_dummy_input(**loaded_model_kwargs, **kwargs) | ||
if sample_data is None | ||
else sample_data | ||
) | ||
|
||
_LOGGER.info(f"Exporting {onnx_model_name} to {target_path}...") | ||
|
||
export_kwargs = create_export_kwargs(loaded_model_kwargs) | ||
|
||
onnx_file_path = helper_functions.export( | ||
dbogunowicz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
model=model, | ||
sample_data=sample_data, | ||
target_path=target_path, | ||
onnx_model_name=onnx_model_name, | ||
deployment_target=deployment_target, | ||
opset=opset, | ||
**export_kwargs, | ||
) | ||
_LOGGER.info(f"Successfully exported {onnx_model_name} to {target_path}...") | ||
|
||
_LOGGER.info( | ||
f"Applying optimizations: {graph_optimizations} to the exported model..." | ||
) | ||
apply_optimizations( | ||
onnx_file_path=onnx_file_path, | ||
target_optimizations=graph_optimizations, | ||
available_optimizations=helper_functions.graph_optimizations, | ||
single_graph_file=single_graph_file, | ||
) | ||
_LOGGER.info(f"Successfully exported {onnx_model_name} to {onnx_file_path}...") | ||
|
||
if num_export_samples: | ||
_LOGGER.info(f"Exporting {num_export_samples} samples...") | ||
if not validation_dataloader: | ||
raise ValueError( | ||
"To export sample inputs/outputs a data loader is needed. " | ||
"To return a data loader provide the appropriate, integration-specific " | ||
"arguments to `create_model` function" | ||
) | ||
( | ||
input_samples, | ||
output_samples, | ||
label_samples, | ||
) = helper_functions.create_data_samples( | ||
num_samples=num_export_samples, | ||
data_loader=validation_dataloader, | ||
model=model, | ||
**loaded_model_kwargs, | ||
) | ||
export_data_samples( | ||
input_samples=input_samples, | ||
|
@@ -207,16 +198,29 @@ def export( | |
source_path=source_path, | ||
target_path=target_path, | ||
deployment_directory_name=deployment_directory_name, | ||
deployment_directory_files=helper_functions.deployment_directory_structure, | ||
deployment_directory_files_mandatory=helper_functions.deployment_directory_files_mandatory, # noqa: E501 | ||
deployment_directory_files_optional=helper_functions.deployment_directory_files_optional, # noqa: E501 | ||
onnx_model_name=onnx_model_name, | ||
) | ||
|
||
_LOGGER.info( | ||
f"Applying optimizations: {graph_optimizations} to the exported model..." | ||
) | ||
if helper_functions.apply_optimizations is not None: | ||
helper_functions.apply_optimizations( | ||
exported_file_path=os.path.join(deployment_path, onnx_model_name), | ||
optimizations=graph_optimizations, | ||
single_graph_file=single_graph_file, | ||
) | ||
|
||
if validate_structure: | ||
_LOGGER.info("Validating model structure...") | ||
validate_structure_( | ||
target_path=target_path, | ||
deployment_directory_name=deployment_directory_name, | ||
onnx_model_name=onnx_model_name, | ||
deployment_directory_files=helper_functions.deployment_directory_structure, | ||
deployment_directory_files_mandatory=helper_functions.deployment_directory_files_mandatory, # noqa: E501 | ||
deployment_directory_files_optional=helper_functions.deployment_directory_files_optional, # noqa: E501 | ||
) | ||
|
||
if validate_correctness: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,7 +18,7 @@ | |
import tarfile | ||
from enum import Enum | ||
from pathlib import Path | ||
from typing import List, Optional, Tuple, Union | ||
from typing import Any, Dict, List, Optional, Tuple, Union | ||
|
||
import torch | ||
from tqdm import tqdm | ||
|
@@ -46,47 +46,11 @@ class InputsNames(Enum): | |
filename = "inp" | ||
|
||
|
||
def create_data_samples( | ||
data_loader: torch.utils.data.DataLoader, | ||
model: Optional[torch.nn.Module] = None, | ||
num_samples: int = 1, | ||
) -> Tuple[List[torch.Tensor], List[torch.Tensor], List[torch.Tensor]]: | ||
""" | ||
Fetch a batch of samples from the data loader and return the inputs and outputs | ||
|
||
:param data_loader: The data loader to get a batch of inputs/outputs from. | ||
:param model: The model to run the inputs through to get the outputs. | ||
If None, the outputs will be an empty list. | ||
:param num_samples: The number of samples to generate. Defaults to 1 | ||
:return: The inputs and outputs as lists of torch tensors | ||
""" | ||
inputs, outputs, labels = [], [], [] | ||
if model is None: | ||
_LOGGER.warning("The model is None. The list of outputs will be empty") | ||
for batch_num, (inputs_, labels_) in tqdm(enumerate(data_loader)): | ||
if batch_num == num_samples: | ||
break | ||
if model: | ||
outputs_ = model(inputs_) | ||
if isinstance(outputs_, tuple): | ||
# outputs_ contains (logits, softmax) | ||
outputs_ = outputs_[0] | ||
outputs.append(outputs_) | ||
inputs.append(inputs_) | ||
labels.append( | ||
torch.IntTensor([labels_]) | ||
if not isinstance(labels_, torch.Tensor) | ||
else labels_ | ||
) | ||
|
||
return inputs, outputs, labels | ||
|
||
|
||
def export_data_samples( | ||
target_path: Union[Path, str], | ||
input_samples: Optional[List["torch.Tensor"]] = None, # noqa F821 | ||
output_samples: Optional[List["torch.Tensor"]] = None, # noqa F821 | ||
label_samples: Optional[List["torch.Tensor"]] = None, # noqa F821 | ||
input_samples: Optional[List[Any]] = None, | ||
output_samples: Optional[List[Any]] = None, | ||
label_samples: Optional[List[Any]] = None, | ||
as_tar: bool = False, | ||
): | ||
""" | ||
|
@@ -116,6 +80,7 @@ def export_data_samples( | |
|
||
:param input_samples: The input samples to save. | ||
:param output_samples: The output samples to save. | ||
:param label_samples: The label samples to save. | ||
:param target_path: The path to save the samples to. | ||
:param as_tar: Whether to save the samples as tar files. | ||
""" | ||
|
@@ -124,16 +89,21 @@ def export_data_samples( | |
[input_samples, output_samples, label_samples], | ||
[InputsNames, OutputsNames, LabelNames], | ||
): | ||
if samples is not None: | ||
if len(samples) > 0: | ||
_LOGGER.info(f"Exporting {names.basename.value} to {target_path}...") | ||
export_data_sample(samples, names, target_path, as_tar) | ||
break_batch = isinstance(samples[0], dict) | ||
dbogunowicz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
export_data_sample(samples, names, target_path, as_tar, break_batch) | ||
_LOGGER.info( | ||
f"Successfully exported {names.basename.value} to {target_path}!" | ||
) | ||
|
||
|
||
def export_data_sample( | ||
samples, names: Enum, target_path: Union[Path, str], as_tar: bool = False | ||
samples, | ||
names: Enum, | ||
target_path: Union[Path, str], | ||
as_tar: bool = False, | ||
break_batch=False, | ||
): | ||
|
||
samples = tensors_to_device(samples, "cpu") | ||
|
@@ -142,9 +112,105 @@ def export_data_sample( | |
tensors=samples, | ||
export_dir=os.path.join(target_path, names.basename.value), | ||
name_prefix=names.filename.value, | ||
break_batch=break_batch, | ||
) | ||
if as_tar: | ||
folder_path = os.path.join(target_path, names.basename.value) | ||
with tarfile.open(folder_path + ".tar.gz", "w:gz") as tar: | ||
tar.add(folder_path, arcname=os.path.basename(folder_path)) | ||
shutil.rmtree(folder_path) | ||
|
||
|
||
def create_data_samples( | ||
data_loader: torch.utils.data.DataLoader, | ||
model: Optional[torch.nn.Module] = None, | ||
num_samples: int = 1, | ||
) -> Tuple[List[Any], List[Any], List[Any]]: | ||
""" | ||
Fetch a batch of samples from the data loader and return the inputs and outputs | ||
|
||
:param data_loader: The data loader to get a batch of inputs/outputs from. | ||
:param model: The model to run the inputs through to get the outputs. | ||
If None, the outputs will be an empty list. | ||
:param num_samples: The number of samples to generate. Defaults to 1 | ||
:return: The inputs and outputs as lists of torch tensors | ||
""" | ||
inputs, outputs, labels = [], [], [] | ||
if model is None: | ||
_LOGGER.warning("The model is None. The list of outputs will be empty") | ||
|
||
for batch_num, data in tqdm(enumerate(data_loader)): | ||
if batch_num == num_samples: | ||
break | ||
if isinstance(data, dict): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as requested before, we can run inference on the transformer data but now without the mandatory export from |
||
inputs_, labels_, outputs_ = run_inference_with_dict_data( | ||
data=data, model=model | ||
) | ||
elif isinstance(data, (list, tuple)): | ||
inputs_, labels_, outputs_ = run_inference_with_tuple_or_list_data( | ||
data=data, model=model | ||
) | ||
else: | ||
raise ValueError( | ||
f"Data type {type(data)} is not supported. " | ||
f"Only dict and tuple are supported" | ||
) | ||
|
||
inputs.append(inputs_) | ||
if outputs_ is not None: | ||
outputs.append(outputs_) | ||
if labels_ is not None: | ||
labels.append( | ||
torch.IntTensor([labels_]) | ||
if not isinstance(labels_, torch.Tensor) | ||
else labels_ | ||
) | ||
|
||
return inputs, outputs, labels | ||
|
||
|
||
def run_inference_with_dict_data( | ||
data: Dict[str, Any], model: Optional[torch.nn.Module] = None | ||
) -> Tuple[Dict[str, Any], Any, Optional[Dict[str, Any]]]: | ||
""" | ||
Run inference on a model by inferring the appropriate | ||
inputs from the dictionary input data. | ||
|
||
|
||
:param data: The data to run inference on | ||
:param model: The model to run inference on (optional) | ||
:return: The inputs, labels and outputs | ||
""" | ||
labels = None | ||
if model is None: | ||
output = None | ||
|
||
else: | ||
inputs = {key: value.to(model.device) for key, value in data.items()} | ||
output_vals = model(**inputs) | ||
output = { | ||
name: torch.squeeze(val).detach().to("cpu") | ||
for name, val in output_vals.items() | ||
} | ||
inputs = {key: value.to("cpu") for key, value in data.items()} | ||
return inputs, labels, output | ||
|
||
|
||
def run_inference_with_tuple_or_list_data( | ||
data: Tuple[Any, Any], model: Optional[torch.nn.Module] = None | ||
) -> Tuple[torch.Tensor, Any, Optional[torch.Tensor]]: | ||
""" | ||
Run inference on a model by inferring the appropriate | ||
inputs from the tuple input data. | ||
|
||
:param inputs: The data to run inference on | ||
:param model: The model to run inference on (optional) | ||
:return: The inputs, labels and outputs | ||
""" | ||
# assume that | ||
inputs, labels = data | ||
outputs = model(inputs) if model else None | ||
if isinstance(outputs, tuple): | ||
# outputs_ contains (logits, softmax) | ||
outputs = outputs[0] | ||
return inputs, labels, outputs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We want to be in full control over the placement of the model on the device.
User can either specify the
device:str
ourselves (tocpu
,cuda
,cuda:0
) or keep the defaultauto
configuration. Ifauto
, we usedefault_device()
helper function that returns a proper device string.Question 1: shouldn't we default to
cpu
here?Question 2: if requested, should we allow the user to place the model over multiple CUDA devices?
Added the
use_single_gpu
functionThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for general initiailizaitaon -> model to multiple gpus modelparallel