Fix memory expansion when das task retry.
This commit is contained in:
		@ -81,6 +81,8 @@ int ObDASGroupScanOp::open_op()
 | 
				
			|||||||
  int64_t max_size = scan_rtdef_->eval_ctx_->is_vectorized()
 | 
					  int64_t max_size = scan_rtdef_->eval_ctx_->is_vectorized()
 | 
				
			||||||
                     ? scan_rtdef_->eval_ctx_->max_batch_size_
 | 
					                     ? scan_rtdef_->eval_ctx_->max_batch_size_
 | 
				
			||||||
                     :1;
 | 
					                     :1;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  ObMemAttr attr = scan_rtdef_->stmt_allocator_.get_attr();
 | 
				
			||||||
  iter_.init_group_range(cur_group_idx_, group_size_);
 | 
					  iter_.init_group_range(cur_group_idx_, group_size_);
 | 
				
			||||||
  if (OB_FAIL(iter_.init_row_store(scan_ctdef_->result_output_,
 | 
					  if (OB_FAIL(iter_.init_row_store(scan_ctdef_->result_output_,
 | 
				
			||||||
                                   *scan_rtdef_->eval_ctx_,
 | 
					                                   *scan_rtdef_->eval_ctx_,
 | 
				
			||||||
@ -88,7 +90,8 @@ int ObDASGroupScanOp::open_op()
 | 
				
			|||||||
                                   max_size,
 | 
					                                   max_size,
 | 
				
			||||||
                                   scan_ctdef_->group_id_expr_,
 | 
					                                   scan_ctdef_->group_id_expr_,
 | 
				
			||||||
                                   &this->get_scan_result(),
 | 
					                                   &this->get_scan_result(),
 | 
				
			||||||
                                   scan_rtdef_->need_check_output_datum_))) {
 | 
					                                   scan_rtdef_->need_check_output_datum_,
 | 
				
			||||||
 | 
					                                   attr))) {
 | 
				
			||||||
    LOG_WARN("fail to init iter", K(ret));
 | 
					    LOG_WARN("fail to init iter", K(ret));
 | 
				
			||||||
  } else if (OB_FAIL(ObDASScanOp::open_op())) {
 | 
					  } else if (OB_FAIL(ObDASScanOp::open_op())) {
 | 
				
			||||||
    LOG_WARN("fail to open op", K(ret));
 | 
					    LOG_WARN("fail to open op", K(ret));
 | 
				
			||||||
@ -222,6 +225,7 @@ int ObDASGroupScanOp::decode_task_result(ObIDASTaskResult *task_result)
 | 
				
			|||||||
                       ? scan_rtdef_->eval_ctx_->max_batch_size_
 | 
					                       ? scan_rtdef_->eval_ctx_->max_batch_size_
 | 
				
			||||||
                       :1;
 | 
					                       :1;
 | 
				
			||||||
    if (OB_SUCC(ret) && NULL == iter_.get_group_id_expr()) {
 | 
					    if (OB_SUCC(ret) && NULL == iter_.get_group_id_expr()) {
 | 
				
			||||||
 | 
					      ObMemAttr attr = scan_rtdef_->stmt_allocator_.get_attr();
 | 
				
			||||||
      iter_.init_group_range(cur_group_idx_, group_size_);
 | 
					      iter_.init_group_range(cur_group_idx_, group_size_);
 | 
				
			||||||
      if (OB_FAIL(iter_.init_row_store(scan_ctdef_->result_output_,
 | 
					      if (OB_FAIL(iter_.init_row_store(scan_ctdef_->result_output_,
 | 
				
			||||||
                                       *scan_rtdef_->eval_ctx_,
 | 
					                                       *scan_rtdef_->eval_ctx_,
 | 
				
			||||||
@ -229,7 +233,8 @@ int ObDASGroupScanOp::decode_task_result(ObIDASTaskResult *task_result)
 | 
				
			|||||||
                                       max_size,
 | 
					                                       max_size,
 | 
				
			||||||
                                       scan_ctdef_->group_id_expr_,
 | 
					                                       scan_ctdef_->group_id_expr_,
 | 
				
			||||||
                                       &this->get_scan_result(),
 | 
					                                       &this->get_scan_result(),
 | 
				
			||||||
                                       scan_rtdef_->need_check_output_datum_))) {
 | 
					                                       scan_rtdef_->need_check_output_datum_,
 | 
				
			||||||
 | 
					                                       attr))) {
 | 
				
			||||||
        LOG_WARN("fail to init iter", K(ret));
 | 
					        LOG_WARN("fail to init iter", K(ret));
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
@ -287,6 +292,7 @@ int ObGroupLookupOp::init_group_scan_iter(int64_t cur_group_idx,
 | 
				
			|||||||
  bool is_vectorized = lookup_rtdef_->p_pd_expr_op_->is_vectorized();
 | 
					  bool is_vectorized = lookup_rtdef_->p_pd_expr_op_->is_vectorized();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  int64_t max_row_store_size = is_vectorized ? lookup_rtdef_->eval_ctx_->max_batch_size_: 1;
 | 
					  int64_t max_row_store_size = is_vectorized ? lookup_rtdef_->eval_ctx_->max_batch_size_: 1;
 | 
				
			||||||
 | 
					  ObMemAttr attr = lookup_rtdef_->stmt_allocator_.get_attr();
 | 
				
			||||||
  group_iter_.init_group_range(cur_group_idx, group_size);
 | 
					  group_iter_.init_group_range(cur_group_idx, group_size);
 | 
				
			||||||
  OZ(group_iter_.init_row_store(lookup_ctdef_->result_output_,
 | 
					  OZ(group_iter_.init_row_store(lookup_ctdef_->result_output_,
 | 
				
			||||||
                                *lookup_rtdef_->eval_ctx_,
 | 
					                                *lookup_rtdef_->eval_ctx_,
 | 
				
			||||||
@ -294,7 +300,8 @@ int ObGroupLookupOp::init_group_scan_iter(int64_t cur_group_idx,
 | 
				
			|||||||
                                max_row_store_size,
 | 
					                                max_row_store_size,
 | 
				
			||||||
                                group_id_expr,
 | 
					                                group_id_expr,
 | 
				
			||||||
                                &group_iter_.get_result_tmp_iter(),
 | 
					                                &group_iter_.get_result_tmp_iter(),
 | 
				
			||||||
                                lookup_rtdef_->need_check_output_datum_));
 | 
					                                lookup_rtdef_->need_check_output_datum_,
 | 
				
			||||||
 | 
					                                attr));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  return ret;
 | 
					  return ret;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -24,15 +24,22 @@ int ObGroupResultRows::init(const common::ObIArray<ObExpr *> &exprs,
 | 
				
			|||||||
                            ObIAllocator &das_op_allocator,
 | 
					                            ObIAllocator &das_op_allocator,
 | 
				
			||||||
                            int64_t max_size,
 | 
					                            int64_t max_size,
 | 
				
			||||||
                            ObExpr *group_id_expr,
 | 
					                            ObExpr *group_id_expr,
 | 
				
			||||||
                            bool need_check_output_datum)
 | 
					                            bool need_check_output_datum,
 | 
				
			||||||
 | 
					                            ObMemAttr& attr)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
  int ret = OB_SUCCESS;
 | 
					  int ret = OB_SUCCESS;
 | 
				
			||||||
  if (inited_) {
 | 
					  //Temp fix see the comment in the ob_group_scan_iter.cpp
 | 
				
			||||||
 | 
					  if (inited_ || nullptr != reuse_alloc_) {
 | 
				
			||||||
    ret = OB_INIT_TWICE;
 | 
					    ret = OB_INIT_TWICE;
 | 
				
			||||||
    LOG_WARN("init twice", K(ret));
 | 
					    LOG_WARN("init twice", K(ret));
 | 
				
			||||||
  } else {
 | 
					  } else {
 | 
				
			||||||
    need_check_output_datum_ = need_check_output_datum;
 | 
					    need_check_output_datum_ = need_check_output_datum;
 | 
				
			||||||
    rows_ = static_cast<LastDASStoreRow *>(das_op_allocator.alloc(max_size * sizeof(LastDASStoreRow)));
 | 
					    //Temp fix see the comment in the ob_group_scan_iter.cpp
 | 
				
			||||||
 | 
					    if (nullptr == reuse_alloc_) {
 | 
				
			||||||
 | 
					      reuse_alloc_ = new(reuse_alloc_buf_) common::ObArenaAllocator();
 | 
				
			||||||
 | 
					      reuse_alloc_->set_attr(attr);
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    rows_ = static_cast<LastDASStoreRow *>(reuse_alloc_->alloc(max_size * sizeof(LastDASStoreRow)));
 | 
				
			||||||
    if (NULL == rows_) {
 | 
					    if (NULL == rows_) {
 | 
				
			||||||
      ret = OB_ALLOCATE_MEMORY_FAILED;
 | 
					      ret = OB_ALLOCATE_MEMORY_FAILED;
 | 
				
			||||||
      LOG_WARN("fail to alloc memory", K(max_size), K(ret));
 | 
					      LOG_WARN("fail to alloc memory", K(max_size), K(ret));
 | 
				
			||||||
 | 
				
			|||||||
@ -23,7 +23,7 @@ class ObGroupResultRows
 | 
				
			|||||||
public:
 | 
					public:
 | 
				
			||||||
  ObGroupResultRows() : inited_(false), exprs_(NULL), eval_ctx_(NULL),
 | 
					  ObGroupResultRows() : inited_(false), exprs_(NULL), eval_ctx_(NULL),
 | 
				
			||||||
                        saved_size_(0), max_size_(1), start_pos_(0), group_id_expr_pos_(0),
 | 
					                        saved_size_(0), max_size_(1), start_pos_(0), group_id_expr_pos_(0),
 | 
				
			||||||
                        rows_(NULL), need_check_output_datum_(false)
 | 
					                        rows_(NULL), need_check_output_datum_(false),reuse_alloc_(nullptr)
 | 
				
			||||||
  {
 | 
					  {
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -32,7 +32,8 @@ public:
 | 
				
			|||||||
           common::ObIAllocator &das_op_allocator,
 | 
					           common::ObIAllocator &das_op_allocator,
 | 
				
			||||||
           int64_t max_size,
 | 
					           int64_t max_size,
 | 
				
			||||||
           ObExpr *group_id_expr,
 | 
					           ObExpr *group_id_expr,
 | 
				
			||||||
           bool need_check_output_datum);
 | 
					           bool need_check_output_datum,
 | 
				
			||||||
 | 
					           ObMemAttr& attr);
 | 
				
			||||||
  int save(bool is_vectorized, int64_t start_pos, int64_t size);
 | 
					  int save(bool is_vectorized, int64_t start_pos, int64_t size);
 | 
				
			||||||
  int to_expr(bool is_vectorized, int64_t start_pos, int64_t size);
 | 
					  int to_expr(bool is_vectorized, int64_t start_pos, int64_t size);
 | 
				
			||||||
  int64_t cur_group_idx();
 | 
					  int64_t cur_group_idx();
 | 
				
			||||||
@ -48,6 +49,12 @@ public:
 | 
				
			|||||||
    group_id_expr_pos_ = 0;
 | 
					    group_id_expr_pos_ = 0;
 | 
				
			||||||
    rows_ = NULL;
 | 
					    rows_ = NULL;
 | 
				
			||||||
    need_check_output_datum_ = false;
 | 
					    need_check_output_datum_ = false;
 | 
				
			||||||
 | 
					    //Temp fix
 | 
				
			||||||
 | 
					    if (reuse_alloc_ != nullptr) {
 | 
				
			||||||
 | 
					      reuse_alloc_->reset();
 | 
				
			||||||
 | 
					      reuse_alloc_->~ObArenaAllocator();
 | 
				
			||||||
 | 
					      reuse_alloc_ = nullptr;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
  TO_STRING_KV(K_(saved_size),
 | 
					  TO_STRING_KV(K_(saved_size),
 | 
				
			||||||
               K_(start_pos),
 | 
					               K_(start_pos),
 | 
				
			||||||
@ -66,6 +73,17 @@ public:
 | 
				
			|||||||
  int64_t group_id_expr_pos_;
 | 
					  int64_t group_id_expr_pos_;
 | 
				
			||||||
  LastDASStoreRow *rows_;
 | 
					  LastDASStoreRow *rows_;
 | 
				
			||||||
  bool need_check_output_datum_;
 | 
					  bool need_check_output_datum_;
 | 
				
			||||||
 | 
					  //Temp fix
 | 
				
			||||||
 | 
					  //Current implement group iter is eval in das task context.
 | 
				
			||||||
 | 
					  //Whe Das task retry we hard to ues allocator pass from eval ctx, we can not free or
 | 
				
			||||||
 | 
					  //reuse LastDASStoreRow memory because when das task is remote the alloctor is change,
 | 
				
			||||||
 | 
					  //only memory from eval ctx can be reuse.
 | 
				
			||||||
 | 
					  //So we just introduce this temp fix use a new alloctor make LastDASStoreRow have a same
 | 
				
			||||||
 | 
					  //life cycle with ObGroupResultRows.
 | 
				
			||||||
 | 
					  //After next version @xiyang.gjc will refactor group rescan, then ObGroupResultRows will
 | 
				
			||||||
 | 
					  //be move from das into Table scan op, every thing will be easy.
 | 
				
			||||||
 | 
					  common::ObArenaAllocator *reuse_alloc_;
 | 
				
			||||||
 | 
					  char reuse_alloc_buf_[sizeof(common::ObArenaAllocator)];
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class ObGroupScanIter : public ObNewRowIterator
 | 
					class ObGroupScanIter : public ObNewRowIterator
 | 
				
			||||||
@ -99,7 +117,8 @@ public:
 | 
				
			|||||||
                     int64_t max_size,
 | 
					                     int64_t max_size,
 | 
				
			||||||
                     ObExpr *group_id_expr,
 | 
					                     ObExpr *group_id_expr,
 | 
				
			||||||
                     ObNewRowIterator **iter,
 | 
					                     ObNewRowIterator **iter,
 | 
				
			||||||
                     bool need_check_output_datum)
 | 
					                     bool need_check_output_datum,
 | 
				
			||||||
 | 
					                     ObMemAttr& attr)
 | 
				
			||||||
  {
 | 
					  {
 | 
				
			||||||
    group_id_expr_ = group_id_expr;
 | 
					    group_id_expr_ = group_id_expr;
 | 
				
			||||||
    iter_ = iter;
 | 
					    iter_ = iter;
 | 
				
			||||||
@ -108,7 +127,8 @@ public:
 | 
				
			|||||||
                           das_op_allocator,
 | 
					                           das_op_allocator,
 | 
				
			||||||
                           max_size,
 | 
					                           max_size,
 | 
				
			||||||
                           group_id_expr,
 | 
					                           group_id_expr,
 | 
				
			||||||
                           need_check_output_datum);
 | 
					                           need_check_output_datum,
 | 
				
			||||||
 | 
					                           attr);
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
  ObNewRowIterator *&get_result_tmp_iter() { return result_tmp_iter_; }
 | 
					  ObNewRowIterator *&get_result_tmp_iter() { return result_tmp_iter_; }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user