[fix](multi-table-load) fix single stream multi table load cannot finish (#33816)

This commit is contained in:
HHoflittlefish777
2024-04-19 13:02:45 +08:00
committed by yiguolei
parent 659900040f
commit e38d844d40
3 changed files with 30 additions and 5 deletions

View File

@ -114,17 +114,25 @@ Status MultiTablePipe::dispatch(const std::string& table, const char* data, size
} else {
pipe = iter->second;
}
RETURN_NOT_OK_STATUS_WITH_WARN((pipe.get()->*cb)(data, size),
"append failed in unplanned kafka pipe");
// It is necessary to determine whether the sum of pipe_current_capacity and size is greater than pipe_max_capacity,
// otherwise the following situation may occur:
// the pipe is full but still cannot trigger the request and exec plan condition,
// causing one stream multi table load can not finish
++_unplanned_row_cnt;
auto pipe_current_capacity = pipe->current_capacity();
auto pipe_max_capacity = pipe->max_capacity();
if (_unplanned_row_cnt >= _row_threshold ||
_unplanned_pipes.size() >= _wait_tables_threshold) {
_unplanned_pipes.size() >= _wait_tables_threshold ||
pipe_current_capacity + size > pipe_max_capacity) {
LOG(INFO) << fmt::format(
"unplanned row cnt={} reach row_threshold={} or "
"wait_plan_table_threshold={}, "
"wait_plan_table_threshold={}, or the sum of "
"pipe_current_capacity {} "
"and size {} is greater than pipe_max_capacity {}, "
"plan them",
_unplanned_row_cnt, _row_threshold, _wait_tables_threshold)
_unplanned_row_cnt, _row_threshold, _wait_tables_threshold,
pipe_current_capacity, size, pipe_max_capacity)
<< ", ctx: " << _ctx->brief();
Status st = request_and_exec_plans();
_unplanned_row_cnt = 0;
@ -132,7 +140,11 @@ Status MultiTablePipe::dispatch(const std::string& table, const char* data, size
return st;
}
}
RETURN_NOT_OK_STATUS_WITH_WARN((pipe.get()->*cb)(data, size),
"append failed in unplanned kafka pipe");
}
return Status::OK();
}

View File

@ -255,5 +255,14 @@ TUniqueId StreamLoadPipe::calculate_pipe_id(const UniqueId& query_id, int32_t fr
return pipe_id;
}
size_t StreamLoadPipe::current_capacity() {
std::unique_lock<std::mutex> l(_lock);
if (_use_proto) {
return _proto_buffered_bytes;
} else {
return _buffered_bytes;
}
}
} // namespace io
} // namespace doris

View File

@ -83,6 +83,10 @@ public:
// used for pipeline load, which use TUniqueId(lo: query_id.lo + fragment_id, hi: query_id.hi) as pipe_id
static TUniqueId calculate_pipe_id(const UniqueId& query_id, int32_t fragment_id);
size_t max_capacity() const { return _max_buffered_bytes; }
size_t current_capacity();
protected:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) override;