diff --git a/src/sql/das/ob_das_group_scan_op.cpp b/src/sql/das/ob_das_group_scan_op.cpp index 38f196cf5..1df48f16d 100644 --- a/src/sql/das/ob_das_group_scan_op.cpp +++ b/src/sql/das/ob_das_group_scan_op.cpp @@ -81,6 +81,8 @@ int ObDASGroupScanOp::open_op() int64_t max_size = scan_rtdef_->eval_ctx_->is_vectorized() ? scan_rtdef_->eval_ctx_->max_batch_size_ :1; + + ObMemAttr attr = scan_rtdef_->stmt_allocator_.get_attr(); iter_.init_group_range(cur_group_idx_, group_size_); if (OB_FAIL(iter_.init_row_store(scan_ctdef_->result_output_, *scan_rtdef_->eval_ctx_, @@ -88,7 +90,8 @@ int ObDASGroupScanOp::open_op() max_size, scan_ctdef_->group_id_expr_, &this->get_scan_result(), - scan_rtdef_->need_check_output_datum_))) { + scan_rtdef_->need_check_output_datum_, + attr))) { LOG_WARN("fail to init iter", K(ret)); } else if (OB_FAIL(ObDASScanOp::open_op())) { LOG_WARN("fail to open op", K(ret)); @@ -222,6 +225,7 @@ int ObDASGroupScanOp::decode_task_result(ObIDASTaskResult *task_result) ? scan_rtdef_->eval_ctx_->max_batch_size_ :1; if (OB_SUCC(ret) && NULL == iter_.get_group_id_expr()) { + ObMemAttr attr = scan_rtdef_->stmt_allocator_.get_attr(); iter_.init_group_range(cur_group_idx_, group_size_); if (OB_FAIL(iter_.init_row_store(scan_ctdef_->result_output_, *scan_rtdef_->eval_ctx_, @@ -229,7 +233,8 @@ int ObDASGroupScanOp::decode_task_result(ObIDASTaskResult *task_result) max_size, scan_ctdef_->group_id_expr_, &this->get_scan_result(), - scan_rtdef_->need_check_output_datum_))) { + scan_rtdef_->need_check_output_datum_, + attr))) { LOG_WARN("fail to init iter", K(ret)); } } @@ -287,6 +292,7 @@ int ObGroupLookupOp::init_group_scan_iter(int64_t cur_group_idx, bool is_vectorized = lookup_rtdef_->p_pd_expr_op_->is_vectorized(); int64_t max_row_store_size = is_vectorized ? lookup_rtdef_->eval_ctx_->max_batch_size_: 1; + ObMemAttr attr = lookup_rtdef_->stmt_allocator_.get_attr(); group_iter_.init_group_range(cur_group_idx, group_size); OZ(group_iter_.init_row_store(lookup_ctdef_->result_output_, *lookup_rtdef_->eval_ctx_, @@ -294,7 +300,8 @@ int ObGroupLookupOp::init_group_scan_iter(int64_t cur_group_idx, max_row_store_size, group_id_expr, &group_iter_.get_result_tmp_iter(), - lookup_rtdef_->need_check_output_datum_)); + lookup_rtdef_->need_check_output_datum_, + attr)); return ret; } diff --git a/src/sql/das/ob_group_scan_iter.cpp b/src/sql/das/ob_group_scan_iter.cpp index 704d544c1..e6ff704cd 100644 --- a/src/sql/das/ob_group_scan_iter.cpp +++ b/src/sql/das/ob_group_scan_iter.cpp @@ -24,15 +24,22 @@ int ObGroupResultRows::init(const common::ObIArray &exprs, ObIAllocator &das_op_allocator, int64_t max_size, ObExpr *group_id_expr, - bool need_check_output_datum) + bool need_check_output_datum, + ObMemAttr& attr) { int ret = OB_SUCCESS; - if (inited_) { + //Temp fix see the comment in the ob_group_scan_iter.cpp + if (inited_ || nullptr != reuse_alloc_) { ret = OB_INIT_TWICE; LOG_WARN("init twice", K(ret)); } else { need_check_output_datum_ = need_check_output_datum; - rows_ = static_cast(das_op_allocator.alloc(max_size * sizeof(LastDASStoreRow))); + //Temp fix see the comment in the ob_group_scan_iter.cpp + if (nullptr == reuse_alloc_) { + reuse_alloc_ = new(reuse_alloc_buf_) common::ObArenaAllocator(); + reuse_alloc_->set_attr(attr); + } + rows_ = static_cast(reuse_alloc_->alloc(max_size * sizeof(LastDASStoreRow))); if (NULL == rows_) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to alloc memory", K(max_size), K(ret)); diff --git a/src/sql/das/ob_group_scan_iter.h b/src/sql/das/ob_group_scan_iter.h index a26833437..58ee99561 100644 --- a/src/sql/das/ob_group_scan_iter.h +++ b/src/sql/das/ob_group_scan_iter.h @@ -23,7 +23,7 @@ class ObGroupResultRows public: ObGroupResultRows() : inited_(false), exprs_(NULL), eval_ctx_(NULL), saved_size_(0), max_size_(1), start_pos_(0), group_id_expr_pos_(0), - rows_(NULL), need_check_output_datum_(false) + rows_(NULL), need_check_output_datum_(false),reuse_alloc_(nullptr) { } @@ -32,7 +32,8 @@ public: common::ObIAllocator &das_op_allocator, int64_t max_size, ObExpr *group_id_expr, - bool need_check_output_datum); + bool need_check_output_datum, + ObMemAttr& attr); int save(bool is_vectorized, int64_t start_pos, int64_t size); int to_expr(bool is_vectorized, int64_t start_pos, int64_t size); int64_t cur_group_idx(); @@ -48,6 +49,12 @@ public: group_id_expr_pos_ = 0; rows_ = NULL; need_check_output_datum_ = false; + //Temp fix + if (reuse_alloc_ != nullptr) { + reuse_alloc_->reset(); + reuse_alloc_->~ObArenaAllocator(); + reuse_alloc_ = nullptr; + } } TO_STRING_KV(K_(saved_size), K_(start_pos), @@ -66,6 +73,17 @@ public: int64_t group_id_expr_pos_; LastDASStoreRow *rows_; bool need_check_output_datum_; + //Temp fix + //Current implement group iter is eval in das task context. + //Whe Das task retry we hard to ues allocator pass from eval ctx, we can not free or + //reuse LastDASStoreRow memory because when das task is remote the alloctor is change, + //only memory from eval ctx can be reuse. + //So we just introduce this temp fix use a new alloctor make LastDASStoreRow have a same + //life cycle with ObGroupResultRows. + //After next version @xiyang.gjc will refactor group rescan, then ObGroupResultRows will + //be move from das into Table scan op, every thing will be easy. + common::ObArenaAllocator *reuse_alloc_; + char reuse_alloc_buf_[sizeof(common::ObArenaAllocator)]; }; class ObGroupScanIter : public ObNewRowIterator @@ -99,7 +117,8 @@ public: int64_t max_size, ObExpr *group_id_expr, ObNewRowIterator **iter, - bool need_check_output_datum) + bool need_check_output_datum, + ObMemAttr& attr) { group_id_expr_ = group_id_expr; iter_ = iter; @@ -108,7 +127,8 @@ public: das_op_allocator, max_size, group_id_expr, - need_check_output_datum); + need_check_output_datum, + attr); } ObNewRowIterator *&get_result_tmp_iter() { return result_tmp_iter_; }