Skip to content

Commit

Permalink
fix: implement batch
Browse files Browse the repository at this point in the history
  • Loading branch information
ispirtraian committed Apr 22, 2024
1 parent e074637 commit 45024da
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 50 deletions.
115 changes: 66 additions & 49 deletions app/src/serving/serving_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,17 +270,29 @@ def _predict_job (self, model_type: str, input, params:dict = None):

no_runs = 1
if(params and "no_runs" in params and params['no_runs'] is not None):
no_runs= self._get_device(params['no_runs'])
no_runs= params['no_runs']

batch_size = 0
if(params and "batch_size" in params and params['batch_size'] is not None):
batch_size = params['batch_size']

if input:
workername = None
status = STATUS_CREATED
no_tasks = 1
if isinstance(input, List) and batch_size > 0 and len(input) > batch_size :
no_tasks = len(input)//batch_size
if len(input) % batch_size > 0 :
no_tasks= no_tasks + 1

if self.__available :
#check if current instance is available for processing
workername = self.host
status = STATUS_ASSIGNED
self.__input = input

if no_tasks > 1 :
self.__input = input[:batch_size]
else:
self.__input = input
# save job to database
jobid = uuid.uuid4().hex
try:
Expand All @@ -300,55 +312,60 @@ def _predict_job (self, model_type: str, input, params:dict = None):

# save tasks to database
if result is None:
taskid = uuid.uuid4().hex

if not self.__available :
#if current insance is unavailable, save input to file

try:
content_list = []
if isinstance(input, List):
if isinstance(input[0], str):
content_list.append(json.dumps(input))
else:
content_list.extend(input)
else:
content_list.append(input)
self.P (f"Saving {no_tasks} ...")

for tndx in range(no_tasks):
taskid = uuid.uuid4().hex
if (tndx == 0 and not self.__available) or tndx > 0 :
#if current insance is unavailable, or need to split job - save input to file
try:
content_list = []
if isinstance(input, List):
batch = []
for cnt in range(batch_size):
if (tndx*batch_size+cnt < len(input)):
batch[cnt] = input[tndx*batch_size+cnt]

ndx=0
for ndx, content in enumerate(content_list):
task_content_path = f"{self.cache_root}/tasks/{taskid}_{ndx}.bin"
os.makedirs(os.path.dirname(task_content_path), exist_ok=True)
#transform input to bytes if necessary
bytes_data = None
if isinstance(content, str):
bytes_data=content.encode('utf-8')
elif isinstance(content, Image.Image):
bytes_data=content.tobytes()
if isinstance(batch[0], str):
content_list.append(json.dumps(batch))
else:
content_list.extend(batch)
else:
raise Exception("Unsupported content type")

with open(task_content_path, 'wb') as file:
file.write(bytes_data)
print(f"Data successfully written to {task_content_path}")
except Exception as exc:
self.P("Error saving job input: {}".format(exc))
result = "Exception saving job input"
content_list.append(input)

for cndx, content in enumerate(content_list):
task_content_path = f"{self.cache_root}/tasks/{taskid}_{cndx}.bin"
os.makedirs(os.path.dirname(task_content_path), exist_ok=True)
#transform input to bytes if necessary
bytes_data = None
if isinstance(content, str):
bytes_data=content.encode('utf-8')
elif isinstance(content, Image.Image):
bytes_data=content.tobytes()
else:
raise Exception("Unsupported content type")

with open(task_content_path, 'wb') as file:
file.write(bytes_data)
print(f"Data successfully written to {task_content_path}")
except Exception as exc:
self.P("Error saving job input: {}".format(exc))
result = "Exception saving job input"

if result is None:
try:
self.postgres_insert_data(
"tasks",
uuid = taskid,
jobid = jobid,
workername = workername,
status = status
)
result = jobid
self.P(f"Task saved: {taskid}")
except Exception as exc:
self.P("Error saving task: {}".format(exc))
result = "Exception saving processing task to database"
if result is None:
try:
self.postgres_insert_data(
"tasks",
uuid = taskid,
jobid = jobid,
workername = workername,
status = status
)
result = jobid
self.P(f"Task saved: {taskid}")
except Exception as exc:
self.P("Error saving task: {}".format(exc))
result = "Exception saving processing task to database"
else:
result = "Invalid input content"
return self.format_result(result, device)
Expand Down
2 changes: 1 addition & 1 deletion demo-ai-app/manifests/version.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
application:
serving:
version: 1.0.2
version: 1.0.3
monitor:
version: 1.0.1

0 comments on commit 45024da

Please sign in to comment.