Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Debug pipeline #1566

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/deepsparse/operators/engine_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from deepsparse.benchmark import ORTEngine
from deepsparse.engine import Context as EngineContext
from deepsparse.engine import Engine, MultiModelEngine, Scheduler
from deepsparse.engine import DebugAnalysisEngine, KVCacheParams, Engine, MultiModelEngine, Scheduler
from deepsparse.operators import Operator
from deepsparse.utils import join_engine_outputs, model_to_path, split_engine_inputs

Expand Down Expand Up @@ -169,7 +169,14 @@ def create_engine(
**engine_args,
)
engine_args.pop("cache_output_bools", None)
return Engine(onnx_file_path, **engine_args)
engine_args.pop("num_streams", None)
cached_outputs = engine_args.pop("cached_outputs", None)
if not cached_outputs:
raise ValueError
engine_args["kv_cache_params"] = KVCacheParams(cached_outputs, 0, 0)
engine_args["num_warmup_iterations"] = 0
engine_args["num_iterations"] = 1
return DebugAnalysisEngine(onnx_file_path, **engine_args)

if engine_type == ORT_ENGINE:
return ORTEngine(onnx_file_path, **engine_args)
Expand Down
5 changes: 3 additions & 2 deletions src/deepsparse/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,9 @@ def create(cls, task: str, **kwargs) -> "Pipeline":
"Pipeline was not created for the given task. The "
"provided task should be registered using the OperatorRegistry"
)
except Exception:
_LOGGER.warning(f"Could not create v2 '{task}' pipeline, trying legacy")
except Exception as e:
_LOGGER.warning(f"Could not create v2 '{task}' pipeline, with error: {e}")
_LOGGER.warning(f"Attempting to create the legacy pipeline")
from deepsparse.legacy import Pipeline

