Skip to content

Commit

Permalink
ARTEMIS-4809 Allow configuring intermediateMessageReferences size
Browse files Browse the repository at this point in the history
In some setups, there could be a few hundred thousand queues that are
created due to many consumers that are connecting. However, most of
these are empty and stay empty for the entire day since there aren't
necessarily messages to be sent.  The 8K intermediateMessageReferences
instantiates an 64KB buffer (Object[]). This means we have large
allocation and live heap that ultimately remains empty for almost the
entire day.

In this commit, we introduce intermediate-message-buffer-initial-size,
which defaults to the current value of 8192. It can be set
programatically via
QueueConfiguration#setIntermediateMessageBufferInitialSize(int).

Note that this must be a power of 2.
  • Loading branch information
joshb1050 committed Jun 7, 2024
1 parent 5be9833 commit a0b1448
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class QueueConfiguration implements Serializable {
public static final String TRANSIENT = "transient";
public static final String AUTO_CREATED = "auto-created";
public static final String FQQN = "fqqn";
public static final String INTERMEDIATE_MESSAGE_BUFFER_INITIAL_SIZE = "intermediate-message-buffer-initial-size";

private Long id; // internal use
private SimpleString name;
Expand Down Expand Up @@ -110,6 +111,7 @@ public class QueueConfiguration implements Serializable {
private Boolean _transient;
private Boolean autoCreated;
private Boolean fqqn;
private Integer intermediateMessageBufferInitialSize;

public QueueConfiguration() {
}
Expand Down Expand Up @@ -147,6 +149,7 @@ public QueueConfiguration(QueueConfiguration o) {
_transient = o._transient;
autoCreated = o.autoCreated;
fqqn = o.fqqn;
intermediateMessageBufferInitialSize = o.intermediateMessageBufferInitialSize;
}

/**
Expand Down Expand Up @@ -656,6 +659,19 @@ public Boolean isFqqn() {
return fqqn == null ? Boolean.FALSE : fqqn;
}

public QueueConfiguration setIntermediateMessageBufferInitialSize(int initialSize) {
this.intermediateMessageBufferInitialSize = initialSize;
return this;
}

/**
* Returns the size of the intermediate message reference buffer if set.
* Note that this must be a power of 2.
*/
public Integer getIntermediateMessageBufferInitialSize() {
return intermediateMessageBufferInitialSize;
}

/**
* This method returns a JSON-formatted {@code String} representation of this {@code QueueConfiguration}. It is a
* simple collection of key/value pairs. The keys used are referenced in {@link #set(String, String)}.
Expand Down Expand Up @@ -761,6 +777,9 @@ public String toJSON() {
if (isFqqn() != null) {
builder.add(FQQN, isFqqn());
}
if (getIntermediateMessageBufferInitialSize() != null) {
builder.add(INTERMEDIATE_MESSAGE_BUFFER_INITIAL_SIZE, getIntermediateMessageBufferInitialSize());
}

return builder.build().toString();
}
Expand Down Expand Up @@ -861,6 +880,8 @@ public boolean equals(Object o) {
return false;
if (!Objects.equals(fqqn, that.fqqn))
return false;
if (!Objects.equals(intermediateMessageBufferInitialSize, that.intermediateMessageBufferInitialSize))
return false;

return true;
}
Expand Down Expand Up @@ -899,6 +920,7 @@ public int hashCode() {
result = 31 * result + Objects.hashCode(_transient);
result = 31 * result + Objects.hashCode(autoCreated);
result = 31 * result + Objects.hashCode(fqqn);
result = 31 * result + Objects.hashCode(intermediateMessageBufferInitialSize);
return result;
}

Expand Down Expand Up @@ -936,6 +958,7 @@ public String toString() {
+ ", internal=" + internal
+ ", transient=" + _transient
+ ", autoCreated=" + autoCreated
+ ", fqqn=" + fqqn + ']';
+ ", fqqn=" + fqqn
+ ", intermediateMessageBufferInitialSize=" + intermediateMessageBufferInitialSize + ']';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,9 @@ public static String getDefaultHapolicyBackupStrategy() {

private static final boolean DEFAULT_MIRROR_PAGE_TRANSACTION = false;

// The initial capacity of the intermediate message reference buffer.
private static final int DEFAULT_INTERMEDIATE_MESSAGE_BUFFER_INITIAL_SIZE = 8192;

/**
* If true then the ActiveMQ Artemis Server will make use of any Protocol Managers that are in available on the classpath. If false then only the core protocol will be available, unless in Embedded mode where users can inject their own Protocol Managers.
*/
Expand Down Expand Up @@ -1960,4 +1963,7 @@ public static boolean getDefaultMirrorPageTransaction() {
return DEFAULT_MIRROR_PAGE_TRANSACTION;
}

public static int getDefaultIntermediateMessageBufferInitialSize() {
return DEFAULT_INTERMEDIATE_MESSAGE_BUFFER_INITIAL_SIZE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
// Messages will first enter intermediateMessageReferences
// Before they are added to messageReferences
// This is to avoid locking the queue on the producer
private final MpscUnboundedArrayQueue<MessageReference> intermediateMessageReferences = new MpscUnboundedArrayQueue<>(8192);
private final MpscUnboundedArrayQueue<MessageReference> intermediateMessageReferences;

// This is where messages are stored
protected final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<>(QueueImpl.NUM_PRIORITIES, MessageReferenceImpl.getSequenceComparator());
Expand Down Expand Up @@ -753,6 +753,10 @@ public QueueImpl(final QueueConfiguration queueConfiguration,
}

this.ringSize = queueConfiguration.getRingSize() == null ? ActiveMQDefaultConfiguration.getDefaultRingSize() : queueConfiguration.getRingSize();
int intermediateMessageBufferInitialSize = queueConfiguration.getIntermediateMessageBufferInitialSize() == null
? ActiveMQDefaultConfiguration.getDefaultIntermediateMessageBufferInitialSize()
: queueConfiguration.getIntermediateMessageBufferInitialSize();
this.intermediateMessageReferences = new MpscUnboundedArrayQueue<>(intermediateMessageBufferInitialSize);
}

// Bindable implementation -------------------------------------------------------------------------------------
Expand Down

0 comments on commit a0b1448

Please sign in to comment.