Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adjust read unit of http to optimize stream load #9154

Merged
merged 7 commits into from
May 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ void StreamLoadAction::on_chunk_data(HttpRequest* req) {

int64_t start_read_data_time = MonotonicNanos();
while (evbuffer_get_length(evbuf) > 0) {
auto bb = ByteBuffer::allocate(4096);
auto bb = ByteBuffer::allocate(128 * 1024);
auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
bb->pos = remove_bytes;
bb->flip();
Expand Down Expand Up @@ -394,7 +394,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext*
request.__set_header_type(ctx->header_type);
request.__set_loadId(ctx->id.to_thrift());
if (ctx->use_streaming) {
auto pipe = std::make_shared<StreamLoadPipe>(1024 * 1024 /* max_buffered_bytes */,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should define a constant of max_buffered_bytes instead of modifying every place every time

auto pipe = std::make_shared<StreamLoadPipe>(kMaxPipeBufferedBytes /* max_buffered_bytes */,
64 * 1024 /* min_chunk_size */,
ctx->body_bytes /* total_length */);
RETURN_IF_ERROR(_exec_env->load_stream_mgr()->put(ctx->id, pipe));
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) {
stream_load_cxt->need_commit_self = true;
stream_load_cxt->need_rollback = true;
// total_length == -1 means read one message from pipe in once time, don't care the length.
auto pipe = std::make_shared<StreamLoadPipe>(1024 * 1024 /* max_buffered_bytes */,
auto pipe = std::make_shared<StreamLoadPipe>(kMaxPipeBufferedBytes /* max_buffered_bytes */,
64 * 1024 /* min_chunk_size */,
-1 /* total_length */, true /* use_proto */);
stream_load_cxt->body_sink = pipe;
Expand Down
6 changes: 4 additions & 2 deletions be/src/runtime/stream_load/stream_load_pipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@

namespace doris {

const size_t kMaxPipeBufferedBytes = 4 * 1024 * 1024;
// StreamLoadPipe use to transfer data from producer to consumer
// Data in pip is stored in chunks.
class StreamLoadPipe : public MessageBodySink, public FileReader {
public:
StreamLoadPipe(size_t max_buffered_bytes = 1024 * 1024, size_t min_chunk_size = 64 * 1024,
int64_t total_length = -1, bool use_proto = false)
StreamLoadPipe(size_t max_buffered_bytes = kMaxPipeBufferedBytes,
size_t min_chunk_size = 64 * 1024, int64_t total_length = -1,
bool use_proto = false)
: _buffered_bytes(0),
_proto_buffered_bytes(0),
_max_buffered_bytes(max_buffered_bytes),
Expand Down
126 changes: 87 additions & 39 deletions thirdparty/patches/libevent.patch
Original file line number Diff line number Diff line change
@@ -1,6 +1,75 @@
diff -uprN a/http.c b/http.c
--- a/http.c 2020-07-05 20:02:46.000000000 +0800
+++ b/http.c 2021-09-28 13:56:14.045159153 +0800
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 676727f1..833fbf70 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -200,7 +200,7 @@ endif()
if (("${CMAKE_C_COMPILER_ID}" STREQUAL "GNU") OR (${CLANG}))
set(GNUC 1)
endif()
-if (("${CMAKE_C_COMPILER_ID}" STREQUAL "MSVC") OR (${CLANG}))
+if (("${CMAKE_C_COMPILER_ID}" STREQUAL "MSVC") OR ("${CMAKE_C_SIMULATE_ID}" STREQUAL "MSVC"))
set(MSVC 1)
endif()

diff --git a/buffer.c b/buffer.c
index 3524b350..e5d97458 100644
--- a/buffer.c
+++ b/buffer.c
@@ -2204,9 +2204,9 @@ evbuffer_expand(struct evbuffer *buf, size_t datlen)
#define IOV_LEN_TYPE unsigned long
#endif
#endif
-#define NUM_READ_IOVEC 4
+#define NUM_READ_IOVEC 8

-#define EVBUFFER_MAX_READ 4096
+#define EVBUFFER_MAX_READ (128 * 1024)

/** Helper function to figure out which space to use for reading data into
an evbuffer. Internal use only.
diff --git a/bufferevent_async.c b/bufferevent_async.c
index 40c7c5e8..c1624878 100644
--- a/bufferevent_async.c
+++ b/bufferevent_async.c
@@ -275,7 +275,7 @@ bev_async_consider_reading(struct bufferevent_async *beva)
}
at_most = read_high - cur_size;
} else {
- at_most = 16384; /* FIXME totally magic. */
+ at_most = 128 * 1024; /* FIXME totally magic. */
}

/* XXXX This over-commits. */
diff --git a/bufferevent_ratelim.c b/bufferevent_ratelim.c
index 25874968..9bc2b577 100644
--- a/bufferevent_ratelim.c
+++ b/bufferevent_ratelim.c
@@ -179,7 +179,7 @@ ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
}

/* Default values for max_single_read & max_single_write variables. */
-#define MAX_SINGLE_READ_DEFAULT 16384
+#define MAX_SINGLE_READ_DEFAULT (128 * 1024)
#define MAX_SINGLE_WRITE_DEFAULT 16384

#define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
diff --git a/http-internal.h b/http-internal.h
index feaf436d..9f9b5ab5 100644
--- a/http-internal.h
+++ b/http-internal.h
@@ -167,6 +167,8 @@ struct evhttp {
void *gencbarg;
struct bufferevent* (*bevcb)(struct event_base *, void *);
void *bevcbarg;
+ int (*newreqcb)(struct evhttp_request *req, void *);
+ void *newreqcbarg;

struct event_base *base;
};
diff --git a/http.c b/http.c
index 04f089bc..53951cba 100644
--- a/http.c
+++ b/http.c
@@ -3975,6 +3975,14 @@ evhttp_set_bevcb(struct evhttp *http,
http->bevcbarg = cbarg;
}
Expand All @@ -16,7 +85,7 @@ diff -uprN a/http.c b/http.c
/*
* Request related functions
*/
@@ -4036,6 +4044,8 @@ evhttp_request_free(struct evhttp_reques
@@ -4036,6 +4044,8 @@ evhttp_request_free(struct evhttp_request *req)
req->flags |= EVHTTP_REQ_NEEDS_FREE;
return;
}
Expand All @@ -25,7 +94,7 @@ diff -uprN a/http.c b/http.c

if (req->remote_host != NULL)
mm_free(req->remote_host);
@@ -4116,6 +4126,15 @@ evhttp_request_set_on_complete_cb(struct
@@ -4116,6 +4126,15 @@ evhttp_request_set_on_complete_cb(struct evhttp_request *req,
req->on_complete_cb_arg = cb_arg;
}

Expand All @@ -41,7 +110,7 @@ diff -uprN a/http.c b/http.c
/*
* Allows for inspection of the request URI
*/
@@ -4307,10 +4326,15 @@ evhttp_associate_new_request_with_connec
@@ -4307,10 +4326,15 @@ evhttp_associate_new_request_with_connection(struct evhttp_connection *evcon)
*/
req->userdone = 1;

Expand All @@ -59,25 +128,15 @@ diff -uprN a/http.c b/http.c

evhttp_start_read_(evcon);

diff -uprN a/http-internal.h b/http-internal.h
--- a/http-internal.h 2020-07-05 20:02:46.000000000 +0800
+++ b/http-internal.h 2021-09-28 13:56:13.925151028 +0800
@@ -167,6 +167,8 @@ struct evhttp {
void *gencbarg;
struct bufferevent* (*bevcb)(struct event_base *, void *);
void *bevcbarg;
+ int (*newreqcb)(struct evhttp_request *req, void *);
+ void *newreqcbarg;

struct event_base *base;
};
diff -uprN a/include/event2/http.h b/include/event2/http.h
--- a/include/event2/http.h 2020-07-05 20:02:46.000000000 +0800
+++ b/include/event2/http.h 2021-09-28 13:56:13.928151231 +0800
@@ -299,6 +299,20 @@ void evhttp_set_bevcb(struct evhttp *htt
diff --git a/include/event2/http.h b/include/event2/http.h
index 2a41303e..e80bab9a 100644
--- a/include/event2/http.h
+++ b/include/event2/http.h
@@ -298,6 +298,20 @@ EVENT2_EXPORT_SYMBOL
void evhttp_set_bevcb(struct evhttp *http,
struct bufferevent *(*cb)(struct event_base *, void *), void *arg);

/**
+/**
+ Set a callback which allows the user to note or throttle incoming requests.
+ The requests are not populated with HTTP level information. They
+ are just associated to a connection.
Expand All @@ -91,10 +150,9 @@ diff -uprN a/include/event2/http.h b/include/event2/http.h
+void evhttp_set_newreqcb(struct evhttp *http,
+ int (*cb)(struct evhttp_request*, void *), void *arg);
+
+/**
/**
Adds a virtual host to the http server.

A virtual host is a newly initialized evhttp object that has request
@@ -624,6 +638,20 @@ EVENT2_EXPORT_SYMBOL
void evhttp_request_set_on_complete_cb(struct evhttp_request *req,
void (*cb)(struct evhttp_request *, void *), void *cb_arg);
Expand All @@ -116,9 +174,10 @@ diff -uprN a/include/event2/http.h b/include/event2/http.h
/** Frees the request object and removes associated events. */
EVENT2_EXPORT_SYMBOL
void evhttp_request_free(struct evhttp_request *req);
diff -uprN a/include/event2/http_struct.h b/include/event2/http_struct.h
--- a/include/event2/http_struct.h 2020-07-05 20:02:46.000000000 +0800
+++ b/include/event2/http_struct.h 2021-09-28 13:56:13.928151231 +0800
diff --git a/include/event2/http_struct.h b/include/event2/http_struct.h
index 4bf5b1ff..0762cabd 100644
--- a/include/event2/http_struct.h
+++ b/include/event2/http_struct.h
@@ -142,6 +142,12 @@ struct {
*/
void (*on_complete_cb)(struct evhttp_request *, void *);
Expand All @@ -132,14 +191,3 @@ diff -uprN a/include/event2/http_struct.h b/include/event2/http_struct.h
};

#ifdef __cplusplus
diff -uprN a/CMakeLists.txt b/CMakeLists.txt
--- a/CMakeLists.txt 2020-07-05 20:02:46.000000000 +0800
+++ b/CMakeLists.txt 2022-01-10 13:29:32.912883436 +0800
@@ -200,6 +200,6 @@ endif()
if (("${CMAKE_C_COMPILER_ID}" STREQUAL "GNU") OR (${CLANG}))
set(GNUC 1)
endif()
-if (("${CMAKE_C_COMPILER_ID}" STREQUAL "MSVC") OR (${CLANG}))
+if (("${CMAKE_C_COMPILER_ID}" STREQUAL "MSVC") OR ("${CMAKE_C_SIMULATE_ID}" STREQUAL "MSVC"))
set(MSVC 1)
endif()