From 45024da4539afef0a8e860668d4738a0bc5724ae Mon Sep 17 00:00:00 2001 From: ispirtraian Date: Mon, 22 Apr 2024 13:23:01 +0000 Subject: [PATCH] fix: implement batch --- app/src/serving/serving_app.py | 115 +++++++++++++++++------------ demo-ai-app/manifests/version.yaml | 2 +- 2 files changed, 67 insertions(+), 50 deletions(-) diff --git a/app/src/serving/serving_app.py b/app/src/serving/serving_app.py index f74f66e..8ac4ec3 100644 --- a/app/src/serving/serving_app.py +++ b/app/src/serving/serving_app.py @@ -270,17 +270,29 @@ def _predict_job (self, model_type: str, input, params:dict = None): no_runs = 1 if(params and "no_runs" in params and params['no_runs'] is not None): - no_runs= self._get_device(params['no_runs']) + no_runs= params['no_runs'] + + batch_size = 0 + if(params and "batch_size" in params and params['batch_size'] is not None): + batch_size = params['batch_size'] if input: workername = None status = STATUS_CREATED + no_tasks = 1 + if isinstance(input, List) and batch_size > 0 and len(input) > batch_size : + no_tasks = len(input)//batch_size + if len(input) % batch_size > 0 : + no_tasks= no_tasks + 1 + if self.__available : #check if current instance is available for processing workername = self.host status = STATUS_ASSIGNED - self.__input = input - + if no_tasks > 1 : + self.__input = input[:batch_size] + else: + self.__input = input # save job to database jobid = uuid.uuid4().hex try: @@ -300,55 +312,60 @@ def _predict_job (self, model_type: str, input, params:dict = None): # save tasks to database if result is None: - taskid = uuid.uuid4().hex - - if not self.__available : - #if current insance is unavailable, save input to file - - try: - content_list = [] - if isinstance(input, List): - if isinstance(input[0], str): - content_list.append(json.dumps(input)) - else: - content_list.extend(input) - else: - content_list.append(input) + self.P (f"Saving {no_tasks} ...") + + for tndx in range(no_tasks): + taskid = uuid.uuid4().hex + if (tndx == 0 and not self.__available) or tndx > 0 : + #if current insance is unavailable, or need to split job - save input to file + try: + content_list = [] + if isinstance(input, List): + batch = [] + for cnt in range(batch_size): + if (tndx*batch_size+cnt < len(input)): + batch[cnt] = input[tndx*batch_size+cnt] - ndx=0 - for ndx, content in enumerate(content_list): - task_content_path = f"{self.cache_root}/tasks/{taskid}_{ndx}.bin" - os.makedirs(os.path.dirname(task_content_path), exist_ok=True) - #transform input to bytes if necessary - bytes_data = None - if isinstance(content, str): - bytes_data=content.encode('utf-8') - elif isinstance(content, Image.Image): - bytes_data=content.tobytes() + if isinstance(batch[0], str): + content_list.append(json.dumps(batch)) + else: + content_list.extend(batch) else: - raise Exception("Unsupported content type") - - with open(task_content_path, 'wb') as file: - file.write(bytes_data) - print(f"Data successfully written to {task_content_path}") - except Exception as exc: - self.P("Error saving job input: {}".format(exc)) - result = "Exception saving job input" + content_list.append(input) + + for cndx, content in enumerate(content_list): + task_content_path = f"{self.cache_root}/tasks/{taskid}_{cndx}.bin" + os.makedirs(os.path.dirname(task_content_path), exist_ok=True) + #transform input to bytes if necessary + bytes_data = None + if isinstance(content, str): + bytes_data=content.encode('utf-8') + elif isinstance(content, Image.Image): + bytes_data=content.tobytes() + else: + raise Exception("Unsupported content type") + + with open(task_content_path, 'wb') as file: + file.write(bytes_data) + print(f"Data successfully written to {task_content_path}") + except Exception as exc: + self.P("Error saving job input: {}".format(exc)) + result = "Exception saving job input" - if result is None: - try: - self.postgres_insert_data( - "tasks", - uuid = taskid, - jobid = jobid, - workername = workername, - status = status - ) - result = jobid - self.P(f"Task saved: {taskid}") - except Exception as exc: - self.P("Error saving task: {}".format(exc)) - result = "Exception saving processing task to database" + if result is None: + try: + self.postgres_insert_data( + "tasks", + uuid = taskid, + jobid = jobid, + workername = workername, + status = status + ) + result = jobid + self.P(f"Task saved: {taskid}") + except Exception as exc: + self.P("Error saving task: {}".format(exc)) + result = "Exception saving processing task to database" else: result = "Invalid input content" return self.format_result(result, device) diff --git a/demo-ai-app/manifests/version.yaml b/demo-ai-app/manifests/version.yaml index d6ca3d9..2ffc9d4 100644 --- a/demo-ai-app/manifests/version.yaml +++ b/demo-ai-app/manifests/version.yaml @@ -1,5 +1,5 @@ application: serving: - version: 1.0.2 + version: 1.0.3 monitor: version: 1.0.1 \ No newline at end of file