Skip to content
This repository has been archived by the owner on Jul 23, 2024. It is now read-only.

Commit

Permalink
Refactor workflow aspect
Browse files Browse the repository at this point in the history
WorkFlowExecutionAspect was refactored into several classess to allow
separation between pre-execution and post-execution.

Signed-off-by: Moti Asayag <masayag@redhat.com>
  • Loading branch information
masayag committed Apr 2, 2023
1 parent b6a8c9b commit 149f889
Show file tree
Hide file tree
Showing 13 changed files with 787 additions and 196 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package com.redhat.parodos.workflow.execution.aspect;

import com.redhat.parodos.workflow.context.WorkContextDelegate;
import com.redhat.parodos.workflow.definition.entity.WorkFlowCheckerMappingDefinition;
import com.redhat.parodos.workflow.definition.entity.WorkFlowDefinition;
import com.redhat.parodos.workflow.execution.continuation.WorkFlowContinuationServiceImpl;
import com.redhat.parodos.workflow.execution.entity.WorkFlowExecution;
import com.redhat.parodos.workflow.execution.scheduler.WorkFlowSchedulerServiceImpl;
import com.redhat.parodos.workflow.execution.service.WorkFlowServiceImpl;
import com.redhat.parodos.workflows.work.WorkContext;
import com.redhat.parodos.workflows.work.WorkReport;
import com.redhat.parodos.workflows.work.WorkStatus;
import com.redhat.parodos.workflows.workflow.WorkFlow;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class CheckerWorkFlowPostExecution implements WorkFlowPostExecutionHandler {

private final WorkFlowDefinition workFlowDefinition;

private final WorkContext workContext;

private final WorkFlowExecution workFlowExecution;

private final WorkFlowExecution masterWorkFlowExecution;

private final WorkFlowServiceImpl workFlowService;

private final WorkFlowSchedulerServiceImpl workFlowSchedulerService;

private final WorkFlowContinuationServiceImpl workFlowContinuationServiceImpl;

private final WorkFlow workFlow;

private final WorkStatus workStatus;

public CheckerWorkFlowPostExecution(WorkFlowDefinition workFlowDefinition, WorkContext workContext,
WorkFlowServiceImpl workFlowService, WorkFlowSchedulerServiceImpl workFlowSchedulerService,
WorkFlowContinuationServiceImpl workFlowContinuationServiceImpl, WorkFlowExecution workFlowExecution,
WorkFlowExecution masterWorkFlowExecution, WorkFlow workFlow, WorkStatus workStatus) {
this.workFlowDefinition = workFlowDefinition;
this.workContext = workContext;
this.workFlowService = workFlowService;
this.workFlowExecution = workFlowExecution;
this.masterWorkFlowExecution = masterWorkFlowExecution;
this.workFlowSchedulerService = workFlowSchedulerService;
this.workFlowContinuationServiceImpl = workFlowContinuationServiceImpl;
this.workFlow = workFlow;
this.workStatus = workStatus;
}

@Override
public WorkReport handlePostWorkFlowExecution() {
workFlowService.updateWorkFlow(workFlowExecution);
/*
* if this workflow is a checker, schedule workflow checker for dynamic run on
* cron expression or stop if done
*/
startOrStopWorkFlowCheckerOnSchedule(workFlow, workFlowDefinition.getCheckerWorkFlowDefinition(), workStatus,
workContext, workFlowExecution.getProjectId().toString(), masterWorkFlowExecution);
return null;
}

private void startOrStopWorkFlowCheckerOnSchedule(WorkFlow workFlow,
WorkFlowCheckerMappingDefinition workFlowCheckerMappingDefinition, WorkStatus workStatus,
WorkContext workContext, String projectId, WorkFlowExecution masterWorkFlowExecution) {
if (workStatus != WorkStatus.COMPLETED) {
log.info("Schedule workflow checker: {} to run per cron expression: {}", workFlow.getName(),
workFlowCheckerMappingDefinition.getCronExpression());
workFlowSchedulerService.schedule(workFlow, workContext,
workFlowCheckerMappingDefinition.getCronExpression());
return;
}

log.info("Stop workflow checker: {} schedule", workFlow.getName());
workFlowSchedulerService.stop(workFlow);

String masterWorkFlowName = WorkContextDelegate.read(workContext,
WorkContextDelegate.ProcessType.WORKFLOW_DEFINITION, WorkContextDelegate.Resource.NAME).toString();
/*
* if this workflow is checker, and it's successful, call continuation service to
* restart master workflow execution with same execution ID
*/
workFlowContinuationServiceImpl.continueWorkFlow(projectId, masterWorkFlowName, workContext,
masterWorkFlowExecution.getId());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.redhat.parodos.workflow.execution.aspect;

import java.util.Optional;

import static com.redhat.parodos.workflow.execution.aspect.WorkFlowExecutionFactory.getMasterWorkFlowExecutionId;

import com.redhat.parodos.workflow.definition.entity.WorkFlowDefinition;
import com.redhat.parodos.workflow.exceptions.WorkflowExecutionNotFoundException;
import com.redhat.parodos.workflow.execution.continuation.WorkFlowContinuationServiceImpl;
import com.redhat.parodos.workflow.execution.entity.WorkFlowExecution;
import com.redhat.parodos.workflow.execution.repository.WorkFlowRepository;
import com.redhat.parodos.workflow.execution.scheduler.WorkFlowSchedulerServiceImpl;
import com.redhat.parodos.workflow.execution.service.WorkFlowServiceImpl;
import com.redhat.parodos.workflows.work.WorkContext;

public class ContinuedWorkFlowExecutionInterceptor extends WorkFlowExecutionInterceptor {

private WorkFlowExecution masterWorkFlowExecution;

public ContinuedWorkFlowExecutionInterceptor(WorkFlowDefinition workFlowDefinition, WorkContext workContext,
WorkFlowServiceImpl workFlowService, WorkFlowRepository workFlowRepository,
WorkFlowSchedulerServiceImpl workFlowSchedulerService,
WorkFlowContinuationServiceImpl workFlowContinuationServiceImpl) {
super(workFlowDefinition, workContext, workFlowService, workFlowRepository, workFlowSchedulerService,
workFlowContinuationServiceImpl);
}

@Override
protected WorkFlowExecution doPreWorkFlowExecution() {
this.masterWorkFlowExecution = workFlowRepository.findById(getMasterWorkFlowExecutionId(workContext))
.orElseThrow(() -> new WorkflowExecutionNotFoundException(
"masterWorkFlow not found for sub-workflow: " + workFlowDefinition.getName()));

// get the workflow execution if this is triggered by continuation service
return Optional
.ofNullable(workFlowRepository.findFirstByWorkFlowDefinitionIdAndMasterWorkFlowExecution(
workFlowDefinition.getId(), masterWorkFlowExecution))
.orElseGet(() -> this.saveWorkFlow(masterWorkFlowExecution));
}

protected WorkFlowExecution getMasterWorkFlowExecution() {
return masterWorkFlowExecution;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.redhat.parodos.workflow.execution.aspect;

import com.redhat.parodos.workflow.context.WorkContextDelegate;
import com.redhat.parodos.workflow.definition.entity.WorkFlowDefinition;
import com.redhat.parodos.workflow.execution.continuation.WorkFlowContinuationServiceImpl;
import com.redhat.parodos.workflow.execution.entity.WorkFlowExecution;
import com.redhat.parodos.workflow.execution.repository.WorkFlowRepository;
import com.redhat.parodos.workflow.execution.scheduler.WorkFlowSchedulerServiceImpl;
import com.redhat.parodos.workflow.execution.service.WorkFlowServiceImpl;
import com.redhat.parodos.workflows.work.WorkContext;

public class FirstTimeWorkFlowExecutionInterceptor extends WorkFlowExecutionInterceptor {

public FirstTimeWorkFlowExecutionInterceptor(WorkFlowDefinition workFlowDefinition, WorkContext workContext,
WorkFlowServiceImpl workFlowService, WorkFlowRepository workFlowRepository,
WorkFlowSchedulerServiceImpl workFlowSchedulerService,
WorkFlowContinuationServiceImpl workFlowContinuationServiceImpl) {
super(workFlowDefinition, workContext, workFlowService, workFlowRepository, workFlowSchedulerService,
workFlowContinuationServiceImpl);
}

@Override
protected WorkFlowExecution doPreWorkFlowExecution() {
/*
* if this is the first time execution for master workflow, persist it and write
* its execution id to workContext
*/
WorkFlowExecution workFlowExecution = saveWorkFlow(null);
WorkContextDelegate.write(workContext, WorkContextDelegate.ProcessType.WORKFLOW_EXECUTION,
WorkContextDelegate.Resource.ID, workFlowExecution.getId());
return workFlowExecution;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.redhat.parodos.workflow.execution.aspect;

import static com.redhat.parodos.workflow.execution.aspect.WorkFlowExecutionFactory.getMasterWorkFlowExecutionId;

import com.redhat.parodos.workflow.definition.entity.WorkFlowDefinition;
import com.redhat.parodos.workflow.exceptions.WorkflowExecutionNotFoundException;
import com.redhat.parodos.workflow.execution.continuation.WorkFlowContinuationServiceImpl;
import com.redhat.parodos.workflow.execution.entity.WorkFlowExecution;
import com.redhat.parodos.workflow.execution.repository.WorkFlowRepository;
import com.redhat.parodos.workflow.execution.scheduler.WorkFlowSchedulerServiceImpl;
import com.redhat.parodos.workflow.execution.service.WorkFlowServiceImpl;
import com.redhat.parodos.workflows.work.WorkContext;

public class MasterWorkFlowExecutionInterceptor extends WorkFlowExecutionInterceptor {

private WorkFlowExecution masterWorkFlowExecution;

public MasterWorkFlowExecutionInterceptor(WorkFlowDefinition workFlowDefinition, WorkContext workContext,
WorkFlowServiceImpl workFlowService, WorkFlowRepository workFlowRepository,
WorkFlowSchedulerServiceImpl workFlowSchedulerService,
WorkFlowContinuationServiceImpl workFlowContinuationServiceImpl) {
super(workFlowDefinition, workContext, workFlowService, workFlowRepository, workFlowSchedulerService,
workFlowContinuationServiceImpl);
}

@Override
protected WorkFlowExecution doPreWorkFlowExecution() {
masterWorkFlowExecution = workFlowRepository.findById(getMasterWorkFlowExecutionId(workContext))
.orElseThrow(() -> new WorkflowExecutionNotFoundException(
"masterWorkFlow not found for sub-workflow: " + workFlowDefinition.getName()));
return masterWorkFlowExecution;
}

@Override
protected WorkFlowExecution getMasterWorkFlowExecution() {
return masterWorkFlowExecution;
}

}
Loading

0 comments on commit 149f889

Please sign in to comment.