Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert to working tritonserver call in notebook using testbooks #619

Merged
merged 6 commits into from
Sep 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@
"source": [
"\n",
"# create inputs and outputs\n",
"inputs = convert_df_to_triton_input(workflow.input_schema.column_names, batch.fillna(0), grpcclient.InferInput)\n",
"inputs = convert_df_to_triton_input(workflow.input_schema, batch.fillna(0), grpcclient.InferInput)\n",
"output_cols = ensemble.graph.output_schema.column_names\n",
"outputs = [\n",
" grpcclient.InferRequestedOutput(col)\n",
Expand Down Expand Up @@ -372,7 +372,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.10"
"version": "3.9.7"
},
"vscode": {
"interpreter": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,12 @@
from testbook import testbook

from tests.conftest import REPO_ROOT
from merlin.core.dispatch import get_lib
from merlin.systems.triton.utils import run_ensemble_on_tritonserver

import pytest

pytest.importorskip("tensorflow")
pytest.importorskip("feast")
pytest.importorskip("faiss")
from merlin.models.loader.tf_utils import configure_tensorflow

# flake8: noqa


Expand Down Expand Up @@ -82,20 +78,26 @@ def test_func():
top_k = tb2.ref("top_k")
outputs = tb2.ref("outputs")
assert outputs[0] == "ordered_ids"

df_lib = get_lib()

# read in data for request
batch = df_lib.read_parquet(
os.path.join("/tmp/data/processed/retrieval/", "train", "part_0.parquet"),
num_rows=1,
columns=["user_id"],
)
configure_tensorflow()

response = run_ensemble_on_tritonserver(
"/tmp/examples/poc_ensemble/", outputs, batch, "ensemble_model"
tb2.inject(
"""
import shutil
from merlin.core.dispatch import get_lib
from merlin.models.loader.tf_utils import configure_tensorflow
configure_tensorflow()
df_lib = get_lib()
batch = df_lib.read_parquet(
os.path.join("/tmp/data/processed/retrieval/", "train", "part_0.parquet"),
num_rows=1,
columns=["user_id"],
)
from merlin.systems.triton.utils import run_ensemble_on_tritonserver
response = run_ensemble_on_tritonserver(
"/tmp/examples/poc_ensemble", ensemble.graph.input_schema, batch, outputs, "ensemble_model"
)
response = [x.tolist()[0] for x in response["ordered_ids"]]
shutil.rmtree("/tmp/examples/", ignore_errors=True)
"""
)
response = response["ordered_ids"]

tb2.execute_cell(NUM_OF_CELLS - 2)
response = tb2.ref("response")
assert len(response) == top_k
39 changes: 21 additions & 18 deletions tests/unit/examples/test_building_deploying_multi_stage_RecSys.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,12 @@
from testbook import testbook

from tests.conftest import REPO_ROOT
from merlin.core.dispatch import get_lib
from merlin.systems.triton.utils import run_ensemble_on_tritonserver

import pytest

pytest.importorskip("tensorflow")
pytest.importorskip("feast")
pytest.importorskip("faiss")
from merlin.models.loader.tf_utils import configure_tensorflow

# flake8: noqa

Expand Down Expand Up @@ -63,20 +60,26 @@ def test_func():
top_k = tb2.ref("top_k")
outputs = tb2.ref("outputs")
assert outputs[0] == "ordered_ids"

df_lib = get_lib()

# read in data for request
batch = df_lib.read_parquet(
os.path.join("/tmp/data/processed/retrieval/", "train", "part_0.parquet"),
num_rows=1,
columns=["user_id"],
)
configure_tensorflow()

response = run_ensemble_on_tritonserver(
"/tmp/examples/poc_ensemble/", outputs, batch, "ensemble_model"
tb2.inject(
"""
import shutil
from merlin.core.dispatch import get_lib
from merlin.models.loader.tf_utils import configure_tensorflow
configure_tensorflow()
df_lib = get_lib()
batch = df_lib.read_parquet(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am changing the notebooks in this PR #618 to return the raw ids. so feeding this batch wont work. Either we can merge my PR first and then update unit test accordingly, or we can merge this PR, then merge my PR and update unit test again :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should get what we had working, then you can modify as you see fit. :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

regardless we should wait for the new CI container to be out. I will check on that now.

os.path.join("/tmp/data/processed/retrieval/", "train", "part_0.parquet"),
num_rows=1,
columns=["user_id"],
)
from merlin.systems.triton.utils import run_ensemble_on_tritonserver
response = run_ensemble_on_tritonserver(
"/tmp/examples/poc_ensemble", ensemble.graph.input_schema, batch, outputs, "ensemble_model"
)
response = [x.tolist()[0] for x in response["ordered_ids"]]
shutil.rmtree("/tmp/examples/", ignore_errors=True)
"""
)
response = response["ordered_ids"]

tb2.execute_cell(NUM_OF_CELLS - 2)
response = tb2.ref("response")
assert len(response) == top_k
40 changes: 16 additions & 24 deletions tests/unit/examples/test_scaling_criteo_merlin_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,9 @@
import pytest
from testbook import testbook
from tests.conftest import REPO_ROOT
from merlin.core.dispatch import get_lib

pytest.importorskip("tensorflow")

from merlin.models.loader.tf_utils import configure_tensorflow # noqa: E402
from merlin.systems.triton.utils import run_ensemble_on_tritonserver # noqa: E402


def test_func():
with testbook(
Expand Down Expand Up @@ -84,24 +80,20 @@ def test_func():
)
NUM_OF_CELLS = len(tb3.cells)
tb3.execute_cell(list(range(0, NUM_OF_CELLS - 5)))
input_cols = tb3.ref("input_cols")
outputs = tb3.ref("output_cols")
# read in data for request
df_lib = get_lib()
in_dtypes = {}
for col in input_cols:
if col.startswith("C"):
in_dtypes[col] = "int64"
if col.startswith("I"):
in_dtypes[col] = "float64"
batch = df_lib.read_parquet(
os.path.join("/tmp/output/criteo/", "valid", "part_0.parquet"),
num_rows=3,
columns=input_cols,
)
batch = batch.astype(in_dtypes)
configure_tensorflow()
response = run_ensemble_on_tritonserver(
"/tmp/output/criteo/ensemble/", outputs, batch, "ensemble_model"
tb3.inject(
"""
import shutil
from merlin.systems.triton.utils import run_ensemble_on_tritonserver
outputs = ensemble.graph.output_schema.column_names
response = run_ensemble_on_tritonserver(
"/tmp/output/criteo/ensemble/",workflow.input_schema, batch.fillna(0),
outputs, "ensemble_model"
)
response = [x.tolist()[0] for x in response["label/binary_classification_task"]]
shutil.rmtree("/tmp/input/criteo", ignore_errors=True)
shutil.rmtree("/tmp/output/criteo", ignore_errors=True)
"""
)
assert len(response["label/binary_classification_task"]) == 3
tb3.execute_cell(NUM_OF_CELLS - 4)
response = tb3.ref("response")
assert len(response) == 3