fix: fix bug of multi level batch nlj lost data bug
This commit is contained in:
@ -197,9 +197,13 @@ int ObGroupJoinBufffer::has_next_left_row(bool &has_next)
|
||||
LOG_WARN("left row read and group idx do not match", KR(ret),
|
||||
K(left_store_read_), K(left_store_group_idx_.count()));
|
||||
} else if (above_group_idx_for_read_ == left_store_group_idx_.at(left_store_read_)) {
|
||||
// above_group_idx_for_read_ < left_store_group_idx_.at(left_store_read_) means the row of above_group_idx_ is end,
|
||||
// need to return iter_end, and rescan current NLJ operator
|
||||
// we are still reading results for the current rescan param, need to rescan right child
|
||||
has_next = true;
|
||||
}
|
||||
} else {
|
||||
LOG_TRACE("Left child operator has no left rows for read,and the join buffer has no left row, needs to return iter_end_");
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -345,6 +349,7 @@ int ObGroupJoinBufffer::drain_left()
|
||||
const ObChunkDatumStore::StoredRow *row = NULL;
|
||||
// drain old rows from left store
|
||||
for (int64_t i = left_store_read_; OB_SUCC(ret) && need_drain && i < left_store_group_idx_.count(); ++i) {
|
||||
// In addition to the lines of the current group (above_group_idx_for_read_), the buffer also caches rows of subsequent groups
|
||||
if (above_group_idx_for_read_ != left_store_group_idx_.at(i)) {
|
||||
need_drain = false;
|
||||
} else if (OB_FAIL(left_store_iter_.get_next_row(row))) {
|
||||
@ -354,6 +359,7 @@ int ObGroupJoinBufffer::drain_left()
|
||||
++left_store_read_;
|
||||
}
|
||||
}
|
||||
// Only rows of the current group are left in the buffer, and there are rows of the current group that have not been added to the buffer
|
||||
// discard unread rows from left op
|
||||
if (OB_SUCC(ret) && need_drain && !is_left_end_) {
|
||||
if (!spec_->is_vectorized()) {
|
||||
@ -584,6 +590,8 @@ int ObGroupJoinBufffer::batch_fill_group_buffer(const int64_t max_row_cnt,
|
||||
} else if (OB_FAIL(backup_above_params(left_params_backup, right_params_backup))) {
|
||||
LOG_WARN("backup above params failed", KR(ret));
|
||||
}
|
||||
|
||||
// fill group join buffer of current op untill join buffer is full
|
||||
if (OB_SUCC(ret)) {
|
||||
ObEvalCtx::BatchInfoScopeGuard batch_info_guard(*eval_ctx_);
|
||||
if (save_last_batch_) {
|
||||
@ -615,6 +623,7 @@ int ObGroupJoinBufffer::batch_fill_group_buffer(const int64_t max_row_cnt,
|
||||
}
|
||||
}
|
||||
}
|
||||
// rescan left op, switch to next iter of left child op
|
||||
if (OB_SUCC(ret) && batch_rows->end_) {
|
||||
is_left_end_ = true;
|
||||
if (is_multi_level_) {
|
||||
@ -628,6 +637,7 @@ int ObGroupJoinBufffer::batch_fill_group_buffer(const int64_t max_row_cnt,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
if (!rescan_params_->empty()) {
|
||||
op_->set_pushdown_param_null(*rescan_params_);
|
||||
@ -635,9 +645,11 @@ int ObGroupJoinBufffer::batch_fill_group_buffer(const int64_t max_row_cnt,
|
||||
if (batch_rows->size_ == 0 && batch_rows->end_) {
|
||||
// do nothing
|
||||
} else {
|
||||
// if buffer is full ,but the last batch rows of left op is not added, save them to last batch
|
||||
last_batch_.from_exprs(*eval_ctx_, batch_rows->skip_, spec_->max_batch_size_);
|
||||
save_last_batch_ = true;
|
||||
}
|
||||
|
||||
op_->clear_evaluated_flag();
|
||||
if (left_store_.get_row_cnt() <= 0) {
|
||||
// this could happen if we have skipped all rows
|
||||
|
@ -647,7 +647,7 @@ int ObHashJoinVecOp::process_left(bool &need_not_read_right)
|
||||
|
||||
// copy ObOperator::drain_exch
|
||||
// It's same as the base operator, but only need add sync to wait exit for shared hash join
|
||||
int ObHashJoinVecOp::drain_exch()
|
||||
int ObHashJoinVecOp::do_drain_exch()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
|
@ -315,7 +315,7 @@ private:
|
||||
public:
|
||||
virtual int inner_open() override;
|
||||
virtual int inner_rescan() override;
|
||||
virtual int drain_exch() override;
|
||||
virtual int do_drain_exch() override;
|
||||
virtual int inner_get_next_batch(const int64_t max_row_cnt) override;
|
||||
virtual int inner_get_next_row() { return common::OB_NOT_IMPLEMENT; };
|
||||
virtual void destroy() override;
|
||||
|
@ -187,6 +187,54 @@ int ObNestedLoopJoinOp::rescan()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObNestedLoopJoinOp::do_drain_exch_multi_lvel_bnlj()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(try_open())) {
|
||||
LOG_WARN("fail to open operator", K(ret));
|
||||
} else if (!exch_drained_) {
|
||||
// the drain request is triggered by current NLJ operator, and current NLJ is a multi level Batch NLJ
|
||||
// It will block rescan request for it's child operator, if the drain request is passed to it's child operator
|
||||
// The child operators will be marked as iter-end_, and will not get any row if rescan is blocked
|
||||
// So we block the drain request here; Only set current operator to end;
|
||||
int tmp_ret = inner_drain_exch();
|
||||
exch_drained_ = true;
|
||||
brs_.end_ = true;
|
||||
batch_reach_end_ = true;
|
||||
row_reach_end_ = true;
|
||||
if (OB_SUCC(ret)) {
|
||||
ret = tmp_ret;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObNestedLoopJoinOp::do_drain_exch()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (!MY_SPEC.group_rescan_) {
|
||||
if (OB_FAIL( ObOperator::do_drain_exch())) {
|
||||
LOG_WARN("failed to drain NLJ operator", K(ret));
|
||||
}
|
||||
} else if (!group_join_buffer_.is_multi_level()) {
|
||||
if (OB_FAIL( ObOperator::do_drain_exch())) {
|
||||
LOG_WARN("failed to drain NLJ operator", K(ret));
|
||||
}
|
||||
} else {
|
||||
if (!is_operator_end()) {
|
||||
// the drain request is triggered by parent operator
|
||||
// NLJ needs to pass the drain request to it's child operator
|
||||
LOG_TRACE("The drain request is passed by parent operator");
|
||||
if (OB_FAIL( ObOperator::do_drain_exch())) {
|
||||
LOG_WARN("failed to drain normal NLJ operator", K(ret));
|
||||
}
|
||||
} else if (OB_FAIL(do_drain_exch_multi_lvel_bnlj())) {
|
||||
LOG_WARN("failed to drain multi level NLJ operator", K(ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObNestedLoopJoinOp::inner_rescan()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
@ -94,6 +94,8 @@ public:
|
||||
virtual int switch_iterator() override;
|
||||
virtual int rescan() override;
|
||||
virtual int inner_rescan() override;
|
||||
|
||||
|
||||
virtual int inner_get_next_row() override;
|
||||
virtual void destroy() override
|
||||
{
|
||||
@ -121,7 +123,7 @@ public:
|
||||
ObBatchRescanCtl &get_batch_rescan_ctl() { return batch_rescan_ctl_; }
|
||||
int fill_cur_row_rescan_param();
|
||||
int calc_other_conds(bool &is_match);
|
||||
|
||||
int do_drain_exch_multi_lvel_bnlj();
|
||||
private:
|
||||
// state operation and transfer function type.
|
||||
typedef int (ObNestedLoopJoinOp::*state_operation_func_type)();
|
||||
@ -167,6 +169,8 @@ private:
|
||||
// for refactor vectorized end
|
||||
|
||||
bool continue_fetching() { return !(left_brs_->end_ || is_full());}
|
||||
virtual int do_drain_exch() override;
|
||||
virtual int inner_drain_exch() { return OB_SUCCESS; }
|
||||
public:
|
||||
ObJoinState state_;
|
||||
// for bnl join
|
||||
|
@ -478,11 +478,14 @@ public:
|
||||
bool &filtered);
|
||||
ObBatchRows &get_brs() { return brs_; }
|
||||
// Drain exchange in data for PX, or producer DFO will be blocked.
|
||||
virtual int drain_exch();
|
||||
int drain_exch();
|
||||
void set_pushdown_param_null(const common::ObIArray<ObDynamicParamSetter> &rescan_params);
|
||||
void set_feedback_node_idx(int64_t idx)
|
||||
{ fb_node_idx_ = idx; }
|
||||
|
||||
bool is_operator_end() { return batch_reach_end_ || row_reach_end_ ; }
|
||||
protected:
|
||||
virtual int do_drain_exch();
|
||||
int init_skip_vector();
|
||||
// Execute filter
|
||||
// Calc buffer does not reset internally, you need to reset it appropriately.
|
||||
@ -565,7 +568,6 @@ private:
|
||||
int output_expr_decint_datum_len_check();
|
||||
int output_expr_decint_datum_len_check_batch();
|
||||
int setup_op_feedback_info();
|
||||
int do_drain_exch();
|
||||
// child can implement this interface, but can't call this directly
|
||||
virtual int inner_drain_exch() { return common::OB_SUCCESS; };
|
||||
protected:
|
||||
|
Reference in New Issue
Block a user