Bugfix: px merge sort coordinator returns invalid rows

This commit is contained in:
obdev
2023-02-09 17:13:30 +00:00
committed by ob-robot
parent 65769df1a6
commit 1d14ac17a9
7 changed files with 44 additions and 4 deletions

View File

@ -519,7 +519,7 @@ protected:
// try open operator
int try_open() { return opened_ ? common::OB_SUCCESS : open(); }
void do_clear_datum_eval_flag();
virtual void do_clear_datum_eval_flag();
void clear_batch_end_flag() { brs_.end_ = false; }
inline void reset_batchrows()
{

View File

@ -330,7 +330,14 @@ int ObPxMSCoordOp::inner_get_next_row()
// 为了实现 orderly receive, TASKs-QC 通道需要逐个加入到 loop 中
int64_t timeout_us = 0;
int64_t nth_channel = OB_INVALID_INDEX_INT64;
clear_evaluated_flag();
// Note:
// ObPxMSCoordOp::inner_get_next_row is invoked in two pathes (batch vs
// non-batch). The eval flag should be cleared with seperated flags
// under each invoke path (batch vs non-batch). Therefore call the overriding
// API do_clear_datum_eval_flag() to replace clear_evaluated_flag
// TODO qubin.qb: Implement seperated ObPxMSCoordOp::inner_get_next_batch to
// isolate them
do_clear_datum_eval_flag();
clear_dynamic_const_parent_flag();
if (row_heap_.capacity() > 0) {
int64_t idx = row_heap_.writable_channel_idx();
@ -535,5 +542,6 @@ int ObPxMSCoordOp::next_row(ObReceiveRowReader &reader, bool &wait_next_msg)
return ret;
}
} // end namespace sql
} // end namespace oceanbase

View File

@ -176,6 +176,7 @@ private:
common::ObArenaAllocator alloc_;
};
} // end namespace sql
} // end namespace oceanbase

View File

@ -620,7 +620,14 @@ int ObPxMSReceiveOp::inner_get_next_row()
const ObChunkDatumStore::StoredRow *store_row = nullptr;
// (1) 向 heap 中添加一个或多个元素,直至 heap 满
while (OB_SUCC(ret) && row_heap_.capacity() > row_heap_.count()) {
clear_evaluated_flag();
// Note:
// inner_get_next_row is invoked in two pathes (batch vs
// non-batch). The eval flag should be cleared with seperated flags
// under each invoke path (batch vs non-batch). Therefore call the
// overriding API do_clear_datum_eval_flag() to replace
// clear_evaluated_flag
// TODO qubin.qb: Implement seperated inner_get_next_batch to isolate them
do_clear_datum_eval_flag();
clear_dynamic_const_parent_flag();
if (OB_FAIL(get_one_row_from_channels(phy_plan_ctx,
row_heap_.writable_channel_idx(),

View File

@ -139,7 +139,15 @@ int ObPxOrderedCoordOp::inner_get_next_row()
// 为了实现 orderly receive, TASKs-QC 通道需要逐个加入到 loop 中
int64_t timeout_us = 0;
int64_t nth_channel = OB_INVALID_INDEX_INT64;
clear_evaluated_flag();
// Note:
// inner_get_next_row is invoked in two pathes (batch vs
// non-batch). The eval flag should be cleared with seperated flags
// under each invoke path (batch vs non-batch). Therefore call the
// overriding API do_clear_datum_eval_flag() to replace
// clear_evaluated_flag
// TODO qubin.qb: Implement seperated inner_get_next_batch to
// isolate them
do_clear_datum_eval_flag();
clear_dynamic_const_parent_flag();
if (channel_idx_ < task_ch_set_.count()) {
int64_t idx = channel_idx_;

View File

@ -790,6 +790,21 @@ int ObPxReceiveOp::try_send_bloom_filter()
return ret;
}
// Routine "do_clear_datum_eval_flag" behave almost the same as
// "ObOperator::do_clear_datum_eval_flag" except explicitly clear
// projected flag under batch_result mode(vectorization)
void ObPxReceiveOp::do_clear_datum_eval_flag()
{
FOREACH_CNT(e, spec_.calc_exprs_) {
if ((*e)->is_batch_result()) {
(*e)->get_evaluated_flags(eval_ctx_).unset(eval_ctx_.get_batch_idx());
(*e)->get_eval_info(eval_ctx_).projected_ = 0;
} else {
(*e)->get_eval_info(eval_ctx_).clear_evaluated_flag();
}
}
}
int ObPxFifoReceiveOp::fetch_rows(const int64_t row_cnt)
{
int ret = OB_SUCCESS;

View File

@ -97,6 +97,7 @@ public:
virtual int inner_rescan() override;
virtual void destroy() override;
virtual int inner_close() override { return ObOperator::inner_close(); }
virtual void do_clear_datum_eval_flag() override;
ObPxTaskChSet &get_ch_set() { return task_ch_set_; };
virtual int try_link_channel() = 0;