Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[enhancement](errmsg) Make stream load error message more clear and change some Errors stacktrace #38978

Merged
merged 1 commit into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 26 additions & 21 deletions be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,8 @@ extern ErrorCodeState error_states[MAX_ERROR_CODE_DEFINE_NUM];
class ErrorCodeInitializer {
public:
ErrorCodeInitializer(int temp) : signal_value(temp) {
for (int i = 0; i < MAX_ERROR_CODE_DEFINE_NUM; ++i) {
error_states[i].error_code = 0;
for (auto& error_state : error_states) {
error_state.error_code = 0;
}
#define M(NAME, ENABLESTACKTRACE) \
error_states[TStatusCode::NAME].stacktrace = ENABLESTACKTRACE; \
Expand All @@ -338,7 +338,7 @@ class ErrorCodeInitializer {
#undef M
}

void check_init() {
void check_init() const {
//the signal value is 0, it means the global error states not inited, it's logical error
// DO NOT use dcheck here, because dcheck depend on glog, and glog maybe not inited at this time.
if (signal_value == 0) {
Expand Down Expand Up @@ -441,41 +441,49 @@ class [[nodiscard]] Status {
return status;
}

static Status OK() { return Status(); }
static Status OK() { return {}; }

// default have stacktrace. could disable manually.
#define ERROR_CTOR(name, code) \
template <bool stacktrace = true, typename... Args> \
static Status name(std::string_view msg, Args&&... args) { \
return Error<ErrorCode::code, stacktrace>(msg, std::forward<Args>(args)...); \
}

// default have no stacktrace. could enable manually.
#define ERROR_CTOR_NOSTACK(name, code) \
template <bool stacktrace = false, typename... Args> \
static Status name(std::string_view msg, Args&&... args) { \
return Error<ErrorCode::code, stacktrace>(msg, std::forward<Args>(args)...); \
}

ERROR_CTOR(PublishTimeout, PUBLISH_TIMEOUT)
ERROR_CTOR(MemoryAllocFailed, MEM_ALLOC_FAILED)
ERROR_CTOR(BufferAllocFailed, BUFFER_ALLOCATION_FAILED)
ERROR_CTOR(InvalidArgument, INVALID_ARGUMENT)
ERROR_CTOR(InvalidJsonPath, INVALID_JSON_PATH)
ERROR_CTOR_NOSTACK(InvalidArgument, INVALID_ARGUMENT)
ERROR_CTOR_NOSTACK(InvalidJsonPath, INVALID_JSON_PATH)
ERROR_CTOR(MinimumReservationUnavailable, MINIMUM_RESERVATION_UNAVAILABLE)
ERROR_CTOR(Corruption, CORRUPTION)
ERROR_CTOR(IOError, IO_ERROR)
ERROR_CTOR(NotFound, NOT_FOUND)
ERROR_CTOR(AlreadyExist, ALREADY_EXIST)
ERROR_CTOR_NOSTACK(AlreadyExist, ALREADY_EXIST)
ERROR_CTOR(NotSupported, NOT_IMPLEMENTED_ERROR)
ERROR_CTOR(EndOfFile, END_OF_FILE)
ERROR_CTOR_NOSTACK(EndOfFile, END_OF_FILE)
ERROR_CTOR(InternalError, INTERNAL_ERROR)
ERROR_CTOR(WaitForRf, PIP_WAIT_FOR_RF)
ERROR_CTOR(WaitForScannerContext, PIP_WAIT_FOR_SC)
ERROR_CTOR_NOSTACK(WaitForRf, PIP_WAIT_FOR_RF)
ERROR_CTOR_NOSTACK(WaitForScannerContext, PIP_WAIT_FOR_SC)
ERROR_CTOR(RuntimeError, RUNTIME_ERROR)
ERROR_CTOR(Cancelled, CANCELLED)
ERROR_CTOR_NOSTACK(Cancelled, CANCELLED)
ERROR_CTOR(MemoryLimitExceeded, MEM_LIMIT_EXCEEDED)
ERROR_CTOR(RpcError, THRIFT_RPC_ERROR)
ERROR_CTOR(TimedOut, TIMEOUT)
ERROR_CTOR(TooManyTasks, TOO_MANY_TASKS)
ERROR_CTOR_NOSTACK(TooManyTasks, TOO_MANY_TASKS)
ERROR_CTOR(Uninitialized, UNINITIALIZED)
ERROR_CTOR(Aborted, ABORTED)
ERROR_CTOR(DataQualityError, DATA_QUALITY_ERROR)
ERROR_CTOR(NotAuthorized, NOT_AUTHORIZED)
ERROR_CTOR_NOSTACK(DataQualityError, DATA_QUALITY_ERROR)
ERROR_CTOR_NOSTACK(NotAuthorized, NOT_AUTHORIZED)
ERROR_CTOR(HttpError, HTTP_ERROR)
ERROR_CTOR(NeedSendAgain, NEED_SEND_AGAIN)
ERROR_CTOR_NOSTACK(NeedSendAgain, NEED_SEND_AGAIN)
#undef ERROR_CTOR

template <int code>
Expand Down Expand Up @@ -584,15 +592,15 @@ class AtomicStatus {
return error_st_;
}

AtomicStatus(const AtomicStatus&) = delete;
void operator=(const AtomicStatus&) = delete;

private:
std::atomic_int16_t error_code_ = 0;
Status error_st_;
// mutex's lock is not a const method, but we will use this mutex in
// some const method, so that it should be mutable.
mutable std::mutex mutex_;

AtomicStatus(const AtomicStatus&) = delete;
void operator=(const AtomicStatus&) = delete;
};

inline std::ostream& operator<<(std::ostream& ostr, const Status& status) {
Expand Down Expand Up @@ -638,9 +646,6 @@ inline std::string Status::to_string_no_stack() const {
} \
} while (false)

#define RETURN_ERROR_IF_NON_VEC \
return Status::NotSupported("Non-vectorized engine is not supported since Doris 2.0.");

#define RETURN_IF_STATUS_ERROR(status, stmt) \
do { \
status = (stmt); \
Expand Down
24 changes: 10 additions & 14 deletions be/src/http/action/http_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
#include "http/action/http_stream.h"

#include <cstddef>
#include <deque>
#include <future>
#include <shared_mutex>
#include <sstream>

// use string iequal
Expand All @@ -30,10 +28,8 @@
#include <rapidjson/prettywriter.h>
#include <thrift/protocol/TDebugProtocol.h>

#include "cloud/cloud_storage_engine.h"
#include "cloud/config.h"
#include "common/config.h"
#include "common/consts.h"
#include "common/logging.h"
#include "common/status.h"
#include "common/utils.h"
Expand All @@ -44,7 +40,6 @@
#include "http/http_common.h"
#include "http/http_headers.h"
#include "http/http_request.h"
#include "http/http_response.h"
#include "http/utils.h"
#include "io/fs/stream_load_pipe.h"
#include "olap/storage_engine.h"
Expand All @@ -58,9 +53,7 @@
#include "runtime/stream_load/stream_load_executor.h"
#include "runtime/stream_load/stream_load_recorder.h"
#include "util/byte_buffer.h"
#include "util/debug_util.h"
#include "util/doris_metrics.h"
#include "util/load_util.h"
#include "util/metrics.h"
#include "util/string_util.h"
#include "util/thrift_rpc_helper.h"
Expand Down Expand Up @@ -133,7 +126,7 @@ Status HttpStreamAction::_handle(HttpRequest* http_req, std::shared_ptr<StreamLo
if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) {
LOG(WARNING) << "recevie body don't equal with body bytes, body_bytes=" << ctx->body_bytes
<< ", receive_bytes=" << ctx->receive_bytes << ", id=" << ctx->id;
return Status::InternalError("receive body don't equal with body bytes");
return Status::Error<ErrorCode::NETWORK_ERROR>("receive body don't equal with body bytes");
}
RETURN_IF_ERROR(ctx->body_sink->finish());

Expand Down Expand Up @@ -196,7 +189,7 @@ Status HttpStreamAction::_on_header(HttpRequest* http_req, std::shared_ptr<Strea
// auth information
if (!parse_basic_auth(*http_req, &ctx->auth)) {
LOG(WARNING) << "parse basic authorization failed." << ctx->brief();
return Status::InternalError("no valid Basic authorization");
return Status::NotAuthorized("no valid Basic authorization");
}

// TODO(zs) : need Need to request an FE to obtain information such as format
Expand All @@ -208,8 +201,10 @@ Status HttpStreamAction::_on_header(HttpRequest* http_req, std::shared_ptr<Strea
// csv max body size
if (ctx->body_bytes > csv_max_body_bytes) {
LOG(WARNING) << "body exceed max size." << ctx->brief();
return Status::InternalError("body exceed max size: {}, data: {}", csv_max_body_bytes,
ctx->body_bytes);
return Status::Error<ErrorCode::EXCEEDED_LIMIT>(
"body size {} exceed BE's conf `streaming_load_max_mb` {}. increase it if you "
"are sure this load is reasonable",
ctx->body_bytes, csv_max_body_bytes);
}
}

Expand Down Expand Up @@ -386,7 +381,8 @@ Status HttpStreamAction::_handle_group_commit(HttpRequest* req,
std::string group_commit_mode = req->header(HTTP_GROUP_COMMIT);
if (!group_commit_mode.empty() && !iequal(group_commit_mode, "sync_mode") &&
!iequal(group_commit_mode, "async_mode") && !iequal(group_commit_mode, "off_mode")) {
return Status::InternalError("group_commit can only be [async_mode, sync_mode, off_mode]");
return Status::InvalidArgument(
"group_commit can only be [async_mode, sync_mode, off_mode]");
}
if (config::wait_internal_group_commit_finish) {
group_commit_mode = "sync_mode";
Expand All @@ -399,7 +395,7 @@ Status HttpStreamAction::_handle_group_commit(HttpRequest* req,
ss << "This http load content length <0 (" << content_length
<< "), please check your content length.";
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
return Status::InvalidArgument(ss.str());
}
// allow chunked stream load in flink
auto is_chunk =
Expand All @@ -421,7 +417,7 @@ Status HttpStreamAction::_handle_group_commit(HttpRequest* req,
auto partitions = !req->header(HTTP_PARTITIONS).empty();
if (!partial_columns && !partitions && !temp_partitions && !ctx->two_phase_commit) {
if (!config::wait_internal_group_commit_finish && !ctx->label.empty()) {
return Status::InternalError("label and group_commit can't be set at the same time");
return Status::InvalidArgument("label and group_commit can't be set at the same time");
}
ctx->group_commit = true;
if (iequal(group_commit_mode, "async_mode")) {
Expand Down
52 changes: 25 additions & 27 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,17 @@
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/PlanNodes_types.h>
#include <gen_cpp/Types_types.h>
#include <stdint.h>
#include <stdlib.h>
#include <sys/time.h>
#include <thrift/protocol/TDebugProtocol.h>
#include <time.h>

#include <algorithm>
#include <cstdint>
#include <cstdlib>
#include <ctime>
#include <future>
#include <map>
#include <sstream>
#include <stdexcept>
#include <utility>

#include "cloud/cloud_storage_engine.h"
#include "cloud/config.h"
#include "common/config.h"
#include "common/consts.h"
Expand Down Expand Up @@ -122,7 +119,7 @@ void StreamLoadAction::handle(HttpRequest* req) {
_exec_env->stream_load_executor()->rollback_txn(ctx.get());
ctx->need_rollback = false;
}
if (ctx->body_sink.get() != nullptr) {
if (ctx->body_sink != nullptr) {
ctx->body_sink->cancel(ctx->status.to_string());
}
}
Expand All @@ -146,7 +143,7 @@ Status StreamLoadAction::_handle(std::shared_ptr<StreamLoadContext> ctx) {
if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) {
LOG(WARNING) << "recevie body don't equal with body bytes, body_bytes=" << ctx->body_bytes
<< ", receive_bytes=" << ctx->receive_bytes << ", id=" << ctx->id;
return Status::InternalError<false>("receive body don't equal with body bytes");
return Status::Error<ErrorCode::NETWORK_ERROR>("receive body don't equal with body bytes");
}

// if we use non-streaming, MessageBodyFileSink.finish will close the file
Expand Down Expand Up @@ -210,7 +207,7 @@ int StreamLoadAction::on_header(HttpRequest* req) {
_exec_env->stream_load_executor()->rollback_txn(ctx.get());
ctx->need_rollback = false;
}
if (ctx->body_sink.get() != nullptr) {
if (ctx->body_sink != nullptr) {
ctx->body_sink->cancel(ctx->status.to_string());
}
auto str = ctx->to_json();
Expand All @@ -232,13 +229,13 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptr<Strea
// auth information
if (!parse_basic_auth(*http_req, &ctx->auth)) {
LOG(WARNING) << "parse basic authorization failed." << ctx->brief();
return Status::InternalError<false>("no valid Basic authorization");
return Status::NotAuthorized("no valid Basic authorization");
}

// get format of this put
if (!http_req->header(HTTP_COMPRESS_TYPE).empty() &&
iequal(http_req->header(HTTP_FORMAT_KEY), "JSON")) {
return Status::InternalError<false>("compress data of JSON format is not supported.");
return Status::NotSupported("compress data of JSON format is not supported.");
}
std::string format_str = http_req->header(HTTP_FORMAT_KEY);
if (iequal(format_str, BeConsts::CSV_WITH_NAMES) ||
Expand All @@ -254,8 +251,8 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptr<Strea
LoadUtil::parse_format(format_str, http_req->header(HTTP_COMPRESS_TYPE), &ctx->format,
&ctx->compress_type);
if (ctx->format == TFileFormatType::FORMAT_UNKNOWN) {
return Status::InternalError<false>("unknown data format, format={}",
http_req->header(HTTP_FORMAT_KEY));
return Status::Error<ErrorCode::DATA_FILE_TYPE_ERROR>("unknown data format, format={}",
http_req->header(HTTP_FORMAT_KEY));
}

// check content length
Expand All @@ -273,16 +270,18 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptr<Strea
// json max body size
if ((ctx->format == TFileFormatType::FORMAT_JSON) &&
(ctx->body_bytes > json_max_body_bytes) && !read_json_by_line) {
return Status::InternalError<false>(
"The size of this batch exceed the max size [{}] of json type data "
" data [ {} ]. Split the file, or use 'read_json_by_line'",
json_max_body_bytes, ctx->body_bytes);
return Status::Error<ErrorCode::EXCEEDED_LIMIT>(
"json body size {} exceed BE's conf `streaming_load_json_max_mb` {}. increase "
"it if you are sure this load is reasonable",
ctx->body_bytes, json_max_body_bytes);
}
// csv max body size
else if (ctx->body_bytes > csv_max_body_bytes) {
LOG(WARNING) << "body exceed max size." << ctx->brief();
return Status::InternalError<false>("body exceed max size: {}, data: {}",
csv_max_body_bytes, ctx->body_bytes);
return Status::Error<ErrorCode::EXCEEDED_LIMIT>(
"body size {} exceed BE's conf `streaming_load_max_mb` {}. increase it if you "
"are sure this load is reasonable",
ctx->body_bytes, csv_max_body_bytes);
}
} else {
#ifndef BE_TEST
Expand All @@ -300,13 +299,13 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptr<Strea
!ctx->is_chunked_transfer))) {
LOG(WARNING) << "content_length is empty and transfer-encoding!=chunked, please set "
"content_length or transfer-encoding=chunked";
return Status::InternalError<false>(
return Status::InvalidArgument(
"content_length is empty and transfer-encoding!=chunked, please set content_length "
"or transfer-encoding=chunked");
} else if (UNLIKELY(!http_req->header(HttpHeaders::CONTENT_LENGTH).empty() &&
ctx->is_chunked_transfer)) {
LOG(WARNING) << "please do not set both content_length and transfer-encoding";
return Status::InternalError<false>(
return Status::InvalidArgument(
"please do not set both content_length and transfer-encoding");
}

Expand Down Expand Up @@ -430,15 +429,15 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
if (!http_req->header(HTTP_LINE_DELIMITER).empty()) {
request.__set_line_delimiter(http_req->header(HTTP_LINE_DELIMITER));
}
if (!http_req->header(HTTP_ENCLOSE).empty() && http_req->header(HTTP_ENCLOSE).size() > 0) {
if (!http_req->header(HTTP_ENCLOSE).empty() && !http_req->header(HTTP_ENCLOSE).empty()) {
const auto& enclose_str = http_req->header(HTTP_ENCLOSE);
if (enclose_str.length() != 1) {
return Status::InvalidArgument("enclose must be single-char, actually is {}",
enclose_str);
}
request.__set_enclose(http_req->header(HTTP_ENCLOSE)[0]);
}
if (!http_req->header(HTTP_ESCAPE).empty() && http_req->header(HTTP_ESCAPE).size() > 0) {
if (!http_req->header(HTTP_ESCAPE).empty() && !http_req->header(HTTP_ESCAPE).empty()) {
const auto& escape_str = http_req->header(HTTP_ESCAPE);
if (escape_str.length() != 1) {
return Status::InvalidArgument("escape must be single-char, actually is {}",
Expand Down Expand Up @@ -728,7 +727,7 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* req,
std::string group_commit_mode = req->header(HTTP_GROUP_COMMIT);
if (!group_commit_mode.empty() && !iequal(group_commit_mode, "sync_mode") &&
!iequal(group_commit_mode, "async_mode") && !iequal(group_commit_mode, "off_mode")) {
return Status::InternalError<false>(
return Status::InvalidArgument(
"group_commit can only be [async_mode, sync_mode, off_mode]");
}
if (config::wait_internal_group_commit_finish) {
Expand All @@ -742,7 +741,7 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* req,
ss << "This stream load content length <0 (" << content_length
<< "), please check your content length.";
LOG(WARNING) << ss.str();
return Status::InternalError<false>(ss.str());
return Status::InvalidArgument(ss.str());
}
// allow chunked stream load in flink
auto is_chunk = !req->header(HttpHeaders::TRANSFER_ENCODING).empty() &&
Expand All @@ -763,8 +762,7 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* req,
auto partitions = !req->header(HTTP_PARTITIONS).empty();
if (!partial_columns && !partitions && !temp_partitions && !ctx->two_phase_commit) {
if (!config::wait_internal_group_commit_finish && !ctx->label.empty()) {
return Status::InternalError<false>(
"label and group_commit can't be set at the same time");
return Status::InvalidArgument("label and group_commit can't be set at the same time");
}
ctx->group_commit = true;
if (iequal(group_commit_mode, "async_mode")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1657,7 +1657,7 @@ suite("test_stream_load", "p0") {
log.info("test chunked transfer result: ${out}".toString())
def json = parseJson(out)
assertEquals("fail", json.Status.toLowerCase())
assertTrue(json.Message.contains("[INTERNAL_ERROR]please do not set both content_length and transfer-encoding"))
assertTrue(json.Message.contains("please do not set both content_length and transfer-encoding"))
} finally {
sql """ DROP TABLE IF EXISTS ${tableName16} FORCE"""
}
Expand Down Expand Up @@ -1687,7 +1687,7 @@ suite("test_stream_load", "p0") {
log.info("test chunked transfer result: ${out}".toString())
def json = parseJson(out)
assertEquals("fail", json.Status.toLowerCase())
assertTrue(json.Message.contains("[INTERNAL_ERROR]content_length is empty and transfer-encoding!=chunked, please set content_length or transfer-encoding=chunked"))
assertTrue(json.Message.contains("content_length is empty and transfer-encoding!=chunked, please set content_length or transfer-encoding=chunked"))
} finally {
sql """ DROP TABLE IF EXISTS ${tableName16} FORCE"""
}
Expand Down
Loading