Skip to content

Commit

Permalink
Cleanup (#174)
Browse files Browse the repository at this point in the history
  • Loading branch information
hyperxpro committed Jul 14, 2024
1 parent 8925adc commit a47bcc8
Show file tree
Hide file tree
Showing 119 changed files with 459 additions and 315 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;

/**
* <p> Base class for Connection. Protocol implementations must extend this class. </p>
* <p> Base class for Connection. A connection is a Downstream {@link Channel}. Protocol implementations must extend this class. </p>
*
* <p> {@link #init(ChannelFuture)} must be called once {@link ChannelFuture} is ready
* for a new connection. </p>
Expand All @@ -39,7 +39,7 @@ public abstract class Connection {
*/
public enum State {
/**
* Connection has been initialized.
* Connection has been initialized but not connected (or ready) yet.
*/
INITIALIZED,

Expand All @@ -49,20 +49,22 @@ public enum State {
CONNECTION_TIMEOUT,

/**
* Connection has been closed with remote host.
* Connection has been closed with downstream channel.
*/
CONNECTION_CLOSED,

/**
* Connection has been connected successfully and is active.
* Connection has been connected successfully, active and ready to accept traffic.
*/
CONNECTED_AND_ACTIVE
}

/**
* Backlog Queue contains objects pending to be written once connection establishes.
* <p></p>
* This queue will be {@code null} once backlog is processed (either via {@link #writeBacklog()} or {@link #clearBacklog()})
*/
protected final ConcurrentLinkedQueue<Object> backlogQueue = new ConcurrentLinkedQueue<>();
protected ConcurrentLinkedQueue<Object> backlogQueue;

private final Node node;
protected ChannelFuture channelFuture;
Expand All @@ -76,12 +78,15 @@ public enum State {
* @param node {@link Node} associated with this Connection
*/
@NonNull
public Connection(Node node) {
protected Connection(Node node) {
this.node = node;
backlogQueue(new ConcurrentLinkedQueue<>());
}

/**
* Initialize this Connection
* Initialize this Connection with {@link ChannelFuture}
*
* @param channelFuture {@link ChannelFuture} associated with this Connection (of Upstream or Downstream)
*/
@NonNull
public void init(ChannelFuture channelFuture) {
Expand Down Expand Up @@ -120,19 +125,19 @@ public void init(ChannelFuture channelFuture) {
protected abstract void processBacklog(ChannelFuture channelFuture);

/**
* Write and Process the Backlog
* Write Backlog to the {@link Channel}
*/
protected void writeBacklog() {
backlogQueue.forEach(this::writeAndFlush);
backlogQueue.clear(); // Clear the new queue because we're done with it.
backlogQueue = null;
}

/**
* Clear the Backlog and release all objects.
*/
protected void clearBacklog() {
backlogQueue.forEach(ReferenceCountedUtil::silentRelease);
backlogQueue.clear();
backlogQueue = null;
}

/**
Expand All @@ -158,29 +163,49 @@ public ChannelFuture channelFuture() {
return channelFuture;
}

/**
* Get {@link Node} associated with this connection.
*/
public Node node() {
return node;
}

/**
* {@link InetSocketAddress} of this {@link Connection}
*/
public InetSocketAddress socketAddress() {
return socketAddress;
}

/**
* Current {@link State} of this {@link Connection}
*/
public State state() {
return state;
}

/**
* Set the {@link State} of this {@link Connection}
*/
public void backlogQueue(ConcurrentLinkedQueue<Object> newQueue) {
backlogQueue = newQueue;
}

/**
* Close this {@link Connection}
*/
public synchronized void close() {
// If Backlog Queue contains something then clear it before closing connection.
if (!backlogQueue.isEmpty()) {
if (backlogQueue != null && !backlogQueue.isEmpty()) {
clearBacklog();
}

// Remove this connection from Node
node.removeConnection(this);
if (this.channel != null) {

// If Channel is not null then close it.
// Channel can be null if the connection is not initialized.
if (channel != null) {
channel.close();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import java.util.concurrent.atomic.AtomicLong;

/**
* <p> {@link Node} is the server where all requests are sent. </p>
* <p> {@link Node} is the server (downstream) where all requests are sent. </p>
*
* Use {@link NodeBuilder} to build {@link Node} Instance.
*/
Expand Down Expand Up @@ -298,13 +298,14 @@ public void markOnline() {
/**
* Add a {@link Connection} with this {@linkplain Node}
*/
public void addConnection(Connection connection) throws TooManyConnectionsException, IllegalStateException {
public void addConnection(Connection connection) throws TooManyConnectionsException {
// If Maximum Connection is not -1 and Number of Active connections is greater than
// Maximum number of connections then close the connection and throw an exception.
if (connectionFull()) {
connection.close();
throw new TooManyConnectionsException(this);
} else if (state != State.ONLINE) {
}
if (state != State.ONLINE) {
throw new IllegalStateException("Node is not online");
}
activeConnections.add(connection);
Expand Down Expand Up @@ -343,7 +344,7 @@ public String toString() {
", Address=" + socketAddress +
", BytesSent=" + bytesSent +
", BytesReceived=" + bytesReceived +
", Connections=" + activeConnection() + "/" + maxConnections() +
", Connections=" + activeConnection() + '/' + maxConnections() +
", state=" + state +
", health=" + health() +
'}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ public NodeBytesTracker(Node node) {

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof ByteBuf) {
int bytes = ((ByteBuf) msg).readableBytes();
if (msg instanceof ByteBuf byteBuf && byteBuf.isReadable()) {
int bytes = byteBuf.readableBytes();
node.incBytesSent(bytes);
} else if (msg instanceof ByteBufHolder) {
int bytes = ((ByteBufHolder) msg).content().readableBytes();
} else if (msg instanceof ByteBufHolder byteBufHolder && byteBufHolder.content().isReadable()) {
int bytes = byteBufHolder.content().readableBytes();
node.incBytesSent(bytes);
}

Expand All @@ -52,11 +52,11 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
int bytes = ((ByteBuf) msg).readableBytes();
if (msg instanceof ByteBuf byteBuf && byteBuf.isReadable()) {
int bytes = byteBuf.readableBytes();
node.incBytesReceived(bytes);
} else if (msg instanceof ByteBufHolder) {
int bytes = ((ByteBufHolder) msg).content().readableBytes();
} else if (msg instanceof ByteBufHolder byteBufHolder && byteBufHolder.content().isReadable()) {
int bytes = byteBufHolder.content().readableBytes();
node.incBytesReceived(bytes);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,23 @@
* Node State
*/
public enum State {

/**
* {@link Node} is online, active and accepting connections.
*/
ONLINE,

/**
* {@link Node} is offline, not active and not accepting any connections.
*/
OFFLINE,

/**
* {@link Node} is idle, only processing established active connections and not accepting
* any new connections.
*/
IDLE,

/**
* {@link Node} is marked as offline manually.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public class Cluster extends ClusterOnlineNodesWorker {

Cluster(LoadBalance<?, ?, ?, ?> loadBalance) {
loadBalance(loadBalance);
this.eventStream.subscribe(this);
eventStream.subscribe(this);
}

/**
Expand Down Expand Up @@ -168,7 +168,7 @@ public Node get(String id) {
* Get List of online {@link Node} associated with this {@linkplain Cluster}
*/
public List<Node> onlineNodes() {
return ONLINE_NODES;
return onlineNodes;
}

/**
Expand Down Expand Up @@ -261,7 +261,7 @@ public HealthCheckTemplate healthCheckTemplate() {

@NonNull
void configureHealthCheck(HealthCheckConfiguration healthCheckConfiguration, HealthCheckTemplate healthCheckTemplate) {
this.healthCheckService = new HealthCheckService(healthCheckConfiguration, eventStream);
healthCheckService = new HealthCheckService(healthCheckConfiguration, eventStream);
this.healthCheckTemplate = healthCheckTemplate;
}

Expand All @@ -284,9 +284,9 @@ private void configureHealthCheckForNode(Node node) throws Exception {
case HTTP, HTTPS -> {
String host;
if (healthCheckTemplate.protocol() == HealthCheckTemplate.Protocol.HTTP) {
host = "http://" + InetAddress.getByName(healthCheckTemplate.host()).getHostAddress() + ":" + healthCheckTemplate().port();
host = "http://" + InetAddress.getByName(healthCheckTemplate.host()).getHostAddress() + ':' + healthCheckTemplate().port();
} else {
host = "https://" + InetAddress.getByName(healthCheckTemplate.host()).getHostAddress() + ":" + healthCheckTemplate().port();
host = "https://" + InetAddress.getByName(healthCheckTemplate.host()).getHostAddress() + ':' + healthCheckTemplate().port();
}
healthCheck = new HTTPHealthCheck(URI.create(host), Duration.ofSeconds(healthCheckTemplate.timeout()), healthCheckTemplate.samples());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@

import java.util.Objects;

import static com.shieldblaze.expressgateway.common.utils.ObjectUtils.nonNull;
import static java.util.Objects.requireNonNull;

/**
* Builder for {@link Cluster}
*/
Expand All @@ -42,13 +45,13 @@ public ClusterBuilder withLoadBalance(LoadBalance<?, ?, ?, ?> loadBalance) {
}

public ClusterBuilder withHealthCheck(HealthCheckConfiguration healthCheckConfiguration, HealthCheckTemplate healthCheckTemplate) {
this.healthCheckConfiguration = Objects.requireNonNull(healthCheckConfiguration, "HealthCheckConfiguration cannot be 'null'");
this.healthCheckTemplate = Objects.requireNonNull(healthCheckTemplate, "HealthCheckTemplate cannot be 'null'");
this.healthCheckConfiguration = nonNull(healthCheckConfiguration, HealthCheckConfiguration.class);
this.healthCheckTemplate = nonNull(healthCheckTemplate, HealthCheckTemplate.class);
return this;
}

public Cluster build() {
Objects.requireNonNull(loadBalance, "LoadBalance cannot be 'null'");
nonNull(loadBalance, LoadBalance.class);
Cluster cluster = new Cluster(loadBalance);

// If HealthCheck configuration is available then apply it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,24 @@

class ClusterOnlineNodesWorker implements EventListener<Void> {

protected final List<Node> ONLINE_NODES = new CopyOnWriteArrayList<>();
protected final List<Node> onlineNodes = new CopyOnWriteArrayList<>();

@Override
public void accept(Event<Void> event) {
if (event instanceof NodeEvent nodeEvent) {

if (nodeEvent instanceof NodeOnlineEvent || nodeEvent instanceof NodeAddedEvent) {
ONLINE_NODES.add(nodeEvent.node());
onlineNodes.add(nodeEvent.node());
} else {
ONLINE_NODES.remove(nodeEvent.node());
onlineNodes.remove(nodeEvent.node());
}
}
}

@Override
public String toString() {
return "ClusterOnlineNodesWorker{" +
"ONLINE_NODES=" + ONLINE_NODES.size() +
"onlineNodes=" + onlineNodes +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
public abstract class NodeEvent extends DefaultEvent<Void> {
private final Node node;

public NodeEvent(Node node) {
protected NodeEvent(Node node) {
this.node = node;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@

import com.shieldblaze.expressgateway.backend.loadbalance.LoadBalance;

import java.io.Serial;

/**
* Thrown when there was an error during load-balancing by {@link LoadBalance}
*/
public class LoadBalanceException extends Exception {

@Serial
private static final long serialVersionUID = 1841399446385743110L;

public LoadBalanceException() {
super();
}

public LoadBalanceException(String message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@

import com.shieldblaze.expressgateway.backend.Node;

import java.io.Serial;

/**
* Thrown when there is no {@link Node} available to handle request.
*/
public final class NoNodeAvailableException extends LoadBalanceException {

@Serial
private static final long serialVersionUID = 3016237192356488630L;

public static final NoNodeAvailableException INSTANCE = new NoNodeAvailableException("No Node is available to handle this exception");

public NoNodeAvailableException() {
super();
}

public NoNodeAvailableException(String message) {
Expand All @@ -42,7 +46,7 @@ public NoNodeAvailableException(Throwable cause) {
super(cause);
}

protected NoNodeAvailableException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
private NoNodeAvailableException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,16 @@

import com.shieldblaze.expressgateway.backend.Node;

import java.io.Serial;

/**
* Thrown when a {@link Node} has exceeded the maximum number of connections count.
*/
public final class TooManyConnectionsException extends Exception {

@Serial
private static final long serialVersionUID = 6390465718005428833L;

public TooManyConnectionsException(Node node) {
super("Node (" + node + ") has reached maximum number of connections.");
}
Expand Down
Loading

0 comments on commit a47bcc8

Please sign in to comment.