Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ContinuousBatching] ContinuousBatchingScheduler Implementation #1375

Merged
merged 2 commits into from
Nov 13, 2023

Conversation

bfineran
Copy link
Member

@bfineran bfineran commented Nov 1, 2023

Uses the utils in #1373 and #1374 to implement a scheduler for continuous batching.
The ContinuousBatchingScheduler tracks various EngineOperators and manages their input queues with ContinuousBatchingQueues. The scheduler also runs multiple ContinuousBatchingExecutorThreads in parallel that consume these queues and actually run the multi-batch engine and return the correct futures from the scheduler submit.

next steps include:

  • integration with KV Cache engine mode for text gen
  • implementation of a singleton pattern so that the scheduler can easily be shared across pipelines
  • text gen pipeline integration
  • more involved heuristic for the select_fn to determine execution priority
  • server integration
  • heavier testing
  • README

test_plan:
simple single execution unit test included, further tests should test multiple engines/operators/batch sizes with sufficient load to trigger multibatch execution - note that unit tests for multibatch are handled with the helpers

@bfineran bfineran self-assigned this Nov 1, 2023
@bfineran bfineran changed the base branch from main to continuous-batching-executor November 1, 2023 21:50
@bfineran bfineran changed the title Continuous batching scheduler [ContinuousBatching] ContinuousBatchingScheduler Implementation Nov 1, 2023
"""
Manages EngineOperator jobs that should be run with continuous batching.
Groups requests for the same engine into larger batches and returns
the result to the respeictive request threads after scheduled completion
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
the result to the respeictive request threads after scheduled completion
the result to the respective request threads after scheduled completion

Copy link
Contributor

@dbogunowicz dbogunowicz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent. Can we maybe describe in PR description what Continuous Batching is, for clarity and posteriority?

@bfineran bfineran force-pushed the continuous-batching-scheduler branch 2 times, most recently from f0fb8d5 to 4b1472d Compare November 7, 2023 18:41
"""
return self._max_workers

def submit(self, *args, operator: Operator, **kwargs) -> Future:
Copy link
Contributor

@dsikka dsikka Nov 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all the code for the ContinuousBatchingExecutorThread is EngineOperator specific. Are we ever expecting a non-engine operator as the input to this submit function?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great observation - I think it's something we could support in the future, but not sure how much gain there would be. we could probably abstract it out, but it might get a little complicated since the engine operators are unique (and foreseeably unique) that they need to have an engine compiled against the target batch size and this must be swapped in at runtime

# simple test that ContinuousBatchingScheduler can be instantiated and return
# a result from a request, for testing multi-batch execution, making enough
# concurrent requests guarantee batched execution is out of scope
scheduler = ContinuousBatchingScheduler()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the goal that we would initialize (at least) two schedulers, one ContinuousBatchingScheduler and the remaining OperatorSchedulers as part of the pipeline init? And then swap to use ContinuousBatchingScheduler when the operator being executed is of type EngineOperator?

Copy link
Member Author

@bfineran bfineran Nov 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes exactly, the ContinuousBatchingScheduler would come first in the list of schedulers so it can pick up any relevant engine operator jobs. the default scheduler picks up the rest

@dsikka
Copy link
Contributor

dsikka commented Nov 13, 2023

One more question: in terms of next steps, you had written down: integration with KV Cache engine mode for text gen
Any reason this can't work with the NLEngineOperator as is currently? The NLEngineOperator inherits from the EngineOperator

@bfineran

@bfineran
Copy link
Member Author

One more question: in terms of next steps, you had written down: integration with KV Cache engine mode for text gen Any reason this can't work with the NLEngineOperator as is currently? The NLEngineOperator inherits from the EngineOperator

@bfineran

yeah a few things here:

  1. the schemas need to implement the split/join since they don't inherit
  2. I think we'll need to update the way the engine kwargs are passed so the shared create_engine function sets the right internal/external kv cache mode
  3. the run function needs to get updated to accept an engine to be swapped out like we do in EngineOperator

