[Enhancement](group commit) optimize some group commit code (#31392)

This PR optimizes some of the logic related to group commit:
1. Improved the error handling when there is insufficient WAL space during import.
2. Accounted for cases where the content length is negative during import.
3. Added missing error log printing in `group_commit_mgr.cpp`.
This commit is contained in:
abmdocrt
2024-02-27 17:31:40 +08:00
committed by yiguolei
parent 2f6251ccde
commit 747faeed17
6 changed files with 31 additions and 12 deletions

View File

@ -378,9 +378,16 @@ Status HttpStreamAction::_handle_group_commit(HttpRequest* req,
if (config::wait_internal_group_commit_finish) {
group_commit_mode = "sync_mode";
}
size_t content_length = req->header(HttpHeaders::CONTENT_LENGTH).empty()
? 0
: std::stol(req->header(HttpHeaders::CONTENT_LENGTH));
int64_t content_length = req->header(HttpHeaders::CONTENT_LENGTH).empty()
? 0
: std::stoll(req->header(HttpHeaders::CONTENT_LENGTH));
if (content_length < 0) {
std::stringstream ss;
ss << "This http load content length <0 (" << content_length
<< "), please check your content length.";
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
if (group_commit_mode.empty() || iequal(group_commit_mode, "off_mode") || content_length == 0) {
// off_mode and empty
ctx->group_commit = false;
@ -399,7 +406,9 @@ Status HttpStreamAction::_handle_group_commit(HttpRequest* req,
if (iequal(group_commit_mode, "async_mode")) {
if (!load_size_smaller_than_wal_limit(content_length)) {
std::stringstream ss;
ss << "There is no space for group commit http load async WAL. WAL dir info: "
ss << "There is no space for group commit http load async WAL. This http load "
"size is "
<< content_length << ". WAL dir info: "
<< ExecEnv::GetInstance()->wal_mgr()->get_wal_dirs_info_string();
LOG(WARNING) << ss.str();
return Status::Error<EXCEEDED_LIMIT>(ss.str());

View File

@ -710,9 +710,16 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* req,
if (config::wait_internal_group_commit_finish) {
group_commit_mode = "sync_mode";
}
size_t content_length = req->header(HttpHeaders::CONTENT_LENGTH).empty()
? 0
: std::stol(req->header(HttpHeaders::CONTENT_LENGTH));
int64_t content_length = req->header(HttpHeaders::CONTENT_LENGTH).empty()
? 0
: std::stoll(req->header(HttpHeaders::CONTENT_LENGTH));
if (content_length < 0) {
std::stringstream ss;
ss << "This stream load content length <0 (" << content_length
<< "), please check your content length.";
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
if (group_commit_mode.empty() || iequal(group_commit_mode, "off_mode") || content_length == 0) {
// off_mode and empty
ctx->group_commit = false;
@ -731,7 +738,9 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* req,
if (iequal(group_commit_mode, "async_mode")) {
if (!load_size_smaller_than_wal_limit(content_length)) {
std::stringstream ss;
ss << "There is no space for group commit stream load async WAL. WAL dir info: "
ss << "There is no space for group commit stream load async WAL. This stream load "
"size is "
<< content_length << ". WAL dir info: "
<< ExecEnv::GetInstance()->wal_mgr()->get_wal_dirs_info_string();
LOG(WARNING) << ss.str();
return Status::Error<EXCEEDED_LIMIT>(ss.str());

View File

@ -193,7 +193,7 @@ void do_dir_response(const std::string& dir_path, HttpRequest* req) {
HttpChannel::send_reply(req, result_str);
}
bool load_size_smaller_than_wal_limit(size_t content_length) {
bool load_size_smaller_than_wal_limit(int64_t content_length) {
// 1. req->header(HttpHeaders::CONTENT_LENGTH) will return streamload content length. If it is empty or equels to 0, it means this streamload
// is a chunked streamload and we are not sure its size.
// 2. if streamload content length is too large, like larger than 80% of the WAL constrain.

View File

@ -43,5 +43,5 @@ void do_dir_response(const std::string& dir_path, HttpRequest* req);
std::string get_content_type(const std::string& file_name);
bool load_size_smaller_than_wal_limit(size_t content_length);
bool load_size_smaller_than_wal_limit(int64_t content_length);
} // namespace doris

View File

@ -276,6 +276,9 @@ Status GroupCommitTable::_create_group_commit_load(
client->streamLoadPut(result, request);
},
10000L);
if (!st.ok()) {
LOG(WARNING) << "create group commit load rpc error, st=" << st.to_string();
}
RETURN_IF_ERROR(st);
st = Status::create<false>(result.status);
if (!st.ok()) {

View File

@ -64,7 +64,6 @@ TEST_F(StreamLoadTest, TestHeader) {
auto* evhttp_req = evhttp_request_new(nullptr, nullptr);
HttpRequest req(evhttp_req);
EXPECT_EQ(req.header(HttpHeaders::CONTENT_LENGTH).empty(), true);
EXPECT_EQ(load_size_smaller_than_wal_limit(-1), false);
evhttp_request_free(evhttp_req);
}
@ -80,7 +79,6 @@ TEST_F(StreamLoadTest, TestHeader) {
HttpRequest req(evhttp_req);
req.init_from_evhttp();
EXPECT_EQ(req.header(HttpHeaders::CONTENT_LENGTH).empty(), true);
EXPECT_EQ(load_size_smaller_than_wal_limit(-1), false);
evhttp_uri_free(evhttp_req->uri_elems);
evhttp_req->uri = nullptr;
evhttp_req->uri_elems = nullptr;