Skip to content

Commit

Permalink
Execute remote actions on another extension (#588)
Browse files Browse the repository at this point in the history
* Add ProxyAction with TransportAction and handlers

Signed-off-by: Daniel Widdis <widdis@gmail.com>

* Give SDKActionModule a copy of ExtensionsRunner to use with transport

Signed-off-by: Daniel Widdis <widdis@gmail.com>

* Add new ProxyActionRequest

Signed-off-by: Daniel Widdis <widdis@gmail.com>

* Add SDKTransportService wrapper accessible to actions

Signed-off-by: Daniel Widdis <widdis@gmail.com>

* Implement ProxyTransportAction

Signed-off-by: Daniel Widdis <widdis@gmail.com>

* Add test case to HelloWorldExtension

Signed-off-by: Daniel Widdis <widdis@gmail.com>

* Better naming of ExtensionActionResponse and correct action name

Signed-off-by: Daniel Widdis <widdis@gmail.com>

* Refactoring with TransportService and latest OpenSearch PR updates

Signed-off-by: Daniel Widdis <widdis@gmail.com>

* Add ExtensionsActionRequestHandler

Signed-off-by: Daniel Widdis <widdis@gmail.com>

* Instantiate Proxy Action Request

Signed-off-by: Daniel Widdis <widdis@gmail.com>

* Working test case!

Signed-off-by: Daniel Widdis <widdis@gmail.com>

* Properly parse returned byte array into a response

Signed-off-by: Daniel Widdis <widdis@gmail.com>

* Add sequence diagram to DESIGN.md

Signed-off-by: Daniel Widdis <widdis@gmail.com>

* Typoo fix

Signed-off-by: Daniel Widdis <widdis@gmail.com>

* Update with latest changes on companion PR

Signed-off-by: Daniel Widdis <widdis@gmail.com>

* Rename ProxyFoo to RemoteExtensionFoo

Signed-off-by: Daniel Widdis <widdis@gmail.com>

* Better handling of response bytes

Signed-off-by: Daniel Widdis <widdis@gmail.com>

* Handle plugin remote action requests

Signed-off-by: Daniel Widdis <widdis@gmail.com>

* Address code review comments

Signed-off-by: Daniel Widdis <widdis@gmail.com>

* Update sequence diagram

Signed-off-by: Daniel Widdis <widdis@gmail.com>

---------

Signed-off-by: Daniel Widdis <widdis@gmail.com>
(cherry picked from commit ebc684a)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] committed Mar 24, 2023
1 parent 4ce9cc9 commit 72841ee
Show file tree
Hide file tree
Showing 22 changed files with 1,017 additions and 132 deletions.
6 changes: 6 additions & 0 deletions DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ The `ExtensionsManager` reads a list of extensions present in `extensions.yml`.

(27) The User receives the response.

#### Remote Action Execution on another Extension

Extensions may invoke actions on other extensions using the `ProxyAction` and `ProxyActionRequest`. The code sequence is shown below.

![](Docs/RemoteActionExecution.svg)

#### Extension Point Implementation Walk Through

An example of a more complex extension point, `getNamedXContent()` is shown below. A similar pattern can be followed for most extension points.
Expand Down
1 change: 1 addition & 0 deletions Docs/RemoteActionExecution.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
8 changes: 5 additions & 3 deletions src/main/java/org/opensearch/sdk/BaseExtension.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,13 @@

import java.io.IOException;

import com.google.inject.Inject;

