Skip to content
This repository has been archived by the owner on Jul 23, 2024. It is now read-only.

Refactor async execution methods #373

Merged
merged 1 commit into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.redhat.parodos.workflow.execution.continuation.WorkFlowContinuationServiceImpl;
import com.redhat.parodos.workflow.execution.entity.WorkFlowExecution;
import com.redhat.parodos.workflow.execution.scheduler.WorkFlowSchedulerServiceImpl;
import com.redhat.parodos.workflow.execution.service.WorkFlowExecutor.ExecutionContext;
import com.redhat.parodos.workflow.execution.service.WorkFlowServiceImpl;
import com.redhat.parodos.workflows.work.WorkContext;
import com.redhat.parodos.workflows.work.WorkReport;
Expand Down Expand Up @@ -105,10 +106,13 @@ private void startOrStopWorkFlowCheckerOnSchedule(WorkFlow workFlow,
* active checkers
*/
if (workFlowService.findRunningChecker(mainWorkFlowExecution).isEmpty()) {
workFlowContinuationServiceImpl.continueWorkFlow(projectId, userId, mainWorkFlowName, workContext,
mainWorkFlowExecution.getId(),
Optional.ofNullable(mainWorkFlowExecution.getWorkFlowDefinition().getRollbackWorkFlowDefinition())
.map(WorkFlowDefinition::getName).orElse(null));
workFlowContinuationServiceImpl.continueWorkFlow(ExecutionContext.builder().projectId(projectId)
.userId(userId).workFlowName(mainWorkFlowName).workContext(workContext)
.executionId(mainWorkFlowExecution.getId())
.rollbackWorkFlowName(Optional
.ofNullable(mainWorkFlowExecution.getWorkFlowDefinition().getRollbackWorkFlowDefinition())
.map(WorkFlowDefinition::getName).orElse(null))
.build());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
*/
package com.redhat.parodos.workflow.execution.continuation;

import java.util.UUID;

import com.redhat.parodos.workflows.work.WorkContext;
import com.redhat.parodos.workflow.execution.service.WorkFlowExecutor.ExecutionContext;

/**
* When the application starts up it will run any workflows in Progress @see
Expand All @@ -30,7 +28,6 @@ public interface WorkFlowContinuationService {

void workFlowRunAfterStartup();

void continueWorkFlow(UUID projectId, UUID userId, String workflowName, WorkContext workContext, UUID executionId,
String rollbackWorkflowName);
void continueWorkFlow(ExecutionContext executionContext);

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@

import java.util.List;
import java.util.Optional;
import java.util.UUID;

import com.redhat.parodos.workflow.definition.entity.WorkFlowDefinition;
import com.redhat.parodos.workflow.execution.entity.WorkFlowExecution;
import com.redhat.parodos.workflow.execution.repository.WorkFlowRepository;
import com.redhat.parodos.workflow.execution.service.WorkFlowExecutor;
import com.redhat.parodos.workflows.work.WorkContext;
import com.redhat.parodos.workflow.execution.service.WorkFlowExecutor.ExecutionContext;
import com.redhat.parodos.workflows.work.WorkStatus;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -65,18 +64,20 @@ public void workFlowRunAfterStartup() {
WorkFlowDefinition workFlowDefinition = workFlowExecution.getWorkFlowDefinition();

// continue with the same execution id
continueWorkFlow(workFlowExecution.getProjectId(), workFlowExecution.getUser().getId(),
workFlowDefinition.getName(), workFlowExecution.getWorkFlowExecutionContext().getWorkContext(),
workFlowExecution.getId(), Optional.ofNullable(workFlowDefinition.getRollbackWorkFlowDefinition())
.map(WorkFlowDefinition::getName).orElse(null));
continueWorkFlow(ExecutionContext.builder().projectId(workFlowExecution.getProjectId())
.userId(workFlowExecution.getUser().getId()).workFlowName(workFlowDefinition.getName())
.workContext(workFlowExecution.getWorkFlowExecutionContext().getWorkContext())
.executionId(workFlowExecution.getId())
.rollbackWorkFlowName(Optional.ofNullable(workFlowDefinition.getRollbackWorkFlowDefinition())
.map(WorkFlowDefinition::getName).orElse(null))
.build());
// TODO: continue 'FAILED' Checkers in this main workflow execution
});
}

@Override
public void continueWorkFlow(UUID projectId, UUID userId, String workflowName, WorkContext workContext,
UUID executionId, String rollbackWorkflowName) {
workFlowExecutor.executeAsync(projectId, userId, workflowName, workContext, executionId, rollbackWorkflowName);
public void continueWorkFlow(ExecutionContext executionContext) {
workFlowExecutor.executeAsync(executionContext);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,20 @@

import com.redhat.parodos.workflows.work.WorkContext;
import com.redhat.parodos.workflows.work.WorkReport;
import lombok.Builder;

import org.springframework.scheduling.annotation.Async;

public interface WorkFlowExecutor {

@Async
void executeAsync(UUID projectId, UUID userId, String workflowName, WorkContext workContext, UUID executionId,
String rollbackWorkflowName);
void executeAsync(ExecutionContext context);

WorkReport execute(UUID projectId, UUID userId, String workflowName, WorkContext workContext, UUID executionId,
String rollbackWorkflowName);
WorkReport execute(ExecutionContext context);

@Builder
record ExecutionContext(UUID projectId, UUID userId, String workFlowName, WorkContext workContext, UUID executionId,
String rollbackWorkFlowName) {
}

}
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package com.redhat.parodos.workflow.execution.service;

import java.util.Optional;
import java.util.UUID;

import com.redhat.parodos.workflow.WorkFlowDelegate;
import com.redhat.parodos.workflow.execution.repository.WorkFlowRepository;
import com.redhat.parodos.workflow.utils.WorkContextUtils;
import com.redhat.parodos.workflows.engine.WorkFlowEngineBuilder;
import com.redhat.parodos.workflows.work.WorkContext;
import com.redhat.parodos.workflows.work.WorkReport;
import com.redhat.parodos.workflows.work.WorkStatus;
import com.redhat.parodos.workflows.workflow.WorkFlow;
Expand All @@ -29,29 +27,28 @@ public WorkFlowExecutorImpl(WorkFlowDelegate workFlowDelegate, WorkFlowRepositor
}

@Override
public void executeAsync(UUID projectId, UUID userId, String workflowName, WorkContext workContext,
UUID executionId, String rollbackWorkflowName) {
execute(projectId, userId, workflowName, workContext, executionId, rollbackWorkflowName);
public void executeAsync(ExecutionContext executionContext) {
execute(executionContext);
}

@Override
public WorkReport execute(UUID projectId, UUID userId, String workflowName, WorkContext workContext,
UUID executionId, String rollbackWorkflowName) {
WorkFlow workFlow = workFlowDelegate.getWorkFlowByName(workflowName);
log.info("execute workFlow {}", workflowName);
WorkContextUtils.updateWorkContextPartially(workContext, projectId, userId, workflowName, executionId);
WorkReport report = WorkFlowEngineBuilder.aNewWorkFlowEngine().build().run(workFlow, workContext);
public WorkReport execute(ExecutionContext context) {
WorkFlow workFlow = workFlowDelegate.getWorkFlowByName(context.workFlowName());
log.info("execute workFlow {}", context.workFlowName());
WorkContextUtils.updateWorkContextPartially(context.workContext(), context.projectId(), context.userId(),
context.workFlowName(), context.executionId());
WorkReport report = WorkFlowEngineBuilder.aNewWorkFlowEngine().build().run(workFlow, context.workContext());
// need to use the status from db to avoid of repetitive execution on rollback
if (workFlowRepository.findById(executionId).map(execution -> execution.getStatus() == WorkStatus.FAILED)
.orElse(false)) {
Optional.ofNullable(workFlowDelegate.getWorkFlowByName(rollbackWorkflowName))
if (workFlowRepository.findById(context.executionId())
.map(execution -> execution.getStatus() == WorkStatus.FAILED).orElse(false)) {
Optional.ofNullable(workFlowDelegate.getWorkFlowByName(context.rollbackWorkFlowName()))
.ifPresentOrElse(rollbackWorkFlow -> {
log.error(
"The Infrastructure workflow failed. Check the logs for errors coming for the Tasks in this workflow. Checking if there is a Rollback");
WorkFlowEngineBuilder.aNewWorkFlowEngine().build().run(rollbackWorkFlow, workContext);
WorkFlowEngineBuilder.aNewWorkFlowEngine().build().run(rollbackWorkFlow, context.workContext());
}, () -> log.error(
"A rollback workflow could not be found for failed workflow: {} in execution: {}",
workflowName, executionId));
context.workFlowName(), context.executionId()));
}
return report;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,10 @@ public WorkReport execute(WorkFlowRequestDTO workFlowRequestDTO) {
WorkFlowExecution workFlowExecution = saveWorkFlow(projectId, user.getId(),
workFlowDefinitionRepository.findFirstByName(workflowName), WorkStatus.IN_PROGRESS, null, arguments);
WorkContextUtils.setMainExecutionId(workContext, workFlowExecution.getId());
workFlowExecutor.executeAsync(projectId, user.getId(), workflowName, workContext, workFlowExecution.getId(),
workFlowDefinitionResponseDTO.getRollbackWorkflow());
workFlowExecutor
.executeAsync(WorkFlowExecutor.ExecutionContext.builder().projectId(projectId).userId(user.getId())
.workFlowName(workflowName).workContext(workContext).executionId(workFlowExecution.getId())
.rollbackWorkFlowName(workFlowDefinitionResponseDTO.getRollbackWorkflow()).build());
return new DefaultWorkReport(WorkStatus.IN_PROGRESS, workContext, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.redhat.parodos.workflow.execution.entity.WorkFlowExecution;
import com.redhat.parodos.workflow.execution.repository.WorkFlowRepository;
import com.redhat.parodos.workflow.execution.scheduler.WorkFlowSchedulerServiceImpl;
import com.redhat.parodos.workflow.execution.service.WorkFlowExecutor.ExecutionContext;
import com.redhat.parodos.workflow.execution.service.WorkFlowServiceImpl;
import com.redhat.parodos.workflows.work.DefaultWorkReport;
import com.redhat.parodos.workflows.work.WorkContext;
Expand Down Expand Up @@ -125,7 +126,7 @@ public void ExecuteAroundAdviceWithValidDataTest() {
when(workFlowRepository.findFirstByWorkFlowDefinitionIdAndMainWorkFlowExecution(any(), any()))
.thenReturn(workFlowExecution);
when(workFlowRepository.findById(any())).thenReturn(Optional.of(workFlowExecution));
doNothing().when(workFlowContinuationService).continueWorkFlow(any(), any(), any(), any(), any(), any());
doNothing().when(workFlowContinuationService).continueWorkFlow(any(ExecutionContext.class));
// when
WorkReport workReport = this.workFlowExecutionAspect.executeAroundAdvice(proceedingJoinPoint, workContext);

Expand Down Expand Up @@ -230,4 +231,4 @@ static WorkFlowDefinition getSampleWorkFlowDefinition(String name) {
return workFlowDefinition;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.redhat.parodos.workflow.execution.entity.WorkFlowExecution;
import com.redhat.parodos.workflow.execution.repository.WorkFlowRepository;
import com.redhat.parodos.workflow.execution.scheduler.WorkFlowSchedulerServiceImpl;
import com.redhat.parodos.workflow.execution.service.WorkFlowExecutor.ExecutionContext;
import com.redhat.parodos.workflow.execution.service.WorkFlowServiceImpl;
import com.redhat.parodos.workflows.work.WorkContext;
import com.redhat.parodos.workflows.work.WorkReport;
Expand All @@ -25,7 +26,6 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.eq;
Expand All @@ -37,10 +37,6 @@
@ExtendWith(SpringExtension.class)
public class WorkFlowExecutionInterceptorTest {

private static final UUID TEST_USER_ID = UUID.randomUUID();

private static final String TEST_USERNAME = "test-username";

private static final String TEST_WORKFLOW_NAME = "test-workflow-name";

private WorkFlowExecutionInterceptor interceptor;
Expand Down Expand Up @@ -70,7 +66,7 @@ public class WorkFlowExecutionInterceptorTest {

@BeforeEach
public void setUp() {
User user = createUser(TEST_USER_ID, TEST_USERNAME);
User user = createUser();
expectedWorkFlowExecution = new WorkFlowExecution();
expectedWorkFlowExecution.setProjectId(UUID.randomUUID());
expectedWorkFlowExecution.setUser(user);
Expand Down Expand Up @@ -145,14 +141,13 @@ public void testHandleCompletePostWorkFlowExecution() {
verify(workFlowService, times(0)).saveWorkFlow(any(UUID.class), any(UUID.class), any(),
eq(WorkStatus.IN_PROGRESS), any(), anyString());
verify(workFlowSchedulerService, times(1)).stop(any(), any(), any(WorkFlow.class));
verify(workFlowContinuationServiceImpl, times(1)).continueWorkFlow(any(UUID.class), any(UUID.class),
anyString(), any(WorkContext.class), any(UUID.class), nullable(String.class));
verify(workFlowContinuationServiceImpl, times(1)).continueWorkFlow(any(ExecutionContext.class));
assertEquals(result.getStatus(), report.getStatus());
}

private User createUser(UUID userId, String username) {
User user = User.builder().username(username).build();
user.setId(userId);
private User createUser() {
User user = User.builder().username(UUID.randomUUID().toString()).build();
user.setId(UUID.randomUUID());
return user;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,19 @@
import com.redhat.parodos.workflow.execution.repository.WorkFlowRepository;
import com.redhat.parodos.workflow.execution.repository.WorkFlowTaskRepository;
import com.redhat.parodos.workflow.execution.service.WorkFlowExecutor;
import com.redhat.parodos.workflow.execution.service.WorkFlowExecutor.ExecutionContext;
import com.redhat.parodos.workflows.work.WorkContext;
import com.redhat.parodos.workflows.work.WorkStatus;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -72,7 +73,7 @@ void workFlowSkipCompletedJobs() {

// then
verify(this.workFlowRepository, times(1)).findByStatusInAndIsMain(workFlowStatuses);
verify(this.workFlowExecutor, times(0)).executeAsync(any(), any(), any(), any(), any(), any());
verify(this.workFlowExecutor, times(0)).executeAsync(any(ExecutionContext.class));
}

@Test
Expand All @@ -86,8 +87,8 @@ void workFlowCompleteInProgress() {

// then
verify(this.workFlowRepository, times(1)).findByStatusInAndIsMain(workFlowStatuses);
verify(this.workFlowExecutor, times(1)).executeAsync(eq(workFlowExecution.getProjectId()),
eq(workFlowExecution.getUser().getId()), eq(TEST_WORKFLOW), any(), any(), nullable(String.class));

verifyAsyncExecution(workFlowExecution);
}

@Test
Expand All @@ -102,8 +103,8 @@ void workFlowCompletePending() {

// then
verify(this.workFlowRepository, times(1)).findByStatusInAndIsMain(workFlowStatuses);
verify(this.workFlowExecutor, times(1)).executeAsync(eq(workFlowExecution.getProjectId()),
eq(workFlowExecution.getUser().getId()), eq(TEST_WORKFLOW), any(), any(), nullable(String.class));

verifyAsyncExecution(workFlowExecution);
}

@Test
Expand All @@ -127,8 +128,7 @@ void workFlowCompleteWithTaskExecutions() {

// then
verify(this.workFlowRepository, times(1)).findByStatusInAndIsMain(workFlowStatuses);
verify(this.workFlowExecutor, times(1)).executeAsync(eq(workFlowExecution.getProjectId()),
eq(workFlowExecution.getUser().getId()), eq(TEST_WORKFLOW), any(), any(), nullable(String.class));
verifyAsyncExecution(workFlowExecution);
}

@Test
Expand All @@ -146,8 +146,8 @@ void workFlowCompleteWithInvalidJson() {

when(this.workFlowTaskRepository.findByWorkFlowExecutionId(wfExecution.getId()))
.thenReturn(List.of(workFlowTaskExecution));
doThrow(new RuntimeException("JsonParseException")).when(workFlowExecutor).executeAsync(any(), any(), any(),
any(), any(), any());
doThrow(new RuntimeException("JsonParseException")).when(workFlowExecutor)
.executeAsync(any(ExecutionContext.class));

// when
Exception exception = assertThrows(RuntimeException.class, () -> this.service.workFlowRunAfterStartup());
Expand All @@ -157,7 +157,7 @@ void workFlowCompleteWithInvalidJson() {
assertTrue(exception.getMessage().contains("JsonParseException"));

verify(this.workFlowRepository, times(1)).findByStatusInAndIsMain(workFlowStatuses);
verify(this.workFlowExecutor, times(1)).executeAsync(any(), any(), any(), any(), any(), any());
verify(this.workFlowExecutor, times(1)).executeAsync(any(ExecutionContext.class));

}

Expand Down Expand Up @@ -187,4 +187,12 @@ private WorkFlowDefinition sampleWorkFlowDefinition() {
return workFlowDefinition;
}

private void verifyAsyncExecution(WorkFlowExecution workFlowExecution) {
var argument = ArgumentCaptor.forClass(ExecutionContext.class);
verify(this.workFlowExecutor, times(1)).executeAsync(argument.capture());
assertThat(argument.getValue().projectId()).isEqualTo(workFlowExecution.getProjectId());
assertThat(argument.getValue().userId()).isEqualTo(workFlowExecution.getUser().getId());
assertThat(argument.getValue().workFlowName()).isEqualTo(TEST_WORKFLOW);
}

}
Loading