[FEAT MERGE] merge NLJ/SPF group rescan

This commit is contained in:
obdev
2023-01-12 15:17:04 +00:00
committed by OB-robot
parent 4799992a7f
commit 263a44af96
36 changed files with 3945 additions and 859 deletions

View File

@ -53,7 +53,10 @@ ObSubQueryIterator::ObSubQueryIterator(ObOperator &op)
iter_brs_(NULL),
batch_size_(0),
batch_row_pos_(0),
iter_end_(false)
iter_end_(false),
is_new_batch_(false),
current_group_(0),
das_batch_params_recovery_()
{
}
@ -82,16 +85,79 @@ int ObSubQueryIterator::rewind(const bool reset_onetime_plan /* = false */)
LOG_WARN("failed to rewind iterator", K(ret));
}
} else {
if (OB_FAIL(op_.rescan())) {
LOG_WARN("failed to do rescan", K(ret));
if (parent_->enable_left_das_batch()) {
if (OB_FAIL(alloc_das_batch_store())) {
LOG_WARN("Alloc DAS batch parameter store fail.", K(ret));
} else {
//We use GroupParamBackupGuard to save and resume data in param store.
//1.For SPF operator, we have multiple right child, every time rescan we
//need switch the param in param store, But all of param store index in
//the same array, So we switch all of the param store, but resume them
//after we rescan.
//2.For SPF operator, we nedd to support jump read. Difference child params
//may in the difference group id, current child rescan should not influence
//other child's param store stage.
//3.For nesting SPF with other SPF, parent and child SPF may in difference
//stage rescan or get_next_row, the expr may reuse, So child SPF rescan
//should not change the param store stage.
//So we implement the GroupParamBackupGuard to make Paramstore like a stack,
//protect every time change the Paramstore will be resume.
bool new_group = is_new_batch_;
if (OB_SUCC(ret) && is_new_batch_) {
GroupParamBackupGuard guard(eval_ctx_,
das_batch_params_recovery_,
parent_->get_spec().rescan_params_,
parent_->get_spec().rescan_params_.count());
ret = parent_->bind_das_batch_params_to_store();
if (OB_SUCC(ret) && OB_FAIL(op_.rescan())) {
if(OB_ITER_END == ret) {
ret = OB_ERR_UNEXPECTED;
}
LOG_WARN("failed to do rescan", K(ret));
}
current_group_ = 0;
is_new_batch_ = 0;
}
uint64_t parent_spf_group = 0;
if(OB_SUCC(ret)) {
parent_->get_current_group(parent_spf_group);
}
if (OB_SUCC(ret) && current_group_ < parent_spf_group) {
if (new_group) {
//If in lookup op, we need to call get next row to init all of iter.
op_.get_next_row();
}
int64_t old_jump_read_group_id;
old_jump_read_group_id = op_.get_exec_ctx().get_das_ctx().jump_read_group_id_;
op_.get_exec_ctx().get_das_ctx().jump_read_group_id_ = parent_spf_group;
if (OB_FAIL(op_.rescan())) {
if(OB_ITER_END == ret) {
ret = OB_ERR_UNEXPECTED;
}
LOG_WARN("Das jump read rescan fail.", K(ret));
}
op_.get_exec_ctx().get_das_ctx().jump_read_group_id_ = old_jump_read_group_id;
current_group_ = parent_spf_group;
}
}
} else {
//No batch branch
if (OB_SUCC(ret) && OB_FAIL(op_.rescan())) {
LOG_WARN("failed to do rescan", K(ret));
}
}
}
iter_end_ = false;
//for vectorize mode, SPF iter may have a stored batch to process
//should reset them in rewind()
iter_brs_ = NULL;
batch_size_ = 0;
batch_row_pos_ = 0;
if (OB_SUCC(ret)) {
iter_end_ = false;
//for vectorize mode, SPF iter may have a stored batch to process
//should reset them in rewind()
iter_brs_ = NULL;
batch_size_ = 0;
batch_row_pos_ = 0;
}
return ret;
}
@ -104,6 +170,9 @@ void ObSubQueryIterator::reuse()
batch_size_ = 0;
batch_row_pos_ = 0;
iter_end_ = false;
is_new_batch_ = false;
current_group_ = 0;
das_batch_params_recovery_.reset();
}
//TODO 移到对应的expr, 设置一个标记确保只计算一次
@ -248,6 +317,55 @@ int ObSubQueryIterator::reset_hash_map()
return ret;
}
int ObSubQueryIterator::alloc_das_batch_store()
{
int ret = OB_SUCCESS;
int64_t params_count = 0;
params_count = parent_->get_spec().rescan_params_.count();
if (!das_batch_params_recovery_.empty()) {
//Do nothing
OB_ASSERT(params_count == das_batch_params_recovery_.count());
} else {
ObIAllocator& alloc = op_.get_exec_ctx().get_allocator();
if (OB_FAIL(das_batch_params_recovery_.allocate_array(alloc, params_count))) {
LOG_WARN("Alloc das batch params fail." , K(ret));
}
}
return ret;
}
void GroupParamBackupGuard::save_das_batch_store()
{
//int64_t params_count = 0;
//params_count = parent_->get_spec().rescan_params_.count();
OB_ASSERT(!das_batch_params_recovery_.empty());
OB_ASSERT(das_batch_params_recovery_.count() == params_count_);
ObPhysicalPlanCtx *phy_ctx = eval_ctx_.exec_ctx_.get_physical_plan_ctx();
ParamStore &param_store = phy_ctx->get_param_store_for_update();
for (int64_t i = 0; i < params_count_; ++i) {
const ObDynamicParamSetter &rescan_param = rescan_params_.at(i);
//Always shallow copy for state save.
das_batch_params_recovery_.at(i) = param_store.at(rescan_param.param_idx_);
}
}
void GroupParamBackupGuard::resume_das_batch_store()
{
OB_ASSERT(!das_batch_params_recovery_.empty());
OB_ASSERT(das_batch_params_recovery_.count() == params_count_);
ObPhysicalPlanCtx *phy_ctx = eval_ctx_.exec_ctx_.get_physical_plan_ctx();
ParamStore &param_store = phy_ctx->get_param_store_for_update();
for (int64_t i = 0; i < params_count_; ++i) {
const ObDynamicParamSetter &rescan_param = rescan_params_.at(i);
//Always shallow copy for state resume.
param_store.at(rescan_param.param_idx_) = das_batch_params_recovery_.at(i);
ObExpr *dst = rescan_param.dst_;
ObDatum &param_datum = dst->locate_datum_for_write(eval_ctx_);
param_datum.from_obj(das_batch_params_recovery_.at(i), dst->obj_datum_map_);
}
}
ObSubPlanFilterSpec::ObSubPlanFilterSpec(ObIAllocator &alloc, const ObPhyOperatorType type)
: ObOpSpec(alloc, type),
rescan_params_(alloc),
@ -260,7 +378,9 @@ ObSubPlanFilterSpec::ObSubPlanFilterSpec(ObIAllocator &alloc, const ObPhyOperato
enable_px_batch_rescans_(alloc),
enable_das_batch_rescans_(false),
filter_exprs_(alloc),
output_exprs_(alloc)
output_exprs_(alloc),
left_rescan_params_(alloc),
right_rescan_params_(alloc)
{
}
@ -275,7 +395,9 @@ OB_SERIALIZE_MEMBER((ObSubPlanFilterSpec, ObOpSpec),
enable_px_batch_rescans_,
enable_das_batch_rescans_,
filter_exprs_,
output_exprs_);
output_exprs_,
left_rescan_params_,
right_rescan_params_);
DEF_TO_STRING(ObSubPlanFilterSpec)
{
@ -301,12 +423,15 @@ ObSubPlanFilterOp::ObSubPlanFilterOp(
update_set_mem_(NULL),
iter_end_(false),
enable_left_px_batch_(false),
enable_left_das_batch_(false),
max_group_size_(0),
current_group_(0),
das_batch_params_(),
left_rows_(),
left_rows_iter_(),
last_store_row_(),
save_last_row_(false),
is_left_end_(false),
left_row_idx_(0),
batch_rescan_ctl_(),
cur_params_(),
cur_param_idxs_(),
@ -366,23 +491,35 @@ int ObSubPlanFilterOp::rescan()
LOG_WARN("failed to inner rescan", K(ret));
}
if (OB_SUCC(ret) && enable_left_px_batch_) {
if (OB_SUCC(ret) &&
(MY_SPEC.enable_das_batch_rescans_ || enable_left_px_batch_)) {
left_rows_.reset();
left_rows_iter_.reset();
batch_rescan_ctl_.reuse();
is_left_end_ = false;
save_last_row_ = false;
last_store_row_.reset();
}
if (OB_SUCC(ret) && MY_SPEC.enable_das_batch_rescans_) {
//We do not need alloc memory again in rescan.
//das_batch_params_.reset();
current_group_ = 0;
}
if (OB_SUCC(ret) && enable_left_px_batch_) {
batch_rescan_ctl_.reuse();
cur_params_.reset();
cur_param_idxs_.reset();
cur_param_expr_idxs_.reset();
save_last_row_ = false;
last_store_row_.reset();
brs_holder_.reset();
}
for (int32_t i = 1; OB_SUCC(ret) && i < child_cnt_; ++i) {
if (OB_FAIL(children_[i]->rescan())) {
LOG_WARN("rescan child operator failed", K(ret),
"op", op_name(), "child", children_[i]->op_name());
if (!MY_SPEC.enable_das_batch_rescans_) {
for (int32_t i = 1; OB_SUCC(ret) && i < child_cnt_; ++i) {
if (OB_FAIL(children_[i]->rescan())) {
LOG_WARN("rescan child operator failed", K(ret),
"op", op_name(), "child", children_[i]->op_name());
}
}
}
for (int32_t i = 1; OB_SUCC(ret) && i < child_cnt_; ++i) {
@ -495,7 +632,6 @@ int ObSubPlanFilterOp::inner_open()
MY_SPEC.enable_px_batch_rescans_.at(i)) {
enable_left_px_batch_ = true;
}
enable_left_das_batch_ = MY_SPEC.enable_das_batch_rescans_;
if (!MY_SPEC.exec_param_idxs_inited_) {
//unittest or old version, do not init hashmap
} else if (OB_FAIL(iter->init_mem_entity())) {
@ -514,7 +650,19 @@ int ObSubPlanFilterOp::inner_open()
}
}
}
if (enable_left_px_batch_ && OB_ISNULL(last_store_row_mem_)) {
//BATCH SUBPLAN FILTER {
if (OB_SUCC(ret) && MY_SPEC.enable_das_batch_rescans_) {
max_group_size_ = OB_MAX_BULK_JOIN_ROWS;
if(OB_FAIL(alloc_das_batch_params(max_group_size_+MY_SPEC.max_batch_size_))) {
LOG_WARN("Fail to alloc das batch params.", K(ret));
}
}
//} BATCH SUBPLAN FILTER END
//left_rows used by px_batch and das batch.
if (OB_SUCC(ret) &&
(enable_left_px_batch_ || MY_SPEC.enable_das_batch_rescans_) &&
OB_ISNULL(last_store_row_mem_)) {
ObSQLSessionInfo *session = ctx_.get_my_session();
uint64_t tenant_id =session->get_effective_tenant_id();
lib::ContextParam param;
@ -545,6 +693,9 @@ int ObSubPlanFilterOp::inner_close()
{
destroy_subplan_iters();
destroy_update_set_mem();
if (MY_SPEC.enable_das_batch_rescans_) {
das_batch_params_.reset();
}
return OB_SUCCESS;
}
@ -575,15 +726,36 @@ int ObSubPlanFilterOp::handle_next_row()
OZ(prepare_onetime_exprs());
}
if (OB_FAIL(ret)) {
} else if (enable_left_px_batch_) {
} else if (enable_left_px_batch_ || MY_SPEC.enable_das_batch_rescans_) {
//DAS batch spf is conflict with PX batch spf
OB_ASSERT(!(enable_left_px_batch_ && MY_SPEC.enable_das_batch_rescans_));
bool has_row = false;
int batch_count = PX_RESCAN_BATCH_ROW_COUNT;
int batch_count = 0;
batch_count = MY_SPEC.enable_das_batch_rescans_ ? max_group_size_ : PX_RESCAN_BATCH_ROW_COUNT;
if (left_rows_iter_.is_valid() && left_rows_iter_.has_next()) {
batch_rescan_ctl_.cur_idx_++;
if(MY_SPEC.enable_das_batch_rescans_) {
//das batch branch
//Consume the remaining batch data in left store.
current_group_++;
} else {
OB_ASSERT(enable_left_px_batch_);
//px batch branch
batch_rescan_ctl_.cur_idx_++;
}
} else if (is_left_end_) {
ret = OB_ITER_END;
} else {
batch_rescan_ctl_.reuse();
//Accumulate a new batch into left store.
if(enable_left_px_batch_) {
batch_rescan_ctl_.reuse();
}
if (MY_SPEC.enable_das_batch_rescans_) {
current_group_ = 0;
//Always OB_SUCCESS in current implement.
if(OB_FAIL(init_das_batch_params())) {
LOG_WARN("Failed to init das batch params", K(ret));
}
}
left_rows_iter_.reset();
left_rows_.reset();
last_store_row_mem_->get_arena_allocator().reset();
@ -614,8 +786,10 @@ int ObSubPlanFilterOp::handle_next_row()
}
} else if (OB_FAIL(left_rows_.add_row(child_->get_spec().output_, &eval_ctx_))) {
LOG_WARN("fail to add row", K(ret));
} else if (OB_FAIL(prepare_rescan_params(true))) {
} else if (enable_left_px_batch_ && OB_FAIL(prepare_rescan_params(true))) {
LOG_WARN("fail to prepare rescan params", K(ret));
} else if (MY_SPEC.enable_das_batch_rescans_ && OB_FAIL(deep_copy_dynamic_obj())) {
LOG_WARN("fail to deep copy dynamic obj", K(ret));
} else {
has_row = true;
}
@ -633,15 +807,31 @@ int ObSubPlanFilterOp::handle_next_row()
ret = OB_SUCCESS;
OZ(left_rows_.finish_add_row(false));
OZ(left_rows_.begin(left_rows_iter_));
if (MY_SPEC.enable_das_batch_rescans_) {
//Lazy batch rescan right iterator.
//Just set the flag, do the rescan when call the iter->rewind().
for(int32_t i = 1; OB_SUCC(ret) && i < child_cnt_; ++i) {
Iterator *&iter = subplan_iters_.at(i - 1);
iter->set_new_batch(true);
}
}
}
}
//After accumulate a new batch or previous have remaining row.
if (OB_SUCC(ret)) {
clear_evaluated_flag();
// fetch datum from left_row_iter_ instead of child operator
if (OB_FAIL(left_rows_iter_.get_next_row(child_->get_spec().output_, eval_ctx_))) {
LOG_WARN("Failed to get next row", K(ret));
} else {
} else if (enable_left_px_batch_) {
//px batch spf branch
OZ(fill_cur_row_rescan_param());
} else {
//das batch spf branch
OB_ASSERT(MY_SPEC.enable_das_batch_rescans_);
if (OB_FAIL(fill_cur_row_das_batch_param(eval_ctx_, current_group_))) {
LOG_WARN("Filed to prepare das batch rescan params", K(ret));
}
}
}
} else if (FALSE_IT(clear_evaluated_flag())) {
@ -790,6 +980,144 @@ int ObSubPlanFilterOp::handle_next_batch_with_px_rescan(const int64_t op_max_bat
return ret;
}
int ObSubPlanFilterOp::handle_next_batch_with_group_rescan(const int64_t op_max_batch_size)
{
int ret = OB_SUCCESS;
const ObBatchRows *child_brs = NULL;
bool stop_fetch = false;
ObEvalCtx::BatchInfoScopeGuard guard(eval_ctx_);
uint64_t left_rows_total_cnt = 0;
if (left_rows_iter_.is_valid() && left_rows_iter_.has_next()) {
// fetch data from left store
} else {
// 1. material data from child into left_rows_
// 2. prepare batch rescan params
left_rows_.reset();
left_rows_iter_.reset();
(void) brs_holder_.restore();
current_group_ = 0;
if(OB_FAIL(init_das_batch_params())) {
LOG_WARN("Failed to init das batch params", K(ret));
}
while (OB_SUCC(ret) && continue_fetching(left_rows_total_cnt, stop_fetch, true)) {
set_param_null();
clear_evaluated_flag();
int64_t store_row_cnt = -1;
if (OB_FAIL(child_->get_next_batch(op_max_batch_size, child_brs))) {
LOG_WARN("fail to get next batch", K(ret));
} else if (OB_FAIL(left_rows_.add_batch(child_->get_spec().output_, eval_ctx_,
*child_brs->skip_, child_brs->size_, store_row_cnt))) {
LOG_WARN("fail to add expr datums to left_rows_", K(ret));
} else {
stop_fetch = child_brs->end_;
left_rows_total_cnt += store_row_cnt;
guard.set_batch_size(child_brs->size_);
clear_evaluated_flag();
// prepare group batch rescan parameter
for (int64_t l_idx = 0; OB_SUCC(ret) && l_idx < child_brs->size_; l_idx++) {
if (child_brs->skip_->exist(l_idx)) { continue; }
guard.set_batch_idx(l_idx);
if (OB_FAIL(deep_copy_dynamic_obj())) {
LOG_WARN("deep_copy_dynamic_obj", K(ret));
}
}
}
}
if (OB_SUCC(ret)) {
if (!child_brs->end_) {
// backup child datums into brs_holder_
OZ(brs_holder_.save(MY_SPEC.max_batch_size_));
}
if (OB_SUCC(ret)) {
if (OB_FAIL(left_rows_.finish_add_row(false))) {
LOG_WARN("prepare rescan params failed", K(ret));
} else if (OB_FAIL(left_rows_.begin(left_rows_iter_))) {
LOG_WARN("prepare rescan params failed", K(ret));
} else if (left_rows_total_cnt != left_rows_.get_row_cnt()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("left_rows row cnt is unexpectd", K(ret));
}
}
if (OB_SUCC(ret)) {
//Lazy batch rescan right iterator.
//Just set the flag, do the rescan when call the iter->rewind().
for(int32_t i = 1; OB_SUCC(ret) && i < child_cnt_; ++i) {
Iterator *&iter = subplan_iters_.at(i - 1);
iter->set_new_batch(true);
}
}
}
}
// fetch data from masterized left_rows(ChunkDatumStore) and do filtering
if (OB_SUCC(ret)) {
int64_t rows_fetched = 0;
clear_evaluated_flag();
if (OB_FAIL(left_rows_iter_.get_next_batch(child_->get_spec().output_,
eval_ctx_, op_max_batch_size, rows_fetched))) {
if (OB_ITER_END == ret) {
brs_.size_ = rows_fetched;
brs_.end_ = true;
iter_end_ = true;
OB_ASSERT(0 == brs_.size_);
OB_ASSERT(0 == left_rows_total_cnt);
ret = OB_SUCCESS;
} else {
LOG_WARN("left_rows_iter_.get_next_batch failed", K(ret));
}
} else {
// Note: rows are fetched from left_rows(ChunkDatumStore), so there is no
// skip row.
// Do not change brs_.skip_
brs_.size_ = rows_fetched;
left_rows_total_cnt -= rows_fetched; // debug only
guard.set_batch_size(brs_.size_);
for (int64_t l_idx = 0; OB_SUCC(ret) && l_idx < brs_.size_; l_idx++) {
guard.set_batch_idx(l_idx);
if (OB_FAIL(fill_cur_row_das_batch_param(eval_ctx_, current_group_))) {
LOG_WARN("fill_cur_row_das_batch_param failed", K(ret));
} else {
if (need_init_before_get_row_) {
for (int32_t i = 1; OB_SUCC(ret) && i < child_cnt_; ++i) {
Iterator *&iter = subplan_iters_.at(i - 1);
if (MY_SPEC.init_plan_idxs_.has_member(i)) {
OZ(iter->prepare_init_plan());
}
}
need_init_before_get_row_ = false;
}
}
if (OB_SUCC(ret)) {
bool filtered = false;
if (OB_FAIL(filter_row(eval_ctx_, MY_SPEC.filter_exprs_, filtered))) {
LOG_WARN("fail to filter row", K(ret));
} else if (filtered) {
brs_.skip_->set(l_idx);
} else {
ObDatum *datum = NULL;
FOREACH_CNT_X(e, spec_.output_, OB_SUCC(ret)) {
if (OB_FAIL((*e)->eval(eval_ctx_, datum))) {
LOG_WARN("expr evaluate failed", K(ret), K(*e));
}
}
}
}
current_group_++;
} // for end
LOG_DEBUG("show batch_rescan_ctl_ info ", K(batch_rescan_ctl_),
K(rows_fetched), K(left_rows_total_cnt));
}
}
FOREACH_CNT_X(e, spec_.output_, OB_SUCC(ret)) {
(*e)->get_eval_info(eval_ctx_).projected_ = true;
}
return ret;
}
int ObSubPlanFilterOp::inner_get_next_batch(const int64_t max_row_cnt)
{
int ret = OB_SUCCESS;
@ -799,7 +1127,13 @@ int ObSubPlanFilterOp::inner_get_next_batch(const int64_t max_row_cnt)
}
//从主表中获取一行数据
clear_evaluated_flag();
if (enable_left_px_batch_) {
if(OB_FAIL(ret)) {
LOG_WARN("prepare_onetime_expr fail.", K(ret));
} else if (MY_SPEC.enable_das_batch_rescans_) {
if (OB_FAIL(handle_next_batch_with_group_rescan(op_max_batch_size))) {
LOG_WARN("handle_next_batch_with_group_rescan failed", K(ret));
}
} else if (enable_left_px_batch_) {
if (OB_FAIL(handle_next_batch_with_px_rescan(op_max_batch_size))) {
LOG_WARN("handle_next_batch_with_px_rescan failed", K(ret));
}
@ -1031,5 +1365,127 @@ int ObSubPlanFilterOp::handle_update_set()
return ret;
}
int ObSubPlanFilterOp::alloc_das_batch_params(uint64_t group_size)
{
int ret = OB_SUCCESS;
if (das_batch_params_.empty()) {
ret = das_batch_params_.allocate_array(ctx_.get_allocator(),
MY_SPEC.rescan_params_.count());
if(OB_SUCC(ret)) {
uint64_t obj_buf_size = sizeof(ObObjParam) * group_size;
for (int64_t i = 0; OB_SUCC(ret) && i < das_batch_params_.count(); ++i) {
void *buf = ctx_.get_allocator().alloc(obj_buf_size);
if (NULL == buf) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate das params array buf failed", K(ret), K(i), K(obj_buf_size));
} else {
das_batch_params_.at(i).data_ = reinterpret_cast<ObObjParam*>(buf);
das_batch_params_.at(i).count_ = 0;
ObExpr *dst_expr = MY_SPEC.rescan_params_.at(i).dst_;
das_batch_params_.at(i).element_.set_meta_type(dst_expr->obj_meta_);
}
}
} else {
LOG_WARN("allocate das params failed", KR(ret), K(MY_SPEC.rescan_params_.count()));
}
}
return ret;
}
int ObSubPlanFilterOp::init_das_batch_params()
{
OB_ASSERT(!das_batch_params_.empty());
int ret = OB_SUCCESS;
for (int64_t i = 0; OB_SUCC(ret) && i < das_batch_params_.count(); ++i) {
das_batch_params_.at(i).count_ = 0;
}
return ret;
}
int ObSubPlanFilterOp::deep_copy_dynamic_obj()
{
int ret = OB_SUCCESS;
ObObjParam *param = NULL;
int64_t param_cnt = MY_SPEC.rescan_params_.count();
if (OB_ISNULL(last_store_row_mem_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("mem entity not init", K(ret));
}
for (int64_t i = 0; OB_SUCC(ret) && i < param_cnt; ++i) {
const ObDynamicParamSetter &rescan_param = MY_SPEC.rescan_params_.at(i);
if (OB_FAIL(rescan_param.set_dynamic_param(eval_ctx_, param))) {
LOG_WARN("fail to set dynamic param", K(ret));
} else if (OB_ISNULL(param)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("param is null", K(ret));
} else if (OB_FAIL(ob_write_obj(last_store_row_mem_->get_arena_allocator(),
*param,
das_batch_params_.at(i).data_[das_batch_params_.at(i).count_]))) {
LOG_WARN("deep copy dynamic param", KR(ret));
} else {
++das_batch_params_.at(i).count_;
}
}
return ret;
}
int ObSubPlanFilterOp::fill_cur_row_das_batch_param(ObEvalCtx& eval_ctx, uint64_t current_group) const
{
int ret = OB_SUCCESS;
ObPhysicalPlanCtx *plan_ctx = ctx_.get_physical_plan_ctx();
if (das_batch_params_.empty() || current_group >= das_batch_params_.at(0).count_) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("row idx is unexpected", K(ret),
K(current_group), K(das_batch_params_.at(0).count_));
} else {
int64_t param_cnt = das_batch_params_.count();
if (unlikely(MY_SPEC.rescan_params_.count() != param_cnt)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("das params count is invalid", KR(ret), K(param_cnt), K(das_batch_params_.count()));
}
for (int64_t i = 0; OB_SUCC(ret) && i < param_cnt; ++i) {
const ObDynamicParamSetter &rescan_param = MY_SPEC.rescan_params_.at(i);
int64_t param_idx = rescan_param.param_idx_;
ObExpr *dst = rescan_param.dst_;
ObDatum &param_datum = dst->locate_datum_for_write(eval_ctx);
const ObSqlArrayObj &arr = das_batch_params_.at(i);
if (OB_FAIL(param_datum.from_obj(arr.data_[current_group], dst->obj_datum_map_))) {
LOG_WARN("fail to cast datum", K(ret));
} else {
plan_ctx->get_param_store_for_update().at(param_idx) = arr.data_[current_group];
dst->set_evaluated_projected(eval_ctx);
}
}
}
return ret;
}
int ObSubPlanFilterOp::bind_das_batch_params_to_store() const
{
int ret = OB_SUCCESS;
int64_t param_cnt = MY_SPEC.rescan_params_.count();
ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(ctx_);
ParamStore &param_store = plan_ctx->get_param_store_for_update();
if (OB_UNLIKELY(param_cnt != das_batch_params_.count())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("das params count is invalid", KR(ret), K(param_cnt), K(das_batch_params_.count()));
}
for (int64_t i = 0; OB_SUCC(ret) && i < param_cnt; ++i) {
const ObDynamicParamSetter &rescan_param = MY_SPEC.rescan_params_.at(i);
int64_t param_idx = rescan_param.param_idx_;
int64_t array_obj_addr = reinterpret_cast<int64_t>(&das_batch_params_.at(i));
param_store.at(param_idx).set_extend(array_obj_addr, T_EXT_SQL_ARRAY);
}
if (OB_SUCC(ret)) {
LOG_DEBUG("bind das param to store", K(das_batch_params_), K(plan_ctx->get_param_store()));
}
return ret;
}
void ObSubPlanFilterOp::get_current_group(uint64_t& current_group) const
{
current_group = current_group_;
}
} // end namespace sql
} // end namespace oceanbase

