Skip to content

Commit

Permalink
fix: support pagination in Operate ProcessDefinitionImporter (#345)
Browse files Browse the repository at this point in the history
  • Loading branch information
chillleader authored Feb 27, 2023
1 parent 8e498e4 commit 459bd3d
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@
import io.camunda.connector.runtime.inbound.lifecycle.InboundConnectorManager;
import io.camunda.operate.CamundaOperateClient;
import io.camunda.operate.dto.ProcessDefinition;
import io.camunda.operate.dto.SearchResult;
import io.camunda.operate.exception.OperateException;
import io.camunda.operate.search.SearchQuery;
import io.camunda.operate.search.Sort;
import io.camunda.operate.search.SortOrder;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

@Component
@ConditionalOnProperty(name = "camunda.connector.polling.enabled")
Expand All @@ -40,6 +40,8 @@ public class ProcessDefinitionImporter {
private final CamundaOperateClient camundaOperateClient;
private final InboundConnectorManager inboundManager;

private List<Object> paginationIndex;

@Autowired
public ProcessDefinitionImporter(
CamundaOperateClient camundaOperateClient,
Expand All @@ -49,14 +51,31 @@ public ProcessDefinitionImporter(
}

@Scheduled(fixedDelayString = "${camunda.connector.polling.interval:5000}")
public void scheduleImport() throws OperateException {
public synchronized void scheduleImport() throws OperateException {
LOG.trace("Query process deployments...");

SearchQuery processDefinitionQuery =
new SearchQuery.Builder().withSort(new Sort("version", SortOrder.ASC)).build();
SearchResult<ProcessDefinition> result;
do {
LOG.trace("Running paginated query");
// automatically sorted by process definition key, i.e. in chronological order of deployment
SearchQuery processDefinitionQuery = new SearchQuery.Builder()
.searchAfter(paginationIndex)
.size(20)
.build();

result = camundaOperateClient.search(processDefinitionQuery, ProcessDefinition.class);
List<Object> newPaginationIdx = result.getSortValues();

if (!CollectionUtils.isEmpty(newPaginationIdx)) {
paginationIndex = newPaginationIdx;
}
handleImportedDefinitions(result.getItems());

} while (result.getItems().size() > 0);
}

List<ProcessDefinition> processDefinitions =
camundaOperateClient.searchProcessDefinitions(processDefinitionQuery);
private void handleImportedDefinitions(
List<ProcessDefinition> processDefinitions) throws OperateException {

if (processDefinitions==null) {
LOG.trace("... returned no process definitions.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,20 +116,25 @@ public AuthInterface getAuthentication(String operateUrl) {
.keycloakUrl(operateKeycloakUrl)
.keycloakRealm(operateKeycloakRealm);
}
throw new IllegalArgumentException(
"Failed to authenticate with Camunda Operate using Keycloak: "
+ "please configure client ID and client secret values.");
} else {
if (operateClientId != null) {
LOG.debug("Authenticating with Camunda Operate using client id and secret");
return new SaasAuthentication(getAuthUrl(), getAudience(), operateClientId, operateClientSecret);
} else if (clientId != null) {
LOG.debug("Authenticating with Camunda Operate using client id and secret");
return new SaasAuthentication(getAuthUrl(), getAudience(), clientId, clientSecret);
} else if (operateUsername != null) {
} else if (operateUsername != null && operatePassword != null) {
LOG.debug("Authenticating with Camunda Operate using username and password");
return new SimpleAuthentication("demo", "demo", operateUrl);
return new SimpleAuthentication(operateUsername, operatePassword, operateUrl);
}
}
throw new IllegalArgumentException(
"In order to connect to Camunda Operate you need to configure authentication properly.");
"In order to connect to Camunda Operate you need to configure authentication properly. "
+ "You can use password-based authentication, or authenticate with Keycloak. "
+ "Please configure either one of the methods.");
}

public String getAuthUrl() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ public int getPhase() {
return PHASE;
}

@Override
public <T> SearchResult<T> search(SearchQuery query, Class<T> resultType)
throws OperateException {
return get().search(query, resultType);
}

@Override
public ProcessDefinition getProcessDefinition(Long key) throws OperateException {
return get().getProcessDefinition(key);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package io.camunda.connector.runtime.inbound.importer;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import io.camunda.connector.runtime.inbound.lifecycle.InboundConnectorManager;
import io.camunda.operate.CamundaOperateClient;
import io.camunda.operate.dto.ProcessDefinition;
import io.camunda.operate.dto.SearchResult;
import io.camunda.operate.exception.OperateException;
import io.camunda.operate.search.SearchQuery;
import java.util.List;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
public class ProcessDefinitionImporterTest {

private InboundConnectorManager manager;
private CamundaOperateClient operate;

@BeforeEach
public void initMocks() {
manager = mock(InboundConnectorManager.class);
operate = mock(CamundaOperateClient.class);
}

@Test
public void shouldRequestProcessDefinition_Pagination() throws OperateException {
// given
List<ProcessDefinition> firstPage = List.of(
new ProcessDefinition(), new ProcessDefinition(), new ProcessDefinition());
List<ProcessDefinition> secondPage = List.of();

List<Object> paginationIdx = List.of(new Object());

var firstOperateResponse = new SearchResult<ProcessDefinition>();
firstOperateResponse.setItems(firstPage);
firstOperateResponse.setSortValues(paginationIdx);

var secondOperateResponse = new SearchResult<ProcessDefinition>();
secondOperateResponse.setItems(secondPage);

when(operate.search(any(), eq(ProcessDefinition.class)))
.thenReturn(firstOperateResponse)
.thenReturn(secondOperateResponse);

var importer = new ProcessDefinitionImporter(operate, manager);

// when
importer.scheduleImport();

// then
var queryCaptor = ArgumentCaptor.forClass(SearchQuery.class);

verify(operate, times(2)).search(queryCaptor.capture(), eq(ProcessDefinition.class));
verifyNoMoreInteractions(operate);

var queries = queryCaptor.getAllValues();

var firstQuery = queries.get(0);
assertNull(firstQuery.getSearchAfter());
assertEquals(20, firstQuery.getSize());

var secondQuery = queries.get(1);
assertSame(paginationIdx, secondQuery.getSearchAfter());
assertEquals(20, secondQuery.getSize());

verify(manager, times(1)).registerProcessDefinitions(firstPage);
}
}

0 comments on commit 459bd3d

Please sign in to comment.