/**
* An abstract class that simplifies extension initialization and provides an instance of the runner.
*/
public abstract class BaseExtension implements Extension {
/**
* The {@link ExtensionsRunner} instance running this extension
*/
@Inject
private ExtensionsRunner extensionsRunner;

/**
Expand Down Expand Up @@ -56,6 +53,11 @@ public ExtensionSettings getExtensionSettings() {
return this.settings;
}

@Override
public void setExtensionsRunner(ExtensionsRunner runner) {
this.extensionsRunner = runner;
}

/**
* Gets the {@link ExtensionsRunner} of this extension.
*
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/org/opensearch/sdk/Extension.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@
*/
public interface Extension {

/**
* Set the instance of {@link ExtensionsRunner} for this extension.
*
* @param runner The ExtensionsRunner instance.
*/
public void setExtensionsRunner(ExtensionsRunner runner);

/**
* Gets the {@link ExtensionSettings} of this extension.
*
Expand Down
45 changes: 41 additions & 4 deletions src/main/java/org/opensearch/sdk/ExtensionsRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.extensions.DiscoveryExtensionNode;
import org.opensearch.extensions.AddSettingsUpdateConsumerRequest;
import org.opensearch.extensions.UpdateSettingsRequest;
import org.opensearch.extensions.action.ExtensionActionRequest;
import org.opensearch.extensions.ExtensionsManager.RequestType;
import org.opensearch.extensions.ExtensionRequest;
import org.opensearch.extensions.ExtensionsManager;
Expand All @@ -33,6 +34,7 @@
import org.opensearch.sdk.handlers.ClusterSettingsResponseHandler;
import org.opensearch.sdk.handlers.ClusterStateResponseHandler;
import org.opensearch.sdk.handlers.EnvironmentSettingsResponseHandler;
import org.opensearch.sdk.handlers.ExtensionActionRequestHandler;
import org.opensearch.sdk.action.SDKActionModule;
import org.opensearch.sdk.handlers.AcknowledgedResponseHandler;
import org.opensearch.sdk.handlers.ExtensionDependencyResponseHandler;
Expand Down Expand Up @@ -132,13 +134,15 @@ public class ExtensionsRunner {
private final SDKNamedXContentRegistry sdkNamedXContentRegistry;
private final SDKClient sdkClient;
private final SDKClusterService sdkClusterService;
private final SDKTransportService sdkTransportService;
private final SDKActionModule sdkActionModule;

private ExtensionsInitRequestHandler extensionsInitRequestHandler = new ExtensionsInitRequestHandler(this);
private ExtensionsIndicesModuleRequestHandler extensionsIndicesModuleRequestHandler = new ExtensionsIndicesModuleRequestHandler();
private ExtensionsIndicesModuleNameRequestHandler extensionsIndicesModuleNameRequestHandler =
private final ExtensionsInitRequestHandler extensionsInitRequestHandler = new ExtensionsInitRequestHandler(this);
private final ExtensionsIndicesModuleRequestHandler extensionsIndicesModuleRequestHandler = new ExtensionsIndicesModuleRequestHandler();
private final ExtensionsIndicesModuleNameRequestHandler extensionsIndicesModuleNameRequestHandler =
new ExtensionsIndicesModuleNameRequestHandler();
private ExtensionsRestRequestHandler extensionsRestRequestHandler = new ExtensionsRestRequestHandler(extensionRestPathRegistry);
private final ExtensionsRestRequestHandler extensionsRestRequestHandler = new ExtensionsRestRequestHandler(extensionRestPathRegistry);
private final ExtensionActionRequestHandler extensionsActionRequestHandler;

/**
* Instantiates a new update settings request handler
Expand All @@ -152,7 +156,10 @@ public class ExtensionsRunner {
* @throws IOException if the runner failed to read settings or API.
*/
protected ExtensionsRunner(Extension extension) throws IOException {
// Link these classes together
this.extension = extension;
extension.setExtensionsRunner(this);

// Initialize concrete classes needed by extensions
// These must have getters from this class to be accessible via createComponents
// If they require later initialization, create a concrete wrapper class and update the internals
Expand All @@ -175,6 +182,8 @@ protected ExtensionsRunner(Extension extension) throws IOException {
this.sdkClient = new SDKClient(extensionSettings);
// initialize SDKClusterService. Must happen after extension field assigned
this.sdkClusterService = new SDKClusterService(this);
// initialize SDKTransportService. Must happen after extension field assigned
this.sdkTransportService = new SDKTransportService();

// Create Guice modules for injection
List<com.google.inject.Module> modules = new ArrayList<>();
Expand All @@ -189,6 +198,7 @@ protected ExtensionsRunner(Extension extension) throws IOException {

b.bind(SDKClient.class).toInstance(getSdkClient());
b.bind(SDKClusterService.class).toInstance(getSdkClusterService());
b.bind(SDKTransportService.class).toInstance(getSdkTransportService());
});
// Bind the return values from create components
modules.add(this::injectComponents);
Expand All @@ -202,6 +212,8 @@ protected ExtensionsRunner(Extension extension) throws IOException {
// initialize SDKClient action map
initializeSdkClient();

extensionsActionRequestHandler = new ExtensionActionRequestHandler(getSdkClient());

if (extension instanceof ActionExtension) {
// store REST handlers in the registry
for (ExtensionRestHandler extensionRestHandler : ((ActionExtension) extension).getExtensionRestHandlers()) {
Expand Down Expand Up @@ -391,6 +403,25 @@ public void startTransportService(TransportService transportService) {
((request, channel, task) -> channel.sendResponse(updateSettingsRequestHandler.handleUpdateSettingsRequest(request)))
);

// This handles a remote extension request from OpenSearch or a plugin, sending an ExtensionActionResponse
transportService.registerRequestHandler(
ExtensionsManager.REQUEST_EXTENSION_HANDLE_TRANSPORT_ACTION,
ThreadPool.Names.GENERIC,
false,
false,
ExtensionActionRequest::new,
((request, channel, task) -> channel.sendResponse(extensionsActionRequestHandler.handleExtensionActionRequest(request)))
);

// This handles a remote extension request from another extension, sending a RemoteExtensionActionResponse
transportService.registerRequestHandler(
ExtensionsManager.REQUEST_EXTENSION_HANDLE_REMOTE_TRANSPORT_ACTION,
ThreadPool.Names.GENERIC,
false,
false,
ExtensionActionRequest::new,
((request, channel, task) -> channel.sendResponse(extensionsActionRequestHandler.handleRemoteExtensionActionRequest(request)))
);
}

/**
Expand Down Expand Up @@ -638,6 +669,10 @@ public TransportService getExtensionTransportService() {
return extensionTransportService;
}

public SDKTransportService getSdkTransportService() {
return sdkTransportService;
}

/**
* Starts an ActionListener.
*
Expand All @@ -660,6 +695,8 @@ public static void run(Extension extension) throws IOException {
// initialize the transport service
NettyTransport nettyTransport = new NettyTransport(runner);
runner.extensionTransportService = nettyTransport.initializeExtensionTransportService(runner.getSettings(), runner.getThreadPool());
// TODO: merge above line with below line when refactoring out extensionTransportService
runner.getSdkTransportService().setTransportService(runner.extensionTransportService);
runner.startActionListener(0);
}

Expand Down
16 changes: 16 additions & 0 deletions src/main/java/org/opensearch/sdk/SDKClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.stream.Collectors;

import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.DeserializationFeature;
Expand Down Expand Up @@ -91,6 +92,9 @@ public SDKClient(ExtensionSettings extensionSettings) {
// Used by client.execute, populated by initialize method
@SuppressWarnings("rawtypes")
private Map<ActionType, TransportAction> actions = Collections.emptyMap();
// Used by remote client execution where we get a string for the class name
@SuppressWarnings("rawtypes")
private Map<String, ActionType> actionClassToInstanceMap = Collections.emptyMap();

/**
* Initialize this client.
Expand All @@ -100,6 +104,7 @@ public SDKClient(ExtensionSettings extensionSettings) {
@SuppressWarnings("rawtypes")
public void initialize(Map<ActionType, TransportAction> actions) {
this.actions = actions;
this.actionClassToInstanceMap = actions.keySet().stream().collect(Collectors.toMap(a -> a.getClass().getName(), a -> a));
}

/**
Expand Down Expand Up @@ -259,6 +264,17 @@ public void close() throws IOException {
doCloseHighLevelClient();
}

/**
* Gets an instance of {@link ActionType} from its corresponding class name, suitable for using as the first parameter in {@link #execute(ActionType, ActionRequest, ActionListener)}.
*
* @param className The class name of the action type
* @return The instance corresponding to the class name
*/
@SuppressWarnings("unchecked")
public ActionType<? extends ActionResponse> getActionFromClassName(String className) {
return actionClassToInstanceMap.get(className);
}

/**
* Executes a generic action, denoted by an {@link ActionType}.
*
Expand Down
132 changes: 132 additions & 0 deletions src/main/java/org/opensearch/sdk/SDKTransportService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.sdk;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.extensions.ExtensionsManager;
import org.opensearch.extensions.action.RegisterTransportActionsRequest;
import org.opensearch.extensions.action.RemoteExtensionActionResponse;
import org.opensearch.extensions.action.TransportActionRequestFromExtension;
import org.opensearch.sdk.ActionExtension.ActionHandler;
import org.opensearch.sdk.action.RemoteExtensionActionRequest;
import org.opensearch.sdk.action.SDKActionModule;
import org.opensearch.sdk.handlers.AcknowledgedResponseHandler;
import org.opensearch.sdk.handlers.ExtensionActionResponseHandler;
import org.opensearch.transport.TransportService;

/**
* Wrapper class for {@link TransportService} and associated methods.
*
* TODO: Move all the sendFooRequest() methods here
* TODO: Replace usages of getExtensionTransportService with this class
* https://github.com/opensearch-project/opensearch-sdk-java/issues/585
*/
public class SDKTransportService {
private final Logger logger = LogManager.getLogger(SDKTransportService.class);

private TransportService transportService;
private DiscoveryNode opensearchNode;
private String uniqueId;

/**
* Requests that OpenSearch register the Transport Actions for this extension.
*
* @param actions The map of registered actions from {@link SDKActionModule#getActions()}
*/
public void sendRegisterTransportActionsRequest(Map<String, ActionHandler<?, ?>> actions) {
logger.info("Sending Register Transport Actions request to OpenSearch");
Set<String> actionNameSet = actions.values()
.stream()
.filter(h -> !h.getAction().name().startsWith("internal"))
.map(h -> h.getAction().getClass().getName())
.collect(Collectors.toSet());
AcknowledgedResponseHandler registerTransportActionsResponseHandler = new AcknowledgedResponseHandler();
try {
transportService.sendRequest(
opensearchNode,
ExtensionsManager.REQUEST_EXTENSION_REGISTER_TRANSPORT_ACTIONS,
new RegisterTransportActionsRequest(uniqueId, actionNameSet),
registerTransportActionsResponseHandler
);
} catch (Exception e) {
logger.error("Failed to send Register Transport Actions request to OpenSearch", e);
}
}

/**
* Requests that OpenSearch execute a Transport Actions on another extension.
*
* @param request The request to send
* @return A buffer serializing the response from the remote action if successful, otherwise null
*/
public RemoteExtensionActionResponse sendRemoteExtensionActionRequest(RemoteExtensionActionRequest request) {
logger.info("Sending Remote Extension Action request to OpenSearch for [" + request.getAction() + "]");
// Combine class name string and request bytes
byte[] requestClassBytes = request.getRequestClass().getBytes(StandardCharsets.UTF_8);
byte[] proxyRequestBytes = ByteBuffer.allocate(requestClassBytes.length + 1 + request.getRequestBytes().length)
.put(requestClassBytes)
.put(RemoteExtensionActionRequest.UNIT_SEPARATOR)
.put(request.getRequestBytes())
.array();
ExtensionActionResponseHandler extensionActionResponseHandler = new ExtensionActionResponseHandler();
try {
transportService.sendRequest(
opensearchNode,
ExtensionsManager.TRANSPORT_ACTION_REQUEST_FROM_EXTENSION,
new TransportActionRequestFromExtension(request.getAction(), proxyRequestBytes, uniqueId),
extensionActionResponseHandler
);
// Wait on response
extensionActionResponseHandler.awaitResponse();
} catch (TimeoutException e) {
logger.error("Failed to receive Remote Extension Action response from OpenSearch", e);
} catch (Exception e) {
logger.error("Failed to send Remote Extension Action request to OpenSearch", e);
}
// At this point, response handler has read in the response bytes
return new RemoteExtensionActionResponse(
extensionActionResponseHandler.isSuccess(),
extensionActionResponseHandler.getResponseBytes()
);
}

public TransportService getTransportService() {
return transportService;
}

public DiscoveryNode getOpensearchNode() {
return opensearchNode;
}

public String getUniqueId() {
return uniqueId;
}

public void setTransportService(TransportService transportService) {
this.transportService = transportService;
}

public void setOpensearchNode(DiscoveryNode opensearchNode) {
this.opensearchNode = opensearchNode;
}

public void setUniqueId(String uniqueId) {
this.uniqueId = uniqueId;
}
}
32 changes: 32 additions & 0 deletions src/main/java/org/opensearch/sdk/action/RemoteExtensionAction.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.sdk.action;

import org.opensearch.action.ActionType;
import org.opensearch.extensions.action.RemoteExtensionActionResponse;

/**
* The {@link ActionType} used as they key for the {@link RemoteExtensionTransportAction}.
*/
public class RemoteExtensionAction extends ActionType<RemoteExtensionActionResponse> {

/**
* The name to look up this action with
*/
public static final String NAME = "internal:remote-extension-action";
/**
* The singleton instance of this class
*/
public static final RemoteExtensionAction INSTANCE = new RemoteExtensionAction();

private RemoteExtensionAction() {
super(NAME, RemoteExtensionActionResponse::new);
}
}
Loading

0 comments on commit 72841ee

Please sign in to comment.