[FEAT MERGE] perf opt and support cs compaction to row

Co-authored-by: z404289981 <z404289981@163.com>
Co-authored-by: XIAO-HOU <372060054@qq.com>
Co-authored-by: wudidapaopao <664920313@qq.com>
This commit is contained in:
haitaoyang
2024-04-07 10:35:21 +00:00
committed by ob-robot
parent ef5a40009a
commit 44fc2f09f5
145 changed files with 5177 additions and 1816 deletions

View File

@ -43,6 +43,7 @@ constexpr const char ObTableScanIterator::LABEL[];
ObTableScanIterator::ObTableScanIterator()
: ObNewRowIterator(ObNewRowIterator::ObTableScanIterator),
is_inited_(false),
current_iter_type_(T_INVALID_ITER_TYPE),
single_merge_(NULL),
get_merge_(NULL),
scan_merge_(NULL),
@ -59,7 +60,9 @@ ObTableScanIterator::ObTableScanIterator()
scan_param_(NULL),
table_scan_range_(),
main_iter_(NULL),
sample_ranges_()
sample_ranges_(),
cached_iter_node_(NULL),
cached_iter_(NULL)
{
}
@ -77,9 +80,12 @@ void ObTableScanIterator::reset()
reset_scan_iter(multi_scan_merge_);
reset_scan_iter(skip_scan_merge_);
reset_scan_iter(memtable_row_sample_iterator_);
reset_scan_iter(row_sample_iterator_);
reset_scan_iter(block_sample_iterator_);
// reset_scan_iter(i_sample_iter_);
if (nullptr != cached_iter_node_) {
ObGlobalIteratorPool *iter_pool = MTL(ObGlobalIteratorPool*);
iter_pool->release(cached_iter_node_);
}
main_table_param_.reset();
main_table_ctx_.reset();
@ -90,14 +96,18 @@ void ObTableScanIterator::reset()
table_scan_range_.reset();
main_iter_ = NULL;
sample_ranges_.reset();
cached_iter_ = NULL;
current_iter_type_ = T_INVALID_ITER_TYPE;
is_inited_ = false;
}
template<typename T>
void ObTableScanIterator::reset_scan_iter(T *&iter)
{
if (NULL != iter) {
iter->~T();
if (nullptr != iter) {
if (nullptr == cached_iter_node_) {
iter->~T();
}
iter = NULL;
}
}
@ -115,7 +125,6 @@ void ObTableScanIterator::reuse_row_iters()
REUSE_SCAN_ITER(multi_scan_merge_);
REUSE_SCAN_ITER(skip_scan_merge_);
REUSE_SCAN_ITER(memtable_row_sample_iterator_);
REUSE_SCAN_ITER(row_sample_iterator_);
REUSE_SCAN_ITER(block_sample_iterator_);
// REUSE_SCAN_ITER(i_sample_iter_);
@ -130,10 +139,68 @@ int ObTableScanIterator::prepare_table_param(const ObTabletHandle &tablet_handle
STORAGE_LOG(WARN, "Invalid argument", K(ret), KP(scan_param_));
} else if (OB_FAIL(main_table_param_.init(*scan_param_, tablet_handle))) {
STORAGE_LOG(WARN, "failed to init main table param", K(ret));
} else if (nullptr != cached_iter_node_) {
main_table_param_.set_use_global_iter_pool();
main_table_param_.iter_param_.set_use_stmt_iter_pool();
STORAGE_LOG(TRACE, "use global iter pool", K(main_table_param_));
}
return ret;
}
bool ObTableScanIterator::can_use_global_iter_pool(const ObQRIterType iter_type) const
{
bool use_pool = false;
if (main_table_param_.iter_param_.tablet_id_.is_inner_tablet()) {
} else if (scan_param_->use_index_skip_scan() ||
!scan_param_->sample_info_.is_no_sample() ||
main_table_param_.iter_param_.is_use_column_store() ||
main_table_param_.iter_param_.enable_pd_aggregate() ||
main_table_param_.iter_param_.enable_pd_group_by() ||
main_table_param_.iter_param_.has_lob_column_out_) {
} else {
const int64_t table_cnt = get_table_param_.tablet_iter_.table_iter()->count();
const int64_t col_cnt = main_table_param_.get_max_out_col_cnt();
ObGlobalIteratorPool *iter_pool = MTL(ObGlobalIteratorPool*);
if (OB_NOT_NULL(iter_pool)) {
use_pool = iter_pool->can_use_iter_pool(table_cnt, col_cnt, iter_type);
}
}
return use_pool;
}
int ObTableScanIterator::prepare_cached_iter_node()
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(nullptr != cached_iter_node_)) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "Unexpected not null cached iter node", K(ret), KP(cached_iter_node_));
} else if (can_use_global_iter_pool(current_iter_type_)) {
ObGlobalIteratorPool *iter_pool = MTL(ObGlobalIteratorPool*);
if (OB_FAIL(iter_pool->get(current_iter_type_, cached_iter_node_))) {
STORAGE_LOG(WARN, "Failed to get from iter pool", K(ret));
} else if (nullptr != cached_iter_node_) {
main_table_param_.set_use_global_iter_pool();
main_table_param_.iter_param_.set_use_stmt_iter_pool();
STORAGE_LOG(TRACE, "use global iter pool", K(current_iter_type_), K(main_table_param_));
}
}
return ret;
}
void ObTableScanIterator::try_release_cached_iter_node(const ObQRIterType rescan_iter_type)
{
if (nullptr != cached_iter_node_ && current_iter_type_ != rescan_iter_type) {
main_table_param_.diable_use_global_iter_pool();
main_table_ctx_.reset_cached_iter_node();
MTL(ObGlobalIteratorPool*)->release(cached_iter_node_);
cached_iter_node_ = nullptr;
current_iter_type_ = T_INVALID_ITER_TYPE;
if (nullptr != cached_iter_) {
*cached_iter_ = nullptr;
}
}
}
int ObTableScanIterator::prepare_table_context()
{
int ret = OB_SUCCESS;
@ -148,7 +215,7 @@ int ObTableScanIterator::prepare_table_context()
if (OB_UNLIKELY(!trans_version_range.is_valid())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "trans version range is not valid", K(ret), K(trans_version_range));
} else if (OB_FAIL(main_table_ctx_.init(*scan_param_, ctx_guard_.get_store_ctx(), trans_version_range))) {
} else if (OB_FAIL(main_table_ctx_.init(*scan_param_, ctx_guard_.get_store_ctx(), trans_version_range, cached_iter_node_))) {
STORAGE_LOG(WARN, "failed to init main table ctx", K(ret));
}
}
@ -190,6 +257,7 @@ void ObTableScanIterator::reset_for_switch()
int ObTableScanIterator::rescan(ObTableScanParam &scan_param)
{
int ret = OB_SUCCESS;
ACTIVE_GLOBAL_ITERATOR_GUARD(ret, cached_iter_node_);
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
STORAGE_LOG(WARN, "The ObTableScanStoreRowIterator has not been inited, ", K(ret));
@ -201,12 +269,16 @@ int ObTableScanIterator::rescan(ObTableScanParam &scan_param)
STORAGE_LOG(DEBUG, "table scan iterate rescan", K_(is_inited), K(scan_param_));
// there's no need to reset main_table_param_ and table_ctx
// scan_param only reset query range fields in ObTableScan::rt_rescan()
ObQRIterType rescan_iter_type = T_INVALID_ITER_TYPE;
if (OB_FAIL(main_table_ctx_.rescan_reuse(scan_param))) {
LOG_WARN("Failed to rescan reuse", K(ret));
STORAGE_LOG(WARN, "Failed to rescan reuse", K(ret));
} else if (OB_FAIL(table_scan_range_.init(*scan_param_))) {
STORAGE_LOG(WARN, "Failed to init table scan range", K(ret));
} else if (OB_FAIL(rescan_for_iter())) {
STORAGE_LOG(WARN, "Failed to switch param for iter", K(ret), K(*this));
} else if (OB_FAIL(table_scan_range_.get_query_iter_type(rescan_iter_type))) {
STORAGE_LOG(WARN, "Failed to get query iter type", K(ret));
} else if (FALSE_IT(try_release_cached_iter_node(rescan_iter_type))) {
} else if (OB_FAIL(open_iter())) {
STORAGE_LOG(WARN, "fail to open iter", K(ret));
} else {
@ -219,6 +291,7 @@ int ObTableScanIterator::rescan(ObTableScanParam &scan_param)
int ObTableScanIterator::init(ObTableScanParam &scan_param, const ObTabletHandle &tablet_handle)
{
int ret = OB_SUCCESS;
ACTIVE_GLOBAL_ITERATOR_GUARD(ret, cached_iter_node_);
ObStoreCtx &store_ctx = ctx_guard_.get_store_ctx();
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
@ -231,12 +304,16 @@ int ObTableScanIterator::init(ObTableScanParam &scan_param, const ObTabletHandle
K(tablet_handle));
} else if (OB_FAIL(table_scan_range_.init(scan_param))) {
STORAGE_LOG(WARN, "Failed to init table scan range", K(ret), K(scan_param));
} else if (OB_FAIL(table_scan_range_.get_query_iter_type(current_iter_type_))) {
STORAGE_LOG(WARN, "Failed to get query iter type", K(ret));
} else {
scan_param_ = &scan_param;
if (OB_FAIL(get_table_param_.tablet_iter_.set_tablet_handle(tablet_handle))) {
STORAGE_LOG(WARN, "Fail to set tablet handle to iter", K(ret));
} else if (OB_FAIL(prepare_table_param(tablet_handle))) {
STORAGE_LOG(WARN, "Fail to prepare table param, ", K(ret));
} else if (OB_FAIL(prepare_cached_iter_node())) {
STORAGE_LOG(WARN, "Fail to prepare cached iter node", K(ret));
} else if (OB_FAIL(prepare_table_context())) {
STORAGE_LOG(WARN, "Fail to prepare table ctx, ", K(ret));
} else if (OB_FAIL(open_iter())) {
@ -252,7 +329,9 @@ int ObTableScanIterator::init(ObTableScanParam &scan_param, const ObTabletHandle
int ObTableScanIterator::switch_param(ObTableScanParam &scan_param, const ObTabletHandle &tablet_handle)
{
int ret = OB_SUCCESS;
ACTIVE_GLOBAL_ITERATOR_GUARD(ret, cached_iter_node_);
ObStoreCtx &store_ctx = ctx_guard_.get_store_ctx();
ObQRIterType rescan_iter_type = T_INVALID_ITER_TYPE;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
STORAGE_LOG(WARN, "not inited", K(ret), K(*this));
@ -263,6 +342,9 @@ int ObTableScanIterator::switch_param(ObTableScanParam &scan_param, const ObTabl
STORAGE_LOG(WARN, "Invalid argument, ", K(ret), K(store_ctx), K(scan_param), K(tablet_handle));
} else if (OB_FAIL(table_scan_range_.init(scan_param))) {
STORAGE_LOG(WARN, "Failed to init table scan range", K(ret), K(scan_param));
} else if (OB_FAIL(table_scan_range_.get_query_iter_type(rescan_iter_type))) {
STORAGE_LOG(WARN, "Failed to get query iter type", K(ret));
} else if (FALSE_IT(try_release_cached_iter_node(rescan_iter_type))) {
} else {
scan_param_ = &scan_param;
if (OB_FAIL(get_table_param_.tablet_iter_.set_tablet_handle(tablet_handle))) {
@ -300,7 +382,6 @@ int ObTableScanIterator::rescan_for_iter()
RESET_NOT_REFRESHED_ITER(get_table_param_.refreshed_merge_, multi_scan_merge_);
RESET_NOT_REFRESHED_ITER(get_table_param_.refreshed_merge_, skip_scan_merge_);
RESET_NOT_REFRESHED_ITER(get_table_param_.refreshed_merge_, memtable_row_sample_iterator_);
RESET_NOT_REFRESHED_ITER(get_table_param_.refreshed_merge_, row_sample_iterator_);
RESET_NOT_REFRESHED_ITER(get_table_param_.refreshed_merge_, block_sample_iterator_);
get_table_param_.refreshed_merge_ = nullptr;
}
@ -333,27 +414,43 @@ template<typename T>
int ObTableScanIterator::init_scan_iter(T *&iter)
{
int ret = OB_SUCCESS;
void *buf = nullptr;
if (OB_ISNULL(buf = scan_param_->allocator_->alloc(sizeof(T)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
STORAGE_LOG(WARN, "Fail to allocate memory", K(ret));
} else {
iter = new (buf) T();
if (OB_FAIL(iter->init(main_table_param_, main_table_ctx_, get_table_param_))) {
STORAGE_LOG(WARN, "Failed to init multiple merge", K(ret));
} else if (!scan_param_->sample_info_.is_no_sample()
&& SampleInfo::SAMPLE_INCR_DATA == scan_param_->sample_info_.scope_) {
iter->disable_fill_default();
iter->disable_output_row_with_nop();
}
if (OB_FAIL(ret)) {
iter->~T();
scan_param_->allocator_->free(iter);
iter = nullptr;
ObQueryRowIterator *cached_iter = nullptr == cached_iter_node_ ? nullptr : cached_iter_node_->get_iter();
if (OB_NOT_NULL(cached_iter)) {
if (OB_UNLIKELY(cached_iter->get_type() != current_iter_type_)) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "Unexpected cached iter type", K(ret), K(cached_iter->get_type()), K(current_iter_type_));
} else {
iter = static_cast<T*>(cached_iter);
cached_iter_ = reinterpret_cast<ObQueryRowIterator**>(&iter);
}
}
if (OB_FAIL(ret)) {
} else if (nullptr == iter) {
void *buf = nullptr;
if (OB_ISNULL(buf = main_table_ctx_.get_long_life_allocator()->alloc(sizeof(T)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
STORAGE_LOG(WARN, "Fail to allocate memory", K(ret));
} else {
iter = new (buf) T();
if (OB_FAIL(iter->init(main_table_param_, main_table_ctx_, get_table_param_))) {
STORAGE_LOG(WARN, "Failed to init multiple merge", K(ret));
} else if (!scan_param_->sample_info_.is_no_sample()
&& SampleInfo::SAMPLE_INCR_DATA == scan_param_->sample_info_.scope_) {
iter->disable_fill_default();
iter->disable_output_row_with_nop();
}
if (OB_FAIL(ret)) {
iter->~T();
main_table_ctx_.get_long_life_allocator()->free(iter);
iter = nullptr;
} else if (nullptr != cached_iter_node_) {
cached_iter_node_->set_iter(iter);
cached_iter_ = reinterpret_cast<ObQueryRowIterator**>(&iter);
}
}
} else if (OB_FAIL(iter->switch_table(main_table_param_, main_table_ctx_, get_table_param_))) {
STORAGE_LOG(WARN, "Failed to switch table", K(ret), K(main_table_param_));
}
return ret;
}
@ -448,15 +545,13 @@ int ObTableScanIterator::init_and_open_scan_merge_iter_()
STORAGE_LOG(WARN, "check scan range count failed", KR(ret), KPC(scan_param_));
} else if (need_scan_multiple_range) {
// this branch means the sample is row(memtable row) sample
main_table_param_.iter_param_.disable_blockscan();
if (!scan_param_->sample_info_.is_row_sample()) {
main_table_param_.iter_param_.disable_blockscan();
}
INIT_AND_OPEN_ITER(multi_scan_merge_, sample_ranges_, false);
if (OB_FAIL(ret)) {
} else if (scan_param_->sample_info_.is_row_sample()) {
if (OB_FAIL(sample_iter_helper.get_sample_iter(row_sample_iterator_, main_iter_, multi_scan_merge_))) {
STORAGE_LOG(WARN, "get sample iter failed", KR(ret), K(scan_param_));
} else {
STORAGE_LOG(INFO, "finish init row sample iter", KP(row_sample_iterator_), KP(main_iter_));
}
// Row sample is scan, do not need extra iterator.
} else {
if (OB_FAIL(
sample_iter_helper.get_sample_iter(memtable_row_sample_iterator_, main_iter_, multi_scan_merge_))) {
@ -505,6 +600,7 @@ int ObTableScanIterator::get_next_row(blocksstable::ObDatumRow *&row)
{
ACTIVE_SESSION_FLAG_SETTER_GUARD(in_storage_read);
int ret = OB_SUCCESS;
ACTIVE_GLOBAL_ITERATOR_GUARD(ret, cached_iter_node_);
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
STORAGE_LOG(WARN, "The ObTableScanStoreRowIterator has not been inited, ", K(ret));
@ -530,7 +626,6 @@ int ObTableScanIterator::get_next_row(blocksstable::ObDatumRow *&row)
ret = tmp_ret;
}
}
return ret;
}
@ -538,6 +633,7 @@ int ObTableScanIterator::get_next_rows(int64_t &count, int64_t capacity)
{
ACTIVE_SESSION_FLAG_SETTER_GUARD(in_storage_read);
int ret = OB_SUCCESS;
ACTIVE_GLOBAL_ITERATOR_GUARD(ret, cached_iter_node_);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
STORAGE_LOG(WARN, "The ObTableScanStoreRowIterator has not been inited, ", K(ret));
@ -577,7 +673,6 @@ int ObTableScanIterator::check_ls_offline_after_read()
ret = OB_LS_OFFLINE;
STORAGE_LOG(WARN, "ls offline during the read operation", K(ret), K(acc_ctx.snapshot_));
}
return ret;
}