Skip to content

Commit

Permalink
Refactor to reduce pinning in HTTP/2 code when using virtual threads
Browse files Browse the repository at this point in the history
  • Loading branch information
markt-asf committed Jul 26, 2023
1 parent fa7f17b commit 0338f2b
Showing 1 changed file with 60 additions and 30 deletions.
90 changes: 60 additions & 30 deletions java/org/apache/coyote/http2/Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -1112,6 +1112,7 @@ abstract class StreamInputBuffer implements InputBuffer {

class StandardStreamInputBuffer extends StreamInputBuffer {

private final Lock readStateLock = new ReentrantLock();
/*
* Two buffers are required to avoid various multi-threading issues. These issues arise from the fact that the
* Stream (or the Request/Response) used by the application is processed in one thread but the connection is
Expand Down Expand Up @@ -1222,7 +1223,8 @@ public final int doRead(ApplicationBufferHandler applicationBufferHandler) throw
final boolean isReadyForRead() {
ensureBuffersExist();

synchronized (this) {
readStateLock.lock();
try {
if (available() > 0) {
return true;
}
Expand All @@ -1232,48 +1234,65 @@ final boolean isReadyForRead() {
}

return false;
} finally {
readStateLock.unlock();
}
}

@Override
final synchronized boolean isRequestBodyFullyRead() {
return (inBuffer == null || inBuffer.position() == 0) && isInputFinished();
final boolean isRequestBodyFullyRead() {
readStateLock.lock();
try {
return (inBuffer == null || inBuffer.position() == 0) && isInputFinished();
} finally {
readStateLock.unlock();
}
}


@Override
public final synchronized int available() {
if (inBuffer == null) {
return 0;
public final int available() {
readStateLock.lock();
try {
if (inBuffer == null) {
return 0;
}
return inBuffer.position();
} finally {
readStateLock.unlock();
}
return inBuffer.position();
}


/*
* Called after placing some data in the inBuffer.
*/
@Override
final synchronized void onDataAvailable() throws IOException {
if (closed) {
swallowUnread();
} else if (readInterest) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("stream.inputBuffer.dispatch"));
}
readInterest = false;
coyoteRequest.action(ActionCode.DISPATCH_READ, null);
// Always need to dispatch since this thread is processing
// the incoming connection and streams are processed on their
// own.
coyoteRequest.action(ActionCode.DISPATCH_EXECUTE, null);
} else {
if (log.isDebugEnabled()) {
log.debug(sm.getString("stream.inputBuffer.signal"));
}
synchronized (inBuffer) {
inBuffer.notifyAll();
final void onDataAvailable() throws IOException {
readStateLock.lock();
try {
if (closed) {
swallowUnread();
} else if (readInterest) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("stream.inputBuffer.dispatch"));
}
readInterest = false;
coyoteRequest.action(ActionCode.DISPATCH_READ, null);
// Always need to dispatch since this thread is processing
// the incoming connection and streams are processed on their
// own.
coyoteRequest.action(ActionCode.DISPATCH_EXECUTE, null);
} else {
if (log.isDebugEnabled()) {
log.debug(sm.getString("stream.inputBuffer.signal"));
}
synchronized (inBuffer) {
inBuffer.notifyAll();
}
}
} finally {
readStateLock.unlock();
}
}

Expand All @@ -1286,8 +1305,13 @@ final ByteBuffer getInBuffer() {


@Override
final synchronized void insertReplayedBody(ByteChunk body) {
inBuffer = ByteBuffer.wrap(body.getBytes(), body.getOffset(), body.getLength());
final void insertReplayedBody(ByteChunk body) {
readStateLock.lock();
try {
inBuffer = ByteBuffer.wrap(body.getBytes(), body.getOffset(), body.getLength());
} finally {
readStateLock.unlock();
}
}


Expand All @@ -1297,11 +1321,14 @@ private void ensureBuffersExist() {
// this is the initial window size set by Tomcat that the client
// uses (i.e. the local setting is required here).
int size = handler.getLocalSettings().getInitialWindowSize();
synchronized (this) {
readStateLock.lock();
try {
if (inBuffer == null && !closed) {
inBuffer = ByteBuffer.allocate(size);
outBuffer = new byte[size];
}
} finally {
readStateLock.unlock();
}
}
}
Expand All @@ -1328,8 +1355,11 @@ final void notifyEof() {

@Override
final void swallowUnread() throws IOException {
synchronized (this) {
readStateLock.lock();
try {
closed = true;
} finally {
readStateLock.unlock();
}
if (inBuffer != null) {
int unreadByteCount = 0;
Expand Down

0 comments on commit 0338f2b

Please sign in to comment.