diff --git a/be/src/common/status.h b/be/src/common/status.h index 8847bb7c0873210..6afd03c286416fd 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -306,8 +306,8 @@ extern ErrorCodeState error_states[MAX_ERROR_CODE_DEFINE_NUM]; class ErrorCodeInitializer { public: ErrorCodeInitializer(int temp) : signal_value(temp) { - for (int i = 0; i < MAX_ERROR_CODE_DEFINE_NUM; ++i) { - error_states[i].error_code = 0; + for (auto& error_state : error_states) { + error_state.error_code = 0; } #define M(NAME, ENABLESTACKTRACE) \ error_states[TStatusCode::NAME].stacktrace = ENABLESTACKTRACE; \ @@ -330,7 +330,7 @@ class ErrorCodeInitializer { #undef M } - void check_init() { + void check_init() const { //the signal value is 0, it means the global error states not inited, it's logical error // DO NOT use dcheck here, because dcheck depend on glog, and glog maybe not inited at this time. if (signal_value == 0) { @@ -435,41 +435,49 @@ class [[nodiscard]] Status { return status; } - static Status OK() { return Status(); } + static Status OK() { return {}; } +// default have stacktrace. could disable manually. #define ERROR_CTOR(name, code) \ template \ static Status name(std::string_view msg, Args&&... args) { \ return Error(msg, std::forward(args)...); \ } +// default have no stacktrace. could enable manually. +#define ERROR_CTOR_NOSTACK(name, code) \ + template \ + static Status name(std::string_view msg, Args&&... args) { \ + return Error(msg, std::forward(args)...); \ + } + ERROR_CTOR(PublishTimeout, PUBLISH_TIMEOUT) ERROR_CTOR(MemoryAllocFailed, MEM_ALLOC_FAILED) ERROR_CTOR(BufferAllocFailed, BUFFER_ALLOCATION_FAILED) - ERROR_CTOR(InvalidArgument, INVALID_ARGUMENT) - ERROR_CTOR(InvalidJsonPath, INVALID_JSON_PATH) + ERROR_CTOR_NOSTACK(InvalidArgument, INVALID_ARGUMENT) + ERROR_CTOR_NOSTACK(InvalidJsonPath, INVALID_JSON_PATH) ERROR_CTOR(MinimumReservationUnavailable, MINIMUM_RESERVATION_UNAVAILABLE) ERROR_CTOR(Corruption, CORRUPTION) ERROR_CTOR(IOError, IO_ERROR) ERROR_CTOR(NotFound, NOT_FOUND) - ERROR_CTOR(AlreadyExist, ALREADY_EXIST) + ERROR_CTOR_NOSTACK(AlreadyExist, ALREADY_EXIST) ERROR_CTOR(NotSupported, NOT_IMPLEMENTED_ERROR) - ERROR_CTOR(EndOfFile, END_OF_FILE) + ERROR_CTOR_NOSTACK(EndOfFile, END_OF_FILE) ERROR_CTOR(InternalError, INTERNAL_ERROR) - ERROR_CTOR(WaitForRf, PIP_WAIT_FOR_RF) - ERROR_CTOR(WaitForScannerContext, PIP_WAIT_FOR_SC) + ERROR_CTOR_NOSTACK(WaitForRf, PIP_WAIT_FOR_RF) + ERROR_CTOR_NOSTACK(WaitForScannerContext, PIP_WAIT_FOR_SC) ERROR_CTOR(RuntimeError, RUNTIME_ERROR) - ERROR_CTOR(Cancelled, CANCELLED) + ERROR_CTOR_NOSTACK(Cancelled, CANCELLED) ERROR_CTOR(MemoryLimitExceeded, MEM_LIMIT_EXCEEDED) ERROR_CTOR(RpcError, THRIFT_RPC_ERROR) ERROR_CTOR(TimedOut, TIMEOUT) - ERROR_CTOR(TooManyTasks, TOO_MANY_TASKS) + ERROR_CTOR_NOSTACK(TooManyTasks, TOO_MANY_TASKS) ERROR_CTOR(Uninitialized, UNINITIALIZED) ERROR_CTOR(Aborted, ABORTED) - ERROR_CTOR(DataQualityError, DATA_QUALITY_ERROR) - ERROR_CTOR(NotAuthorized, NOT_AUTHORIZED) + ERROR_CTOR_NOSTACK(DataQualityError, DATA_QUALITY_ERROR) + ERROR_CTOR_NOSTACK(NotAuthorized, NOT_AUTHORIZED) ERROR_CTOR(HttpError, HTTP_ERROR) - ERROR_CTOR(NeedSendAgain, NEED_SEND_AGAIN) + ERROR_CTOR_NOSTACK(NeedSendAgain, NEED_SEND_AGAIN) ERROR_CTOR(CgroupError, CGROUP_ERROR) #undef ERROR_CTOR @@ -577,13 +585,13 @@ class AtomicStatus { return error_st_; } + AtomicStatus(const AtomicStatus&) = delete; + void operator=(const AtomicStatus&) = delete; + private: std::atomic_int16_t error_code_ = 0; Status error_st_; mutable std::mutex mutex_; - - AtomicStatus(const AtomicStatus&) = delete; - void operator=(const AtomicStatus&) = delete; }; inline std::ostream& operator<<(std::ostream& ostr, const Status& status) { @@ -631,9 +639,6 @@ inline std::string Status::to_string_no_stack() const { } \ } while (false) -#define RETURN_ERROR_IF_NON_VEC \ - return Status::NotSupported("Non-vectorized engine is not supported since Doris 2.0."); - #define RETURN_IF_STATUS_ERROR(status, stmt) \ do { \ status = (stmt); \ diff --git a/be/src/http/action/http_stream.cpp b/be/src/http/action/http_stream.cpp index b07166a37172cb4..7dd85653002e37d 100644 --- a/be/src/http/action/http_stream.cpp +++ b/be/src/http/action/http_stream.cpp @@ -18,9 +18,7 @@ #include "http/action/http_stream.h" #include -#include #include -#include #include // use string iequal @@ -31,7 +29,6 @@ #include #include "common/config.h" -#include "common/consts.h" #include "common/logging.h" #include "common/status.h" #include "common/utils.h" @@ -42,7 +39,6 @@ #include "http/http_common.h" #include "http/http_headers.h" #include "http/http_request.h" -#include "http/http_response.h" #include "http/utils.h" #include "io/fs/stream_load_pipe.h" #include "olap/storage_engine.h" @@ -57,9 +53,7 @@ #include "runtime/stream_load/stream_load_executor.h" #include "runtime/stream_load/stream_load_recorder.h" #include "util/byte_buffer.h" -#include "util/debug_util.h" #include "util/doris_metrics.h" -#include "util/load_util.h" #include "util/metrics.h" #include "util/string_util.h" #include "util/thrift_rpc_helper.h" @@ -132,7 +126,7 @@ Status HttpStreamAction::_handle(HttpRequest* http_req, std::shared_ptrbody_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) { LOG(WARNING) << "recevie body don't equal with body bytes, body_bytes=" << ctx->body_bytes << ", receive_bytes=" << ctx->receive_bytes << ", id=" << ctx->id; - return Status::InternalError("receive body don't equal with body bytes"); + return Status::Error("receive body don't equal with body bytes"); } RETURN_IF_ERROR(ctx->body_sink->finish()); @@ -195,7 +189,7 @@ Status HttpStreamAction::_on_header(HttpRequest* http_req, std::shared_ptrauth)) { LOG(WARNING) << "parse basic authorization failed." << ctx->brief(); - return Status::InternalError("no valid Basic authorization"); + return Status::NotAuthorized("no valid Basic authorization"); } // TODO(zs) : need Need to request an FE to obtain information such as format @@ -207,8 +201,10 @@ Status HttpStreamAction::_on_header(HttpRequest* http_req, std::shared_ptrbody_bytes > csv_max_body_bytes) { LOG(WARNING) << "body exceed max size." << ctx->brief(); - return Status::InternalError("body exceed max size: {}, data: {}", csv_max_body_bytes, - ctx->body_bytes); + return Status::Error( + "body size {} exceed BE's conf `streaming_load_max_mb` {}. increase it if you " + "are sure this load is reasonable", + ctx->body_bytes, csv_max_body_bytes); } } @@ -380,7 +376,8 @@ Status HttpStreamAction::_handle_group_commit(HttpRequest* req, std::string group_commit_mode = req->header(HTTP_GROUP_COMMIT); if (!group_commit_mode.empty() && !iequal(group_commit_mode, "sync_mode") && !iequal(group_commit_mode, "async_mode") && !iequal(group_commit_mode, "off_mode")) { - return Status::InternalError("group_commit can only be [async_mode, sync_mode, off_mode]"); + return Status::InvalidArgument( + "group_commit can only be [async_mode, sync_mode, off_mode]"); } if (config::wait_internal_group_commit_finish) { group_commit_mode = "sync_mode"; @@ -393,7 +390,7 @@ Status HttpStreamAction::_handle_group_commit(HttpRequest* req, ss << "This http load content length <0 (" << content_length << "), please check your content length."; LOG(WARNING) << ss.str(); - return Status::InternalError(ss.str()); + return Status::InvalidArgument(ss.str()); } // allow chunked stream load in flink auto is_chunk = @@ -415,7 +412,7 @@ Status HttpStreamAction::_handle_group_commit(HttpRequest* req, auto partitions = !req->header(HTTP_PARTITIONS).empty(); if (!partial_columns && !partitions && !temp_partitions && !ctx->two_phase_commit) { if (!config::wait_internal_group_commit_finish && !ctx->label.empty()) { - return Status::InternalError("label and group_commit can't be set at the same time"); + return Status::InvalidArgument("label and group_commit can't be set at the same time"); } ctx->group_commit = true; if (iequal(group_commit_mode, "async_mode")) { diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index feea93446c8c0af..3f32655cf140274 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -26,15 +26,13 @@ #include #include #include -#include -#include #include #include -#include -#include +#include +#include +#include #include -#include #include #include #include @@ -120,7 +118,7 @@ void StreamLoadAction::handle(HttpRequest* req) { _exec_env->stream_load_executor()->rollback_txn(ctx.get()); ctx->need_rollback = false; } - if (ctx->body_sink.get() != nullptr) { + if (ctx->body_sink != nullptr) { ctx->body_sink->cancel(ctx->status.to_string()); } } @@ -144,7 +142,7 @@ Status StreamLoadAction::_handle(std::shared_ptr ctx) { if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) { LOG(WARNING) << "recevie body don't equal with body bytes, body_bytes=" << ctx->body_bytes << ", receive_bytes=" << ctx->receive_bytes << ", id=" << ctx->id; - return Status::InternalError("receive body don't equal with body bytes"); + return Status::Error("receive body don't equal with body bytes"); } // if we use non-streaming, MessageBodyFileSink.finish will close the file @@ -208,7 +206,7 @@ int StreamLoadAction::on_header(HttpRequest* req) { _exec_env->stream_load_executor()->rollback_txn(ctx.get()); ctx->need_rollback = false; } - if (ctx->body_sink.get() != nullptr) { + if (ctx->body_sink != nullptr) { ctx->body_sink->cancel(ctx->status.to_string()); } auto str = ctx->to_json(); @@ -230,13 +228,13 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptrauth)) { LOG(WARNING) << "parse basic authorization failed." << ctx->brief(); - return Status::InternalError("no valid Basic authorization"); + return Status::NotAuthorized("no valid Basic authorization"); } // get format of this put if (!http_req->header(HTTP_COMPRESS_TYPE).empty() && iequal(http_req->header(HTTP_FORMAT_KEY), "JSON")) { - return Status::InternalError("compress data of JSON format is not supported."); + return Status::NotSupported("compress data of JSON format is not supported."); } std::string format_str = http_req->header(HTTP_FORMAT_KEY); if (iequal(format_str, BeConsts::CSV_WITH_NAMES) || @@ -252,8 +250,8 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptrheader(HTTP_COMPRESS_TYPE), &ctx->format, &ctx->compress_type); if (ctx->format == TFileFormatType::FORMAT_UNKNOWN) { - return Status::InternalError("unknown data format, format={}", - http_req->header(HTTP_FORMAT_KEY)); + return Status::Error("unknown data format, format={}", + http_req->header(HTTP_FORMAT_KEY)); } // check content length @@ -271,16 +269,18 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptrformat == TFileFormatType::FORMAT_JSON) && (ctx->body_bytes > json_max_body_bytes) && !read_json_by_line) { - return Status::InternalError( - "The size of this batch exceed the max size [{}] of json type data " - " data [ {} ]. Split the file, or use 'read_json_by_line'", - json_max_body_bytes, ctx->body_bytes); + return Status::Error( + "json body size {} exceed BE's conf `streaming_load_json_max_mb` {}. increase " + "it if you are sure this load is reasonable", + ctx->body_bytes, json_max_body_bytes); } // csv max body size else if (ctx->body_bytes > csv_max_body_bytes) { LOG(WARNING) << "body exceed max size." << ctx->brief(); - return Status::InternalError("body exceed max size: {}, data: {}", - csv_max_body_bytes, ctx->body_bytes); + return Status::Error( + "body size {} exceed BE's conf `streaming_load_max_mb` {}. increase it if you " + "are sure this load is reasonable", + ctx->body_bytes, csv_max_body_bytes); } } else { #ifndef BE_TEST @@ -298,13 +298,13 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptris_chunked_transfer))) { LOG(WARNING) << "content_length is empty and transfer-encoding!=chunked, please set " "content_length or transfer-encoding=chunked"; - return Status::InternalError( + return Status::InvalidArgument( "content_length is empty and transfer-encoding!=chunked, please set content_length " "or transfer-encoding=chunked"); } else if (UNLIKELY(!http_req->header(HttpHeaders::CONTENT_LENGTH).empty() && ctx->is_chunked_transfer)) { LOG(WARNING) << "please do not set both content_length and transfer-encoding"; - return Status::InternalError( + return Status::InvalidArgument( "please do not set both content_length and transfer-encoding"); } @@ -428,7 +428,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, if (!http_req->header(HTTP_LINE_DELIMITER).empty()) { request.__set_line_delimiter(http_req->header(HTTP_LINE_DELIMITER)); } - if (!http_req->header(HTTP_ENCLOSE).empty() && http_req->header(HTTP_ENCLOSE).size() > 0) { + if (!http_req->header(HTTP_ENCLOSE).empty() && !http_req->header(HTTP_ENCLOSE).empty()) { const auto& enclose_str = http_req->header(HTTP_ENCLOSE); if (enclose_str.length() != 1) { return Status::InvalidArgument("enclose must be single-char, actually is {}", @@ -436,7 +436,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, } request.__set_enclose(http_req->header(HTTP_ENCLOSE)[0]); } - if (!http_req->header(HTTP_ESCAPE).empty() && http_req->header(HTTP_ESCAPE).size() > 0) { + if (!http_req->header(HTTP_ESCAPE).empty() && !http_req->header(HTTP_ESCAPE).empty()) { const auto& escape_str = http_req->header(HTTP_ESCAPE); if (escape_str.length() != 1) { return Status::InvalidArgument("escape must be single-char, actually is {}", @@ -717,7 +717,7 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* req, std::string group_commit_mode = req->header(HTTP_GROUP_COMMIT); if (!group_commit_mode.empty() && !iequal(group_commit_mode, "sync_mode") && !iequal(group_commit_mode, "async_mode") && !iequal(group_commit_mode, "off_mode")) { - return Status::InternalError( + return Status::InvalidArgument( "group_commit can only be [async_mode, sync_mode, off_mode]"); } if (config::wait_internal_group_commit_finish) { @@ -731,7 +731,7 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* req, ss << "This stream load content length <0 (" << content_length << "), please check your content length."; LOG(WARNING) << ss.str(); - return Status::InternalError(ss.str()); + return Status::InvalidArgument(ss.str()); } // allow chunked stream load in flink auto is_chunk = !req->header(HttpHeaders::TRANSFER_ENCODING).empty() && @@ -752,8 +752,7 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* req, auto partitions = !req->header(HTTP_PARTITIONS).empty(); if (!partial_columns && !partitions && !temp_partitions && !ctx->two_phase_commit) { if (!config::wait_internal_group_commit_finish && !ctx->label.empty()) { - return Status::InternalError( - "label and group_commit can't be set at the same time"); + return Status::InvalidArgument("label and group_commit can't be set at the same time"); } ctx->group_commit = true; if (iequal(group_commit_mode, "async_mode")) { diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy index 574e2e1466ed604..860ff10e14e5aa3 100644 --- a/regression-test/suites/load_p0/stream_load/test_stream_load.groovy +++ b/regression-test/suites/load_p0/stream_load/test_stream_load.groovy @@ -1648,7 +1648,7 @@ suite("test_stream_load", "p0") { log.info("test chunked transfer result: ${out}".toString()) def json = parseJson(out) assertEquals("fail", json.Status.toLowerCase()) - assertTrue(json.Message.contains("[INTERNAL_ERROR]please do not set both content_length and transfer-encoding")) + assertTrue(json.Message.contains("please do not set both content_length and transfer-encoding")) } finally { sql """ DROP TABLE IF EXISTS ${tableName16} FORCE""" } @@ -1678,7 +1678,7 @@ suite("test_stream_load", "p0") { log.info("test chunked transfer result: ${out}".toString()) def json = parseJson(out) assertEquals("fail", json.Status.toLowerCase()) - assertTrue(json.Message.contains("[INTERNAL_ERROR]content_length is empty and transfer-encoding!=chunked, please set content_length or transfer-encoding=chunked")) + assertTrue(json.Message.contains("content_length is empty and transfer-encoding!=chunked, please set content_length or transfer-encoding=chunked")) } finally { sql """ DROP TABLE IF EXISTS ${tableName16} FORCE""" }