diff --git a/be/src/http/action/http_stream.cpp b/be/src/http/action/http_stream.cpp index 6304c44462..bb5ce729df 100644 --- a/be/src/http/action/http_stream.cpp +++ b/be/src/http/action/http_stream.cpp @@ -378,9 +378,16 @@ Status HttpStreamAction::_handle_group_commit(HttpRequest* req, if (config::wait_internal_group_commit_finish) { group_commit_mode = "sync_mode"; } - size_t content_length = req->header(HttpHeaders::CONTENT_LENGTH).empty() - ? 0 - : std::stol(req->header(HttpHeaders::CONTENT_LENGTH)); + int64_t content_length = req->header(HttpHeaders::CONTENT_LENGTH).empty() + ? 0 + : std::stoll(req->header(HttpHeaders::CONTENT_LENGTH)); + if (content_length < 0) { + std::stringstream ss; + ss << "This http load content length <0 (" << content_length + << "), please check your content length."; + LOG(WARNING) << ss.str(); + return Status::InternalError(ss.str()); + } if (group_commit_mode.empty() || iequal(group_commit_mode, "off_mode") || content_length == 0) { // off_mode and empty ctx->group_commit = false; @@ -399,7 +406,9 @@ Status HttpStreamAction::_handle_group_commit(HttpRequest* req, if (iequal(group_commit_mode, "async_mode")) { if (!load_size_smaller_than_wal_limit(content_length)) { std::stringstream ss; - ss << "There is no space for group commit http load async WAL. WAL dir info: " + ss << "There is no space for group commit http load async WAL. This http load " + "size is " + << content_length << ". WAL dir info: " << ExecEnv::GetInstance()->wal_mgr()->get_wal_dirs_info_string(); LOG(WARNING) << ss.str(); return Status::Error(ss.str()); diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 9e9db1bd37..a447b8d4f9 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -710,9 +710,16 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* req, if (config::wait_internal_group_commit_finish) { group_commit_mode = "sync_mode"; } - size_t content_length = req->header(HttpHeaders::CONTENT_LENGTH).empty() - ? 0 - : std::stol(req->header(HttpHeaders::CONTENT_LENGTH)); + int64_t content_length = req->header(HttpHeaders::CONTENT_LENGTH).empty() + ? 0 + : std::stoll(req->header(HttpHeaders::CONTENT_LENGTH)); + if (content_length < 0) { + std::stringstream ss; + ss << "This stream load content length <0 (" << content_length + << "), please check your content length."; + LOG(WARNING) << ss.str(); + return Status::InternalError(ss.str()); + } if (group_commit_mode.empty() || iequal(group_commit_mode, "off_mode") || content_length == 0) { // off_mode and empty ctx->group_commit = false; @@ -731,7 +738,9 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* req, if (iequal(group_commit_mode, "async_mode")) { if (!load_size_smaller_than_wal_limit(content_length)) { std::stringstream ss; - ss << "There is no space for group commit stream load async WAL. WAL dir info: " + ss << "There is no space for group commit stream load async WAL. This stream load " + "size is " + << content_length << ". WAL dir info: " << ExecEnv::GetInstance()->wal_mgr()->get_wal_dirs_info_string(); LOG(WARNING) << ss.str(); return Status::Error(ss.str()); diff --git a/be/src/http/utils.cpp b/be/src/http/utils.cpp index 1e477530eb..49f9d2c499 100644 --- a/be/src/http/utils.cpp +++ b/be/src/http/utils.cpp @@ -193,7 +193,7 @@ void do_dir_response(const std::string& dir_path, HttpRequest* req) { HttpChannel::send_reply(req, result_str); } -bool load_size_smaller_than_wal_limit(size_t content_length) { +bool load_size_smaller_than_wal_limit(int64_t content_length) { // 1. req->header(HttpHeaders::CONTENT_LENGTH) will return streamload content length. If it is empty or equels to 0, it means this streamload // is a chunked streamload and we are not sure its size. // 2. if streamload content length is too large, like larger than 80% of the WAL constrain. diff --git a/be/src/http/utils.h b/be/src/http/utils.h index e20e68c5b8..254d59cf13 100644 --- a/be/src/http/utils.h +++ b/be/src/http/utils.h @@ -43,5 +43,5 @@ void do_dir_response(const std::string& dir_path, HttpRequest* req); std::string get_content_type(const std::string& file_name); -bool load_size_smaller_than_wal_limit(size_t content_length); +bool load_size_smaller_than_wal_limit(int64_t content_length); } // namespace doris diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 42c0e83f4b..032c5bc525 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -276,6 +276,9 @@ Status GroupCommitTable::_create_group_commit_load( client->streamLoadPut(result, request); }, 10000L); + if (!st.ok()) { + LOG(WARNING) << "create group commit load rpc error, st=" << st.to_string(); + } RETURN_IF_ERROR(st); st = Status::create(result.status); if (!st.ok()) { diff --git a/be/test/http/stream_load_test.cpp b/be/test/http/stream_load_test.cpp index 90b6cbe638..d797c081f4 100644 --- a/be/test/http/stream_load_test.cpp +++ b/be/test/http/stream_load_test.cpp @@ -64,7 +64,6 @@ TEST_F(StreamLoadTest, TestHeader) { auto* evhttp_req = evhttp_request_new(nullptr, nullptr); HttpRequest req(evhttp_req); EXPECT_EQ(req.header(HttpHeaders::CONTENT_LENGTH).empty(), true); - EXPECT_EQ(load_size_smaller_than_wal_limit(-1), false); evhttp_request_free(evhttp_req); } @@ -80,7 +79,6 @@ TEST_F(StreamLoadTest, TestHeader) { HttpRequest req(evhttp_req); req.init_from_evhttp(); EXPECT_EQ(req.header(HttpHeaders::CONTENT_LENGTH).empty(), true); - EXPECT_EQ(load_size_smaller_than_wal_limit(-1), false); evhttp_uri_free(evhttp_req->uri_elems); evhttp_req->uri = nullptr; evhttp_req->uri_elems = nullptr;