From f4c7f65a3b629504de7e7fbde97a4a235188056e Mon Sep 17 00:00:00 2001 From: zclllhhjj Date: Wed, 7 Aug 2024 19:30:53 +0800 Subject: [PATCH] [enhancement](errmsg) Make stream load error message more clear and change some Errors stacktrace (#38978) ## Proposed changes Issue Number: close #xxx 1. introduce `ERROR_CTOR_NOSTACK` to make some Doris' native error don't print stacktrace by default. 2. when streamload exceed limit, just tell user which conf to change --- be/src/common/status.h | 47 +++++++++-------- be/src/http/action/http_stream.cpp | 24 ++++----- be/src/http/action/stream_load.cpp | 52 +++++++++---------- .../stream_load/test_stream_load.groovy | 4 +- 4 files changed, 63 insertions(+), 64 deletions(-) diff --git a/be/src/common/status.h b/be/src/common/status.h index 11c7c42ac99496..573f28dbcf37e5 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -314,8 +314,8 @@ extern ErrorCodeState error_states[MAX_ERROR_CODE_DEFINE_NUM]; class ErrorCodeInitializer { public: ErrorCodeInitializer(int temp) : signal_value(temp) { - for (int i = 0; i < MAX_ERROR_CODE_DEFINE_NUM; ++i) { - error_states[i].error_code = 0; + for (auto& error_state : error_states) { + error_state.error_code = 0; } #define M(NAME, ENABLESTACKTRACE) \ error_states[TStatusCode::NAME].stacktrace = ENABLESTACKTRACE; \ @@ -338,7 +338,7 @@ class ErrorCodeInitializer { #undef M } - void check_init() { + void check_init() const { //the signal value is 0, it means the global error states not inited, it's logical error // DO NOT use dcheck here, because dcheck depend on glog, and glog maybe not inited at this time. if (signal_value == 0) { @@ -441,41 +441,49 @@ class [[nodiscard]] Status { return status; } - static Status OK() { return Status(); } + static Status OK() { return {}; } +// default have stacktrace. could disable manually. #define ERROR_CTOR(name, code) \ template \ static Status name(std::string_view msg, Args&&... args) { \ return Error(msg, std::forward(args)...); \ } +// default have no stacktrace. could enable manually. +#define ERROR_CTOR_NOSTACK(name, code) \ + template \ + static Status name(std::string_view msg, Args&&... args) { \ + return Error(msg, std::forward(args)...); \ + } + ERROR_CTOR(PublishTimeout, PUBLISH_TIMEOUT) ERROR_CTOR(MemoryAllocFailed, MEM_ALLOC_FAILED) ERROR_CTOR(BufferAllocFailed, BUFFER_ALLOCATION_FAILED) - ERROR_CTOR(InvalidArgument, INVALID_ARGUMENT) - ERROR_CTOR(InvalidJsonPath, INVALID_JSON_PATH) + ERROR_CTOR_NOSTACK(InvalidArgument, INVALID_ARGUMENT) + ERROR_CTOR_NOSTACK(InvalidJsonPath, INVALID_JSON_PATH) ERROR_CTOR(MinimumReservationUnavailable, MINIMUM_RESERVATION_UNAVAILABLE) ERROR_CTOR(Corruption, CORRUPTION) ERROR_CTOR(IOError, IO_ERROR) ERROR_CTOR(NotFound, NOT_FOUND) - ERROR_CTOR(AlreadyExist, ALREADY_EXIST) + ERROR_CTOR_NOSTACK(AlreadyExist, ALREADY_EXIST) ERROR_CTOR(NotSupported, NOT_IMPLEMENTED_ERROR) - ERROR_CTOR(EndOfFile, END_OF_FILE) + ERROR_CTOR_NOSTACK(EndOfFile, END_OF_FILE) ERROR_CTOR(InternalError, INTERNAL_ERROR) - ERROR_CTOR(WaitForRf, PIP_WAIT_FOR_RF) - ERROR_CTOR(WaitForScannerContext, PIP_WAIT_FOR_SC) + ERROR_CTOR_NOSTACK(WaitForRf, PIP_WAIT_FOR_RF) + ERROR_CTOR_NOSTACK(WaitForScannerContext, PIP_WAIT_FOR_SC) ERROR_CTOR(RuntimeError, RUNTIME_ERROR) - ERROR_CTOR(Cancelled, CANCELLED) + ERROR_CTOR_NOSTACK(Cancelled, CANCELLED) ERROR_CTOR(MemoryLimitExceeded, MEM_LIMIT_EXCEEDED) ERROR_CTOR(RpcError, THRIFT_RPC_ERROR) ERROR_CTOR(TimedOut, TIMEOUT) - ERROR_CTOR(TooManyTasks, TOO_MANY_TASKS) + ERROR_CTOR_NOSTACK(TooManyTasks, TOO_MANY_TASKS) ERROR_CTOR(Uninitialized, UNINITIALIZED) ERROR_CTOR(Aborted, ABORTED) - ERROR_CTOR(DataQualityError, DATA_QUALITY_ERROR) - ERROR_CTOR(NotAuthorized, NOT_AUTHORIZED) + ERROR_CTOR_NOSTACK(DataQualityError, DATA_QUALITY_ERROR) + ERROR_CTOR_NOSTACK(NotAuthorized, NOT_AUTHORIZED) ERROR_CTOR(HttpError, HTTP_ERROR) - ERROR_CTOR(NeedSendAgain, NEED_SEND_AGAIN) + ERROR_CTOR_NOSTACK(NeedSendAgain, NEED_SEND_AGAIN) #undef ERROR_CTOR template @@ -584,15 +592,15 @@ class AtomicStatus { return error_st_; } + AtomicStatus(const AtomicStatus&) = delete; + void operator=(const AtomicStatus&) = delete; + private: std::atomic_int16_t error_code_ = 0; Status error_st_; // mutex's lock is not a const method, but we will use this mutex in // some const method, so that it should be mutable. mutable std::mutex mutex_; - - AtomicStatus(const AtomicStatus&) = delete; - void operator=(const AtomicStatus&) = delete; }; inline std::ostream& operator<<(std::ostream& ostr, const Status& status) { @@ -638,9 +646,6 @@ inline std::string Status::to_string_no_stack() const { } \ } while (false) -#define RETURN_ERROR_IF_NON_VEC \ - return Status::NotSupported("Non-vectorized engine is not supported since Doris 2.0."); - #define RETURN_IF_STATUS_ERROR(status, stmt) \ do { \ status = (stmt); \ diff --git a/be/src/http/action/http_stream.cpp b/be/src/http/action/http_stream.cpp index 87cc2f694eb102..e7bfa83911141a 100644 --- a/be/src/http/action/http_stream.cpp +++ b/be/src/http/action/http_stream.cpp @@ -18,9 +18,7 @@ #include "http/action/http_stream.h" #include -#include #include -#include #include // use string iequal @@ -30,10 +28,8 @@ #include #include -#include "cloud/cloud_storage_engine.h" #include "cloud/config.h" #include "common/config.h" -#include "common/consts.h" #include "common/logging.h" #include "common/status.h" #include "common/utils.h" @@ -44,7 +40,6 @@ #include "http/http_common.h" #include "http/http_headers.h" #include "http/http_request.h" -#include "http/http_response.h" #include "http/utils.h" #include "io/fs/stream_load_pipe.h" #include "olap/storage_engine.h" @@ -58,9 +53,7 @@ #include "runtime/stream_load/stream_load_executor.h" #include "runtime/stream_load/stream_load_recorder.h" #include "util/byte_buffer.h" -#include "util/debug_util.h" #include "util/doris_metrics.h" -#include "util/load_util.h" #include "util/metrics.h" #include "util/string_util.h" #include "util/thrift_rpc_helper.h" @@ -133,7 +126,7 @@ Status HttpStreamAction::_handle(HttpRequest* http_req, std::shared_ptrbody_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) { LOG(WARNING) << "recevie body don't equal with body bytes, body_bytes=" << ctx->body_bytes << ", receive_bytes=" << ctx->receive_bytes << ", id=" << ctx->id; - return Status::InternalError("receive body don't equal with body bytes"); + return Status::Error("receive body don't equal with body bytes"); } RETURN_IF_ERROR(ctx->body_sink->finish()); @@ -196,7 +189,7 @@ Status HttpStreamAction::_on_header(HttpRequest* http_req, std::shared_ptrauth)) { LOG(WARNING) << "parse basic authorization failed." << ctx->brief(); - return Status::InternalError("no valid Basic authorization"); + return Status::NotAuthorized("no valid Basic authorization"); } // TODO(zs) : need Need to request an FE to obtain information such as format @@ -208,8 +201,10 @@ Status HttpStreamAction::_on_header(HttpRequest* http_req, std::shared_ptrbody_bytes > csv_max_body_bytes) { LOG(WARNING) << "body exceed max size." << ctx->brief(); - return Status::InternalError("body exceed max size: {}, data: {}", csv_max_body_bytes, - ctx->body_bytes); + return Status::Error( + "body size {} exceed BE's conf `streaming_load_max_mb` {}. increase it if you " + "are sure this load is reasonable", + ctx->body_bytes, csv_max_body_bytes); } } @@ -386,7 +381,8 @@ Status HttpStreamAction::_handle_group_commit(HttpRequest* req, std::string group_commit_mode = req->header(HTTP_GROUP_COMMIT); if (!group_commit_mode.empty() && !iequal(group_commit_mode, "sync_mode") && !iequal(group_commit_mode, "async_mode") && !iequal(group_commit_mode, "off_mode")) { - return Status::InternalError("group_commit can only be [async_mode, sync_mode, off_mode]"); + return Status::InvalidArgument( + "group_commit can only be [async_mode, sync_mode, off_mode]"); } if (config::wait_internal_group_commit_finish) { group_commit_mode = "sync_mode"; @@ -399,7 +395,7 @@ Status HttpStreamAction::_handle_group_commit(HttpRequest* req, ss << "This http load content length <0 (" << content_length << "), please check your content length."; LOG(WARNING) << ss.str(); - return Status::InternalError(ss.str()); + return Status::InvalidArgument(ss.str()); } // allow chunked stream load in flink auto is_chunk = @@ -421,7 +417,7 @@ Status HttpStreamAction::_handle_group_commit(HttpRequest* req, auto partitions = !req->header(HTTP_PARTITIONS).empty(); if (!partial_columns && !partitions && !temp_partitions && !ctx->two_phase_commit) { if (!config::wait_internal_group_commit_finish && !ctx->label.empty()) { - return Status::InternalError("label and group_commit can't be set at the same time"); + return Status::InvalidArgument("label and group_commit can't be set at the same time"); } ctx->group_commit = true; if (iequal(group_commit_mode, "async_mode")) { diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 64becf8d7e3369..75d6943d3c6382 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -26,20 +26,17 @@ #include #include #include -#include -#include #include #include -#include -#include +#include +#include +#include #include -#include #include #include #include -#include "cloud/cloud_storage_engine.h" #include "cloud/config.h" #include "common/config.h" #include "common/consts.h" @@ -122,7 +119,7 @@ void StreamLoadAction::handle(HttpRequest* req) { _exec_env->stream_load_executor()->rollback_txn(ctx.get()); ctx->need_rollback = false; } - if (ctx->body_sink.get() != nullptr) { + if (ctx->body_sink != nullptr) { ctx->body_sink->cancel(ctx->status.to_string()); } } @@ -146,7 +143,7 @@ Status StreamLoadAction::_handle(std::shared_ptr ctx) { if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) { LOG(WARNING) << "recevie body don't equal with body bytes, body_bytes=" << ctx->body_bytes << ", receive_bytes=" << ctx->receive_bytes << ", id=" << ctx->id; - return Status::InternalError("receive body don't equal with body bytes"); + return Status::Error("receive body don't equal with body bytes"); } // if we use non-streaming, MessageBodyFileSink.finish will close the file @@ -210,7 +207,7 @@ int StreamLoadAction::on_header(HttpRequest* req) { _exec_env->stream_load_executor()->rollback_txn(ctx.get()); ctx->need_rollback = false; } - if (ctx->body_sink.get() != nullptr) { + if (ctx->body_sink != nullptr) { ctx->body_sink->cancel(ctx->status.to_string()); } auto str = ctx->to_json(); @@ -232,13 +229,13 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptrauth)) { LOG(WARNING) << "parse basic authorization failed." << ctx->brief(); - return Status::InternalError("no valid Basic authorization"); + return Status::NotAuthorized("no valid Basic authorization"); } // get format of this put if (!http_req->header(HTTP_COMPRESS_TYPE).empty() && iequal(http_req->header(HTTP_FORMAT_KEY), "JSON")) { - return Status::InternalError("compress data of JSON format is not supported."); + return Status::NotSupported("compress data of JSON format is not supported."); } std::string format_str = http_req->header(HTTP_FORMAT_KEY); if (iequal(format_str, BeConsts::CSV_WITH_NAMES) || @@ -254,8 +251,8 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptrheader(HTTP_COMPRESS_TYPE), &ctx->format, &ctx->compress_type); if (ctx->format == TFileFormatType::FORMAT_UNKNOWN) { - return Status::InternalError("unknown data format, format={}", - http_req->header(HTTP_FORMAT_KEY)); + return Status::Error("unknown data format, format={}", + http_req->header(HTTP_FORMAT_KEY)); } // check content length @@ -273,16 +270,18 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptrformat == TFileFormatType::FORMAT_JSON) && (ctx->body_bytes > json_max_body_bytes) && !read_json_by_line) { - return Status::InternalError( - "The size of this batch exceed the max size [{}] of json type data " - " data [ {} ]. Split the file, or use 'read_json_by_line'", - json_max_body_bytes, ctx->body_bytes); + return Status::Error( + "json body size {} exceed BE's conf `streaming_load_json_max_mb` {}. increase " + "it if you are sure this load is reasonable", + ctx->body_bytes, json_max_body_bytes); } // csv max body size else if (ctx->body_bytes > csv_max_body_bytes) { LOG(WARNING) << "body exceed max size." << ctx->brief(); - return Status::InternalError("body exceed max size: {}, data: {}", - csv_max_body_bytes, ctx->body_bytes); + return Status::Error( + "body size {} exceed BE's conf `streaming_load_max_mb` {}. increase it if you " + "are sure this load is reasonable", + ctx->body_bytes, csv_max_body_bytes); } } else { #ifndef BE_TEST @@ -300,13 +299,13 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptris_chunked_transfer))) { LOG(WARNING) << "content_length is empty and transfer-encoding!=chunked, please set " "content_length or transfer-encoding=chunked"; - return Status::InternalError( + return Status::InvalidArgument( "content_length is empty and transfer-encoding!=chunked, please set content_length " "or transfer-encoding=chunked"); } else if (UNLIKELY(!http_req->header(HttpHeaders::CONTENT_LENGTH).empty() && ctx->is_chunked_transfer)) { LOG(WARNING) << "please do not set both content_length and transfer-encoding"; - return Status::InternalError( + return Status::InvalidArgument( "please do not set both content_length and transfer-encoding"); } @@ -430,7 +429,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, if (!http_req->header(HTTP_LINE_DELIMITER).empty()) { request.__set_line_delimiter(http_req->header(HTTP_LINE_DELIMITER)); } - if (!http_req->header(HTTP_ENCLOSE).empty() && http_req->header(HTTP_ENCLOSE).size() > 0) { + if (!http_req->header(HTTP_ENCLOSE).empty() && !http_req->header(HTTP_ENCLOSE).empty()) { const auto& enclose_str = http_req->header(HTTP_ENCLOSE); if (enclose_str.length() != 1) { return Status::InvalidArgument("enclose must be single-char, actually is {}", @@ -438,7 +437,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, } request.__set_enclose(http_req->header(HTTP_ENCLOSE)[0]); } - if (!http_req->header(HTTP_ESCAPE).empty() && http_req->header(HTTP_ESCAPE).size() > 0) { + if (!http_req->header(HTTP_ESCAPE).empty() && !http_req->header(HTTP_ESCAPE).empty()) { const auto& escape_str = http_req->header(HTTP_ESCAPE); if (escape_str.length() != 1) { return Status::InvalidArgument("escape must be single-char, actually is {}", @@ -728,7 +727,7 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* req, std::string group_commit_mode = req->header(HTTP_GROUP_COMMIT); if (!group_commit_mode.empty() && !iequal(group_commit_mode, "sync_mode") && !iequal(group_commit_mode, "async_mode") && !iequal(group_commit_mode, "off_mode")) { - return Status::InternalError( + return Status::InvalidArgument( "group_commit can only be [async_mode, sync_mode, off_mode]"); } if (config::wait_internal_group_commit_finish) { @@ -742,7 +741,7 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* req, ss << "This stream load content length <0 (" << content_length << "), please check your content length."; LOG(WARNING) << ss.str(); - return Status::InternalError(ss.str()); + return Status::InvalidArgument(ss.str()); } // allow chunked stream load in flink auto is_chunk = !req->header(HttpHeaders::TRANSFER_ENCODING).empty() && @@ -763,8 +762,7 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* req, auto partitions = !req->header(HTTP_PARTITIONS).empty(); if (!partial_columns && !partitions && !temp_partitions && !ctx->two_phase_commit) { if (!config::wait_internal_group_commit_finish && !ctx->label.empty()) { - return Status::InternalError( - "label and group_commit can't be set at the same time"); + return Status::InvalidArgument("label and group_commit can't be set at the same time"); } ctx->group_commit = true; if (iequal(group_commit_mode, "async_mode")) { diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy index f089861b76eb4a..f84ac60a21ad6c 100644 --- a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy +++ b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy @@ -1657,7 +1657,7 @@ suite("test_stream_load", "p0") { log.info("test chunked transfer result: ${out}".toString()) def json = parseJson(out) assertEquals("fail", json.Status.toLowerCase()) - assertTrue(json.Message.contains("[INTERNAL_ERROR]please do not set both content_length and transfer-encoding")) + assertTrue(json.Message.contains("please do not set both content_length and transfer-encoding")) } finally { sql """ DROP TABLE IF EXISTS ${tableName16} FORCE""" } @@ -1687,7 +1687,7 @@ suite("test_stream_load", "p0") { log.info("test chunked transfer result: ${out}".toString()) def json = parseJson(out) assertEquals("fail", json.Status.toLowerCase()) - assertTrue(json.Message.contains("[INTERNAL_ERROR]content_length is empty and transfer-encoding!=chunked, please set content_length or transfer-encoding=chunked")) + assertTrue(json.Message.contains("content_length is empty and transfer-encoding!=chunked, please set content_length or transfer-encoding=chunked")) } finally { sql """ DROP TABLE IF EXISTS ${tableName16} FORCE""" }