pipeline = Pipeline.create(task=task, **kwargs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ def can_operate(self, inp: Any) -> bool:
if inp.get("in_generation"):
return True

if kv_cache.total_num_processed_tokens >= kv_cache.capacity:
raise RuntimeError(
"Not enough kv_cache capacity to run generation. Please use a larger "
"sequence_length or a shorter prompt"
)

remaining_tokens = len(tokens) - kv_cache.total_num_processed_tokens
can_process = (
remaining_tokens > 0 and remaining_tokens < self.prompt_sequence_length
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from pydantic import BaseModel, Field

from deepsparse.operators import Operator
from deepsparse.transformers.schemas.text_generation_schemas import FinishReason
from deepsparse.utils import InferenceState


Expand All @@ -43,9 +42,6 @@ def run(self, inference_state: InferenceState, **kwargs):
generated_logits = inference_state.current_state.get("generated_logits")
finished_reason = inference_state.current_state.get("finished_reason")

if len(finished_reason) == 0:
finished_reason.append(FinishReason.LENGTH)

generated_tokens = numpy.array([generated_tokens])
generated_logits = numpy.concatenate(generated_logits, axis=1)
return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@
from deepsparse.transformers.pipelines.text_generation.nl_engine_operator import (
NLEngineOutputs,
)
from deepsparse.transformers.schemas.text_generation_schemas import (
FinishReason,
PromptLogitsNoKVCacheInference,
)
from deepsparse.transformers.schemas.text_generation_schemas import FinishReason
from deepsparse.utils import InferenceState


Expand All @@ -36,14 +33,16 @@ def __init__(
self.force_max_tokens = force_max_tokens
self.tokenizer = tokenizer

def can_operate(self, inp: Union[PromptLogitsNoKVCacheInference, NLEngineOutputs]):
def can_operate(
self, inp: Union[NLEngineOutputs, "PrepareForGenerationOutput"] # noqa: F821
):
if inp.in_generation:
return True
return False

def run(
self,
inp: Union[PromptLogitsNoKVCacheInference, NLEngineOutputs],
inp: Union[NLEngineOutputs, "PrepareForGenerationOutput"], # noqa: F821
inference_state: InferenceState,
**kwargs,
):
Expand All @@ -52,21 +51,26 @@ def run(
if isinstance(inp, NLEngineOutputs)
else inp.prompt_logits
)
kv_cache = inp.kv_cache if isinstance(inp, NLEngineOutputs) else None
kv_cache = inp.kv_cache

max_tokens = inference_state.current_state.get("max_tokens")
length_finish_reason = inference_state.current_state.get("length_finish_reason")
generated_tokens = inference_state.current_state.get("generated_tokens")
num_generated_tokens = len(generated_tokens)

token_generator = inference_state.current_state.get("token_generator")
token = token_generator.generate(logits=logits[0, -1, :])
finish_reason = None

callback = inference_state.current_state.get("callback")
stop = inference_state.current_state.get("stop")

if (
kv_cache is not None
and kv_cache.total_num_processed_tokens >= kv_cache.capacity
):
finish_reason = FinishReason.CAPACITY

callback = inference_state.current_state.get("callback")
stop = inference_state.current_state.get("stop")

if token == self.tokenizer.eos_token_id and not self.force_max_tokens:
finish_reason = FinishReason.STOP

Expand All @@ -84,9 +88,11 @@ def run(
)
finish_reason = FinishReason.CALLBACK

max_tokens = inference_state.current_state.get("max_tokens")
if len(inference_state.current_state.get("generated_tokens")) + 1 >= max_tokens:
finish_reason = inference_state.current_state.get("length_finish_reason")
# Note: this is +1 as the inference state variable keeping track of all the
# generated tokens has not yet been updated with the most recently generated
# token from this operator
if num_generated_tokens + 1 == max_tokens:
finish_reason = length_finish_reason

state_update = {
"token_generator": token_generator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ def can_operate(self, inp: Any):
kv_cache = inp.get("kv_cache")
tokens = inp.get("tokens")

if kv_cache.total_num_processed_tokens >= kv_cache.capacity:
raise RuntimeError(
"Not enough kv_cache capacity to run generation. Please use a larger "
"sequence_length or a shorter prompt"
)

if len(tokens) < self.prompt_sequence_length:
return False

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
overwrite_onnx_model_inputs_for_kv_cache_models,
)


__all__ = ["NLEngineOperator", "NLEngineInputs", "NLEngineOutputs"]


Expand Down Expand Up @@ -105,7 +104,6 @@ def split(self) -> List["NLEngineOutputs"]:


class NLEngineOperator(EngineOperator):

"""
Operator for the NL Decoder Engine. This Operator inherits from the EngineOperator.
Specific updates to engine attributes are made through this operator, as well
Expand All @@ -117,22 +115,23 @@ class NLEngineOperator(EngineOperator):
output_schema = NLEngineOutputs

def __init__(
self,
sequence_length: int,
input_ids_length: int,
internal_kv_cache: bool = False,
**kwargs,
self,
sequence_length: int,
input_ids_length: int,
internal_kv_cache: bool = False,
**kwargs,
):

self.sequence_length = sequence_length
self.input_ids_length = input_ids_length
self.internal_kv_cache = internal_kv_cache
self.kv_cache_data_type = None
self.inference_index = 0

super().__init__(**kwargs)

def create_engine(
self, batch_size: Optional[int] = None, engine_kwargs: Optional[dict] = None
self, batch_size: Optional[int] = None, engine_kwargs: Optional[dict] = None
):

batch_size = batch_size if batch_size is not None else self._batch_size
Expand Down Expand Up @@ -164,10 +163,10 @@ def create_engine(
return super().create_engine(**kwargs, **engine_kwargs)

def override_model_inputs(
self,
model_path: Union[str, Path],
batch_size: int,
return_additional_outputs=False,
self,
model_path: Union[str, Path],
batch_size: int,
return_additional_outputs=False,
):
"""
Override the model based on the provided batch_size, sequence_length,
Expand Down Expand Up @@ -219,10 +218,30 @@ def run(self, inp: NLEngineInputs, **kwargs) -> NLEngineOutputs:
# we skip the validation

internal_kv_cache = [x.engine_internal_cache for x in kv_cache]
# if inp.engine:
# out = inp.engine._eng_net.execute_list_out(inputs, internal_kv_cache)
# else:
# out = self.engine._eng_net.execute_list_out(inputs, internal_kv_cache)
if inp.engine:
out = inp.engine._eng_net.execute_list_out(inputs, internal_kv_cache)
out, bench_info = inp.engine._eng_net.benchmark_execute(
inputs, internal_kv_cache
)
else:
out = self.engine._eng_net.execute_list_out(inputs, internal_kv_cache)
out, bench_info = self.engine._eng_net.benchmark_execute(
inputs, internal_kv_cache
)

inferenece_type = 'prefill' if self.prefill else 'decode'
filename = f"analysis-{inferenece_type}-{self.inference_index}.pickle"
if 'WAND_BENCH_ANALYSIS_DIR' in os.environ:
filename = os.path.join(os.environ['WAND_BENCH_ANALYSIS_DIR'], filename)

print(f"Saving text generation inference analysis to {filename}")
import pickle
with open(filename, 'wb') as f:
pickle.dump(bench_info, f, pickle.HIGHEST_PROTOCOL)
self.inference_index += 1
out = [v for v in out.values()]

else:
# run the engine without the LIB.kv_cache object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ def __init__(
**engine_kwargs,
)

single_engine_operator.prefill = False
multi_engine_operator.prefill = True

# NOTE: Currently using pipeline state. Can swap to simply pass in the
# attributes to the specific Operator that need them, as class attributes.
pipeline_state_vals[
Expand Down Expand Up @@ -239,7 +242,6 @@ def __init__(
sequence_length=sequence_length,
prompt_sequence_length=prompt_sequence_length,
token_generator=token_generator,
process_output_operator=process_output,
)

# TODO: do we want to support lists for different engines?
Expand Down Expand Up @@ -286,7 +288,7 @@ def __init__(
"compile_logits",
"generate_new_token",
],
"prep_for_generation": "autoregressive_preprocess",
"prep_for_generation": "generate_new_token",
"generate_new_token": "compile_generated_tokens",
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from deepsparse.routers import GraphRouter
from deepsparse.schedulers import OperatorScheduler
from deepsparse.transformers.pipelines.text_generation import (
CompileGeneratedTokens,
CompileGenerations,
GenerateNewTokenOperator,
JoinOutput,
Expand Down Expand Up @@ -73,6 +74,7 @@ def __init__(
tokenizer=self.tokenizer, force_max_tokens=True
)
compile_generations = CompileGenerations()
compile_generated_tokens = CompileGeneratedTokens()
join_output = JoinOutput(tokenizer=self.tokenizer)
process_outputs = ProcessOutputs(tokenizer=self.tokenizer)

Expand All @@ -82,6 +84,7 @@ def __init__(
"engine_operator": engine_operator,
"prepare_generation": prepare_generation,
"generate_new_token": generate_new_token,
"compile_generated_tokens": compile_generated_tokens,
"compile_generations": compile_generations,
"join_output": join_output,
"process_outputs": process_outputs,
Expand All @@ -92,7 +95,8 @@ def __init__(
"SPLIT": "engine_operator",
"engine_operator": "prepare_generation",
"prepare_generation": "generate_new_token",
"generate_new_token": "compile_generations",
"generate_new_token": "compile_generated_tokens",
"compile_generated_tokens": "compile_generations",
"compile_generations": "JOIN",
"JOIN": "join_output",
"join_output": "process_outputs",
Expand Down
Loading
Loading