Skip to content

Commit

Permalink
[improvement](stream-load) adjust read unit of http to optimize strea…
Browse files Browse the repository at this point in the history
…m load (#9154)
  • Loading branch information
dataroaring authored May 20, 2022
1 parent 1e940f2 commit defdae1
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 44 deletions.
4 changes: 2 additions & 2 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ void StreamLoadAction::on_chunk_data(HttpRequest* req) {

int64_t start_read_data_time = MonotonicNanos();
while (evbuffer_get_length(evbuf) > 0) {
auto bb = ByteBuffer::allocate(4096);
auto bb = ByteBuffer::allocate(128 * 1024);
auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
bb->pos = remove_bytes;
bb->flip();
Expand Down Expand Up @@ -394,7 +394,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext*
request.__set_header_type(ctx->header_type);
request.__set_loadId(ctx->id.to_thrift());
if (ctx->use_streaming) {
auto pipe = std::make_shared<StreamLoadPipe>(1024 * 1024 /* max_buffered_bytes */,
auto pipe = std::make_shared<StreamLoadPipe>(kMaxPipeBufferedBytes /* max_buffered_bytes */,
64 * 1024 /* min_chunk_size */,
ctx->body_bytes /* total_length */);
RETURN_IF_ERROR(_exec_env->load_stream_mgr()->put(ctx->id, pipe));
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) {
stream_load_cxt->need_commit_self = true;
stream_load_cxt->need_rollback = true;
// total_length == -1 means read one message from pipe in once time, don't care the length.
auto pipe = std::make_shared<StreamLoadPipe>(1024 * 1024 /* max_buffered_bytes */,
auto pipe = std::make_shared<StreamLoadPipe>(kMaxPipeBufferedBytes /* max_buffered_bytes */,
64 * 1024 /* min_chunk_size */,
-1 /* total_length */, true /* use_proto */);
stream_load_cxt->body_sink = pipe;
Expand Down
6 changes: 4 additions & 2 deletions be/src/runtime/stream_load/stream_load_pipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@

namespace doris {

const size_t kMaxPipeBufferedBytes = 4 * 1024 * 1024;
// StreamLoadPipe use to transfer data from producer to consumer
// Data in pip is stored in chunks.
class StreamLoadPipe : public MessageBodySink, public FileReader {
public:
StreamLoadPipe(size_t max_buffered_bytes = 1024 * 1024, size_t min_chunk_size = 64 * 1024,
int64_t total_length = -1, bool use_proto = false)
StreamLoadPipe(size_t max_buffered_bytes = kMaxPipeBufferedBytes,
size_t min_chunk_size = 64 * 1024, int64_t total_length = -1,
bool use_proto = false)
: _buffered_bytes(0),
_proto_buffered_bytes(0),
_max_buffered_bytes(max_buffered_bytes),
Expand Down
126 changes: 87 additions & 39 deletions thirdparty/patches/libevent.patch
Original file line number Diff line number Diff line change
@@ -1,6 +1,75 @@
diff -uprN a/http.c b/http.c
--- a/http.c 2020-07-05 20:02:46.000000000 +0800
+++ b/http.c 2021-09-28 13:56:14.045159153 +0800
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 676727f1..833fbf70 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -200,7 +200,7 @@ endif()
if (("${CMAKE_C_COMPILER_ID}" STREQUAL "GNU") OR (${CLANG}))
set(GNUC 1)
endif()
-if (("${CMAKE_C_COMPILER_ID}" STREQUAL "MSVC") OR (${CLANG}))
+if (("${CMAKE_C_COMPILER_ID}" STREQUAL "MSVC") OR ("${CMAKE_C_SIMULATE_ID}" STREQUAL "MSVC"))
set(MSVC 1)
endif()

diff --git a/buffer.c b/buffer.c
index 3524b350..e5d97458 100644
--- a/buffer.c
+++ b/buffer.c
@@ -2204,9 +2204,9 @@ evbuffer_expand(struct evbuffer *buf, size_t datlen)
#define IOV_LEN_TYPE unsigned long
#endif
#endif
-#define NUM_READ_IOVEC 4
+#define NUM_READ_IOVEC 8

-#define EVBUFFER_MAX_READ 4096
+#define EVBUFFER_MAX_READ (128 * 1024)

/** Helper function to figure out which space to use for reading data into
an evbuffer. Internal use only.
diff --git a/bufferevent_async.c b/bufferevent_async.c
index 40c7c5e8..c1624878 100644
--- a/bufferevent_async.c
+++ b/bufferevent_async.c
@@ -275,7 +275,7 @@ bev_async_consider_reading(struct bufferevent_async *beva)
}
at_most = read_high - cur_size;
} else {
- at_most = 16384; /* FIXME totally magic. */
+ at_most = 128 * 1024; /* FIXME totally magic. */
}

/* XXXX This over-commits. */
diff --git a/bufferevent_ratelim.c b/bufferevent_ratelim.c
index 25874968..9bc2b577 100644
--- a/bufferevent_ratelim.c
+++ b/bufferevent_ratelim.c
@@ -179,7 +179,7 @@ ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
}

/* Default values for max_single_read & max_single_write variables. */
-#define MAX_SINGLE_READ_DEFAULT 16384
+#define MAX_SINGLE_READ_DEFAULT (128 * 1024)
#define MAX_SINGLE_WRITE_DEFAULT 16384

#define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
diff --git a/http-internal.h b/http-internal.h
index feaf436d..9f9b5ab5 100644
--- a/http-internal.h
+++ b/http-internal.h
@@ -167,6 +167,8 @@ struct evhttp {
void *gencbarg;
struct bufferevent* (*bevcb)(struct event_base *, void *);
void *bevcbarg;
+ int (*newreqcb)(struct evhttp_request *req, void *);
+ void *newreqcbarg;

struct event_base *base;
};
diff --git a/http.c b/http.c
index 04f089bc..53951cba 100644
--- a/http.c
+++ b/http.c
@@ -3975,6 +3975,14 @@ evhttp_set_bevcb(struct evhttp *http,
http->bevcbarg = cbarg;
}
Expand All @@ -16,7 +85,7 @@ diff -uprN a/http.c b/http.c
/*
* Request related functions
*/
@@ -4036,6 +4044,8 @@ evhttp_request_free(struct evhttp_reques
@@ -4036,6 +4044,8 @@ evhttp_request_free(struct evhttp_request *req)
req->flags |= EVHTTP_REQ_NEEDS_FREE;
return;
}
Expand All @@ -25,7 +94,7 @@ diff -uprN a/http.c b/http.c

if (req->remote_host != NULL)
mm_free(req->remote_host);
@@ -4116,6 +4126,15 @@ evhttp_request_set_on_complete_cb(struct
@@ -4116,6 +4126,15 @@ evhttp_request_set_on_complete_cb(struct evhttp_request *req,
req->on_complete_cb_arg = cb_arg;
}

Expand All @@ -41,7 +110,7 @@ diff -uprN a/http.c b/http.c
/*
* Allows for inspection of the request URI
*/
@@ -4307,10 +4326,15 @@ evhttp_associate_new_request_with_connec
@@ -4307,10 +4326,15 @@ evhttp_associate_new_request_with_connection(struct evhttp_connection *evcon)
*/
req->userdone = 1;

Expand All @@ -59,25 +128,15 @@ diff -uprN a/http.c b/http.c

evhttp_start_read_(evcon);

diff -uprN a/http-internal.h b/http-internal.h
--- a/http-internal.h 2020-07-05 20:02:46.000000000 +0800
+++ b/http-internal.h 2021-09-28 13:56:13.925151028 +0800
@@ -167,6 +167,8 @@ struct evhttp {
void *gencbarg;
struct bufferevent* (*bevcb)(struct event_base *, void *);
void *bevcbarg;
+ int (*newreqcb)(struct evhttp_request *req, void *);
+ void *newreqcbarg;

struct event_base *base;
};
diff -uprN a/include/event2/http.h b/include/event2/http.h
--- a/include/event2/http.h 2020-07-05 20:02:46.000000000 +0800
+++ b/include/event2/http.h 2021-09-28 13:56:13.928151231 +0800
@@ -299,6 +299,20 @@ void evhttp_set_bevcb(struct evhttp *htt
diff --git a/include/event2/http.h b/include/event2/http.h
index 2a41303e..e80bab9a 100644
--- a/include/event2/http.h
+++ b/include/event2/http.h
@@ -298,6 +298,20 @@ EVENT2_EXPORT_SYMBOL
void evhttp_set_bevcb(struct evhttp *http,
struct bufferevent *(*cb)(struct event_base *, void *), void *arg);

/**
+/**
+ Set a callback which allows the user to note or throttle incoming requests.
+ The requests are not populated with HTTP level information. They
+ are just associated to a connection.
Expand All @@ -91,10 +150,9 @@ diff -uprN a/include/event2/http.h b/include/event2/http.h
+void evhttp_set_newreqcb(struct evhttp *http,
+ int (*cb)(struct evhttp_request*, void *), void *arg);
+
+/**
/**
Adds a virtual host to the http server.

A virtual host is a newly initialized evhttp object that has request
@@ -624,6 +638,20 @@ EVENT2_EXPORT_SYMBOL
void evhttp_request_set_on_complete_cb(struct evhttp_request *req,
void (*cb)(struct evhttp_request *, void *), void *cb_arg);
Expand All @@ -116,9 +174,10 @@ diff -uprN a/include/event2/http.h b/include/event2/http.h
/** Frees the request object and removes associated events. */
EVENT2_EXPORT_SYMBOL
void evhttp_request_free(struct evhttp_request *req);
diff -uprN a/include/event2/http_struct.h b/include/event2/http_struct.h
--- a/include/event2/http_struct.h 2020-07-05 20:02:46.000000000 +0800
+++ b/include/event2/http_struct.h 2021-09-28 13:56:13.928151231 +0800
diff --git a/include/event2/http_struct.h b/include/event2/http_struct.h
index 4bf5b1ff..0762cabd 100644
--- a/include/event2/http_struct.h
+++ b/include/event2/http_struct.h
@@ -142,6 +142,12 @@ struct {
*/
void (*on_complete_cb)(struct evhttp_request *, void *);
Expand All @@ -132,14 +191,3 @@ diff -uprN a/include/event2/http_struct.h b/include/event2/http_struct.h
};

#ifdef __cplusplus
diff -uprN a/CMakeLists.txt b/CMakeLists.txt
--- a/CMakeLists.txt 2020-07-05 20:02:46.000000000 +0800
+++ b/CMakeLists.txt 2022-01-10 13:29:32.912883436 +0800
@@ -200,6 +200,6 @@ endif()
if (("${CMAKE_C_COMPILER_ID}" STREQUAL "GNU") OR (${CLANG}))
set(GNUC 1)
endif()
-if (("${CMAKE_C_COMPILER_ID}" STREQUAL "MSVC") OR (${CLANG}))
+if (("${CMAKE_C_COMPILER_ID}" STREQUAL "MSVC") OR ("${CMAKE_C_SIMULATE_ID}" STREQUAL "MSVC"))
set(MSVC 1)
endif()

0 comments on commit defdae1

Please sign in to comment.