View File

@ -93,6 +93,8 @@ public:
int get_next_batch(const int64_t max_row_cnt, const ObBatchRows *&batch_rows);
//for vectorized end
bool is_onetime_plan() const { return onetime_plan_; }
void set_new_batch(bool new_batch) { is_new_batch_ = new_batch;};
TO_STRING_KV(K(onetime_plan_), K(init_plan_), K(inited_));
//a row cache for hash optimizer to use
@ -102,6 +104,11 @@ public:
private:
// for das batch spf
int alloc_das_batch_store();
int save_das_batch_store();
int resume_das_batch_store();
// for das batch spf end
ObOperator &op_;
bool onetime_plan_;
bool init_plan_;
@ -124,6 +131,12 @@ private:
int64_t batch_row_pos_;
bool iter_end_;
// for vectorized end
// for das batch spf
bool is_new_batch_;
uint64_t current_group_;
common::ObArrayWrap<ObObjParam> das_batch_params_recovery_;
// for das batch spf end
};
class ObSubPlanFilterSpec : public ObOpSpec
@ -154,6 +167,8 @@ public:
bool enable_das_batch_rescans_;
ExprFixedArray filter_exprs_;
ExprFixedArray output_exprs_;
common::ObFixedArray<ObDynamicParamSetter, common::ObIAllocator> left_rescan_params_;
common::ObFixedArray<ObDynamicParamSetter, common::ObIAllocator> right_rescan_params_;
};
class ObSubPlanFilterOp : public ObOperator
@ -181,17 +196,28 @@ public:
}
int handle_next_row();
bool enable_px_batch_rescan() { return enable_left_px_batch_; }
bool enable_das_batch_rescan() { return enable_left_das_batch_; }
//for vectorized
int inner_get_next_batch(const int64_t max_row_cnt);
// for vectorized end
int init_left_cur_row(const int64_t column_cnt, ObExecContext &ctx);
int fill_cur_row_rescan_param();
//for DAS batch SPF
int fill_cur_row_das_batch_param(ObEvalCtx& eval_ctx, uint64_t current_group) const;
int bind_das_batch_params_to_store() const;
void get_current_group(uint64_t& current_group) const;
bool enable_left_das_batch() const {return MY_SPEC.enable_das_batch_rescans_;}
//for DAS batch SPF end
const ObSubPlanFilterSpec &get_spec() const
{ return static_cast<const ObSubPlanFilterSpec &>(spec_); }
public:
ObBatchRescanCtl &get_batch_rescan_ctl() { return batch_rescan_ctl_; }
static const int64_t PX_RESCAN_BATCH_ROW_COUNT = 8192;
int handle_next_batch_with_px_rescan(const int64_t op_max_batch_size);
int handle_next_batch_with_group_rescan(const int64_t op_max_batch_size);
private:
void set_param_null() { set_pushdown_param_null(MY_SPEC.rescan_params_); };
void destroy_subplan_iters();
@ -208,19 +234,31 @@ private:
int prepare_onetime_exprs();
int prepare_onetime_exprs_inner();
int handle_update_set();
bool continue_fetching(uint64_t left_rows_total_cnt, bool stop)
bool continue_fetching(uint64_t left_rows_total_cnt, bool stop, bool use_group = false)
{
return (!stop && (left_rows_total_cnt < PX_RESCAN_BATCH_ROW_COUNT));
return use_group?
(!stop && (left_rows_total_cnt < max_group_size_))
:(!stop && (left_rows_total_cnt < PX_RESCAN_BATCH_ROW_COUNT));
}
// for das batch spf
int alloc_das_batch_params(uint64_t group_size);
int init_das_batch_params();
int deep_copy_dynamic_obj();
// for das batch spf end
private:
common::ObSEArray<Iterator *, 16> subplan_iters_;
lib::MemoryContext update_set_mem_;
bool iter_end_;
// for px batch rescan
bool enable_left_px_batch_;
// for px batch rescan end
// for das batch rescan
bool enable_left_das_batch_;
uint64_t max_group_size_; //Das batch rescan size;
uint64_t current_group_; //The group id in this time right iter rescan;
common::ObArrayWrap<ObSqlArrayObj> das_batch_params_;
// for das batch rescan end
ObChunkDatumStore left_rows_;
ObChunkDatumStore::Iterator left_rows_iter_;
ObChunkDatumStore::ShadowStoredRow last_store_row_;
@ -237,6 +275,35 @@ private:
ObBatchResultHolder brs_holder_;
};
class GroupParamBackupGuard
{
public:
GroupParamBackupGuard(ObEvalCtx& eval_ctx,
common::ObArrayWrap<ObObjParam>& das_batch_params_recovery,
const common::ObFixedArray<ObDynamicParamSetter, common::ObIAllocator>& rescan_params,
int64_t params_count)
: eval_ctx_(eval_ctx),
das_batch_params_recovery_(das_batch_params_recovery),
rescan_params_(rescan_params),
params_count_(params_count)
{
save_das_batch_store();
}
~GroupParamBackupGuard()
{
resume_das_batch_store();
}
private:
void save_das_batch_store();
void resume_das_batch_store();
private:
ObEvalCtx& eval_ctx_;
common::ObArrayWrap<ObObjParam>& das_batch_params_recovery_;
const common::ObFixedArray<ObDynamicParamSetter, common::ObIAllocator>& rescan_params_;
int64_t params_count_;
};
} // end namespace sql
} // end namespace oceanbase