Skip to content

Commit

Permalink
NIFI-12277: Added SSLContextService to Slack Processors
Browse files Browse the repository at this point in the history
  • Loading branch information
mark-bathori committed Oct 31, 2023
1 parent ead1c23 commit ebb67c4
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 9 deletions.
6 changes: 6 additions & 0 deletions nifi-nar-bundles/nifi-slack-bundle/nifi-slack-nar/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,11 @@
<artifactId>nifi-slack-processors</artifactId>
<version>1.24.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<version>1.24.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.conn.socket.PlainConnectionSocketFactory;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.entity.mime.MultipartEntityBuilder;
Expand Down Expand Up @@ -48,13 +53,15 @@
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;

import javax.json.Json;
import javax.json.JsonArrayBuilder;
import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;
import javax.json.JsonString;
import javax.json.stream.JsonParsingException;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.io.StringReader;
import java.io.UnsupportedEncodingException;
Expand Down Expand Up @@ -184,6 +191,12 @@ public class PostSlack extends AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();

public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
.description("Specifies an optional SSL Context Service that, if provided, will be used to create connections")
.identifiesControllerService(SSLContextService.class)
.build();

public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles are routed to success after being successfully sent to Slack")
Expand All @@ -195,7 +208,7 @@ public class PostSlack extends AbstractProcessor {
.build();

public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(POST_MESSAGE_URL, FILE_UPLOAD_URL, ACCESS_TOKEN, CHANNEL, TEXT, UPLOAD_FLOWFILE, FILE_TITLE, FILE_NAME, FILE_MIME_TYPE));
Arrays.asList(POST_MESSAGE_URL, FILE_UPLOAD_URL, ACCESS_TOKEN, CHANNEL, TEXT, UPLOAD_FLOWFILE, FILE_TITLE, FILE_NAME, FILE_MIME_TYPE, SSL_CONTEXT_SERVICE));

public static final Set<Relationship> relationships = Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
Expand Down Expand Up @@ -232,18 +245,27 @@ public Set<Relationship> getRelationships() {
}

@OnScheduled
public void initDynamicProperties(ProcessContext context) {
public void onScheduled(ProcessContext context) {
attachmentProperties.clear();
attachmentProperties.addAll(
context.getProperties().keySet()
.stream()
.filter(PropertyDescriptor::isDynamic)
.collect(Collectors.toList()));
}

@OnScheduled
public void initHttpResources() {
connManager = new PoolingHttpClientConnectionManager();
final SSLContextService sslService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);

if (sslService != null) {
final SSLContext sslContext = sslService.createContext();
final Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder.<ConnectionSocketFactory>create()
.register("http", PlainConnectionSocketFactory.getSocketFactory())
.register("https", new SSLConnectionSocketFactory(sslContext))
.build();

connManager = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
} else {
connManager = new PoolingHttpClientConnectionManager();
}

client = HttpClientBuilder.create()
.setConnectionManager(connManager)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;

import javax.json.Json;
import javax.json.JsonArrayBuilder;
Expand All @@ -41,6 +42,8 @@
import javax.json.JsonReader;
import javax.json.JsonWriter;
import javax.json.stream.JsonParsingException;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext;

import java.io.DataOutputStream;
import java.io.IOException;
Expand Down Expand Up @@ -131,6 +134,12 @@ public class PutSlack extends AbstractProcessor {
.addValidator(new EmojiValidator())
.build();

public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
.description("Specifies an optional SSL Context Service that, if provided, will be used to create connections")
.identifiesControllerService(SSLContextService.class)
.build();

public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles are routed to success after being successfully sent to Slack")
Expand All @@ -144,7 +153,7 @@ public class PutSlack extends AbstractProcessor {
private final SortedSet<PropertyDescriptor> attachments = Collections.synchronizedSortedSet(new TreeSet<PropertyDescriptor>());

public static final List<PropertyDescriptor> descriptors = Collections.unmodifiableList(
Arrays.asList(WEBHOOK_URL, WEBHOOK_TEXT, CHANNEL, USERNAME, ICON_URL, ICON_EMOJI));
Arrays.asList(WEBHOOK_URL, WEBHOOK_TEXT, CHANNEL, USERNAME, ICON_URL, ICON_EMOJI, SSL_CONTEXT_SERVICE));

public static final Set<Relationship> relationships = Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
Expand Down Expand Up @@ -237,6 +246,8 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
builder.add("icon_emoji", iconEmoji);
}

final SSLContextService sslService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);

try {
// Get Attachments Array
if (!attachments.isEmpty()) {
Expand All @@ -260,6 +271,12 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("POST");
conn.setDoOutput(true);

if (sslService != null) {
final SSLContext sslContext = sslService.createContext();
((HttpsURLConnection) conn).setSSLSocketFactory(sslContext.getSocketFactory());
}

DataOutputStream outputStream = new DataOutputStream(conn.getOutputStream());
String payload = "payload=" + URLEncoder.encode(stringWriter.getBuffer().toString(), "UTF-8");
outputStream.writeBytes(payload);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,14 @@ public void testInvalidDynamicPropertiesWithExpressionLanguage() {
public void testGetPropertyDescriptors() {
PutSlack processor = new PutSlack();
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
assertEquals(6, pd.size(), "size should be eq");
assertEquals(7, pd.size(), "size should be eq");
assertTrue(pd.contains(PutSlack.WEBHOOK_TEXT));
assertTrue(pd.contains(PutSlack.WEBHOOK_URL));
assertTrue(pd.contains(PutSlack.CHANNEL));
assertTrue(pd.contains(PutSlack.USERNAME));
assertTrue(pd.contains(PutSlack.ICON_URL));
assertTrue(pd.contains(PutSlack.ICON_EMOJI));
assertTrue(pd.contains(PutSlack.SSL_CONTEXT_SERVICE));
}

@Test
Expand Down

0 comments on commit ebb67c4

Please sign in to comment.