Skip to content

Commit

Permalink
Http2 sse support (#14673)
Browse files Browse the repository at this point in the history
  • Loading branch information
oxsean authored Sep 29, 2024
1 parent e1cfc03 commit c23af24
Show file tree
Hide file tree
Showing 13 changed files with 97 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,25 @@
*/
package org.apache.dubbo.remoting.http12;

import java.nio.charset.StandardCharsets;

public final class HttpConstants {

public static final String TRAILERS = "trailers";

public static final String CHUNKED = "chunked";

public static final String NO_CACHE = "no-cache";

public static final String X_FORWARDED_PROTO = "x-forwarded-proto";
public static final String X_FORWARDED_HOST = "x-forwarded-host";
public static final String X_FORWARDED_PORT = "x-forwarded-port";

public static final String HTTPS = "https";
public static final String HTTP = "http";

public static final byte[] SERVER_SENT_EVENT_DATA_PREFIX_BYTES = "data:".getBytes(StandardCharsets.US_ASCII);
public static final byte[] SERVER_SENT_EVENT_LF_BYTES = "\n\n".getBytes(StandardCharsets.US_ASCII);

private HttpConstants() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public enum HttpHeaderNames {

TRANSFER_ENCODING(io.netty.handler.codec.http.HttpHeaderNames.TRANSFER_ENCODING),

CACHE_CONTROL(io.netty.handler.codec.http.HttpHeaderNames.CACHE_CONTROL),

LOCATION(io.netty.handler.codec.http.HttpHeaderNames.LOCATION),

HOST(io.netty.handler.codec.http.HttpHeaderNames.HOST),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,9 @@ default String contentType() {
default String header(CharSequence name) {
return headers().getFirst(name);
}

default HttpMetadata header(CharSequence name, String value) {
headers().set(name, value);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,40 +19,35 @@
import org.apache.dubbo.remoting.http12.HttpChannel;
import org.apache.dubbo.remoting.http12.HttpConstants;
import org.apache.dubbo.remoting.http12.HttpHeaderNames;
import org.apache.dubbo.remoting.http12.HttpHeaders;
import org.apache.dubbo.remoting.http12.HttpMetadata;
import org.apache.dubbo.remoting.http12.HttpOutputMessage;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class Http1ServerStreamChannelObserver extends Http1ServerChannelObserver {
public class Http1SseServerChannelObserver extends Http1ServerChannelObserver {

private static final byte[] SERVER_SENT_EVENT_DATA_PREFIX_BYTES = "data:".getBytes(StandardCharsets.US_ASCII);
private static final byte[] SERVER_SENT_EVENT_LF_BYTES = "\n\n".getBytes(StandardCharsets.US_ASCII);

public Http1ServerStreamChannelObserver(HttpChannel httpChannel) {
public Http1SseServerChannelObserver(HttpChannel httpChannel) {
super(httpChannel);
}

@Override
protected HttpMetadata encodeHttpMetadata(boolean endStream) {
HttpHeaders headers = HttpHeaders.create();
headers.set(HttpHeaderNames.TRANSFER_ENCODING.getKey(), HttpConstants.CHUNKED);
return new Http1Metadata(headers);
return super.encodeHttpMetadata(endStream)
.header(HttpHeaderNames.TRANSFER_ENCODING.getKey(), HttpConstants.CHUNKED)
.header(HttpHeaderNames.CACHE_CONTROL.getKey(), HttpConstants.NO_CACHE);
}

@Override
protected void preOutputMessage(HttpOutputMessage message) throws IOException {
HttpOutputMessage prefixMessage = getHttpChannel().newOutputMessage();
prefixMessage.getBody().write(SERVER_SENT_EVENT_DATA_PREFIX_BYTES);
prefixMessage.getBody().write(HttpConstants.SERVER_SENT_EVENT_DATA_PREFIX_BYTES);
getHttpChannel().writeMessage(prefixMessage);
}

@Override
protected void postOutputMessage(HttpOutputMessage message) throws IOException {
HttpOutputMessage lfMessage = getHttpChannel().newOutputMessage();
lfMessage.getBody().write(SERVER_SENT_EVENT_LF_BYTES);
lfMessage.getBody().write(HttpConstants.SERVER_SENT_EVENT_LF_BYTES);
getHttpChannel().writeMessage(lfMessage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public JsonPbCodec(FrameworkModel frameworkModel) {
public void encode(OutputStream os, Object data, Charset charset) throws EncodeException {
try {
if (data instanceof Message) {
String jsonString = JsonFormat.printer().print((Message) data);
String jsonString =
JsonFormat.printer().omittingInsignificantWhitespace().print((Message) data);
os.write(jsonString.getBytes(charset));
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.common.utils.UrlUtils;
import org.apache.dubbo.config.nested.TripleConfig;
import org.apache.dubbo.remoting.http12.HttpHeaderNames;
import org.apache.dubbo.remoting.http12.HttpHeaders;
import org.apache.dubbo.remoting.http12.HttpMetadata;
import org.apache.dubbo.remoting.http12.command.HttpWriteQueue;
import org.apache.dubbo.remoting.http12.exception.UnsupportedMediaTypeException;
Expand Down Expand Up @@ -64,8 +62,7 @@ public NettyHttp2ProtocolSelectorHandler(

@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpMetadata metadata) {
HttpHeaders headers = metadata.headers();
String contentType = headers.getFirst(HttpHeaderNames.CONTENT_TYPE.getName());
String contentType = metadata.contentType();
Http2ServerTransportListenerFactory factory = UrlUtils.computeServiceAttribute(
url,
TRANSPORT_LISTENER_FACTORY_CACHE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ public interface Constants {
String H2_SETTINGS_SERVLET_ENABLED = "dubbo.protocol.triple.servlet.enabled";
String H3_SETTINGS_HTTP3_ENABLED = "dubbo.protocol.triple.http3.enabled";
String H3_SETTINGS_HTTP3_NEGOTIATION = "dubbo.protocol.triple.http3.negotiation";
String H2_SETTINGS_CONNECTION_INITIAL_WINDOW_SIZE_KEY = "dubbo.rpc.tri.connection-initial-window-size";

String ADAPTIVE_LOADBALANCE_ATTACHMENT_KEY = "lb_adaptive";
String ADAPTIVE_LOADBALANCE_START_TIME = "adaptive_startTime";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.apache.dubbo.remoting.http12.HttpInputMessage;
import org.apache.dubbo.remoting.http12.RequestMetadata;
import org.apache.dubbo.remoting.http12.h1.Http1ServerChannelObserver;
import org.apache.dubbo.remoting.http12.h1.Http1ServerStreamChannelObserver;
import org.apache.dubbo.remoting.http12.h1.Http1ServerTransportListener;
import org.apache.dubbo.remoting.http12.h1.Http1SseServerChannelObserver;
import org.apache.dubbo.remoting.http12.message.DefaultListeningDecoder;
import org.apache.dubbo.remoting.http12.message.MediaType;
import org.apache.dubbo.remoting.http12.message.codec.JsonCodec;
Expand Down Expand Up @@ -81,9 +81,9 @@ private ServerCallListener startListener(
case UNARY:
return new AutoCompleteUnaryServerCallListener(invocation, invoker, responseObserver);
case SERVER_STREAM:
responseObserver = prepareResponseObserver(new Http1ServerStreamChannelObserver(httpChannel));
responseObserver = prepareResponseObserver(new Http1SseServerChannelObserver(httpChannel));
responseObserver.addHeadersCustomizer((hs, t) ->
hs.set(HttpHeaderNames.CONTENT_TYPE.getName(), MediaType.TEXT_EVENT_STREAM.getName()));
hs.set(HttpHeaderNames.CONTENT_TYPE.getKey(), MediaType.TEXT_EVENT_STREAM.getName()));
return new AutoCompleteServerStreamServerCallListener(invocation, invoker, responseObserver);
default:
throw new UnsupportedOperationException("HTTP1.x only support unary and server-stream");
Expand All @@ -104,7 +104,7 @@ protected void onError(Throwable throwable) {
protected void initializeAltSvc(URL url) {
String protocolId = Http3Exchanger.isEnabled(url) ? "h3" : "h2";
String value = protocolId + "=\":" + url.getParameter(Constants.BIND_PORT_KEY, url.getPort()) + '"';
responseObserver.addHeadersCustomizer((hs, t) -> hs.set(HttpHeaderNames.ALT_SVC.getName(), value));
responseObserver.addHeadersCustomizer((hs, t) -> hs.set(HttpHeaderNames.ALT_SVC.getKey(), value));
}

private static final class AutoCompleteUnaryServerCallListener extends UnaryServerCallListener {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ protected void customizeHeaders(HttpHeaders headers, Throwable throwable, HttpOu
throw new IllegalArgumentException("Unsupported body type: " + body.getClass());
}
}
headers.set(HttpHeaderNames.CONTENT_LENGTH.getName(), String.valueOf(contentLength));
headers.set(HttpHeaderNames.CONTENT_LENGTH.getKey(), String.valueOf(contentLength));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.dubbo.remoting.http12.message.DefaultListeningDecoder;
import org.apache.dubbo.remoting.http12.message.DefaultStreamingDecoder;
import org.apache.dubbo.remoting.http12.message.ListeningDecoder;
import org.apache.dubbo.remoting.http12.message.MediaType;
import org.apache.dubbo.remoting.http12.message.StreamingDecoder;
import org.apache.dubbo.remoting.http12.message.codec.JsonCodec;
import org.apache.dubbo.rpc.Invoker;
Expand Down Expand Up @@ -71,7 +72,11 @@ protected Http2ServerChannelObserver newResponseObserver(H2StreamChannel h2Strea
}

protected Http2ServerChannelObserver newStreamResponseObserver(H2StreamChannel h2StreamChannel) {
return new Http2StreamServerChannelObserver(getFrameworkModel(), h2StreamChannel);
Http2ServerChannelObserver responseObserver =
new Http2SseServerChannelObserver(getFrameworkModel(), h2StreamChannel);
responseObserver.addHeadersCustomizer(
(hs, t) -> hs.set(HttpHeaderNames.CONTENT_TYPE.getKey(), MediaType.TEXT_EVENT_STREAM.getName()));
return responseObserver;
}

protected Http2ServerChannelObserver prepareResponseObserver(Http2ServerChannelObserver responseObserver) {
Expand Down Expand Up @@ -124,7 +129,7 @@ protected void prepareStreamServerCall() {
protected void initializeAltSvc(URL url) {
if (Http3Exchanger.isEnabled(url)) {
String value = "h3=\":" + url.getParameter(Constants.BIND_PORT_KEY, url.getPort()) + '"';
responseObserver.addHeadersCustomizer((hs, t) -> hs.set(HttpHeaderNames.ALT_SVC.getName(), value));
responseObserver.addHeadersCustomizer((hs, t) -> hs.set(HttpHeaderNames.ALT_SVC.getKey(), value));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.rpc.protocol.tri.h12.http2;

import org.apache.dubbo.remoting.http12.HttpConstants;
import org.apache.dubbo.remoting.http12.HttpHeaderNames;
import org.apache.dubbo.remoting.http12.HttpMetadata;
import org.apache.dubbo.remoting.http12.HttpOutputMessage;
import org.apache.dubbo.remoting.http12.h2.H2StreamChannel;
import org.apache.dubbo.rpc.model.FrameworkModel;

import java.io.IOException;

public final class Http2SseServerChannelObserver extends Http2StreamServerChannelObserver {

public Http2SseServerChannelObserver(FrameworkModel frameworkModel, H2StreamChannel h2StreamChannel) {
super(frameworkModel, h2StreamChannel);
}

@Override
protected HttpMetadata encodeHttpMetadata(boolean endStream) {
return super.encodeHttpMetadata(endStream)
.header(HttpHeaderNames.CACHE_CONTROL.getKey(), HttpConstants.NO_CACHE);
}

@Override
protected void preOutputMessage(HttpOutputMessage message) throws IOException {
HttpOutputMessage prefixMessage = getHttpChannel().newOutputMessage();
prefixMessage.getBody().write(HttpConstants.SERVER_SENT_EVENT_DATA_PREFIX_BYTES);
getHttpChannel().writeMessage(prefixMessage);
}

@Override
protected void postOutputMessage(HttpOutputMessage message) throws IOException {
HttpOutputMessage lfMessage = getHttpChannel().newOutputMessage();
lfMessage.getBody().write(HttpConstants.SERVER_SENT_EVENT_LF_BYTES);
getHttpChannel().writeMessage(lfMessage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.dubbo.remoting.http12.HttpHeaders;
import org.apache.dubbo.remoting.http12.exception.HttpStatusException;
import org.apache.dubbo.remoting.http12.message.HttpMessageDecoder;
import org.apache.dubbo.remoting.http12.message.MediaType;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
Expand Down Expand Up @@ -81,8 +82,12 @@ public <T> List<T> getBodies(Class<T> type) {
List<T> bodies = (List<T>) this.bodies;
if (bodies == null) {
bodies = new ArrayList<>(oss.size());
for (OutputStream os : oss) {
ByteArrayOutputStream bos = (ByteArrayOutputStream) os;
boolean isTextEvent = MediaType.TEXT_EVENT_STREAM.getName().equals(getContentType());
for (int i = 0, size = oss.size(); i < size; i++) {
if (isTextEvent && i % 3 != 1) {
continue;
}
ByteArrayOutputStream bos = (ByteArrayOutputStream) oss.get(i);
if (bos.size() == 0) {
bodies.add(null);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ private <T> void registerProvider(TProvider<T> provider, Protocol protocol, Prox
}

@Override
@SuppressWarnings({"unchecked", "resource"})
@SuppressWarnings("unchecked")
public TestResponse run(TestRequest request) {
MockH2StreamChannel channel = new MockH2StreamChannel();
URL url = new URL(TestProtocol.NAME, TestProtocol.HOST, TestProtocol.PORT, request.getProviderParams());
Expand Down

0 comments on commit c23af24

Please sign in to comment.