Skip to content

Commit

Permalink
Add AsyncConnection and AsyncConnectionFactory
Browse files Browse the repository at this point in the history
  • Loading branch information
renner committed Nov 17, 2017
1 parent 954dbc3 commit c626990
Show file tree
Hide file tree
Showing 6 changed files with 378 additions and 56 deletions.
25 changes: 25 additions & 0 deletions src/main/java/com/suse/salt/netapi/client/AsyncConnection.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.suse.salt.netapi.client;

import java.util.concurrent.CompletionStage;

/**
* Interface for different HTTP async connection implementations.
* @param <T> type of result retrieved using this HTTP connection
*/
public interface AsyncConnection<T> {

/**
* Send a GET request and parse the result into object of given type.
*
* @return CompletionStage holding object of the given return type T
*/
CompletionStage<T> get();

/**
* Send a POST request and parse the result into object of given type.
*
* @param data the data to send (in JSON format)
* @return CompletionStage holding object of the given return type T
*/
CompletionStage<T> post(String data);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.suse.salt.netapi.client;

import com.suse.salt.netapi.parser.JsonParser;

/**
* Interface for creating instances of an HTTP async connection implementation.
*/
public interface AsyncConnectionFactory extends AutoCloseable {

/**
* Create a new {@link AsyncConnection} for a given endpoint and configuration.
*
* @param <T> type of the result as returned by the parser
* @param endpoint the API endpoint
* @param parser the parser used for parsing the result
* @return object representing a connection to the API
*/
<T> AsyncConnection<T> create(String endpoint, JsonParser<T> parser);
}
76 changes: 29 additions & 47 deletions src/main/java/com/suse/salt/netapi/client/SaltClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.suse.salt.netapi.calls.Client;
import com.suse.salt.netapi.calls.SaltSSHConfig;
import com.suse.salt.netapi.calls.SaltSSHUtils;
import com.suse.salt.netapi.client.impl.HttpAsyncClientConnectionFactory;
import com.suse.salt.netapi.client.impl.HttpClientConnectionFactory;
import com.suse.salt.netapi.config.ClientConfig;
import com.suse.salt.netapi.config.ProxySettings;
Expand All @@ -23,21 +24,13 @@
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;

import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;

import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -46,14 +39,17 @@
/**
* Salt API client.
*/
public class SaltClient {
public class SaltClient implements AutoCloseable {

/** The configuration object */
private final ClientConfig config = new ClientConfig();

/** The connection factory object */
private final ConnectionFactory connectionFactory;

/** The async connection factory object */
private final AsyncConnectionFactory asyncConnectionFactory;

/** The executor for async operations */
private final ExecutorService executor;

Expand Down Expand Up @@ -101,6 +97,9 @@ public SaltClient(URI url, ConnectionFactory connectionFactory,
config.put(ClientConfig.URL, url);
this.connectionFactory = connectionFactory;
this.executor = executor;

// TODO: Replace connectionFactory with this
this.asyncConnectionFactory = new HttpAsyncClientConnectionFactory(config);
}

/**
Expand Down Expand Up @@ -135,53 +134,31 @@ public void setProxy(ProxySettings settings) {
* <p>
* {@code POST /login}
*
* @param httpclient the HTTP client
* @param username username
* @param password password
* @param eauth authentication module
* @return completion stage
* @param username the username
* @param password the password
* @param eauth the eauth type
* @return CompletionStage holding the authentication token
*/
public CompletionStage<Token> loginNonBlocking(CloseableHttpAsyncClient httpclient,
final String username, final String password, final AuthModule eauth) {
public CompletionStage<Token> loginNonBlocking(final String username,
final String password, final AuthModule eauth) {
Map<String, String> props = new LinkedHashMap<>();
props.put("username", username);
props.put("password", password);
props.put("eauth", eauth.getValue());
String payload = gson.toJson(props);

CompletableFuture<Token> future = new CompletableFuture<>();

final HttpPost request = new HttpPost(config.get(ClientConfig.URL) + "/login");
request.setEntity(new StringEntity(payload, ContentType.APPLICATION_JSON));
httpclient.execute(request, new FutureCallback<HttpResponse>() {

@Override
public void failed(Exception e) {
future.completeExceptionally(e);
}

@Override
public void completed(HttpResponse response) {
try {
Return<List<Token>> result = JsonParser.TOKEN
.parse(response.getEntity().getContent());
String payload = gson.toJson(props);

// They return a list of tokens here, take the first
Token token = result.getResult().get(0);
CompletionStage<Token> result = asyncConnectionFactory
.create("/login", JsonParser.TOKEN)
.post(payload)
.thenApply(r -> {
// They return a list of tokens here, take the first one
Token token = r.getResult().get(0);
config.put(ClientConfig.TOKEN, token.getToken());
future.complete(token);
} catch (Exception e) {
future.completeExceptionally(e);
}
}
return token;
});

@Override
public void cancelled() {
future.cancel(false);
}
});

return future;
return result;
}

/**
Expand Down Expand Up @@ -448,4 +425,9 @@ public <R> R call(Call<?> call, Client client, String endpoint, TypeToken<R> typ
throws SaltException {
return call(call, client, endpoint, Optional.empty(), type);
}

@Override
public void close() throws Exception {
asyncConnectionFactory.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package com.suse.salt.netapi.client.impl;

import com.suse.salt.netapi.client.AsyncConnection;
import com.suse.salt.netapi.config.ClientConfig;
import com.suse.salt.netapi.exception.SaltException;
import com.suse.salt.netapi.exception.SaltUserUnauthorizedException;
import com.suse.salt.netapi.parser.JsonParser;

import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.nio.client.HttpAsyncClient;

import java.net.URI;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

/**
* Representation of a connection to Salt for issuing API requests using Apache's
* HttpAsyncClient.
*
* @param <T> type of result retrieved using this HTTP connection
*/
public class HttpAsyncClientConnection<T> implements AsyncConnection<T> {

/** HTTP client instance */
private final HttpAsyncClient httpClient;

/** Endpoint */
private final String endpoint;

/** Configuration */
private final ClientConfig config;

/** Parser to parse the returned result */
private final JsonParser<T> parser;

/**
* Init a connection to a given Salt API endpoint.
*
* @param httpClientIn the HTTP client
* @param endpointIn the endpoint
* @param parserIn the parser
* @param configIn the config
*/
public HttpAsyncClientConnection(HttpAsyncClient httpClientIn, String endpointIn,
JsonParser<T> parserIn, ClientConfig configIn) {
httpClient = httpClientIn;
endpoint = endpointIn;
config = configIn;
parser = parserIn;
}

/**
* {@inheritDoc}
*/
@Override
public CompletionStage<T> post(String data) {
return request(data);
}

/**
* {@inheritDoc}
*/
@Override
public CompletionStage<T> get() {
return request(null);
}

/**
* Perform HTTP request and parse the result into a given result type.
*
* @param data the data to send with the request
* @return CompletionStage holding object of type T
*/
private CompletionStage<T> request(String data) {
return executeRequest(httpClient, prepareRequest(data));
}

/**
* Prepares the HTTP request object creating a POST or GET request depending on if data
* is supplied or not.
*
* @param jsonData json POST data, will use GET if null
* @return HttpUriRequest object the prepared request
*/
private HttpUriRequest prepareRequest(String jsonData) {
URI uri = config.get(ClientConfig.URL).resolve(endpoint);
HttpUriRequest httpRequest;
if (jsonData != null) {
// POST data
HttpPost httpPost = new HttpPost(uri);
httpPost.setEntity(new StringEntity(jsonData, ContentType.APPLICATION_JSON));
httpRequest = httpPost;
} else {
// GET request
httpRequest = new HttpGet(uri);
}
httpRequest.addHeader(HttpHeaders.ACCEPT, "application/json");

// Token authentication
String token = config.get(ClientConfig.TOKEN);
if (token != null) {
httpRequest.addHeader("X-Auth-Token", token);
}

return httpRequest;
}

/**
* Executes a prepared HTTP request using the given client.
*
* @param httpClient the client to use for the request
* @param httpRequest the prepared request to perform
* @return CompletionStage holding object of type T
*/
private CompletionStage<T> executeRequest(HttpAsyncClient httpClient,
HttpUriRequest httpRequest) {
CompletableFuture<T> future = new CompletableFuture<>();
httpClient.execute(httpRequest, new FutureCallback<HttpResponse>() {
@Override
public void failed(Exception e) {
future.completeExceptionally(e);
}

@Override
public void completed(HttpResponse response) {
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode == HttpStatus.SC_OK ||
statusCode == HttpStatus.SC_ACCEPTED) {

// Parse result type from the returned JSON
try {
T result = parser.parse(response.getEntity().getContent());
future.complete(result);
} catch (Exception e) {
future.completeExceptionally(e);
}
} else {
future.completeExceptionally(createSaltException(statusCode));
}
}

@Override
public void cancelled() {
future.cancel(false);
}
});

return future;
}

/**
* Create the appropriate exception for the given HTTP status code.
*
* @param statusCode HTTP status code
* @return {@link SaltException} instance
*/
private SaltException createSaltException(int statusCode) {
if (statusCode == HttpStatus.SC_UNAUTHORIZED) {
return new SaltUserUnauthorizedException(
"Salt user does not have sufficient permissions");
}
return new SaltException("Response code: " + statusCode);
}
}
Loading

0 comments on commit c626990

Please sign in to comment.