Skip to content

Commit

Permalink
added pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
andreiionutdamian committed Feb 7, 2024
1 parent 61a3166 commit e163107
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 20 deletions.
2 changes: 2 additions & 0 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- [ ] finish PostgreSQL manifest
- [ ] add on-the-fly connection to sql
3 changes: 2 additions & 1 deletion demo-ai-app/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ Goal:


Setup:
- Python app (1 replica) deployment downloads models - uses Redis to announce update
- Python app (1 replica) deployment downloads model
- uses Redis to announce update
- Python app (4 replicas) sfs
- loads models from PV and serves using GPU
- uses Redis to get models
Expand Down
86 changes: 86 additions & 0 deletions demo-ai-app/serving_pattern.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
from time import sleep
from fastapi import FastAPI, APIRouter, Request, File, UploadFile
from threading import Thread
from pydantic import BaseModel


class ModelServer:
def __init__(self, **kwargs):
self.__done = False
return

def checker_loop(self):
while not self.__done:
# check for models
sleep(1)
return

def start(self):
self.__done = False
thread = Thread(target=self.checker_loop, daemon=True)
thread.start()
self.__thread = thread
return

def shutdown(self):
self.__done = True
return


def predict(self, data):
result = None
return result

engine = ModelServer()
app = FastAPI()
router = APIRouter()

@app.on_event("startup")
async def on_startup():
engine.start()
return

@app.on_event("shutdown")
async def on_shutdown():
engine.shutdown()
return

# json request
class Item(BaseModel):
field1: str
field2: str
field3: str

@app.post("/predict2")
async def predict2(item: Item):
return {"message": f"Received JSON body: {item}"}


# image request
@app.post("/predict3")
async def predict3(image: UploadFile = File(...)):
contents = await image.read()
# Process the image data
return {"message": "Image received"}


# universal request
@router.post("/predict")
async def predict_universal(request: Request):
content_type = request.headers.get("content-type")

if content_type == "application/json":
# Handle JSON request
json_body = await request.json()
return {"message": f"Received JSON body: {json_body}"}
elif content_type == "image/jpeg" or content_type == "image/png":
# Handle image upload
image = await request.body()
# Process the image data
return {"message": "Image received"}
else:
# Assume it's a plain text request
text_body = await request.body()
return {"message": f"Received plain text body: {text_body.decode()}"}

app.include_router(router)
2 changes: 1 addition & 1 deletion demo-basic-fastapi/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
FROM python:3.9
WORKDIR /test_app
RUN pip install fastapi uvicorn redis numpy
RUN pip install fastapi uvicorn redis numpy pydantic
COPY src/ .
ENV PORT=5050
EXPOSE $PORT
Expand Down
17 changes: 17 additions & 0 deletions demo-basic-fastapi/src/app_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,24 @@
import json
import numpy as np
import pkg_resources

from datetime import datetime

def get_packages(monitored_packages=None):
packs = [x for x in pkg_resources.working_set]
maxlen = max([len(x.key) for x in packs]) + 1
if isinstance(monitored_packages, list) and len(monitored_packages) > 0:
packs = [
"{}{}".format(x.key + ' ' * (maxlen - len(x.key)), x.version) for x in packs
if x.key in monitored_packages
]
else:
packs = [
"{}{}".format(x.key + ' ' * (maxlen - len(x.key)), x.version) for x in packs
]
packs = sorted(packs)
return packs

class NPJson(json.JSONEncoder):
"""
Used to help jsonify numpy arrays or lists that contain numpy data types.
Expand Down
27 changes: 17 additions & 10 deletions demo-basic-fastapi/src/engine.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import os
import redis
import json


from uuid import uuid4
from datetime import datetime

from app_utils import safe_jsonify
from app_utils import safe_jsonify, get_packages

__VER__ = '0.3.2'
__VER__ = '0.3.3'


class AppPaths:
Expand Down Expand Up @@ -56,6 +57,8 @@ def __setup(self):
self.__class__.__name__, __VER__,
self.str_local_id, self.hostname
))
self.__packs = get_packages()
self.P("Packages:\n{}".format("\n".join(self.__packs)))
dct_env = dict(os.environ)
self.P("Environement:\n{}".format(safe_jsonify(dct_env, indent=2)))
self.__maybe_setup_redis()
Expand Down Expand Up @@ -94,8 +97,12 @@ def __maybe_setup_redis(self):
return


def _pack_result(self, message):
return {"result": message}
def _pack_result(self, message, path=None, parameter=None):
return {
"result": message,
"path": path,
"parameter": parameter,
}


def _inc_cluster_count(self):
Expand Down Expand Up @@ -126,14 +133,14 @@ def process_data(self):
return


def handle_request(self, path):
def handle_request(self, path, parameter=None):
self.process_data()
if path in self.__avail_paths:
func_name = '_handle_' + self.__path_to_func[path]
msg = getattr(self, func_name)(path=path)
else:
msg = self.__handle_generic(path)
result = self._pack_result(msg)
result = self._pack_result(msg, path=path, parameter=parameter)
return result


Expand All @@ -156,9 +163,9 @@ def _handle_stats(self, **kwargs):
return dct_result


def __handle_generic(self, path, **kwargs):
msg = "Generic handler '{}', Local/Global: {}/{}, HOSTNAME: '{}', ID: '{}'".format(
path, self.__local_count, self.get_cluster_count(),
def __handle_generic(self, path, parameter=None, **kwargs):
msg = "Generic handler '{}', Param='{}', Local/Global: {}/{}, HOSTNAME: '{}', ID: '{}'".format(
path, parameter, self.__local_count, self.get_cluster_count(),
self.hostname, self.str_local_id
)
return msg
Expand Down
15 changes: 7 additions & 8 deletions demo-basic-fastapi/src/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os
from uuid import uuid4
from fastapi import FastAPI, APIRouter
from fastapi import FastAPI, APIRouter, Query
from engine import AppHandler, AppPaths


Expand All @@ -10,19 +10,18 @@

ROUTE1 = AppPaths.PATH_ROOT['PATH']
@router1.get(ROUTE1)
async def root():
return engine.handle_request(ROUTE1)
async def root(parameter: str = Query(...)):
return engine.handle_request(ROUTE1, parameter=parameter)

ROUTE2 = AppPaths.PATH_STAT['PATH']
@router1.get(ROUTE2)
async def stat():
return engine.handle_request(ROUTE2)
async def stat(parameter: str = Query(...)):
return engine.handle_request(ROUTE2, parameter=parameter))

# note: this is a catch-all route, so it should be the last route in the router
@router1.get("/{full_path:path}", include_in_schema=False)
async def catch_all(full_path: str):
return engine.handle_request(full_path)
async def catch_all(full_path: str, parameter: str = Query(...)):
return engine.handle_request(full_path, parameter=parameter)



app.include_router(router1)

0 comments on commit e163107

Please sign in to comment.