Base automatically changed from continuous-batching-executor to v2 November 13, 2023 21:22
@bfineran bfineran force-pushed the continuous-batching-scheduler branch from 4b1472d to e2a0761 Compare November 13, 2023 21:24
@bfineran bfineran merged commit 5c48505 into v2 Nov 13, 2023
@bfineran bfineran deleted the continuous-batching-scheduler branch November 13, 2023 21:24
bfineran added a commit that referenced this pull request Dec 6, 2023
* Pipelines Refactor - Initial Impl (#1287)

* [Pipeline Refactor] Additional functionality, engine operator, linear router and image classification pipeline/operators/example (#1325)

* initial functionality and working example with image classification

* remove testing image

* update args

* initial functionality and working example with image classification

* remove testing image

* pr comments

* defines schemas for operators and test

* add image classification test, PR comments

* fix input/output handling in pipeline and operator base classes to be more generic; remove context

* add additional operator input message

* typo fix

* [v2] EngineOperator updates to make continuous batching easier (#1371)

* [v2] EngineOperator updates to make continuous batching easier

* test fixes

* [Pipeline Refactor] Update routes, text generation initial functionality (#1348)

* initial functionality and working example with image classification

* remove testing image

* rebase fixes

* initial functionality and working example with image classification

* text gen

* updates func

* prompt inference, initial functionality

* remove image; update state docstring

* Fix typo

* add todo for split/join

* remove context, clean-up args, remove prefill_preprocess_operaator

* fix docstrings

* [Pipeline Refactor] Additional Operators, Route update and completed generation functionality (#1356)

* initial functionality and working example with image classification

* remove testing image

* rebase fixes

* initial functionality and working example with image classification

* text gen

* updates func

* prompt inference, initial functionality

* remove image; update state docstring

* Fix typo

* add todo for split/join

* remove context, clean-up args, remove prefill_preprocess_operaator

* fix docstrings

* initial functionality and working example with image classification

* updates func

* prompt inference, initial functionality

* finish generation operators and update routes

* further breakdown operators

* add operators

* fix can_operate condition

* update can_operate to not rely on the inference_state

* rebase + update

* fix condition

* fix capacity settting again

* typo fixes

* [Pipeline Refactor] Split/Join Functionality for multiple prompts (#1384)

* add split/join functionality

* update router to include split/join in parent class, refactor pipeline code to remove repeat code, update map function

* process multiple generations

* move map to base class

* [Pipeline Refactor] Unit Testing for Text Generation Operators (#1392)

* unit testing for text generation operators

* additional changes

* unit testing completion

* remove debug

* fix

* add todo

* more clean-up

* fix test

* add docstrings/comments

* break out tests to individual unit test files; add conftest and make scope of fixtures module to help with speed

* fix name

* [Continuous Batching] Queue Implementation to support batching grouping and prioritization (#1373)

* [Continuous Batching] Queue Implementation to support batching grouping and prioritization

* has_key method

* thread safety

* add blocking option for pop_batch

* update docstring

* allow mutex to be shared across continuous batching objects

* revert last commit

* [Continuous Batching] Executor thread for running continuous batching (#1374)

* [Continuous Batching] Executor thread for running continuous batching

* quality

* ensure that executor stops when main thread does - clean up test hack

* [ContinuousBatching] ContinuousBatchingScheduler Implementation (#1375)

* [ContinuousBatching] ContinuousBatchingScheduler Implementation

* cleanup unnecessary stop condition

* [continuous batching] singleton pattern for scheduler (#1391)

* [continuous batching] singleton pattern for scheduler

* catch from review

* [Pipeline Refactor][Text-Generation] Create a helper function for creating engine_inputs (#1364)

* rebasing off my initial commit

* cleanups

* unit testing for text generation operators

* additional changes

* unit testing completion

* remove debug

* fix

* add todo

* more clean-up

* fix test

* add docstrings/comments

* break out tests to individual unit test files; add conftest and make scope of fixtures module to help with speed

* Delete tests/deepsparse/v2/unit/text_generation/test_msic.py

---------

Co-authored-by: Dipika Sikka <dipikasikka1@gmail.com>

* [Pipeline Refactor][Text-Generation] Refactor `transformers` helpers functions (#1394)

* add split/join functionality

* update router to include split/join in parent class, refactor pipeline code to remove repeat code, update map function

* process multiple generations

* initial commit

* fix error

* unit testing for text generation operators

* additional changes

* unit testing completion

* remove debug

* fix

* add todo

* more clean-up

* fix test

* add docstrings/comments

* break out tests to individual unit test files; add conftest and make scope of fixtures module to help with speed

* Delete tests/deepsparse/v2/unit/text_generation/test_msic.py

* pipeline runs, but incorrectly

* Revert "pipeline runs, but incorrectly"

This reverts commit 51c4ee6.

* PR review comments

---------

Co-authored-by: Dipika Sikka <dipikasikka1@gmail.com>

* [Text Generation][V2] End-to-end tests (#1402)

* initial commit

* initial commit

* its working now

* beautification

* thank you Dipika <3

* ready to review

* [Pipeline Refactor][Text Generation][Continuous Batching] Integration (#1409)

* update split/join

* use map

* update

* run end-to-end

* clean-up

* fix bug with batch size, introduce SplitRoute dataclass

* update tests to use new inputs/outputs

* use the normal scheduler for internal kv_cache

* add pipeline inpuits

* clean-up

* change engine type, update docstrings, update override function to be more generic

* move subgraph functionality to its own function; clean-up cont batching in text gen pipeline

* update linear pathway to also use subgraph execution

* rebase fix

* fix tests

* [Pipeline Refactor] Operator Registry (#1420)

* initial registry functionality

* use sparsezoo mixin

* [Pipeline Refactor] Fix Operator scheduling to fix issue with slow execution  (#1453)

* fix scheduling to fix issue with engine running very slowly; introduce new completed attribute for Subgraph instead of checking instance type

* fix warning message

* [Pipeline Refactor] Add `Pipeline.create` method to initialize pipelines (#1457)

* add pipeline create method for pipeline creation using the operator registry

* add instance check

* [Pipeline Refactor] async (#1380)

* initial functionality and working example with image classification

* remove testing image

* rebase fixes

* initial functionality and working example with image classification

* text gen

* updates func

* prompt inference, initial functionality

* remove image; update state docstring

* Fix typo

* add todo for split/join

* remove context, clean-up args, remove prefill_preprocess_operaator

* fix docstrings

* initial functionality and working example with image classification

* updates func

* prompt inference, initial functionality

* finish generation operators and update routes

* further breakdown operators

* add operators

* fix can_operate condition

* update can_operate to not rely on the inference_state

* rebase + update

* fix condition

* async initial functionality

* fix capacity settting again

* add blocking

* more testing

* update to use split/join

* fix

* rebase fix

* remove index

* change event loop

* rebase fix

* update async run to use new operator scheduling properly

* rebase fixes (#1458)

* more fixes (#1459)

---------

Co-authored-by: Benjamin Fineran <bfineran@users.noreply.github.com>
Co-authored-by: Dipika Sikka <dipikasikka1@gmail.com>
dbogunowicz added a commit that referenced this pull request Jan 2, 2024
* Pipelines Refactor - Initial Impl (#1287)

* [Pipeline Refactor] Additional functionality, engine operator, linear router and image classification pipeline/operators/example (#1325)

* initial functionality and working example with image classification

* remove testing image

* update args

* initial functionality and working example with image classification

* remove testing image

* pr comments

* defines schemas for operators and test

* add image classification test, PR comments

* fix input/output handling in pipeline and operator base classes to be more generic; remove context

* add additional operator input message

* typo fix

* [v2] EngineOperator updates to make continuous batching easier (#1371)

* [v2] EngineOperator updates to make continuous batching easier

* test fixes

* [Pipeline Refactor] Update routes, text generation initial functionality (#1348)

* initial functionality and working example with image classification

* remove testing image

* rebase fixes

* initial functionality and working example with image classification

* text gen

* updates func

* prompt inference, initial functionality

* remove image; update state docstring

* Fix typo

* add todo for split/join

* remove context, clean-up args, remove prefill_preprocess_operaator

* fix docstrings

* [Pipeline Refactor] Additional Operators, Route update and completed generation functionality (#1356)

* initial functionality and working example with image classification

* remove testing image

* rebase fixes

* initial functionality and working example with image classification

* text gen

* updates func

* prompt inference, initial functionality

* remove image; update state docstring

* Fix typo

* add todo for split/join

* remove context, clean-up args, remove prefill_preprocess_operaator

* fix docstrings

* initial functionality and working example with image classification

* updates func

* prompt inference, initial functionality

* finish generation operators and update routes

* further breakdown operators

* add operators

* fix can_operate condition

* update can_operate to not rely on the inference_state

* rebase + update

* fix condition

* fix capacity settting again

* typo fixes

* add split/join functionality

* update router to include split/join in parent class, refactor pipeline code to remove repeat code, update map function

* process multiple generations

* initial commit

* fix error

* [Pipeline Refactor] Split/Join Functionality for multiple prompts (#1384)

* add split/join functionality

* update router to include split/join in parent class, refactor pipeline code to remove repeat code, update map function

* process multiple generations

* move map to base class

* unit testing for text generation operators

* additional changes

* unit testing completion

* remove debug

* fix

* add todo

* more clean-up

* fix test

* add docstrings/comments

* break out tests to individual unit test files; add conftest and make scope of fixtures module to help with speed

* [Pipeline Refactor] Unit Testing for Text Generation Operators (#1392)

* unit testing for text generation operators

* additional changes

* unit testing completion

* remove debug

* fix

* add todo

* more clean-up

* fix test

* add docstrings/comments

* break out tests to individual unit test files; add conftest and make scope of fixtures module to help with speed

* fix name

* Delete tests/deepsparse/v2/unit/text_generation/test_msic.py

* [Continuous Batching] Queue Implementation to support batching grouping and prioritization (#1373)

* [Continuous Batching] Queue Implementation to support batching grouping and prioritization

* has_key method

* thread safety

* add blocking option for pop_batch

* update docstring

* allow mutex to be shared across continuous batching objects

* revert last commit

* [Continuous Batching] Executor thread for running continuous batching (#1374)

* [Continuous Batching] Executor thread for running continuous batching

* quality

* ensure that executor stops when main thread does - clean up test hack

* [ContinuousBatching] ContinuousBatchingScheduler Implementation (#1375)

* [ContinuousBatching] ContinuousBatchingScheduler Implementation

* cleanup unnecessary stop condition

* [continuous batching] singleton pattern for scheduler (#1391)

* [continuous batching] singleton pattern for scheduler

* catch from review

* [Pipeline Refactor][Text-Generation] Create a helper function for creating engine_inputs (#1364)

* rebasing off my initial commit

* cleanups

* unit testing for text generation operators

* additional changes

* unit testing completion

* remove debug

* fix

* add todo

* more clean-up

* fix test

* add docstrings/comments

* break out tests to individual unit test files; add conftest and make scope of fixtures module to help with speed

* Delete tests/deepsparse/v2/unit/text_generation/test_msic.py

---------

Co-authored-by: Dipika Sikka <dipikasikka1@gmail.com>

* pipeline runs, but incorrectly

* it works for a single sequence

* cleanup. now lets figure out how to run multiple sequences

* [Pipeline Refactor][Text-Generation] Refactor `transformers` helpers functions (#1394)

* add split/join functionality

* update router to include split/join in parent class, refactor pipeline code to remove repeat code, update map function

* process multiple generations

* initial commit

* fix error

* unit testing for text generation operators

* additional changes

* unit testing completion

* remove debug

* fix

* add todo

* more clean-up

* fix test

* add docstrings/comments

* break out tests to individual unit test files; add conftest and make scope of fixtures module to help with speed

* Delete tests/deepsparse/v2/unit/text_generation/test_msic.py

* pipeline runs, but incorrectly

* Revert "pipeline runs, but incorrectly"

This reverts commit 51c4ee6.

* PR review comments

---------

Co-authored-by: Dipika Sikka <dipikasikka1@gmail.com>

* [Text Generation][V2] End-to-end tests (#1402)

* initial commit

* initial commit

* its working now

* beautification

* thank you Dipika <3

* ready to review

* integration tests pass

* [Pipeline Refactor][Text Generation][Continuous Batching] Integration (#1409)

* update split/join

* use map

* update

* run end-to-end

* clean-up

* fix bug with batch size, introduce SplitRoute dataclass

* update tests to use new inputs/outputs

* use the normal scheduler for internal kv_cache

* add pipeline inpuits

* clean-up

* change engine type, update docstrings, update override function to be more generic

* move subgraph functionality to its own function; clean-up cont batching in text gen pipeline

* update linear pathway to also use subgraph execution

* rebase fix

* fix tests

* [Pipeline Refactor] Operator Registry (#1420)

* initial registry functionality

* use sparsezoo mixin

* fix tricky rebase

* one more cleanup

* got tests to work after rebase. implementing SPLIT and JOIN in linearouter now

* pipeline working, with GraphRouter. Needs some more testing

* ready for review

* cleanup

* simplify after PR review round

* [Pipeline Refactor] Fix Operator scheduling to fix issue with slow execution  (#1453)

* fix scheduling to fix issue with engine running very slowly; introduce new completed attribute for Subgraph instead of checking instance type

* fix warning message

* [Pipeline Refactor] Add `Pipeline.create` method to initialize pipelines (#1457)

* add pipeline create method for pipeline creation using the operator registry

* add instance check

* [Pipeline Refactor] async (#1380)

* initial functionality and working example with image classification

* remove testing image

* rebase fixes

* initial functionality and working example with image classification

* text gen

* updates func

* prompt inference, initial functionality

* remove image; update state docstring

* Fix typo

* add todo for split/join

* remove context, clean-up args, remove prefill_preprocess_operaator

* fix docstrings

* initial functionality and working example with image classification

* updates func

* prompt inference, initial functionality

* finish generation operators and update routes

* further breakdown operators

* add operators

* fix can_operate condition

* update can_operate to not rely on the inference_state

* rebase + update

* fix condition

* async initial functionality

* fix capacity settting again

* add blocking

* more testing

* update to use split/join

* fix

* rebase fix

* remove index

* change event loop

* rebase fix

* update async run to use new operator scheduling properly

* rebase fixes (#1458)

* more fixes (#1459)

* bring back functionalities that were lost in v2 during rebasing

* Update src/deepsparse/transformers/helpers.py

* ready for review

* bring tests back"

* quality

* original readme

* addressing Dipikas comments

* Update src/deepsparse/transformers/pipelines/text_generation/pipeline_no_kv_cache.py

* addressing PR review

---------

Co-authored-by: Benjamin Fineran <bfineran@users.noreply.github.com>
Co-authored-by: Dipika Sikka <dipikasikka1@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants