Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
zclllyybb committed Aug 6, 2024
1 parent e1c2da1 commit 9eafd85
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 62 deletions.
47 changes: 26 additions & 21 deletions be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,8 @@ extern ErrorCodeState error_states[MAX_ERROR_CODE_DEFINE_NUM];
class ErrorCodeInitializer {
public:
ErrorCodeInitializer(int temp) : signal_value(temp) {
for (int i = 0; i < MAX_ERROR_CODE_DEFINE_NUM; ++i) {
error_states[i].error_code = 0;
for (auto& error_state : error_states) {
error_state.error_code = 0;
}
#define M(NAME, ENABLESTACKTRACE) \
error_states[TStatusCode::NAME].stacktrace = ENABLESTACKTRACE; \
Expand All @@ -338,7 +338,7 @@ class ErrorCodeInitializer {
#undef M
}

void check_init() {
void check_init() const {
//the signal value is 0, it means the global error states not inited, it's logical error
// DO NOT use dcheck here, because dcheck depend on glog, and glog maybe not inited at this time.
if (signal_value == 0) {
Expand Down Expand Up @@ -441,41 +441,49 @@ class [[nodiscard]] Status {
return status;
}

static Status OK() { return Status(); }
static Status OK() { return {}; }

// default have stacktrace. could disable manually.
#define ERROR_CTOR(name, code) \
template <bool stacktrace = true, typename... Args> \
static Status name(std::string_view msg, Args&&... args) { \
return Error<ErrorCode::code, stacktrace>(msg, std::forward<Args>(args)...); \
}

// default have no stacktrace. could enable manually.
#define ERROR_CTOR_NOSTACK(name, code) \
template <bool stacktrace = false, typename... Args> \
static Status name(std::string_view msg, Args&&... args) { \
return Error<ErrorCode::code, stacktrace>(msg, std::forward<Args>(args)...); \
}

ERROR_CTOR(PublishTimeout, PUBLISH_TIMEOUT)
ERROR_CTOR(MemoryAllocFailed, MEM_ALLOC_FAILED)
ERROR_CTOR(BufferAllocFailed, BUFFER_ALLOCATION_FAILED)
ERROR_CTOR(InvalidArgument, INVALID_ARGUMENT)
ERROR_CTOR(InvalidJsonPath, INVALID_JSON_PATH)
ERROR_CTOR_NOSTACK(InvalidArgument, INVALID_ARGUMENT)
ERROR_CTOR_NOSTACK(InvalidJsonPath, INVALID_JSON_PATH)
ERROR_CTOR(MinimumReservationUnavailable, MINIMUM_RESERVATION_UNAVAILABLE)
ERROR_CTOR(Corruption, CORRUPTION)
ERROR_CTOR(IOError, IO_ERROR)
ERROR_CTOR(NotFound, NOT_FOUND)
ERROR_CTOR(AlreadyExist, ALREADY_EXIST)
ERROR_CTOR_NOSTACK(AlreadyExist, ALREADY_EXIST)
ERROR_CTOR(NotSupported, NOT_IMPLEMENTED_ERROR)
ERROR_CTOR(EndOfFile, END_OF_FILE)
ERROR_CTOR_NOSTACK(EndOfFile, END_OF_FILE)
ERROR_CTOR(InternalError, INTERNAL_ERROR)
ERROR_CTOR(WaitForRf, PIP_WAIT_FOR_RF)
ERROR_CTOR(WaitForScannerContext, PIP_WAIT_FOR_SC)
ERROR_CTOR_NOSTACK(WaitForRf, PIP_WAIT_FOR_RF)
ERROR_CTOR_NOSTACK(WaitForScannerContext, PIP_WAIT_FOR_SC)
ERROR_CTOR(RuntimeError, RUNTIME_ERROR)
ERROR_CTOR(Cancelled, CANCELLED)
ERROR_CTOR_NOSTACK(Cancelled, CANCELLED)
ERROR_CTOR(MemoryLimitExceeded, MEM_LIMIT_EXCEEDED)
ERROR_CTOR(RpcError, THRIFT_RPC_ERROR)
ERROR_CTOR(TimedOut, TIMEOUT)
ERROR_CTOR(TooManyTasks, TOO_MANY_TASKS)
ERROR_CTOR_NOSTACK(TooManyTasks, TOO_MANY_TASKS)
ERROR_CTOR(Uninitialized, UNINITIALIZED)
ERROR_CTOR(Aborted, ABORTED)
ERROR_CTOR(DataQualityError, DATA_QUALITY_ERROR)
ERROR_CTOR(NotAuthorized, NOT_AUTHORIZED)
ERROR_CTOR_NOSTACK(DataQualityError, DATA_QUALITY_ERROR)
ERROR_CTOR_NOSTACK(NotAuthorized, NOT_AUTHORIZED)
ERROR_CTOR(HttpError, HTTP_ERROR)
ERROR_CTOR(NeedSendAgain, NEED_SEND_AGAIN)
ERROR_CTOR_NOSTACK(NeedSendAgain, NEED_SEND_AGAIN)
#undef ERROR_CTOR

template <int code>
Expand Down Expand Up @@ -584,15 +592,15 @@ class AtomicStatus {
return error_st_;
}

AtomicStatus(const AtomicStatus&) = delete;
void operator=(const AtomicStatus&) = delete;

private:
std::atomic_int16_t error_code_ = 0;
Status error_st_;
// mutex's lock is not a const method, but we will use this mutex in
// some const method, so that it should be mutable.
mutable std::mutex mutex_;

AtomicStatus(const AtomicStatus&) = delete;
void operator=(const AtomicStatus&) = delete;
};

inline std::ostream& operator<<(std::ostream& ostr, const Status& status) {
Expand Down Expand Up @@ -638,9 +646,6 @@ inline std::string Status::to_string_no_stack() const {
} \
} while (false)

#define RETURN_ERROR_IF_NON_VEC \
return Status::NotSupported("Non-vectorized engine is not supported since Doris 2.0.");

#define RETURN_IF_STATUS_ERROR(status, stmt) \
do { \
status = (stmt); \
Expand Down
24 changes: 10 additions & 14 deletions be/src/http/action/http_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
#include "http/action/http_stream.h"

#include <cstddef>
#include <deque>
#include <future>
#include <shared_mutex>
#include <sstream>

// use string iequal
Expand All @@ -30,10 +28,8 @@
#include <rapidjson/prettywriter.h>
#include <thrift/protocol/TDebugProtocol.h>

#include "cloud/cloud_storage_engine.h"
#include "cloud/config.h"
#include "common/config.h"
#include "common/consts.h"
#include "common/logging.h"
#include "common/status.h"
#include "common/utils.h"
Expand All @@ -44,7 +40,6 @@
#include "http/http_common.h"
#include "http/http_headers.h"
#include "http/http_request.h"
#include "http/http_response.h"
#include "http/utils.h"
#include "io/fs/stream_load_pipe.h"
#include "olap/storage_engine.h"
Expand All @@ -58,9 +53,7 @@
#include "runtime/stream_load/stream_load_executor.h"
#include "runtime/stream_load/stream_load_recorder.h"
#include "util/byte_buffer.h"
#include "util/debug_util.h"
#include "util/doris_metrics.h"
#include "util/load_util.h"
#include "util/metrics.h"
#include "util/string_util.h"
#include "util/thrift_rpc_helper.h"
Expand Down Expand Up @@ -133,7 +126,7 @@ Status HttpStreamAction::_handle(HttpRequest* http_req, std::shared_ptr<StreamLo
if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) {
LOG(WARNING) << "recevie body don't equal with body bytes, body_bytes=" << ctx->body_bytes
<< ", receive_bytes=" << ctx->receive_bytes << ", id=" << ctx->id;
return Status::InternalError("receive body don't equal with body bytes");
return Status::Error<ErrorCode::NETWORK_ERROR>("receive body don't equal with body bytes");
}
RETURN_IF_ERROR(ctx->body_sink->finish());

Expand Down Expand Up @@ -196,7 +189,7 @@ Status HttpStreamAction::_on_header(HttpRequest* http_req, std::shared_ptr<Strea
// auth information
if (!parse_basic_auth(*http_req, &ctx->auth)) {
LOG(WARNING) << "parse basic authorization failed." << ctx->brief();
return Status::InternalError("no valid Basic authorization");
return Status::NotAuthorized("no valid Basic authorization");
}

// TODO(zs) : need Need to request an FE to obtain information such as format
Expand All @@ -208,8 +201,10 @@ Status HttpStreamAction::_on_header(HttpRequest* http_req, std::shared_ptr<Strea
// csv max body size
if (ctx->body_bytes > csv_max_body_bytes) {
LOG(WARNING) << "body exceed max size." << ctx->brief();
return Status::InternalError("body exceed max size: {}, data: {}", csv_max_body_bytes,
ctx->body_bytes);
return Status::Error<ErrorCode::EXCEEDED_LIMIT>(
"body size {} exceed BE's conf `streaming_load_max_mb` {}. increase it if you "
"are sure this load is reasonable",
ctx->body_bytes, csv_max_body_bytes);
}
}

Expand Down Expand Up @@ -386,7 +381,8 @@ Status HttpStreamAction::_handle_group_commit(HttpRequest* req,
std::string group_commit_mode = req->header(HTTP_GROUP_COMMIT);
if (!group_commit_mode.empty() && !iequal(group_commit_mode, "sync_mode") &&
!iequal(group_commit_mode, "async_mode") && !iequal(group_commit_mode, "off_mode")) {
return Status::InternalError("group_commit can only be [async_mode, sync_mode, off_mode]");
return Status::InvalidArgument(
"group_commit can only be [async_mode, sync_mode, off_mode]");
}
if (config::wait_internal_group_commit_finish) {
group_commit_mode = "sync_mode";
Expand All @@ -399,7 +395,7 @@ Status HttpStreamAction::_handle_group_commit(HttpRequest* req,
ss << "This http load content length <0 (" << content_length
<< "), please check your content length.";
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
return Status::InvalidArgument(ss.str());
}
// allow chunked stream load in flink
auto is_chunk =
Expand All @@ -421,7 +417,7 @@ Status HttpStreamAction::_handle_group_commit(HttpRequest* req,
auto partitions = !req->header(HTTP_PARTITIONS).empty();
if (!partial_columns && !partitions && !temp_partitions && !ctx->two_phase_commit) {
if (!config::wait_internal_group_commit_finish && !ctx->label.empty()) {
return Status::InternalError("label and group_commit can't be set at the same time");
return Status::InvalidArgument("label and group_commit can't be set at the same time");
}
ctx->group_commit = true;
if (iequal(group_commit_mode, "async_mode")) {
Expand Down
52 changes: 25 additions & 27 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,17 @@
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/PlanNodes_types.h>
#include <gen_cpp/Types_types.h>
#include <stdint.h>
#include <stdlib.h>
#include <cstdint>
#include <cstdlib>
#include <sys/time.h>
#include <thrift/protocol/TDebugProtocol.h>
#include <time.h>
#include <ctime>

#include <algorithm>
#include <future>
#include <map>
#include <sstream>
#include <stdexcept>
#include <utility>

#include "cloud/cloud_storage_engine.h"
#include "cloud/config.h"
#include "common/config.h"
#include "common/consts.h"
Expand Down Expand Up @@ -122,7 +119,7 @@ void StreamLoadAction::handle(HttpRequest* req) {
_exec_env->stream_load_executor()->rollback_txn(ctx.get());
ctx->need_rollback = false;
}
if (ctx->body_sink.get() != nullptr) {
if (ctx->body_sink != nullptr) {
ctx->body_sink->cancel(ctx->status.to_string());
}
}
Expand All @@ -146,7 +143,7 @@ Status StreamLoadAction::_handle(std::shared_ptr<StreamLoadContext> ctx) {
if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) {
LOG(WARNING) << "recevie body don't equal with body bytes, body_bytes=" << ctx->body_bytes
<< ", receive_bytes=" << ctx->receive_bytes << ", id=" << ctx->id;
return Status::InternalError<false>("receive body don't equal with body bytes");
return Status::Error<ErrorCode::NETWORK_ERROR>("receive body don't equal with body bytes");
}

// if we use non-streaming, MessageBodyFileSink.finish will close the file
Expand Down Expand Up @@ -210,7 +207,7 @@ int StreamLoadAction::on_header(HttpRequest* req) {
_exec_env->stream_load_executor()->rollback_txn(ctx.get());
ctx->need_rollback = false;
}
if (ctx->body_sink.get() != nullptr) {
if (ctx->body_sink != nullptr) {
ctx->body_sink->cancel(ctx->status.to_string());
}
auto str = ctx->to_json();
Expand All @@ -232,13 +229,13 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptr<Strea
// auth information
if (!parse_basic_auth(*http_req, &ctx->auth)) {
LOG(WARNING) << "parse basic authorization failed." << ctx->brief();
return Status::InternalError<false>("no valid Basic authorization");
return Status::NotAuthorized("no valid Basic authorization");
}

// get format of this put
if (!http_req->header(HTTP_COMPRESS_TYPE).empty() &&
iequal(http_req->header(HTTP_FORMAT_KEY), "JSON")) {
return Status::InternalError<false>("compress data of JSON format is not supported.");
return Status::NotSupported("compress data of JSON format is not supported.");
}
std::string format_str = http_req->header(HTTP_FORMAT_KEY);
if (iequal(format_str, BeConsts::CSV_WITH_NAMES) ||
Expand All @@ -254,8 +251,8 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptr<Strea
LoadUtil::parse_format(format_str, http_req->header(HTTP_COMPRESS_TYPE), &ctx->format,
&ctx->compress_type);
if (ctx->format == TFileFormatType::FORMAT_UNKNOWN) {
return Status::InternalError<false>("unknown data format, format={}",
http_req->header(HTTP_FORMAT_KEY));
return Status::Error<ErrorCode::DATA_FILE_TYPE_ERROR>("unknown data format, format={}",
http_req->header(HTTP_FORMAT_KEY));
}

// check content length
Expand All @@ -273,16 +270,18 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptr<Strea
// json max body size
if ((ctx->format == TFileFormatType::FORMAT_JSON) &&
(ctx->body_bytes > json_max_body_bytes) && !read_json_by_line) {
return Status::InternalError<false>(
"The size of this batch exceed the max size [{}] of json type data "
" data [ {} ]. Split the file, or use 'read_json_by_line'",
json_max_body_bytes, ctx->body_bytes);
return Status::Error<ErrorCode::EXCEEDED_LIMIT>(
"json body size {} exceed BE's conf `streaming_load_json_max_mb` {}. increase "
"it if you are sure this load is reasonable",
ctx->body_bytes, json_max_body_bytes);
}
// csv max body size
else if (ctx->body_bytes > csv_max_body_bytes) {
LOG(WARNING) << "body exceed max size." << ctx->brief();
return Status::InternalError<false>("body exceed max size: {}, data: {}",
csv_max_body_bytes, ctx->body_bytes);
return Status::Error<ErrorCode::EXCEEDED_LIMIT>(
"body size {} exceed BE's conf `streaming_load_max_mb` {}. increase it if you "
"are sure this load is reasonable",
ctx->body_bytes, csv_max_body_bytes);
}
} else {
#ifndef BE_TEST
Expand All @@ -300,13 +299,13 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptr<Strea
!ctx->is_chunked_transfer))) {
LOG(WARNING) << "content_length is empty and transfer-encoding!=chunked, please set "
"content_length or transfer-encoding=chunked";
return Status::InternalError<false>(
return Status::InvalidArgument(
"content_length is empty and transfer-encoding!=chunked, please set content_length "
"or transfer-encoding=chunked");
} else if (UNLIKELY(!http_req->header(HttpHeaders::CONTENT_LENGTH).empty() &&
ctx->is_chunked_transfer)) {
LOG(WARNING) << "please do not set both content_length and transfer-encoding";
return Status::InternalError<false>(
return Status::InvalidArgument(
"please do not set both content_length and transfer-encoding");
}

Expand Down Expand Up @@ -430,15 +429,15 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
if (!http_req->header(HTTP_LINE_DELIMITER).empty()) {
request.__set_line_delimiter(http_req->header(HTTP_LINE_DELIMITER));
}
if (!http_req->header(HTTP_ENCLOSE).empty() && http_req->header(HTTP_ENCLOSE).size() > 0) {
if (!http_req->header(HTTP_ENCLOSE).empty() && !http_req->header(HTTP_ENCLOSE).empty()) {
const auto& enclose_str = http_req->header(HTTP_ENCLOSE);
if (enclose_str.length() != 1) {
return Status::InvalidArgument("enclose must be single-char, actually is {}",
enclose_str);
}
request.__set_enclose(http_req->header(HTTP_ENCLOSE)[0]);
}
if (!http_req->header(HTTP_ESCAPE).empty() && http_req->header(HTTP_ESCAPE).size() > 0) {
if (!http_req->header(HTTP_ESCAPE).empty() && !http_req->header(HTTP_ESCAPE).empty()) {
const auto& escape_str = http_req->header(HTTP_ESCAPE);
if (escape_str.length() != 1) {
return Status::InvalidArgument("escape must be single-char, actually is {}",
Expand Down Expand Up @@ -728,7 +727,7 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* req,
std::string group_commit_mode = req->header(HTTP_GROUP_COMMIT);
if (!group_commit_mode.empty() && !iequal(group_commit_mode, "sync_mode") &&
!iequal(group_commit_mode, "async_mode") && !iequal(group_commit_mode, "off_mode")) {
return Status::InternalError<false>(
return Status::InvalidArgument(
"group_commit can only be [async_mode, sync_mode, off_mode]");
}
if (config::wait_internal_group_commit_finish) {
Expand All @@ -742,7 +741,7 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* req,
ss << "This stream load content length <0 (" << content_length
<< "), please check your content length.";
LOG(WARNING) << ss.str();
return Status::InternalError<false>(ss.str());
return Status::InvalidArgument(ss.str());
}
// allow chunked stream load in flink
auto is_chunk = !req->header(HttpHeaders::TRANSFER_ENCODING).empty() &&
Expand All @@ -763,8 +762,7 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* req,
auto partitions = !req->header(HTTP_PARTITIONS).empty();
if (!partial_columns && !partitions && !temp_partitions && !ctx->two_phase_commit) {
if (!config::wait_internal_group_commit_finish && !ctx->label.empty()) {
return Status::InternalError<false>(
"label and group_commit can't be set at the same time");
return Status::InvalidArgument("label and group_commit can't be set at the same time");
}
ctx->group_commit = true;
if (iequal(group_commit_mode, "async_mode")) {
Expand Down

0 comments on commit 9eafd85

Please sign in to comment.