Skip to content

Commit

Permalink
feat(inbound): support intermediate events (#346)
Browse files Browse the repository at this point in the history
  • Loading branch information
chillleader committed Jul 26, 2023
1 parent 798d635 commit 937c4be
Show file tree
Hide file tree
Showing 9 changed files with 423 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package io.camunda.connector.api.inbound;

import java.util.Map;

/**
* The context object provided to an inbound connector function. The context allows to fetch
* information injected by the environment runtime.
Expand All @@ -35,4 +37,14 @@ public interface InboundConnectorContext {
* @param input - the object to validate
*/
void validate(Object input);

/**
* Correlates the inbound event to the matching process definition
*
* @param correlationPoint - information about the target process
* @param variables - variables to be passed to the process
* @return
*/
InboundConnectorResult correlate(
ProcessCorrelationPoint correlationPoint, Map<String, Object> variables);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH
* under one or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information regarding copyright
* ownership. Camunda licenses this file to you under the Apache License,
* Version 2.0; you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.camunda.connector.api.inbound;

import java.util.Map;
import java.util.Objects;

public class InboundConnectorProperties {

public static String TYPE_WEBHOOK = "webhook";

private final String type;
private final Map<String, String> properties;

private final ProcessCorrelationPoint correlationPoint;

private final String bpmnProcessId;
private final int version;
private final long processDefinitionKey;

public InboundConnectorProperties(
ProcessCorrelationPoint correlationPoint,
Map<String, String> properties,
String bpmnProcessId,
int version,
long processDefinitionKey) {
this.type = properties.get("inbound.type");
this.properties = properties;
this.correlationPoint = correlationPoint;
this.bpmnProcessId = bpmnProcessId;
this.version = version;
this.processDefinitionKey = processDefinitionKey;
}

public String getType() {
return type;
}

public Map<String, String> getProperties() {
return properties;
}

public ProcessCorrelationPoint getCorrelationPoint() {
return correlationPoint;
}

public String getBpmnProcessId() {
return bpmnProcessId;
}

public int getVersion() {
return version;
}

public long getProcessDefinitionKey() {
return processDefinitionKey;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
InboundConnectorProperties that = (InboundConnectorProperties) o;
return version == that.version
&& processDefinitionKey == that.processDefinitionKey
&& Objects.equals(type, that.type)
&& Objects.equals(properties, that.properties)
&& Objects.equals(correlationPoint, that.correlationPoint)
&& Objects.equals(bpmnProcessId, that.bpmnProcessId);
}

@Override
public int hashCode() {
return Objects.hash(
type, properties, correlationPoint, bpmnProcessId, version, processDefinitionKey);
}

@Override
public String toString() {
return "InboundConnectorProperties{"
+ "type='"
+ type
+ '\''
+ ", properties="
+ properties
+ ", correlationPoint="
+ correlationPoint
+ ", bpmnProcessId='"
+ bpmnProcessId
+ '\''
+ ", version="
+ version
+ ", processDefinitionKey="
+ processDefinitionKey
+ '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH
* under one or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information regarding copyright
* ownership. Camunda licenses this file to you under the Apache License,
* Version 2.0; you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.camunda.connector.api.inbound;

import java.util.Objects;

/**
* Contains general information about the inbound correlation results.
*
* <p>This information is specific to the process correlation point type, e.g. subscription key &
* message name in case of an IntermediateEvent target, or process definition key in case of a
* StartEvent target.
*/
public abstract class InboundConnectorResult {

protected String type;
protected String id;
protected Object responseData;

protected InboundConnectorResult(String type, String id, Object responseData) {
this.type = type;
this.id = id;
this.responseData = responseData;
}

/** Type of process correlation point, e.g. StartEvent or Message */
public String getType() {
return type;
}

/** ID of a process correlation point (unique within its type, see {@link #getType()} */
public String getId() {
return id;
}

/** Additional information related to Inbound Connector correlation result */
public Object getResponseData() {
return this.responseData;
}

@Override
public String toString() {
return "InboundConnectorResult{"
+ "type='"
+ type
+ '\''
+ ", id='"
+ id
+ '\''
+ ", responseData="
+ responseData
+ '}';
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
InboundConnectorResult result = (InboundConnectorResult) o;
return Objects.equals(type, result.type)
&& Objects.equals(id, result.id)
&& Objects.equals(responseData, result.responseData);
}

@Override
public int hashCode() {
return Objects.hash(type, id, responseData);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.camunda.connector.runtime.util.inbound;
package io.camunda.connector.api.inbound;

import static org.junit.jupiter.api.Assertions.assertThrowsExactly;

import org.junit.jupiter.api.Test;

class InboundJobHandlerContextTest {

private InboundJobHandlerContext testObject;

@Test
void shouldFailConstructor_WhenNullSecretProvider() {
assertThrowsExactly(RuntimeException.class, () -> new InboundJobHandlerContext(null));
}
}
/**
* Base class for a unique set of properties of a single inbound Connector usage in the business
* process definition.
*
* <p>Comparable interface defines the priorities among inbound connector execution (suitable
* inbound candidates are executed in the natural order).
*/
public abstract class ProcessCorrelationPoint implements Comparable<ProcessCorrelationPoint> {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH
* under one or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information regarding copyright
* ownership. Camunda licenses this file to you under the Apache License,
* Version 2.0; you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.camunda.connector.impl.inbound;

import io.camunda.connector.api.inbound.ProcessCorrelationPoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Properties of a message published by an Inbound Connector */
public class MessageCorrelationPoint extends ProcessCorrelationPoint {

public static final String TYPE_NAME = "MESSAGE";

private static final Logger LOG = LoggerFactory.getLogger(MessageCorrelationPoint.class);
private final String messageName;

/** FEEL expression */
private final String correlationKeyExpression;

public MessageCorrelationPoint(String messageName, String correlationKeyExpression) {
this.messageName = messageName;
this.correlationKeyExpression = correlationKeyExpression;
LOG.debug("Created inbound correlation point: " + this);
}

public String getMessageName() {
return messageName;
}

public String getCorrelationKeyExpression() {
return correlationKeyExpression;
}

@Override
public String toString() {
return "MessageCorrelationPoint{"
+ "messageName='"
+ messageName
+ '\''
+ ", correlationKeyMapping='"
+ correlationKeyExpression
+ '}';
}

@Override
public int compareTo(ProcessCorrelationPoint o) {
if (!this.getClass().equals(o.getClass())) {
return 1;
}
MessageCorrelationPoint other = (MessageCorrelationPoint) o;
if (!messageName.equals(other.messageName)) {
return messageName.compareTo(other.messageName);
}
return correlationKeyExpression.compareTo(other.correlationKeyExpression);
}
}
Loading

0 comments on commit 937c4be

Please sign in to comment.