[enhancement] Refactor to improve the usability of MemTracker (step2) (#10823)
This commit is contained in:
@ -19,7 +19,7 @@
|
||||
|
||||
#include "common/config.h"
|
||||
#include "gen_cpp/PlanNodes_types.h"
|
||||
#include "runtime/mem_tracker.h"
|
||||
#include "runtime/memory/mem_tracker.h"
|
||||
#include "runtime/runtime_filter_mgr.h"
|
||||
#include "runtime/runtime_state.h"
|
||||
#include "runtime/string_value.h"
|
||||
@ -80,7 +80,7 @@ Status FileScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
|
||||
Status FileScanNode::prepare(RuntimeState* state) {
|
||||
VLOG_QUERY << "FileScanNode prepare";
|
||||
RETURN_IF_ERROR(ScanNode::prepare(state));
|
||||
SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
|
||||
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
|
||||
// get tuple desc
|
||||
_runtime_state = state;
|
||||
_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
|
||||
@ -111,8 +111,8 @@ Status FileScanNode::prepare(RuntimeState* state) {
|
||||
|
||||
Status FileScanNode::open(RuntimeState* state) {
|
||||
SCOPED_TIMER(_runtime_profile->total_time_counter());
|
||||
SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
|
||||
RETURN_IF_ERROR(ExecNode::open(state));
|
||||
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
|
||||
RETURN_IF_CANCELLED(state);
|
||||
|
||||
RETURN_IF_ERROR(_acquire_and_build_runtime_filter(state));
|
||||
@ -147,7 +147,7 @@ Status FileScanNode::_acquire_and_build_runtime_filter(RuntimeState* state) {
|
||||
// std::list<ExprContext*> expr_context;
|
||||
// RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(&expr_context));
|
||||
// for (auto ctx : expr_context) {
|
||||
// ctx->prepare(state, row_desc(), _expr_mem_tracker);
|
||||
// ctx->prepare(state, row_desc());
|
||||
// ctx->open(state);
|
||||
// int index = _conjunct_ctxs.size();
|
||||
// _conjunct_ctxs.push_back(ctx);
|
||||
@ -164,7 +164,7 @@ Status FileScanNode::_acquire_and_build_runtime_filter(RuntimeState* state) {
|
||||
}
|
||||
IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtimefilter;
|
||||
std::vector<VExpr*> vexprs;
|
||||
runtime_filter->get_prepared_vexprs(&vexprs, row_desc(), _expr_mem_tracker);
|
||||
runtime_filter->get_prepared_vexprs(&vexprs, row_desc());
|
||||
if (vexprs.empty()) {
|
||||
continue;
|
||||
}
|
||||
@ -180,7 +180,7 @@ Status FileScanNode::_acquire_and_build_runtime_filter(RuntimeState* state) {
|
||||
last_expr = new_node;
|
||||
}
|
||||
auto new_vconjunct_ctx_ptr = _pool->add(new VExprContext(last_expr));
|
||||
auto expr_status = new_vconjunct_ctx_ptr->prepare(state, row_desc(), expr_mem_tracker());
|
||||
auto expr_status = new_vconjunct_ctx_ptr->prepare(state, row_desc());
|
||||
if (UNLIKELY(!expr_status.OK())) {
|
||||
LOG(WARNING) << "Something wrong for runtime filters: " << expr_status;
|
||||
vexprs.clear();
|
||||
@ -403,7 +403,10 @@ Status FileScanNode::scanner_scan(const TFileScanRange& scan_range, ScannerCount
|
||||
// 1. too many batches in queue, or
|
||||
// 2. at least one batch in queue and memory exceed limit.
|
||||
(_block_queue.size() >= _max_buffered_batches ||
|
||||
(mem_tracker()->any_limit_exceeded() && !_block_queue.empty()))) {
|
||||
(thread_context()
|
||||
->_thread_mem_tracker_mgr->limiter_mem_tracker()
|
||||
->any_limit_exceeded() &&
|
||||
!_block_queue.empty()))) {
|
||||
_queue_writer_cond.wait_for(l, std::chrono::seconds(1));
|
||||
}
|
||||
// Process already set failed, so we just return OK
|
||||
|
||||
Reference in New Issue
Block a user