Fix the bug of lookup iter failing to switch partition correctly
This commit is contained in:
@ -593,11 +593,12 @@ int ObDASScanOp::reuse_iter()
|
|||||||
int &ret = errcode_;
|
int &ret = errcode_;
|
||||||
ObITabletScan &tsc_service = get_tsc_service();
|
ObITabletScan &tsc_service = get_tsc_service();
|
||||||
ObLocalIndexLookupOp *lookup_op = get_lookup_op();
|
ObLocalIndexLookupOp *lookup_op = get_lookup_op();
|
||||||
scan_param_.need_switch_param_ = need_switch_param();
|
const ObTabletID &storage_tablet_id = scan_param_.tablet_id_;
|
||||||
if (OB_FAIL(tsc_service.reuse_scan_iter(need_switch_param(), get_storage_scan_iter()))) {
|
scan_param_.need_switch_param_ = (storage_tablet_id.is_valid() && storage_tablet_id != tablet_id_ ? true : false);
|
||||||
|
if (OB_FAIL(tsc_service.reuse_scan_iter(scan_param_.need_switch_param_, get_storage_scan_iter()))) {
|
||||||
LOG_WARN("reuse scan iterator failed", K(ret));
|
LOG_WARN("reuse scan iterator failed", K(ret));
|
||||||
} else if (lookup_op != nullptr
|
} else if (lookup_op != nullptr
|
||||||
&& OB_FAIL(lookup_op->reset_lookup_state(need_switch_param()))) {
|
&& OB_FAIL(lookup_op->reset_lookup_state())) {
|
||||||
LOG_WARN("reuse lookup iterator failed", K(ret));
|
LOG_WARN("reuse lookup iterator failed", K(ret));
|
||||||
} else {
|
} else {
|
||||||
scan_param_.key_ranges_.reuse();
|
scan_param_.key_ranges_.reuse();
|
||||||
@ -933,6 +934,8 @@ int ObLocalIndexLookupOp::do_index_lookup()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
const ObTabletID &storage_tablet_id = scan_param_.tablet_id_;
|
||||||
|
scan_param_.need_switch_param_ = (storage_tablet_id.is_valid() && storage_tablet_id != tablet_id_ ? true : false);
|
||||||
scan_param_.tablet_id_ = tablet_id_;
|
scan_param_.tablet_id_ = tablet_id_;
|
||||||
scan_param_.ls_id_ = ls_id_;
|
scan_param_.ls_id_ = ls_id_;
|
||||||
if (OB_FAIL(reuse_iter())) {
|
if (OB_FAIL(reuse_iter())) {
|
||||||
@ -982,7 +985,7 @@ int ObLocalIndexLookupOp::process_next_index_batch_for_row()
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (need_next_index_batch()) {
|
if (need_next_index_batch()) {
|
||||||
reset_lookup_state(false);
|
reset_lookup_state();
|
||||||
index_end_ = false;
|
index_end_ = false;
|
||||||
state_ = INDEX_SCAN;
|
state_ = INDEX_SCAN;
|
||||||
} else {
|
} else {
|
||||||
@ -998,7 +1001,7 @@ int ObLocalIndexLookupOp::process_next_index_batch_for_rows(int64_t &count)
|
|||||||
if (OB_FAIL(check_lookup_row_cnt())) {
|
if (OB_FAIL(check_lookup_row_cnt())) {
|
||||||
LOG_WARN("check lookup row cnt failed", K(ret));
|
LOG_WARN("check lookup row cnt failed", K(ret));
|
||||||
} else if (need_next_index_batch()) {
|
} else if (need_next_index_batch()) {
|
||||||
reset_lookup_state(false);
|
reset_lookup_state();
|
||||||
index_end_ = false;
|
index_end_ = false;
|
||||||
state_ = INDEX_SCAN;
|
state_ = INDEX_SCAN;
|
||||||
ret = OB_SUCCESS;
|
ret = OB_SUCCESS;
|
||||||
@ -1163,7 +1166,7 @@ int ObLocalIndexLookupOp::reuse_iter()
|
|||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
int ObLocalIndexLookupOp::reset_lookup_state(bool need_switch_param)
|
int ObLocalIndexLookupOp::reset_lookup_state()
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
state_ = INDEX_SCAN;
|
state_ = INDEX_SCAN;
|
||||||
@ -1172,7 +1175,6 @@ int ObLocalIndexLookupOp::reset_lookup_state(bool need_switch_param)
|
|||||||
// Keep lookup_rtdef_->stmt_allocator_.alloc_ consistent with index_rtdef_->stmt_allocator_.alloc_
|
// Keep lookup_rtdef_->stmt_allocator_.alloc_ consistent with index_rtdef_->stmt_allocator_.alloc_
|
||||||
// to avoid memory expansion
|
// to avoid memory expansion
|
||||||
if (lookup_iter_ != nullptr) {
|
if (lookup_iter_ != nullptr) {
|
||||||
scan_param_.need_switch_param_ = need_switch_param;
|
|
||||||
scan_param_.key_ranges_.reuse();
|
scan_param_.key_ranges_.reuse();
|
||||||
scan_param_.ss_key_ranges_.reuse();
|
scan_param_.ss_key_ranges_.reuse();
|
||||||
}
|
}
|
||||||
|
|||||||
@ -329,7 +329,7 @@ public:
|
|||||||
void set_rowkey_iter(common::ObNewRowIterator *rowkey_iter) {rowkey_iter_ = rowkey_iter;}
|
void set_rowkey_iter(common::ObNewRowIterator *rowkey_iter) {rowkey_iter_ = rowkey_iter;}
|
||||||
common::ObNewRowIterator *get_rowkey_iter() { return rowkey_iter_; }
|
common::ObNewRowIterator *get_rowkey_iter() { return rowkey_iter_; }
|
||||||
int reuse_iter();
|
int reuse_iter();
|
||||||
virtual int reset_lookup_state(bool need_switch_param);
|
virtual int reset_lookup_state();
|
||||||
int revert_iter();
|
int revert_iter();
|
||||||
VIRTUAL_TO_STRING_KV(KPC_(lookup_ctdef),
|
VIRTUAL_TO_STRING_KV(KPC_(lookup_ctdef),
|
||||||
KPC_(lookup_rtdef),
|
KPC_(lookup_rtdef),
|
||||||
|
|||||||
@ -63,10 +63,10 @@ ObSpatialIndexLookupOp::~ObSpatialIndexLookupOp()
|
|||||||
sorter_.~ObExternalSort();
|
sorter_.~ObExternalSort();
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObSpatialIndexLookupOp::reset_lookup_state(bool need_switch_param)
|
int ObSpatialIndexLookupOp::reset_lookup_state()
|
||||||
{
|
{
|
||||||
is_inited_ = false;
|
is_inited_ = false;
|
||||||
return ObLocalIndexLookupOp::reset_lookup_state(need_switch_param);
|
return ObLocalIndexLookupOp::reset_lookup_state();
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObSpatialIndexLookupOp::filter_by_mbr(const ObObj &mbr_obj, bool &pass_through)
|
int ObSpatialIndexLookupOp::filter_by_mbr(const ObObj &mbr_obj, bool &pass_through)
|
||||||
@ -209,7 +209,7 @@ int ObSpatialIndexLookupOp::get_next_row()
|
|||||||
ret = OB_SUCCESS;
|
ret = OB_SUCCESS;
|
||||||
if (need_next_index_batch()) {
|
if (need_next_index_batch()) {
|
||||||
// reuse lookup_iter_ only
|
// reuse lookup_iter_ only
|
||||||
ObLocalIndexLookupOp::reset_lookup_state(false);
|
ObLocalIndexLookupOp::reset_lookup_state();
|
||||||
index_end_ = false;
|
index_end_ = false;
|
||||||
state_ = INDEX_SCAN;
|
state_ = INDEX_SCAN;
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@ -47,7 +47,7 @@ public:
|
|||||||
transaction::ObTxDesc *tx_desc,
|
transaction::ObTxDesc *tx_desc,
|
||||||
transaction::ObTxReadSnapshot *snapshot,
|
transaction::ObTxReadSnapshot *snapshot,
|
||||||
const ObMbrFilterArray *mbr_filters);
|
const ObMbrFilterArray *mbr_filters);
|
||||||
int reset_lookup_state(bool need_switch_param);
|
int reset_lookup_state();
|
||||||
int filter_by_mbr(const ObObj &mbr_obj, bool &pass_through);
|
int filter_by_mbr(const ObObj &mbr_obj, bool &pass_through);
|
||||||
int get_next_row();
|
int get_next_row();
|
||||||
private:
|
private:
|
||||||
|
|||||||
@ -183,8 +183,6 @@ public:
|
|||||||
void set_can_part_retry(const bool flag) { can_part_retry_ = flag; }
|
void set_can_part_retry(const bool flag) { can_part_retry_ = flag; }
|
||||||
bool can_part_retry() const { return can_part_retry_; }
|
bool can_part_retry() const { return can_part_retry_; }
|
||||||
bool is_in_retry() const { return in_part_retry_ || in_stmt_retry_; }
|
bool is_in_retry() const { return in_part_retry_ || in_stmt_retry_; }
|
||||||
void set_need_switch_param(bool v) { need_switch_param_ = v; }
|
|
||||||
bool need_switch_param() const { return need_switch_param_; }
|
|
||||||
void set_task_status(ObDasTaskStatus status);
|
void set_task_status(ObDasTaskStatus status);
|
||||||
ObDasTaskStatus get_task_status() const { return task_status_; };
|
ObDasTaskStatus get_task_status() const { return task_status_; };
|
||||||
const ObDasAggregatedTasks *get_agg_tasks() const { return agg_tasks_; };
|
const ObDasAggregatedTasks *get_agg_tasks() const { return agg_tasks_; };
|
||||||
@ -223,7 +221,7 @@ protected:
|
|||||||
uint16_t task_started_ : 1;
|
uint16_t task_started_ : 1;
|
||||||
uint16_t in_part_retry_ : 1;
|
uint16_t in_part_retry_ : 1;
|
||||||
uint16_t in_stmt_retry_ : 1;
|
uint16_t in_stmt_retry_ : 1;
|
||||||
uint16_t need_switch_param_ : 1; //need to switch param in gi table rescan
|
uint16_t need_switch_param_ : 1; //need to switch param in gi table rescan, this parameter has been deprecated
|
||||||
uint16_t status_reserved_ : 12;
|
uint16_t status_reserved_ : 12;
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
|
|||||||
@ -1517,10 +1517,6 @@ int ObTableScanOp::local_iter_rescan()
|
|||||||
for (; OB_SUCC(ret) && !task_iter.is_end(); ++task_iter) {
|
for (; OB_SUCC(ret) && !task_iter.is_end(); ++task_iter) {
|
||||||
ObDASScanOp *scan_op = DAS_SCAN_OP(*task_iter);
|
ObDASScanOp *scan_op = DAS_SCAN_OP(*task_iter);
|
||||||
if (MY_SPEC.gi_above_) {
|
if (MY_SPEC.gi_above_) {
|
||||||
ObTableScanParam &scan_param = scan_op->get_scan_param();
|
|
||||||
scan_op->set_tablet_id(MY_INPUT.tablet_loc_->tablet_id_);
|
|
||||||
scan_op->set_ls_id(MY_INPUT.tablet_loc_->ls_id_);
|
|
||||||
scan_op->set_tablet_loc(MY_INPUT.tablet_loc_);
|
|
||||||
if (!MY_SPEC.is_index_global_ && MY_CTDEF.lookup_ctdef_ != nullptr) {
|
if (!MY_SPEC.is_index_global_ && MY_CTDEF.lookup_ctdef_ != nullptr) {
|
||||||
//is local index lookup, need to set the lookup ctdef to the das scan op
|
//is local index lookup, need to set the lookup ctdef to the das scan op
|
||||||
ObDASTableLoc *lookup_table_loc = tsc_rtdef_.lookup_rtdef_->table_loc_;
|
ObDASTableLoc *lookup_table_loc = tsc_rtdef_.lookup_rtdef_->table_loc_;
|
||||||
@ -1563,7 +1559,11 @@ int ObTableScanOp::local_iter_reuse()
|
|||||||
ObDASScanOp *scan_op = DAS_SCAN_OP(*task_iter);
|
ObDASScanOp *scan_op = DAS_SCAN_OP(*task_iter);
|
||||||
bool need_switch_param = (scan_op->get_tablet_loc() != MY_INPUT.tablet_loc_ &&
|
bool need_switch_param = (scan_op->get_tablet_loc() != MY_INPUT.tablet_loc_ &&
|
||||||
MY_INPUT.tablet_loc_ != nullptr);
|
MY_INPUT.tablet_loc_ != nullptr);
|
||||||
scan_op->set_need_switch_param(need_switch_param);
|
if (MY_INPUT.tablet_loc_ != nullptr) {
|
||||||
|
scan_op->set_tablet_id(MY_INPUT.tablet_loc_->tablet_id_);
|
||||||
|
scan_op->set_ls_id(MY_INPUT.tablet_loc_->ls_id_);
|
||||||
|
scan_op->set_tablet_loc(MY_INPUT.tablet_loc_);
|
||||||
|
}
|
||||||
scan_op->reuse_iter();
|
scan_op->reuse_iter();
|
||||||
}
|
}
|
||||||
if (OB_FAIL(reuse_table_rescan_allocator())) {
|
if (OB_FAIL(reuse_table_rescan_allocator())) {
|
||||||
|
|||||||
Reference in New Issue
Block a user