Fix ObMultipleMerge not refreshed during ObTableScanIterator::rescan
This commit is contained in:
@ -1239,6 +1239,7 @@ int ObMultipleMerge::refresh_tablet_iter()
|
|||||||
false/*allow_not_ready*/))) {
|
false/*allow_not_ready*/))) {
|
||||||
LOG_WARN("failed to refresh tablet iterator", K(ret), K_(get_table_param), KP_(access_param));
|
LOG_WARN("failed to refresh tablet iterator", K(ret), K_(get_table_param), KP_(access_param));
|
||||||
} else {
|
} else {
|
||||||
|
get_table_param_->refreshed_merge_ = this;
|
||||||
access_param_->iter_param_.rowkey_read_info_ =
|
access_param_->iter_param_.rowkey_read_info_ =
|
||||||
&(get_table_param_->tablet_iter_.get_tablet_handle().get_obj()->get_rowkey_read_info());
|
&(get_table_param_->tablet_iter_.get_tablet_handle().get_obj()->get_rowkey_read_info());
|
||||||
}
|
}
|
||||||
|
|||||||
@ -205,6 +205,8 @@ int ObTableScanIterator::rescan(ObTableScanParam &scan_param)
|
|||||||
LOG_WARN("Failed to rescan reuse", K(ret));
|
LOG_WARN("Failed to rescan reuse", K(ret));
|
||||||
} else if (OB_FAIL(table_scan_range_.init(*scan_param_))) {
|
} else if (OB_FAIL(table_scan_range_.init(*scan_param_))) {
|
||||||
STORAGE_LOG(WARN, "Failed to init table scan range", K(ret));
|
STORAGE_LOG(WARN, "Failed to init table scan range", K(ret));
|
||||||
|
} else if (OB_FAIL(rescan_for_iter())) {
|
||||||
|
STORAGE_LOG(WARN, "Failed to switch param for iter", K(ret), K(*this));
|
||||||
} else if (OB_FAIL(open_iter())) {
|
} else if (OB_FAIL(open_iter())) {
|
||||||
STORAGE_LOG(WARN, "fail to open iter", K(ret));
|
STORAGE_LOG(WARN, "fail to open iter", K(ret));
|
||||||
} else {
|
} else {
|
||||||
@ -281,6 +283,31 @@ int ObTableScanIterator::switch_param(ObTableScanParam &scan_param, const ObTabl
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObTableScanIterator::rescan_for_iter()
|
||||||
|
{
|
||||||
|
#define RESET_NOT_REFRESHED_ITER(refreshed_iter, iter) \
|
||||||
|
if (refreshed_iter != (void*)iter) { \
|
||||||
|
reset_scan_iter(iter); \
|
||||||
|
} \
|
||||||
|
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
if (OB_LIKELY(nullptr == get_table_param_.refreshed_merge_)) {
|
||||||
|
// do nothing
|
||||||
|
} else {
|
||||||
|
RESET_NOT_REFRESHED_ITER(get_table_param_.refreshed_merge_, single_merge_);
|
||||||
|
RESET_NOT_REFRESHED_ITER(get_table_param_.refreshed_merge_, get_merge_);
|
||||||
|
RESET_NOT_REFRESHED_ITER(get_table_param_.refreshed_merge_, scan_merge_);
|
||||||
|
RESET_NOT_REFRESHED_ITER(get_table_param_.refreshed_merge_, multi_scan_merge_);
|
||||||
|
RESET_NOT_REFRESHED_ITER(get_table_param_.refreshed_merge_, skip_scan_merge_);
|
||||||
|
RESET_NOT_REFRESHED_ITER(get_table_param_.refreshed_merge_, memtable_row_sample_iterator_);
|
||||||
|
RESET_NOT_REFRESHED_ITER(get_table_param_.refreshed_merge_, row_sample_iterator_);
|
||||||
|
RESET_NOT_REFRESHED_ITER(get_table_param_.refreshed_merge_, block_sample_iterator_);
|
||||||
|
get_table_param_.refreshed_merge_ = nullptr;
|
||||||
|
}
|
||||||
|
#undef RESET_NOT_REFRESHED_ITER
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
int ObTableScanIterator::switch_param_for_iter()
|
int ObTableScanIterator::switch_param_for_iter()
|
||||||
{
|
{
|
||||||
#define SWITCH_PARAM_FOR_ITER(iter, ret) \
|
#define SWITCH_PARAM_FOR_ITER(iter, ret) \
|
||||||
|
|||||||
@ -75,6 +75,7 @@ private:
|
|||||||
template<typename T> void reset_scan_iter(T *&iter);
|
template<typename T> void reset_scan_iter(T *&iter);
|
||||||
int switch_scan_param(ObMultipleMerge &iter);
|
int switch_scan_param(ObMultipleMerge &iter);
|
||||||
void reuse_row_iters();
|
void reuse_row_iters();
|
||||||
|
int rescan_for_iter();
|
||||||
int switch_param_for_iter();
|
int switch_param_for_iter();
|
||||||
int open_iter();
|
int open_iter();
|
||||||
|
|
||||||
|
|||||||
@ -103,10 +103,10 @@ private:
|
|||||||
ObTabletHandle *transfer_src_handle_;
|
ObTabletHandle *transfer_src_handle_;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct ObGetTableParam
|
struct ObGetTableParam final
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
ObGetTableParam() : frozen_version_(-1), sample_info_(), tablet_iter_() {}
|
ObGetTableParam() : frozen_version_(-1), sample_info_(), tablet_iter_(), refreshed_merge_(nullptr) {}
|
||||||
~ObGetTableParam() { reset(); }
|
~ObGetTableParam() { reset(); }
|
||||||
bool is_valid() const { return tablet_iter_.is_valid(); }
|
bool is_valid() const { return tablet_iter_.is_valid(); }
|
||||||
void reset()
|
void reset()
|
||||||
@ -114,12 +114,17 @@ public:
|
|||||||
frozen_version_ = -1;
|
frozen_version_ = -1;
|
||||||
sample_info_.reset();
|
sample_info_.reset();
|
||||||
tablet_iter_.reset();
|
tablet_iter_.reset();
|
||||||
|
refreshed_merge_ = nullptr;
|
||||||
}
|
}
|
||||||
TO_STRING_KV(K_(frozen_version), K_(sample_info), K_(tablet_iter));
|
TO_STRING_KV(K_(frozen_version), K_(sample_info), K_(tablet_iter));
|
||||||
public:
|
public:
|
||||||
int64_t frozen_version_;
|
int64_t frozen_version_;
|
||||||
common::SampleInfo sample_info_;
|
common::SampleInfo sample_info_;
|
||||||
ObTabletTableIterator tablet_iter_;
|
ObTabletTableIterator tablet_iter_;
|
||||||
|
|
||||||
|
// when tablet has been refreshed, to notify other ObMultipleMerge in ObTableScanIterator re-inited
|
||||||
|
// before rescan.
|
||||||
|
void *refreshed_merge_;
|
||||||
DISALLOW_COPY_AND_ASSIGN(ObGetTableParam);
|
DISALLOW_COPY_AND_ASSIGN(ObGetTableParam);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user