diff --git a/connector-runtime/src/main/java/io/camunda/connector/runtime/inbound/importer/ProcessDefinitionImporter.java b/connector-runtime/src/main/java/io/camunda/connector/runtime/inbound/importer/ProcessDefinitionImporter.java index 99b2deefd0..e1eb48a95e 100644 --- a/connector-runtime/src/main/java/io/camunda/connector/runtime/inbound/importer/ProcessDefinitionImporter.java +++ b/connector-runtime/src/main/java/io/camunda/connector/runtime/inbound/importer/ProcessDefinitionImporter.java @@ -19,10 +19,9 @@ import io.camunda.connector.runtime.inbound.lifecycle.InboundConnectorManager; import io.camunda.operate.CamundaOperateClient; import io.camunda.operate.dto.ProcessDefinition; +import io.camunda.operate.dto.SearchResult; import io.camunda.operate.exception.OperateException; import io.camunda.operate.search.SearchQuery; -import io.camunda.operate.search.Sort; -import io.camunda.operate.search.SortOrder; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,6 +29,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; @Component @ConditionalOnProperty(name = "camunda.connector.polling.enabled") @@ -40,6 +40,8 @@ public class ProcessDefinitionImporter { private final CamundaOperateClient camundaOperateClient; private final InboundConnectorManager inboundManager; + private List paginationIndex; + @Autowired public ProcessDefinitionImporter( CamundaOperateClient camundaOperateClient, @@ -49,14 +51,31 @@ public ProcessDefinitionImporter( } @Scheduled(fixedDelayString = "${camunda.connector.polling.interval:5000}") - public void scheduleImport() throws OperateException { + public synchronized void scheduleImport() throws OperateException { LOG.trace("Query process deployments..."); - SearchQuery processDefinitionQuery = - new SearchQuery.Builder().withSort(new Sort("version", SortOrder.ASC)).build(); + SearchResult result; + do { + LOG.trace("Running paginated query"); + // automatically sorted by process definition key, i.e. in chronological order of deployment + SearchQuery processDefinitionQuery = new SearchQuery.Builder() + .searchAfter(paginationIndex) + .size(20) + .build(); + + result = camundaOperateClient.search(processDefinitionQuery, ProcessDefinition.class); + List newPaginationIdx = result.getSortValues(); + + if (!CollectionUtils.isEmpty(newPaginationIdx)) { + paginationIndex = newPaginationIdx; + } + handleImportedDefinitions(result.getItems()); + + } while (result.getItems().size() > 0); + } - List processDefinitions = - camundaOperateClient.searchProcessDefinitions(processDefinitionQuery); + private void handleImportedDefinitions( + List processDefinitions) throws OperateException { if (processDefinitions==null) { LOG.trace("... returned no process definitions."); diff --git a/connector-runtime/src/main/java/io/camunda/connector/runtime/inbound/operate/OperateClientFactory.java b/connector-runtime/src/main/java/io/camunda/connector/runtime/inbound/operate/OperateClientFactory.java index 7668c81c8d..72e2556817 100644 --- a/connector-runtime/src/main/java/io/camunda/connector/runtime/inbound/operate/OperateClientFactory.java +++ b/connector-runtime/src/main/java/io/camunda/connector/runtime/inbound/operate/OperateClientFactory.java @@ -116,6 +116,9 @@ public AuthInterface getAuthentication(String operateUrl) { .keycloakUrl(operateKeycloakUrl) .keycloakRealm(operateKeycloakRealm); } + throw new IllegalArgumentException( + "Failed to authenticate with Camunda Operate using Keycloak: " + + "please configure client ID and client secret values."); } else { if (operateClientId != null) { LOG.debug("Authenticating with Camunda Operate using client id and secret"); @@ -123,13 +126,15 @@ public AuthInterface getAuthentication(String operateUrl) { } else if (clientId != null) { LOG.debug("Authenticating with Camunda Operate using client id and secret"); return new SaasAuthentication(getAuthUrl(), getAudience(), clientId, clientSecret); - } else if (operateUsername != null) { + } else if (operateUsername != null && operatePassword != null) { LOG.debug("Authenticating with Camunda Operate using username and password"); - return new SimpleAuthentication("demo", "demo", operateUrl); + return new SimpleAuthentication(operateUsername, operatePassword, operateUrl); } } throw new IllegalArgumentException( - "In order to connect to Camunda Operate you need to configure authentication properly."); + "In order to connect to Camunda Operate you need to configure authentication properly. " + + "You can use password-based authentication, or authenticate with Keycloak. " + + "Please configure either one of the methods."); } public String getAuthUrl() { diff --git a/connector-runtime/src/main/java/io/camunda/connector/runtime/inbound/operate/OperateClientLifecycle.java b/connector-runtime/src/main/java/io/camunda/connector/runtime/inbound/operate/OperateClientLifecycle.java index 8a908728b6..6931b8b070 100644 --- a/connector-runtime/src/main/java/io/camunda/connector/runtime/inbound/operate/OperateClientLifecycle.java +++ b/connector-runtime/src/main/java/io/camunda/connector/runtime/inbound/operate/OperateClientLifecycle.java @@ -106,6 +106,12 @@ public int getPhase() { return PHASE; } + @Override + public SearchResult search(SearchQuery query, Class resultType) + throws OperateException { + return get().search(query, resultType); + } + @Override public ProcessDefinition getProcessDefinition(Long key) throws OperateException { return get().getProcessDefinition(key); diff --git a/connector-runtime/src/test/java/io/camunda/connector/runtime/inbound/importer/ProcessDefinitionImporterTest.java b/connector-runtime/src/test/java/io/camunda/connector/runtime/inbound/importer/ProcessDefinitionImporterTest.java new file mode 100644 index 0000000000..1775fede83 --- /dev/null +++ b/connector-runtime/src/test/java/io/camunda/connector/runtime/inbound/importer/ProcessDefinitionImporterTest.java @@ -0,0 +1,82 @@ +package io.camunda.connector.runtime.inbound.importer; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import io.camunda.connector.runtime.inbound.lifecycle.InboundConnectorManager; +import io.camunda.operate.CamundaOperateClient; +import io.camunda.operate.dto.ProcessDefinition; +import io.camunda.operate.dto.SearchResult; +import io.camunda.operate.exception.OperateException; +import io.camunda.operate.search.SearchQuery; +import java.util.List; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class ProcessDefinitionImporterTest { + + private InboundConnectorManager manager; + private CamundaOperateClient operate; + + @BeforeEach + public void initMocks() { + manager = mock(InboundConnectorManager.class); + operate = mock(CamundaOperateClient.class); + } + + @Test + public void shouldRequestProcessDefinition_Pagination() throws OperateException { + // given + List firstPage = List.of( + new ProcessDefinition(), new ProcessDefinition(), new ProcessDefinition()); + List secondPage = List.of(); + + List paginationIdx = List.of(new Object()); + + var firstOperateResponse = new SearchResult(); + firstOperateResponse.setItems(firstPage); + firstOperateResponse.setSortValues(paginationIdx); + + var secondOperateResponse = new SearchResult(); + secondOperateResponse.setItems(secondPage); + + when(operate.search(any(), eq(ProcessDefinition.class))) + .thenReturn(firstOperateResponse) + .thenReturn(secondOperateResponse); + + var importer = new ProcessDefinitionImporter(operate, manager); + + // when + importer.scheduleImport(); + + // then + var queryCaptor = ArgumentCaptor.forClass(SearchQuery.class); + + verify(operate, times(2)).search(queryCaptor.capture(), eq(ProcessDefinition.class)); + verifyNoMoreInteractions(operate); + + var queries = queryCaptor.getAllValues(); + + var firstQuery = queries.get(0); + assertNull(firstQuery.getSearchAfter()); + assertEquals(20, firstQuery.getSize()); + + var secondQuery = queries.get(1); + assertSame(paginationIdx, secondQuery.getSearchAfter()); + assertEquals(20, secondQuery.getSize()); + + verify(manager, times(1)).registerProcessDefinitions(firstPage); + } +}