[CP] [CP] fix: fix the core induced by NLJ batch rescan not reset the ptr of ObDatum
This commit is contained in:
@ -2367,7 +2367,7 @@ static struct VarsInit{
|
|||||||
}();
|
}();
|
||||||
|
|
||||||
[&] (){
|
[&] (){
|
||||||
ObSysVars[166].default_value_ = "1" ;
|
ObSysVars[166].default_value_ = "0" ;
|
||||||
ObSysVars[166].info_ = "enable batching of the RHS IO in NLJ" ;
|
ObSysVars[166].info_ = "enable batching of the RHS IO in NLJ" ;
|
||||||
ObSysVars[166].name_ = "_nlj_batching_enabled" ;
|
ObSysVars[166].name_ = "_nlj_batching_enabled" ;
|
||||||
ObSysVars[166].data_type_ = ObIntType ;
|
ObSysVars[166].data_type_ = ObIntType ;
|
||||||
|
|||||||
@ -2379,7 +2379,7 @@
|
|||||||
"_nlj_batching_enabled": {
|
"_nlj_batching_enabled": {
|
||||||
"id": 10080,
|
"id": 10080,
|
||||||
"name": "_nlj_batching_enabled",
|
"name": "_nlj_batching_enabled",
|
||||||
"default_value": "1",
|
"default_value": "0",
|
||||||
"base_value": "1",
|
"base_value": "1",
|
||||||
"data_type": "bool",
|
"data_type": "bool",
|
||||||
"info": "enable batching of the RHS IO in NLJ",
|
"info": "enable batching of the RHS IO in NLJ",
|
||||||
|
|||||||
@ -98,6 +98,7 @@ int ObDASGroupScanOp::open_op()
|
|||||||
iter_.init_group_range(cur_group_idx_, group_size_);
|
iter_.init_group_range(cur_group_idx_, group_size_);
|
||||||
if (OB_FAIL(iter_.init_row_store(scan_ctdef_->result_output_,
|
if (OB_FAIL(iter_.init_row_store(scan_ctdef_->result_output_,
|
||||||
*scan_rtdef_->eval_ctx_,
|
*scan_rtdef_->eval_ctx_,
|
||||||
|
scan_ctdef_->pd_expr_spec_.access_exprs_,
|
||||||
scan_rtdef_->stmt_allocator_,
|
scan_rtdef_->stmt_allocator_,
|
||||||
max_size,
|
max_size,
|
||||||
scan_ctdef_->group_id_expr_,
|
scan_ctdef_->group_id_expr_,
|
||||||
@ -241,6 +242,7 @@ int ObDASGroupScanOp::decode_task_result(ObIDASTaskResult *task_result)
|
|||||||
iter_.init_group_range(cur_group_idx_, group_size_);
|
iter_.init_group_range(cur_group_idx_, group_size_);
|
||||||
if (OB_FAIL(iter_.init_row_store(scan_ctdef_->result_output_,
|
if (OB_FAIL(iter_.init_row_store(scan_ctdef_->result_output_,
|
||||||
*scan_rtdef_->eval_ctx_,
|
*scan_rtdef_->eval_ctx_,
|
||||||
|
scan_ctdef_->pd_expr_spec_.access_exprs_,
|
||||||
scan_rtdef_->stmt_allocator_,
|
scan_rtdef_->stmt_allocator_,
|
||||||
max_size,
|
max_size,
|
||||||
scan_ctdef_->group_id_expr_,
|
scan_ctdef_->group_id_expr_,
|
||||||
@ -296,6 +298,7 @@ int ObGroupLookupOp::init_group_scan_iter(int64_t cur_group_idx,
|
|||||||
group_iter_.init_group_range(cur_group_idx, group_size);
|
group_iter_.init_group_range(cur_group_idx, group_size);
|
||||||
OZ(group_iter_.init_row_store(lookup_ctdef_->result_output_,
|
OZ(group_iter_.init_row_store(lookup_ctdef_->result_output_,
|
||||||
*lookup_rtdef_->eval_ctx_,
|
*lookup_rtdef_->eval_ctx_,
|
||||||
|
lookup_ctdef_->pd_expr_spec_.access_exprs_,
|
||||||
lookup_rtdef_->stmt_allocator_,
|
lookup_rtdef_->stmt_allocator_,
|
||||||
max_row_store_size,
|
max_row_store_size,
|
||||||
group_id_expr,
|
group_id_expr,
|
||||||
|
|||||||
@ -21,6 +21,7 @@ namespace sql
|
|||||||
{
|
{
|
||||||
int ObGroupResultRows::init(const common::ObIArray<ObExpr *> &exprs,
|
int ObGroupResultRows::init(const common::ObIArray<ObExpr *> &exprs,
|
||||||
ObEvalCtx &eval_ctx,
|
ObEvalCtx &eval_ctx,
|
||||||
|
const ExprFixedArray &access_exprs,
|
||||||
ObIAllocator &das_op_allocator,
|
ObIAllocator &das_op_allocator,
|
||||||
int64_t max_size,
|
int64_t max_size,
|
||||||
ObExpr *group_id_expr,
|
ObExpr *group_id_expr,
|
||||||
@ -50,6 +51,7 @@ int ObGroupResultRows::init(const common::ObIArray<ObExpr *> &exprs,
|
|||||||
}
|
}
|
||||||
inited_ = true;
|
inited_ = true;
|
||||||
exprs_ = &exprs;
|
exprs_ = &exprs;
|
||||||
|
access_exprs_ = &access_exprs;
|
||||||
eval_ctx_ = &eval_ctx;
|
eval_ctx_ = &eval_ctx;
|
||||||
max_size_ = max_size;
|
max_size_ = max_size;
|
||||||
group_id_expr_pos_ = OB_INVALID_INDEX;
|
group_id_expr_pos_ = OB_INVALID_INDEX;
|
||||||
@ -349,10 +351,12 @@ int ObGroupScanIter::get_next_rows(int64_t &count, int64_t capacity)
|
|||||||
|
|
||||||
void ObGroupScanIter::reset_expr_datum_ptr()
|
void ObGroupScanIter::reset_expr_datum_ptr()
|
||||||
{
|
{
|
||||||
FOREACH_CNT(e, *row_store_.exprs_) {
|
if (OB_NOT_NULL(row_store_.access_exprs_)) {
|
||||||
(*e)->locate_datums_for_update(*row_store_.eval_ctx_, row_store_.max_size_);
|
FOREACH_CNT(e, *row_store_.access_exprs_) {
|
||||||
ObEvalInfo &info = (*e)->get_eval_info(*row_store_.eval_ctx_);
|
(*e)->locate_datums_for_update(*row_store_.eval_ctx_, row_store_.max_size_);
|
||||||
info.point_to_frame_ = true;
|
ObEvalInfo &info = (*e)->get_eval_info(*row_store_.eval_ctx_);
|
||||||
|
info.point_to_frame_ = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -23,7 +23,7 @@ class ObGroupResultRows
|
|||||||
public:
|
public:
|
||||||
ObGroupResultRows() : inited_(false), exprs_(NULL), eval_ctx_(NULL),
|
ObGroupResultRows() : inited_(false), exprs_(NULL), eval_ctx_(NULL),
|
||||||
saved_size_(0), max_size_(1), start_pos_(0), group_id_expr_pos_(0),
|
saved_size_(0), max_size_(1), start_pos_(0), group_id_expr_pos_(0),
|
||||||
rows_(NULL), need_check_output_datum_(false),reuse_alloc_(nullptr)
|
rows_(NULL), need_check_output_datum_(false),reuse_alloc_(nullptr),access_exprs_(nullptr)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -31,6 +31,7 @@ public:
|
|||||||
|
|
||||||
int init(const common::ObIArray<ObExpr *> &exprs,
|
int init(const common::ObIArray<ObExpr *> &exprs,
|
||||||
ObEvalCtx &eval_ctx,
|
ObEvalCtx &eval_ctx,
|
||||||
|
const ExprFixedArray &access_exprs,
|
||||||
common::ObIAllocator &das_op_allocator,
|
common::ObIAllocator &das_op_allocator,
|
||||||
int64_t max_size,
|
int64_t max_size,
|
||||||
ObExpr *group_id_expr,
|
ObExpr *group_id_expr,
|
||||||
@ -57,6 +58,7 @@ public:
|
|||||||
reuse_alloc_->~ObArenaAllocator();
|
reuse_alloc_->~ObArenaAllocator();
|
||||||
reuse_alloc_ = nullptr;
|
reuse_alloc_ = nullptr;
|
||||||
}
|
}
|
||||||
|
access_exprs_ = nullptr;
|
||||||
}
|
}
|
||||||
TO_STRING_KV(K_(saved_size),
|
TO_STRING_KV(K_(saved_size),
|
||||||
K_(start_pos),
|
K_(start_pos),
|
||||||
@ -86,6 +88,7 @@ public:
|
|||||||
//be move from das into Table scan op, every thing will be easy.
|
//be move from das into Table scan op, every thing will be easy.
|
||||||
common::ObArenaAllocator *reuse_alloc_;
|
common::ObArenaAllocator *reuse_alloc_;
|
||||||
char reuse_alloc_buf_[sizeof(common::ObArenaAllocator)];
|
char reuse_alloc_buf_[sizeof(common::ObArenaAllocator)];
|
||||||
|
const ExprFixedArray *access_exprs_;
|
||||||
};
|
};
|
||||||
|
|
||||||
class ObGroupScanIter : public ObNewRowIterator
|
class ObGroupScanIter : public ObNewRowIterator
|
||||||
@ -116,6 +119,7 @@ public:
|
|||||||
ObExpr *get_group_id_expr() { return group_id_expr_; }
|
ObExpr *get_group_id_expr() { return group_id_expr_; }
|
||||||
int init_row_store(const common::ObIArray<ObExpr *> &exprs,
|
int init_row_store(const common::ObIArray<ObExpr *> &exprs,
|
||||||
ObEvalCtx &eval_ctx,
|
ObEvalCtx &eval_ctx,
|
||||||
|
const ExprFixedArray &access_exprs,
|
||||||
common::ObIAllocator &das_op_allocator,
|
common::ObIAllocator &das_op_allocator,
|
||||||
int64_t max_size,
|
int64_t max_size,
|
||||||
ObExpr *group_id_expr,
|
ObExpr *group_id_expr,
|
||||||
@ -127,6 +131,7 @@ public:
|
|||||||
iter_ = iter;
|
iter_ = iter;
|
||||||
return row_store_.init(exprs,
|
return row_store_.init(exprs,
|
||||||
eval_ctx,
|
eval_ctx,
|
||||||
|
access_exprs,
|
||||||
das_op_allocator,
|
das_op_allocator,
|
||||||
max_size,
|
max_size,
|
||||||
group_id_expr,
|
group_id_expr,
|
||||||
|
|||||||
@ -28,6 +28,8 @@ alter tenant sys set variables ob_enable_truncate_flashback = 'on';
|
|||||||
alter tenant mysql set variables ob_tcp_invited_nodes='%';
|
alter tenant mysql set variables ob_tcp_invited_nodes='%';
|
||||||
alter tenant mysql set variables recyclebin = 'on';
|
alter tenant mysql set variables recyclebin = 'on';
|
||||||
alter tenant mysql set variables ob_enable_truncate_flashback = 'on';
|
alter tenant mysql set variables ob_enable_truncate_flashback = 'on';
|
||||||
|
alter tenant sys set variables _nlj_batching_enabled = true;
|
||||||
|
alter tenant mysql set variables _nlj_batching_enabled = true;
|
||||||
alter system set ob_compaction_schedule_interval = '10s' tenant sys;
|
alter system set ob_compaction_schedule_interval = '10s' tenant sys;
|
||||||
alter system set ob_compaction_schedule_interval = '10s' tenant all_user;
|
alter system set ob_compaction_schedule_interval = '10s' tenant all_user;
|
||||||
alter system set ob_compaction_schedule_interval = '10s' tenant all_meta;
|
alter system set ob_compaction_schedule_interval = '10s' tenant all_meta;
|
||||||
|
|||||||
Reference in New Issue
Block a user