diff --git a/src/sql/engine/aggregate/ob_aggregate_processor.cpp b/src/sql/engine/aggregate/ob_aggregate_processor.cpp index f7d57a4faa..a0cda983a0 100644 --- a/src/sql/engine/aggregate/ob_aggregate_processor.cpp +++ b/src/sql/engine/aggregate/ob_aggregate_processor.cpp @@ -759,7 +759,8 @@ ObAggregateProcessor::DllUdfExtra::~DllUdfExtra() ObAggregateProcessor::ObAggregateProcessor(ObEvalCtx &eval_ctx, ObIArray &aggr_infos, const lib::ObLabel &label, - ObMonitorNode &op_monitor_info) + ObMonitorNode &op_monitor_info, + const int64_t tenant_id) : has_distinct_(false), has_order_by_(false), has_group_concat_(false), @@ -768,7 +769,7 @@ ObAggregateProcessor::ObAggregateProcessor(ObEvalCtx &eval_ctx, eval_ctx_(eval_ctx), aggr_alloc_(label, common::OB_MALLOC_MIDDLE_BLOCK_SIZE, - OB_SERVER_TENANT_ID, + tenant_id, ObCtxIds::WORK_AREA), cur_batch_group_idx_(0), cur_batch_group_buf_(nullptr), diff --git a/src/sql/engine/aggregate/ob_aggregate_processor.h b/src/sql/engine/aggregate/ob_aggregate_processor.h index 13d48f93e3..fda5a452b9 100644 --- a/src/sql/engine/aggregate/ob_aggregate_processor.h +++ b/src/sql/engine/aggregate/ob_aggregate_processor.h @@ -624,7 +624,8 @@ public: ObAggregateProcessor(ObEvalCtx &eval_ctx, ObIArray &aggr_infos, const lib::ObLabel &label, - ObMonitorNode &op_monitor_info); + ObMonitorNode &op_monitor_info, + const int64_t tenant_id); ~ObAggregateProcessor() { destroy(); }; int init(); diff --git a/src/sql/engine/aggregate/ob_groupby_op.h b/src/sql/engine/aggregate/ob_groupby_op.h index 1259af1f66..4bedf8e952 100644 --- a/src/sql/engine/aggregate/ob_groupby_op.h +++ b/src/sql/engine/aggregate/ob_groupby_op.h @@ -69,7 +69,8 @@ public: aggr_processor_(eval_ctx_, (static_cast(const_cast(spec))).aggr_infos_, ObModIds::OB_SQL_AGGR_FUNC_ROW, - op_monitor_info_) + op_monitor_info_, + exec_ctx.get_my_session()->get_effective_tenant_id()) { } inline ObAggregateProcessor &get_aggr_processor() { return aggr_processor_; } diff --git a/src/sql/engine/aggregate/ob_merge_distinct_op.cpp b/src/sql/engine/aggregate/ob_merge_distinct_op.cpp index 502b63e4a2..68fc2fed3e 100644 --- a/src/sql/engine/aggregate/ob_merge_distinct_op.cpp +++ b/src/sql/engine/aggregate/ob_merge_distinct_op.cpp @@ -33,7 +33,7 @@ ObMergeDistinctOp::ObMergeDistinctOp(ObExecContext &exec_ctx, const ObOpSpec &sp : ObOperator(exec_ctx, spec, input), first_got_row_(true), alloc_(ObModIds::OB_SQL_MERGE_GROUPBY, - OB_MALLOC_NORMAL_BLOCK_SIZE, OB_SERVER_TENANT_ID, ObCtxIds::WORK_AREA), + OB_MALLOC_NORMAL_BLOCK_SIZE, exec_ctx.get_my_session()->get_effective_tenant_id(), ObCtxIds::WORK_AREA), last_row_(alloc_) { } diff --git a/src/sql/engine/set/ob_merge_except_op.cpp b/src/sql/engine/set/ob_merge_except_op.cpp index 2ef31eab59..0b1abd31ac 100644 --- a/src/sql/engine/set/ob_merge_except_op.cpp +++ b/src/sql/engine/set/ob_merge_except_op.cpp @@ -160,7 +160,6 @@ int ObMergeExceptOp::inner_get_next_batch(const int64_t max_row_cnt) first_got_left_row_))) { //when we succ locate a row in left batch, store row is out of date, //we will compare inside a batch - last_row_.store_row_ = nullptr; brs_.skip_->unset(curr_left_idx); last_left_idx = curr_left_idx; if (right_iter_end_) { diff --git a/src/sql/engine/set/ob_merge_intersect_op.cpp b/src/sql/engine/set/ob_merge_intersect_op.cpp index e2aab75a17..9c03929c59 100644 --- a/src/sql/engine/set/ob_merge_intersect_op.cpp +++ b/src/sql/engine/set/ob_merge_intersect_op.cpp @@ -158,7 +158,6 @@ int ObMergeIntersectOp::inner_get_next_batch(const int64_t max_row_cnt) first_got_left_row_))) { //when we succ locate a row in left batch, store row is out of date, //we will compare inside a batch - last_row_.store_row_ = nullptr; while (OB_SUCC(ret) && !right_iter_end_) { if (!first_got_right_row_) { if (OB_FAIL(cmp_(right_->get_spec().output_, left_->get_spec().output_, diff --git a/src/sql/engine/set/ob_merge_set_op.cpp b/src/sql/engine/set/ob_merge_set_op.cpp index 37b7328f27..7f24109ae2 100644 --- a/src/sql/engine/set/ob_merge_set_op.cpp +++ b/src/sql/engine/set/ob_merge_set_op.cpp @@ -13,6 +13,7 @@ #define USING_LOG_PREFIX SQL_ENG #include "sql/engine/set/ob_merge_set_op.h" +#include "sql/engine/ob_exec_context.h" namespace oceanbase { @@ -30,7 +31,7 @@ OB_SERIALIZE_MEMBER((ObMergeSetSpec, ObSetSpec)); ObMergeSetOp::ObMergeSetOp(ObExecContext &exec_ctx, const ObOpSpec &spec, ObOpInput *input) : ObOperator(exec_ctx, spec, input), alloc_(ObModIds::OB_SQL_MERGE_GROUPBY, - OB_MALLOC_NORMAL_BLOCK_SIZE, OB_SERVER_TENANT_ID, ObCtxIds::WORK_AREA), + OB_MALLOC_NORMAL_BLOCK_SIZE, exec_ctx.get_my_session()->get_effective_tenant_id(), ObCtxIds::WORK_AREA), last_row_(alloc_), cmp_(), need_skip_init_row_(false), diff --git a/src/sql/engine/set/ob_merge_union_op.cpp b/src/sql/engine/set/ob_merge_union_op.cpp index 442da7bdb6..e563a6f5ff 100644 --- a/src/sql/engine/set/ob_merge_union_op.cpp +++ b/src/sql/engine/set/ob_merge_union_op.cpp @@ -430,7 +430,6 @@ int ObMergeUnionOp::distinct_get_next_batch(const int64_t batch_size) } else { curr_info_->op_added_ = true; //in this branch, we must got a valid row, so reset store_row and record last_valid_idx_ - last_row_.store_row_ = nullptr; last_valid_idx_ = got_cnt; candidate_child_err = do_strict_distinct_vectorize(*candidate_child_op_, last_row_.store_row_, @@ -451,7 +450,6 @@ int ObMergeUnionOp::distinct_get_next_batch(const int64_t batch_size) } } else if (!found_valid_row) { //we got 1 row inside a batch, record its idx and got from expr - last_row_.store_row_ = nullptr; last_valid_idx_ = got_cnt; got_cnt++; //candidate move to end, need to get_next_batch after return batch @@ -483,7 +481,6 @@ int ObMergeUnionOp::distinct_get_next_batch(const int64_t batch_size) } if (OB_SUCC(ret)) { //we got 1 row inside a batch, record its idx and got from expr - last_row_.store_row_ = nullptr; last_valid_idx_ = got_cnt; got_cnt++; } @@ -504,7 +501,6 @@ int ObMergeUnionOp::distinct_get_next_batch(const int64_t batch_size) } else { //we got 1st row, record its idx and set last_row.store_row_ to nullptr curr_info_->op_added_ = true; - last_row_.store_row_ = nullptr; last_valid_idx_ = got_cnt; got_cnt++; } @@ -648,11 +644,11 @@ int ObMergeUnionOp::do_strict_distinct_vectorize(ObOperator &child_op, if (op_info.op_brs_->skip_->at(i)) { continue; } - if (OB_UNLIKELY(nullptr != compare_row) + if (OB_UNLIKELY(compare_idx < 0) && OB_FAIL(cmp_(*compare_row, child_op.get_spec().output_, op_info.op_idx_, eval_ctx_, cmp))) { LOG_WARN("strict compare with last row failed", K(ret)); - } else if (OB_LIKELY(nullptr == compare_row) + } else if (OB_LIKELY(compare_idx >= 0) && OB_FAIL(cmp_(compare_expr, child_op.get_spec().output_, compare_idx, op_info.op_idx_, eval_ctx_, cmp))) { LOG_WARN("strict compare with last expr failed", K(ret)); @@ -677,11 +673,11 @@ int ObMergeUnionOp::do_strict_distinct_vectorize(ObOperator &child_op, if (op_info.op_brs_->skip_->at(i)) { continue; } - if (OB_UNLIKELY(nullptr != compare_row) + if (OB_UNLIKELY(compare_idx < 0) && OB_FAIL(cmp_(*compare_row, child_op.get_spec().output_, op_info.op_idx_, eval_ctx_, cmp))) { LOG_WARN("strict compare with last row failed", K(ret)); - } else if (OB_LIKELY(nullptr == compare_row) + } else if (OB_LIKELY(compare_idx >= 0) && OB_FAIL(cmp_(compare_expr, child_op.get_spec().output_, compare_idx, op_info.op_idx_, eval_ctx_, cmp))) { LOG_WARN("strict compare with last row failed ", K(ret)); diff --git a/src/sql/engine/window_function/ob_window_function_op.cpp b/src/sql/engine/window_function/ob_window_function_op.cpp index 9f7fd5425c..5b89b38ade 100644 --- a/src/sql/engine/window_function/ob_window_function_op.cpp +++ b/src/sql/engine/window_function/ob_window_function_op.cpp @@ -1221,7 +1221,7 @@ int ObWindowFunctionOp::init() } else if (OB_FAIL(aggr_infos->push_back(wf_info.aggr_info_))) { LOG_WARN("failed to push_back", K(wf_info.aggr_info_), K(ret)); } else { - AggrCell *aggr_func = new (tmp_ptr) AggrCell(wf_info, *this, *aggr_infos); + AggrCell *aggr_func = new (tmp_ptr) AggrCell(wf_info, *this, *aggr_infos, tenant_id); aggr_func->aggr_processor_.set_in_window_func(); if (OB_FAIL(aggr_func->aggr_processor_.init())) { LOG_WARN("failed to initialize init_group_rows", K(ret)); diff --git a/src/sql/engine/window_function/ob_window_function_op.h b/src/sql/engine/window_function/ob_window_function_op.h index 79bd7e3bbd..3857d4095e 100644 --- a/src/sql/engine/window_function/ob_window_function_op.h +++ b/src/sql/engine/window_function/ob_window_function_op.h @@ -557,10 +557,10 @@ public: class AggrCell : public WinFuncCell { public: - AggrCell(WinFuncInfo &wf_info, ObWindowFunctionOp &op, ObIArray &aggr_infos) + AggrCell(WinFuncInfo &wf_info, ObWindowFunctionOp &op, ObIArray &aggr_infos, const int64_t tenant_id) : WinFuncCell(wf_info, op), finish_prepared_(false), - aggr_processor_(op_.eval_ctx_, aggr_infos, "WindowAggProc", op.get_monitor_info()), + aggr_processor_(op_.eval_ctx_, aggr_infos, "WindowAggProc", op.get_monitor_info(), tenant_id), result_(), got_result_(false), remove_type_(wf_info.remove_type_)