fix bug in group join buffer

This commit is contained in:
obdev
2023-04-24 12:14:27 +00:00
committed by ob-robot
parent 43877d47d6
commit 349ecdf982
3 changed files with 182 additions and 243 deletions

View File

@ -168,8 +168,14 @@ int ObGroupJoinBufffer::init(ObOperator *op,
LOG_WARN("null memory entity", KR(ret));
} else if (OB_FAIL(left_store_.init(UINT64_MAX, tenant_id, ObCtxIds::WORK_AREA))) {
LOG_WARN("init row store failed", KR(ret));
} else {
left_store_.set_allocator(mem_context_->get_malloc_allocator());
} else if (FALSE_IT(left_store_.set_allocator(mem_context_->get_malloc_allocator()))) {
}
if (OB_SUCC(ret) && op_->is_vectorized()) {
if (OB_FAIL(last_batch_.init(&left_->get_spec().output_,
&ctx_->get_allocator(),
spec_->max_batch_size_))) {
LOG_WARN("init batch failed", KR(ret));
}
}
}
if (OB_SUCC(ret)) {
@ -291,6 +297,42 @@ int ObGroupJoinBufffer::fill_cur_row_group_param()
return ret;
}
int ObGroupJoinBufffer::get_next_left_iter()
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_multi_level_ || ((above_group_idx_for_expand_ + 1) >= above_group_size_))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("left op does not have another iterator", KR(ret));
} else {
// for multi level group rescan, left_ may output more than 1 iterators,
// and we need to call left_->rescan() to switch to next iterator
above_group_idx_for_expand_++;
ObPhysicalPlanCtx *plan_ctx = ctx_->get_physical_plan_ctx();
for (int64_t i = 0; OB_SUCC(ret) && i < above_left_group_params_.count(); ++i) {
ObSqlArrayObj *array_obj = above_left_group_params_.at(i);
if (NULL == array_obj) {
// skip
} else {
const ObDynamicParamSetter &rescan_param = left_rescan_params_->at(i);
int64_t param_idx = rescan_param.param_idx_;
ObExpr *dst = rescan_param.dst_;
ObDatum &param_datum = dst->locate_datum_for_write(*eval_ctx_);
if (OB_FAIL(param_datum.from_obj(array_obj->data_[above_group_idx_for_expand_], dst->obj_datum_map_))) {
LOG_WARN("cast datum failed", KR(ret));
} else {
plan_ctx->get_param_store_for_update().at(param_idx) = array_obj->data_[above_group_idx_for_expand_];
dst->set_evaluated_projected(*eval_ctx_);
}
}
}
if (OB_SUCC(ret) && OB_FAIL(left_->rescan())) {
ret = (OB_ITER_END == ret) ? OB_ERR_UNEXPECTED : ret;
LOG_WARN("rescan left failed", KR(ret));
}
}
return ret;
}
int ObGroupJoinBufffer::rescan_left()
{
int ret = OB_SUCCESS;
@ -326,6 +368,7 @@ int ObGroupJoinBufffer::rescan_left()
above_group_idx_for_read_ = 0;
}
} else {
is_left_end_ = false;
above_group_idx_for_read_++;
}
return ret;
@ -361,19 +404,15 @@ int ObGroupJoinBufffer::rescan_right()
int ObGroupJoinBufffer::fill_group_buffer()
{
int ret = OB_SUCCESS;
if (!need_fill_group_buffer()) {
// do nothing
} else if (OB_FAIL(init_group_params())) {
LOG_WARN("init group params failed", KR(ret));
} else if (is_left_end_) {
// we have fetched all rows from left
ret = OB_ITER_END;
} else {
common::ObSEArray<ObObjParam, 1> left_params_backup;
common::ObSEArray<ObObjParam, 1> right_params_backup;
if (!is_left_end_ && need_fill_group_buffer()) {
if (OB_FAIL(backup_above_params(left_params_backup, right_params_backup))) {
LOG_WARN("backup above params failed", KR(ret));
} else {
} else if (OB_FAIL(init_group_params())) {
LOG_WARN("init group params failed", KR(ret));
}
if (OB_SUCC(ret)) {
if (save_last_row_) {
if (OB_ISNULL(last_row_.get_store_row())) {
ret = OB_NOT_INIT;
@ -383,6 +422,7 @@ int ObGroupJoinBufffer::fill_group_buffer()
LOG_WARN("restore last row failed", KR(ret));
}
}
}
if (OB_SUCC(ret)) {
reset_buffer_state();
if (OB_FAIL(last_row_.init(
@ -396,11 +436,17 @@ int ObGroupJoinBufffer::fill_group_buffer()
if (!rescan_params_->empty()) {
op_->set_pushdown_param_null(*rescan_params_);
}
if (OB_FAIL(get_next_left_row())) {
if (OB_FAIL(left_->get_next_row())) {
if (OB_ITER_END != ret) {
LOG_WARN("get next left row failed", KR(ret));
} else {
is_left_end_ = true;
if (is_multi_level_ && ((above_group_idx_for_expand_ + 1) < above_group_size_)) {
ret = OB_SUCCESS;
if (OB_FAIL(get_next_left_iter())) {
LOG_WARN("get next iter failed", KR(ret));
}
}
}
} else if (OB_FAIL(add_row_to_store())) {
LOG_WARN("add row to store failed", KR(ret));
@ -432,9 +478,7 @@ int ObGroupJoinBufffer::fill_group_buffer()
} else if (OB_FAIL(bind_group_params_to_store())) {
LOG_WARN("bind group params to store failed", KR(ret));
} else if (OB_FAIL(rescan_right())) {
if (OB_ITER_END == ret) {
ret = OB_ERR_UNEXPECTED;
}
ret = (OB_ITER_END == ret) ? OB_ERR_UNEXPECTED : ret;
LOG_WARN("rescan right failed", KR(ret));
} else {
skip_rescan_right_ = true;
@ -449,7 +493,6 @@ int ObGroupJoinBufffer::fill_group_buffer()
ret = save_ret;
}
}
}
return ret;
}
@ -457,49 +500,34 @@ int ObGroupJoinBufffer::batch_fill_group_buffer(const int64_t max_row_cnt,
const ObBatchRows *&batch_rows)
{
int ret = OB_SUCCESS;
if (!need_fill_group_buffer()) {
// do nothing
} else if (OB_FAIL(init_group_params())) {
common::ObSEArray<ObObjParam, 1> left_params_backup;
common::ObSEArray<ObObjParam, 1> right_params_backup;
if (!is_left_end_ && need_fill_group_buffer()) {
if (OB_FAIL(init_group_params())) {
LOG_WARN("init group params failed", KR(ret));
} else if (is_left_end_) {
ret = OB_ITER_END;
} else {
bool ignore_end = false;
ObEvalCtx::BatchInfoScopeGuard batch_info_guard(*eval_ctx_);
if (save_last_batch_) {
if (!last_batch_.is_inited()) {
ret = OB_NOT_INIT;
LOG_WARN("last batch is not inited", KR(ret),
K(save_last_batch_), K(last_batch_.is_inited()));
} else {
last_batch_.to_exprs(*eval_ctx_);
save_last_batch_ = false;
}
} else if (OB_FAIL(backup_above_params(left_params_backup, right_params_backup))) {
LOG_WARN("backup above params failed", KR(ret));
}
if (OB_SUCC(ret)) {
reset_buffer_state();
if (OB_FAIL(last_batch_.init(&left_->get_spec().output_,
&mem_context_->get_arena_allocator(),
spec_->max_batch_size_))) {
LOG_WARN("init batch failed", KR(ret));
}
}
while (OB_SUCC(ret) && !is_full()) {
op_->clear_evaluated_flag();
ObEvalCtx::BatchInfoScopeGuard batch_info_guard(*eval_ctx_);
if (save_last_batch_) {
last_batch_.to_exprs(*eval_ctx_);
save_last_batch_ = false;
}
batch_rows = &left_->get_brs();
reset_buffer_state();
while (OB_SUCC(ret) && !is_full() && !batch_rows->end_) {
op_->clear_evaluated_flag();
if (!rescan_params_->empty()) {
op_->set_pushdown_param_null(*rescan_params_);
}
if (OB_FAIL(get_next_left_batch(max_row_cnt, batch_rows))) {
if (OB_ITER_END != ret) {
LOG_WARN("get next left batch failed", KR(ret));
if (OB_FAIL(left_->get_next_batch(max_row_cnt, batch_rows))) {
LOG_WARN("get next batch from left failed", KR(ret));
}
} else {
for (int64_t l_idx = 0; OB_SUCC(ret) && l_idx < batch_rows->size_; l_idx++) {
if (batch_rows->skip_->exist(l_idx)) { continue; }
if (batch_rows->skip_->exist(l_idx)) {
// do nothing
} else {
batch_info_guard.set_batch_idx(l_idx);
batch_info_guard.set_batch_size(batch_rows->size_);
if (OB_FAIL(add_row_to_store())) {
@ -510,7 +538,18 @@ int ObGroupJoinBufffer::batch_fill_group_buffer(const int64_t max_row_cnt,
LOG_WARN("deep copy dynamic obj failed", KR(ret));
}
}
ignore_end = true;
}
if (OB_SUCC(ret) && batch_rows->end_) {
is_left_end_ = true;
if (is_multi_level_) {
if ((above_group_idx_for_expand_ + 1) >= above_group_size_) {
// wait for parent op to fill next group params
} else if (FALSE_IT(const_cast<ObBatchRows *&>(batch_rows)->end_ = false)) {
} else if (OB_FAIL(get_next_left_iter())) {
LOG_WARN("get next iter failed", KR(ret));
}
}
}
}
}
if (OB_SUCC(ret)) {
@ -520,13 +559,10 @@ int ObGroupJoinBufffer::batch_fill_group_buffer(const int64_t max_row_cnt,
if (batch_rows->size_ == 0 && batch_rows->end_) {
// do nothing
} else {
last_batch_.from_exprs(*eval_ctx_, batch_rows->skip_, batch_rows->size_);
last_batch_.from_exprs(*eval_ctx_, batch_rows->skip_, spec_->max_batch_size_);
save_last_batch_ = true;
}
op_->clear_evaluated_flag();
}
if (OB_SUCC(ret) || (ignore_end && OB_ITER_END == ret)) {
ret = OB_SUCCESS;
if (left_store_.get_row_cnt() <= 0) {
// this could happen if we have skipped all rows
ret = OB_ITER_END;
@ -537,14 +573,20 @@ int ObGroupJoinBufffer::batch_fill_group_buffer(const int64_t max_row_cnt,
} else if (OB_FAIL(bind_group_params_to_store())) {
LOG_WARN("bind group params to store failed", KR(ret));
} else if (OB_FAIL(rescan_right())) {
if (OB_ITER_END == ret) {
ret = OB_ERR_UNEXPECTED;
}
ret = (OB_ITER_END == ret) ? OB_ERR_UNEXPECTED : ret;
LOG_WARN("rescan right failed", KR(ret));
} else {
skip_rescan_right_ = true;
}
}
int save_ret = ret;
ret = OB_SUCCESS;
if (OB_FAIL(restore_above_params(left_params_backup,
right_params_backup))) {
LOG_WARN("restore above params failed", KR(ret), KR(save_ret));
} else {
ret = save_ret;
}
}
return ret;
}
@ -644,6 +686,7 @@ void ObGroupJoinBufffer::destroy()
above_left_group_params_.destroy();
above_right_group_params_.destroy();
last_row_.reset();
last_batch_.reset();
if (NULL != mem_context_) {
DESTROY_CONTEXT(mem_context_);
mem_context_ = NULL;
@ -793,109 +836,6 @@ int ObGroupJoinBufffer::prepare_rescan_params()
return ret;
}
int ObGroupJoinBufffer::get_next_left_row()
{
int ret = OB_SUCCESS;
bool got_row = false;
while (OB_SUCC(ret) && !got_row) {
op_->clear_evaluated_flag();
if (OB_FAIL(left_->get_next_row())) {
if (OB_ITER_END != ret) {
LOG_WARN("get next row from left failed", KR(ret));
} else if (is_multi_level_) {
// for multi level group rescan, left_ may output more than 1 iterators,
// and we need to call left_->rescan() to switch to next iterator
ret = OB_SUCCESS;
if ((above_group_idx_for_expand_ + 1) >= above_group_size_) {
// wait for parent op to fill next group params
ret = OB_ITER_END;
} else {
above_group_idx_for_expand_++;
ObPhysicalPlanCtx *plan_ctx = ctx_->get_physical_plan_ctx();
for (int64_t i = 0; OB_SUCC(ret) && i < above_left_group_params_.count(); i++) {
ObSqlArrayObj *array_obj = above_left_group_params_.at(i);
if (NULL == array_obj) {
// skip
} else {
const ObDynamicParamSetter &rescan_param = left_rescan_params_->at(i);
int64_t param_idx = rescan_param.param_idx_;
ObExpr *dst = rescan_param.dst_;
ObDatum &param_datum = dst->locate_datum_for_write(*eval_ctx_);
if (OB_FAIL(param_datum.from_obj(array_obj->data_[above_group_idx_for_expand_],
dst->obj_datum_map_))) {
LOG_WARN("cast datum failed", KR(ret));
} else {
plan_ctx->get_param_store_for_update().at(param_idx) =
array_obj->data_[above_group_idx_for_expand_];
dst->set_evaluated_projected(*eval_ctx_);
}
}
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(left_->rescan())) {
if (OB_ITER_END != ret) {
LOG_WARN("rescan left failed", KR(ret));
}
}
}
}
} else {
got_row = true;
}
}
return ret;
}
int ObGroupJoinBufffer::get_next_left_batch(const int64_t max_row_cnt, const ObBatchRows *&batch_rows)
{
int ret = OB_SUCCESS;
bool got_row = false;
if (OB_FAIL(left_->get_next_batch(max_row_cnt, batch_rows))) {
LOG_WARN("get next batch from left failed", KR(ret));
} else if (batch_rows->end_) {
if (is_multi_level_) {
// for multi level group rescan, left_ may output more than 1 iterators,
// and we need to call left_->rescan() to switch to next iterator
if ((above_group_idx_for_expand_ + 1) >= above_group_size_) {
// wait for parent op to fill next group params
ret = OB_ITER_END;
} else {
above_group_idx_for_expand_++;
ObPhysicalPlanCtx *plan_ctx = ctx_->get_physical_plan_ctx();
for (int64_t i = 0; OB_SUCC(ret) && i < above_left_group_params_.count(); i++) {
ObSqlArrayObj *array_obj = above_left_group_params_.at(i);
if (NULL == array_obj) {
// skip
} else {
const ObDynamicParamSetter &rescan_param = left_rescan_params_->at(i);
int64_t param_idx = rescan_param.param_idx_;
ObExpr *dst = rescan_param.dst_;
ObDatum &param_datum = dst->locate_datum_for_write(*eval_ctx_);
if (OB_FAIL(param_datum.from_obj(array_obj->data_[above_group_idx_for_expand_], dst->obj_datum_map_))) {
LOG_WARN("cast datum failed", KR(ret));
} else {
plan_ctx->get_param_store_for_update().at(param_idx) =
array_obj->data_[above_group_idx_for_expand_];
dst->set_evaluated_projected(*eval_ctx_);
}
}
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(left_->rescan())) {
if (OB_ITER_END != ret) {
LOG_WARN("rescan left failed", KR(ret));
}
}
}
} else {
ret = OB_ITER_END;
}
}
return ret;
}
int ObGroupJoinBufffer::add_row_to_store()
{
int ret = OB_SUCCESS;
@ -967,7 +907,7 @@ void ObGroupJoinBufffer::reset_buffer_state()
left_store_read_ = 0;
left_store_group_idx_.reuse();
last_row_.reset();
last_batch_.reset();
last_batch_.clear_saved_size();
save_last_row_ = false;
mem_context_->get_arena_allocator().reset();
}

View File

@ -35,6 +35,7 @@ struct ObBatchRowDatums
saved_size_ = 0;
is_inited_ = false;
}
void clear_saved_size() { saved_size_ = 0; }
void from_exprs(ObEvalCtx &ctx, ObBitVector *skip, int64_t size);
void extend_save(ObEvalCtx &ctx, int64_t size);
void to_exprs(ObEvalCtx &ctx);
@ -43,9 +44,11 @@ struct ObBatchRowDatums
{
return datums_[col_id * batch_size_ + row_id];
}
ObBitVector *get_skip() { return skip_; }
int32_t get_size() { return size_; }
bool is_inited() const { return is_inited_; }
public:
private:
common::ObIAllocator *alloc_;
const ObExprPtrIArray *exprs_;
int32_t batch_size_;
@ -53,7 +56,6 @@ public:
ObBitVector *skip_;
int32_t size_;
int32_t saved_size_; // record the saved size, include extend saved size
private:
bool is_inited_;
};
@ -88,8 +90,7 @@ private:
int deep_copy_dynamic_obj();
int bind_group_params_to_store();
int prepare_rescan_params();
int get_next_left_row();
int get_next_left_batch(const int64_t max_row_cnt, const ObBatchRows *&batch_rows);
int get_next_left_iter();
int add_row_to_store();
int build_above_group_params(const common::ObIArray<ObDynamicParamSetter> &above_rescan_params,
common::ObIArray<ObSqlArrayObj *> &above_group_params,
@ -111,7 +112,7 @@ private:
const common::ObIArray<ObDynamicParamSetter> *rescan_params_;
const common::ObIArray<ObDynamicParamSetter> *left_rescan_params_;
const common::ObIArray<ObDynamicParamSetter> *right_rescan_params_;
lib::MemoryContext mem_context_;
lib::MemoryContext mem_context_; // for dynamic param copying, will reset after each group rescan
// buffer for rows read from left child
ObChunkDatumStore left_store_;
ObChunkDatumStore::Iterator left_store_iter_;

View File

@ -236,8 +236,8 @@ void ObNestedLoopJoinOp::reset_buf_state()
save_last_batch_ = false;
need_switch_iter_ = false;
iter_end_ = false;
left_batch_.saved_size_ = 0;
last_save_batch_.saved_size_ = 0;
left_batch_.clear_saved_size();
last_save_batch_.clear_saved_size();
match_left_batch_end_ = false;
match_right_batch_end_ = false;
l_idx_ = 0;
@ -509,9 +509,7 @@ int ObNestedLoopJoinOp::group_read_left_operate()
// das group rescan
bool has_next = false;
if (OB_FAIL(group_join_buffer_.fill_group_buffer())) {
if (OB_ITER_END != ret) {
LOG_WARN("fill group buffer failed", KR(ret));
}
} else if (OB_FAIL(group_join_buffer_.has_next_left_row(has_next))) {
LOG_WARN("check has next failed", KR(ret));
} else if (has_next) {
@ -919,18 +917,18 @@ int ObNestedLoopJoinOp::output()
if (IS_LEFT_SEMI_ANTI_JOIN(MY_SPEC.join_type_)) {
reset_batchrows();
if (LEFT_SEMI_JOIN == MY_SPEC.join_type_) {
brs_.skip_->bit_calculate(*left_batch_.skip_, *left_matched_, left_batch_.size_,
brs_.skip_->bit_calculate(*left_batch_.get_skip(), *left_matched_, left_batch_.get_size(),
[](const uint64_t l, const uint64_t r) { return (l | (~r)); });
} else if (LEFT_ANTI_JOIN == MY_SPEC.join_type_) {
brs_.skip_->bit_calculate(*left_batch_.skip_, *left_matched_, left_batch_.size_,
brs_.skip_->bit_calculate(*left_batch_.get_skip(), *left_matched_, left_batch_.get_size(),
[](const uint64_t l, const uint64_t r) { return (l | r); });
}
if (MY_SPEC.enable_px_batch_rescan_) {
last_save_batch_.extend_save(eval_ctx_, left_batch_.size_);
last_save_batch_.extend_save(eval_ctx_, left_batch_.get_size());
}
left_batch_.to_exprs(eval_ctx_);
brs_.size_ = left_batch_.size_;
left_matched_->reset(left_batch_.size_);
brs_.size_ = left_batch_.get_size();
left_matched_->reset(left_batch_.get_size());
} else {
// do nothing.
}