Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Pipeline Refactor][server][OpenAI] Enable OpenAI to use new text gen pipeline #1477

Merged
merged 11 commits into from
Dec 14, 2023
1 change: 1 addition & 0 deletions src/deepsparse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
cpu_vnni_compatible,
)
from .engine import *
from .pipeline_config import *
from .tasks import *
from .pipeline import *
from .loggers import *
Expand Down
68 changes: 3 additions & 65 deletions src/deepsparse/legacy/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from typing import Any, Dict, Generator, List, Optional, Tuple, Type, Union

import numpy
from pydantic import BaseModel, Field
from pydantic import BaseModel

from deepsparse import Context, Engine, MultiModelEngine, Scheduler
from deepsparse.benchmark import ORTEngine, TorchScriptEngine
Expand All @@ -36,6 +36,7 @@
)
from deepsparse.loggers.base_logger import BaseLogger
from deepsparse.loggers.constants import MetricCategories, SystemGroups
from deepsparse.pipeline_config import PipelineConfig
from deepsparse.utils import (
InferenceStages,
StagedTimer,
Expand All @@ -54,7 +55,6 @@
"BasePipeline",
"SupportedTasks",
"_REGISTERED_PIPELINES",
"PipelineConfig",
"question_answering_pipeline",
"text_classification_pipeline",
"zero_shot_text_classification_pipeline",
Expand Down Expand Up @@ -306,6 +306,7 @@ def __call__(self, *args, **kwargs) -> BaseModel:

return pipeline_outputs

# TODO: remove to just use the new pipeline function
@classmethod
def from_config(
cls,
Expand Down Expand Up @@ -562,69 +563,6 @@ def __str__(self):
)


class PipelineConfig(BaseModel):
"""
Configuration for creating a Pipeline object

Can be used to create a Pipeline from a config object or file with
Pipeline.from_config(), or used as a building block for other configs
such as for deepsparse.server
"""

task: str = Field(
description="name of task to create a pipeline for",
)
model_path: str = Field(
default=None,
description="path on local system or SparseZoo stub to load the model from",
)
engine_type: str = Field(
default=DEEPSPARSE_ENGINE,
description=(
"inference engine to use. Currently supported values include "
"'deepsparse' and 'onnxruntime'. Default is 'deepsparse'"
),
)
batch_size: Optional[int] = Field(
default=1,
description=("static batch size to use for inference. Default is 1"),
)
num_cores: int = Field(
default=None,
description=(
"number of CPU cores to allocate for inference engine. None"
"specifies all available cores. Default is None"
),
)
scheduler: Optional[str] = Field(
default="async",
description=(
"(deepsparse only) kind of scheduler to execute with. Defaults to async"
),
)
input_shapes: List[List[int]] = Field(
default=None,
description=(
"list of shapes to set ONNX the inputs to. Pass None to use model as-is. "
"Default is None"
),
)
alias: str = Field(
default=None,
description=(
"optional name to give this pipeline instance, useful when inferencing "
"with multiple models. Default is None"
),
)
kwargs: Dict[str, Any] = Field(
default={},
description=(
"Additional arguments for inference with the model that will be passed "
"into the pipeline as kwargs"
),
)


class BucketingPipeline(object):
"""
A Proxy class that adds Bucketing functionality to Pipelines
Expand Down
22 changes: 10 additions & 12 deletions src/deepsparse/operators/engine_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

from pydantic import BaseModel, Field

from deepsparse import Context as EngineContext
from deepsparse import Engine, MultiModelEngine, Scheduler
from deepsparse.benchmark import ORTEngine
from deepsparse.engine import Context as EngineContext
from deepsparse.engine import Engine, MultiModelEngine, Scheduler
from deepsparse.operators import Operator
from deepsparse.utils import join_engine_outputs, model_to_path, split_engine_inputs

Expand Down Expand Up @@ -100,18 +100,18 @@ def __init__(
num_streams: int = None,
scheduler: Scheduler = None,
input_shapes: List[List[int]] = None,
engine_context: Optional[EngineContext] = None,
context: Optional[EngineContext] = None,
engine_kwargs: Dict = None,
):
self.model_path = model_to_path(model_path)
self.engine_context = engine_context
self.context = context
self._batch_size = batch_size

if self.engine_context is not None:
num_cores = num_cores or self.engine_context.num_cores
if self.engine_context.num_cores != num_cores:
if self.context is not None:
num_cores = num_cores or self.context.num_cores
if self.context.num_cores != num_cores:
raise ValueError(
f"num_cores mismatch. Expected {self.engine_context.num_cores} "
f"num_cores mismatch. Expected {self.context.num_cores} "
f"from passed context, but got {num_cores} while "
f"instantiating Pipeline"
)
Expand Down Expand Up @@ -159,13 +159,11 @@ def create_engine(
engine_type = self._engine_type.lower()

if engine_type == DEEPSPARSE_ENGINE:
if self.engine_context is not None and isinstance(
self.engine_context, EngineContext
):
if self.context is not None and isinstance(self.context, EngineContext):
engine_args.pop("num_cores", None)
engine_args.pop("scheduler", None)
engine_args.pop("num_streams", None)
engine_args["context"] = self.engine_context
engine_args["context"] = self.context
return MultiModelEngine(
model=onnx_file_path,
**engine_args,
Expand Down
56 changes: 50 additions & 6 deletions src/deepsparse/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import copy
import logging
import os
from pathlib import Path
from typing import Any, Dict, List, Optional, Union

from deepsparse.operators import EngineOperator, Operator
from deepsparse.pipeline_config import PipelineConfig
from deepsparse.routers import Router
from deepsparse.schedulers import (
ContinuousBatchingScheduler,
Expand Down Expand Up @@ -44,6 +47,9 @@
"yolo_pipeline",
]

_LOGGER = logging.getLogger(__name__)
V2_NOT_SUPPORTED = ["alias", "logger", "executor"]


class Pipeline(Operator):
"""
Expand Down Expand Up @@ -218,12 +224,11 @@ async def run_async(self, *args, inference_state: InferenceState, **kwargs):
operator_output = outputs.result()

if isinstance(operator_output, tuple):
state_update = operator_output[-1]
operator_output = operator_output[0]
operator_output, state_update = operator_output[0], operator_output[-1]
inference_state.update_state(state_update)

next_step = self.router.next(next_step, self.ops, operator_output)
if state_update:
inference_state.update_state(state_update)

return operator_output

async def _apply_split(
Expand Down Expand Up @@ -259,8 +264,15 @@ def create(cls, task: str, **kwargs) -> "Pipeline":
:param kwargs: extra task specific kwargs to be passed to the Pipeline
:return: pipeline object initialized for the given task
"""
new_kwargs = {}
for k in kwargs:
if k in V2_NOT_SUPPORTED:
_LOGGER.warning(f"{k} is not yet supported in the v2 pipeline.")
else:
new_kwargs[k] = kwargs.get(k)

try:
pipeline = Operator.create(task=task, **kwargs)
pipeline = Operator.create(task=task, **new_kwargs)
if not isinstance(pipeline, cls):
raise RuntimeError(
"Pipeline was not created for the given task. The "
Expand All @@ -272,6 +284,38 @@ def create(cls, task: str, **kwargs) -> "Pipeline":
pipeline = Pipeline.create(task=task, **kwargs)
return pipeline

@classmethod
def from_config(
cls, config: Union["PipelineConfig", str, Path], **kwargs
) -> "Pipeline":
"""
:param config: PipelineConfig object, filepath to a json serialized
PipelineConfig, or raw string of a json serialized PipelineConfig.
Optionally, pipeline arguments not defined in the PipelineConfig may be
passed as key-word arguments to this function.
"""
if isinstance(config, Path) or (
isinstance(config, str) and os.path.exists(config)
):
if isinstance(config, str):
config = Path(config)
config = PipelineConfig.parse_file(config)
if isinstance(config, str):
config = PipelineConfig.parse_raw(config)

kwargs.update(config.kwargs)
return cls.create(
task=config.task,
model_path=config.model_path,
engine_type=config.engine_type,
batch_size=config.batch_size,
num_cores=config.num_cores,
scheduler=config.scheduler,
input_shapes=config.input_shapes,
alias=config.alias,
**kwargs,
)

def run(
self,
*args,
Expand Down
82 changes: 82 additions & 0 deletions src/deepsparse/pipeline_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Any, Dict, List, Optional

from pydantic import BaseModel, Field

from deepsparse.operators.engine_operator import DEEPSPARSE_ENGINE


class PipelineConfig(BaseModel):
"""
Configuration for creating a Pipeline object

Can be used to create a Pipeline from a config object or file with
Pipeline.from_config(), or used as a building block for other configs
such as for deepsparse.server
"""

task: str = Field(
description="name of task to create a pipeline for",
)
model_path: str = Field(
default=None,
description="path on local system or SparseZoo stub to load the model from",
)
engine_type: Optional[str] = Field(
default=DEEPSPARSE_ENGINE,
description=(
"inference engine to use. Currently supported values include "
"'deepsparse' and 'onnxruntime'. Default is 'deepsparse'"
),
)
batch_size: Optional[int] = Field(
default=1,
description=("static batch size to use for inference. Default is 1"),
)
num_cores: Optional[int] = Field(
default=None,
description=(
"number of CPU cores to allocate for inference engine. None"
"specifies all available cores. Default is None"
),
)
scheduler: Optional[str] = Field(
default="async",
description=(
"(deepsparse only) kind of scheduler to execute with. Defaults to async"
),
)
input_shapes: Optional[List[List[int]]] = Field(
default=None,
description=(
"list of shapes to set ONNX the inputs to. Pass None to use model as-is. "
"Default is None"
),
)
alias: Optional[str] = Field(
default=None,
description=(
"optional name to give this pipeline instance, useful when inferencing "
"with multiple models. Default is None"
),
)
kwargs: Optional[Dict[str, Any]] = Field(
default={},
description=(
"Additional arguments for inference with the model that will be passed "
"into the pipeline as kwargs"
),
)
3 changes: 2 additions & 1 deletion src/deepsparse/server/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@

from pydantic import BaseModel, Field, validator

from deepsparse.legacy.pipeline import DEEPSPARSE_ENGINE, PipelineConfig
from deepsparse.legacy.tasks import SupportedTasks
from deepsparse.loggers.config import (
MetricFunctionConfig,
PipelineSystemLoggingConfig,
SystemLoggingConfig,
SystemLoggingGroup,
)
from deepsparse.operators.engine_operator import DEEPSPARSE_ENGINE
from deepsparse.pipeline_config import PipelineConfig


__all__ = [
Expand Down
9 changes: 6 additions & 3 deletions src/deepsparse/server/deepsparse_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import logging
from functools import partial

from deepsparse.legacy import Pipeline
from deepsparse import Pipeline
from deepsparse.server.config import EndpointConfig
from deepsparse.server.server import CheckReady, ModelMetaData, ProxyPipeline, Server
from fastapi import FastAPI
Expand Down Expand Up @@ -76,7 +76,7 @@ def _add_endpoint(

_LOGGER.info(f"Initializing pipeline for '{endpoint_config.name}'")
pipeline = Pipeline.from_config(
pipeline_config, self.context, self.server_logger
pipeline_config, context=self.context, logger=self.server_logger
)

_LOGGER.info(f"Adding endpoints for '{endpoint_config.name}'")
Expand Down Expand Up @@ -147,7 +147,10 @@ def _add_inference_endpoints(
),
)
)
if hasattr(pipeline.input_schema, "from_files"):
# NOTE: the new pipeline does not yet support from_files
if hasattr(pipeline.input_schema, "from_files") and not hasattr(
pipeline, "run_async"
):
routes_and_fns.append(
(
route + "/from_files",
Expand Down
Loading