Fix FORWARD_NULL
This commit is contained in:
@ -1100,30 +1100,30 @@ int ObTenantCGReadInfoMgr::construct_normal_cg_read_infos()
|
||||
read_info_array = new(buf) ObCGReadInfo[array_cnt];
|
||||
basic_info_array = new((char *)buf + (sizeof(ObCGReadInfo) * array_cnt)) ObReadInfoStruct[array_cnt];
|
||||
alloc_buf_ = buf;
|
||||
ObColExtend tmp_col_extend;
|
||||
tmp_col_extend.skip_index_attr_.set_min_max();
|
||||
int64_t idx = 0;
|
||||
for (int64_t i = 0 ; OB_SUCC(ret) && i < ObObjType::ObMaxType ; ++i) {
|
||||
ObObjType type = static_cast<ObObjType>(i);
|
||||
ObColDesc tmp_desc;
|
||||
ObCGReadInfo &tmp_read_info = read_info_array[idx];
|
||||
normal_cg_read_infos_.at(i) = &tmp_read_info;
|
||||
if (not_in_normal_cg_array(type)) {
|
||||
} else if (FALSE_IT(set_col_desc(type, tmp_desc))) {
|
||||
} else if (FALSE_IT(tmp_read_info.cg_basic_info_ = &basic_info_array[idx])) {
|
||||
} else if (OB_FAIL(tmp_read_info.cg_basic_info_->generate_for_column_store(allocator_, tmp_desc, index_read_info_.is_oracle_mode()))) {
|
||||
STORAGE_LOG(WARN, "Fail to generate column group read info", K(ret));
|
||||
} else if (OB_FAIL(tmp_read_info.cols_extend_.init(1, allocator_))) {
|
||||
STORAGE_LOG(WARN, "Fail to init columns extend", K(ret));
|
||||
} else if (OB_FAIL(tmp_read_info.cols_extend_.push_back(tmp_col_extend))) {
|
||||
STORAGE_LOG(WARN, "Fail to push col extend", K(ret));
|
||||
} else {
|
||||
tmp_read_info.need_release_ = false;
|
||||
idx++;
|
||||
}
|
||||
} // end of for
|
||||
// if failed, will call destroy outside
|
||||
}
|
||||
ObColExtend tmp_col_extend;
|
||||
tmp_col_extend.skip_index_attr_.set_min_max();
|
||||
int64_t idx = 0;
|
||||
for (int64_t i = 0 ; OB_SUCC(ret) && i < ObObjType::ObMaxType ; ++i) {
|
||||
ObObjType type = static_cast<ObObjType>(i);
|
||||
ObColDesc tmp_desc;
|
||||
ObCGReadInfo &tmp_read_info = read_info_array[idx];
|
||||
normal_cg_read_infos_.at(i) = &tmp_read_info;
|
||||
if (not_in_normal_cg_array(type)) {
|
||||
} else if (FALSE_IT(set_col_desc(type, tmp_desc))) {
|
||||
} else if (FALSE_IT(tmp_read_info.cg_basic_info_ = &basic_info_array[idx])) {
|
||||
} else if (OB_FAIL(tmp_read_info.cg_basic_info_->generate_for_column_store(allocator_, tmp_desc, index_read_info_.is_oracle_mode()))) {
|
||||
STORAGE_LOG(WARN, "Fail to generate column group read info", K(ret));
|
||||
} else if (OB_FAIL(tmp_read_info.cols_extend_.init(1, allocator_))) {
|
||||
STORAGE_LOG(WARN, "Fail to init columns extend", K(ret));
|
||||
} else if (OB_FAIL(tmp_read_info.cols_extend_.push_back(tmp_col_extend))) {
|
||||
STORAGE_LOG(WARN, "Fail to push col extend", K(ret));
|
||||
} else {
|
||||
tmp_read_info.need_release_ = false;
|
||||
idx++;
|
||||
}
|
||||
} // end of for
|
||||
// if failed, will call destroy outside
|
||||
}
|
||||
|
||||
return ret;
|
||||
|
@ -491,6 +491,12 @@ int ObCOMergeBatchExeDag::create_first_task()
|
||||
LOG_WARN("execute task is unexpected null", KR(ret), KP(execute_task));
|
||||
} else if (OB_FAIL(create_task(execute_task/*parent*/, finish_task, *ctx, *dag_net))) {
|
||||
LOG_WARN("fail to create finish task", K(ret), KPC(dag_net));
|
||||
} else { // fill compaction param
|
||||
// the dag_net has been set, and the dag hasn't been added to the scheduler now
|
||||
param_.compaction_param_.sstable_cnt_ = ctx->get_tables_handle().get_count();
|
||||
param_.compaction_param_.estimate_concurrent_cnt_ = ctx->get_concurrent_cnt();
|
||||
param_.compaction_param_.add_time_ = common::ObTimeUtility::fast_current_time();
|
||||
param_.compaction_param_.batch_size_ = end_cg_idx_ - start_cg_idx_;
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
@ -500,17 +506,9 @@ int ObCOMergeBatchExeDag::create_first_task()
|
||||
}
|
||||
if (OB_NOT_NULL(finish_task)) {
|
||||
remove_task(*finish_task);
|
||||
execute_task = nullptr;
|
||||
finish_task = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
// the dag_net has been set, and the dag hasn't been added to the scheduler now
|
||||
if (OB_SUCC(ret)) { // fill compaction param
|
||||
param_.compaction_param_.sstable_cnt_ = ctx->get_tables_handle().get_count();
|
||||
param_.compaction_param_.estimate_concurrent_cnt_ = ctx->get_concurrent_cnt();
|
||||
param_.compaction_param_.add_time_ = common::ObTimeUtility::fast_current_time();
|
||||
param_.compaction_param_.batch_size_ = end_cg_idx_ - start_cg_idx_;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -942,44 +942,49 @@ int ObCOSSTableRowScanner::filter_group_by_rows()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const ObCGBitmap *result_bitmap = nullptr;
|
||||
ObICGIterator *group_by_iter = nullptr;
|
||||
ObICGGroupByProcessor *group_by_processor = group_by_iters_.at(0);
|
||||
ObICGIterator *group_by_iter = dynamic_cast<ObICGIterator*>(group_by_processor);
|
||||
while(OB_SUCC(ret)) {
|
||||
if (can_forward_row_scanner() &&
|
||||
OB_FAIL(row_scanner_->forward_blockscan(end_, blockscan_state_, current_))) {
|
||||
LOG_WARN("Fail to forward blockscan border", K(ret));
|
||||
} else if (end_of_scan()) {
|
||||
LOG_DEBUG("cur scan finished, update state", K(blockscan_state_), K(state_), KPC(this));
|
||||
ret = OB_ITER_END;
|
||||
} else if (OB_FAIL(group_by_iter->locate(ObCSRange(current_, end_ - current_ + 1)))) {
|
||||
LOG_WARN("Failed to locate", K(ret));
|
||||
} else if (OB_FAIL(group_by_processor->decide_group_size(group_size_))) {
|
||||
LOG_WARN("Failed to decide group size", K(ret));
|
||||
} else if (nullptr != rows_filter_) {
|
||||
if (OB_FAIL(rows_filter_->apply(ObCSRange(current_, group_size_)))) {
|
||||
LOG_WARN("Fail to apply rows filter", K(ret), K(current_), K(group_size_));
|
||||
} else if (OB_ISNULL(result_bitmap = rows_filter_->get_result_bitmap())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("Unexpected result bitmap", K(ret), KPC(rows_filter_));
|
||||
} else {
|
||||
EVENT_ADD(ObStatEventIds::PUSHDOWN_STORAGE_FILTER_ROW_CNT, result_bitmap->popcnt());
|
||||
access_ctx_->table_store_stat_.pushdown_row_select_cnt_ += result_bitmap->popcnt();
|
||||
if (result_bitmap->is_all_false()) {
|
||||
update_current(group_size_);
|
||||
continue;
|
||||
if (OB_ISNULL(group_by_iter = dynamic_cast<ObICGIterator*>(group_by_processor))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("Unexpected null group_by_iter", K(ret), KPC(group_by_processor));
|
||||
} else {
|
||||
while(OB_SUCC(ret)) {
|
||||
if (can_forward_row_scanner() &&
|
||||
OB_FAIL(row_scanner_->forward_blockscan(end_, blockscan_state_, current_))) {
|
||||
LOG_WARN("Fail to forward blockscan border", K(ret));
|
||||
} else if (end_of_scan()) {
|
||||
LOG_DEBUG("cur scan finished, update state", K(blockscan_state_), K(state_), KPC(this));
|
||||
ret = OB_ITER_END;
|
||||
} else if (OB_FAIL(group_by_iter->locate(ObCSRange(current_, end_ - current_ + 1)))) {
|
||||
LOG_WARN("Failed to locate", K(ret));
|
||||
} else if (OB_FAIL(group_by_processor->decide_group_size(group_size_))) {
|
||||
LOG_WARN("Failed to decide group size", K(ret));
|
||||
} else if (nullptr != rows_filter_) {
|
||||
if (OB_FAIL(rows_filter_->apply(ObCSRange(current_, group_size_)))) {
|
||||
LOG_WARN("Fail to apply rows filter", K(ret), K(current_), K(group_size_));
|
||||
} else if (OB_ISNULL(result_bitmap = rows_filter_->get_result_bitmap())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("Unexpected result bitmap", K(ret), KPC(rows_filter_));
|
||||
} else {
|
||||
EVENT_ADD(ObStatEventIds::PUSHDOWN_STORAGE_FILTER_ROW_CNT, result_bitmap->popcnt());
|
||||
access_ctx_->table_store_stat_.pushdown_row_select_cnt_ += result_bitmap->popcnt();
|
||||
if (result_bitmap->is_all_false()) {
|
||||
update_current(group_size_);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
EVENT_ADD(ObStatEventIds::PUSHDOWN_STORAGE_FILTER_ROW_CNT, group_size_);
|
||||
access_ctx_->table_store_stat_.pushdown_row_select_cnt_ += group_size_;
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
EVENT_ADD(ObStatEventIds::PUSHDOWN_STORAGE_FILTER_ROW_CNT, group_size_);
|
||||
access_ctx_->table_store_stat_.pushdown_row_select_cnt_ += group_size_;
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret) && OB_FAIL(project_iter_->locate(
|
||||
ObCSRange(current_, group_size_),
|
||||
result_bitmap))) {
|
||||
ObCSRange(current_, group_size_),
|
||||
result_bitmap))) {
|
||||
LOG_WARN("Fail to locate", K(ret), K(current_), K(group_size_), KP(result_bitmap));
|
||||
} else {
|
||||
is_new_group_ = true;
|
||||
@ -989,7 +994,7 @@ int ObCOSSTableRowScanner::filter_group_by_rows()
|
||||
access_ctx_->table_store_stat_.physical_read_cnt_ += group_size_;
|
||||
}
|
||||
LOG_TRACE("[GROUP BY PUSHDOWN]", K(ret), K(current_), K(end_), K(blockscan_state_),
|
||||
"popcnt", nullptr == result_bitmap ? group_size_ : result_bitmap->popcnt());
|
||||
"popcnt", nullptr == result_bitmap ? group_size_ : result_bitmap->popcnt());
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -1135,19 +1140,20 @@ int ObCOSSTableRowScanner::init_group_by_info(ObTableAccessContext &context)
|
||||
int ObCOSSTableRowScanner::push_group_by_processor(ObICGIterator *cg_iterator)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObICGGroupByProcessor *group_by_processor = nullptr;
|
||||
if (OB_ISNULL(cg_iterator)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("Invalid argument", K(ret));
|
||||
} else if (OB_UNLIKELY(!ObICGIterator::is_valid_group_by_cg_scanner(cg_iterator->get_type()))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("Unexpected cg scanner", K(ret), K(cg_iterator->get_type()));
|
||||
} else {
|
||||
ObICGGroupByProcessor *group_by_processor = dynamic_cast<ObICGGroupByProcessor*>(cg_iterator);
|
||||
if (OB_FAIL(group_by_processor->init_group_by_info())) {
|
||||
LOG_WARN("Failed to init group by info", K(ret));
|
||||
} else if (OB_FAIL(group_by_iters_.push_back(group_by_processor))) {
|
||||
LOG_WARN("Failed to push back", K(ret));
|
||||
}
|
||||
} else if (OB_ISNULL(group_by_processor = dynamic_cast<ObICGGroupByProcessor*>(cg_iterator))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("Unexpected null group_by_processor", K(ret), KPC(cg_iterator));
|
||||
} else if (OB_FAIL(group_by_processor->init_group_by_info())) {
|
||||
LOG_WARN("Failed to init group by info", K(ret));
|
||||
} else if (OB_FAIL(group_by_iters_.push_back(group_by_processor))) {
|
||||
LOG_WARN("Failed to push back", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -530,11 +530,16 @@ int ObCOSSTableRowsFilter::transform_filter_tree(
|
||||
LOG_WARN("Failed to pull up common node", K(ret), K(tmp_filter_indexes));
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (1 < tmp_filter_indexes.count() &&
|
||||
OB_FAIL(common_filter_executor->set_cg_param(*common_col_group_ids, common_col_exprs))) {
|
||||
LOG_WARN("Failed to set cg param to filter", K(ret), KPC(common_filter_executor),
|
||||
KP(common_col_group_ids), KP(common_col_exprs));
|
||||
} else {
|
||||
if (1 < tmp_filter_indexes.count()) {
|
||||
if (OB_ISNULL(common_filter_executor)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("Unexpected null common_filter_executor", K(ret));
|
||||
} else if (OB_FAIL(common_filter_executor->set_cg_param(*common_col_group_ids, common_col_exprs))) {
|
||||
LOG_WARN("Failed to set cg param to filter", K(ret), KPC(common_filter_executor),
|
||||
KP(common_col_group_ids), KP(common_col_exprs));
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
++base_filter_idx;
|
||||
if (common_filter_executor == &filter || base_filter_idx >= filter.get_child_count()) {
|
||||
break;
|
||||
|
@ -91,6 +91,9 @@ int ObCOMerger::inner_prepare_merge(ObBasicTabletMergeCtx &ctx, const int64_t id
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_ISNULL(table)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(WARN, "Unexpected null table", K(ret));
|
||||
} else if (FALSE_IT(sstable = static_cast<ObSSTable *>(table))) {
|
||||
} else if (OB_FAIL(init_merge_iters(sstable))) {
|
||||
STORAGE_LOG(WARN, "failed to init_merge_iters", K(ret));
|
||||
|
@ -450,18 +450,18 @@ int ObCOSSTableV2::deep_copy(
|
||||
LOG_WARN("failed to deep copy co sstable", K(ret));
|
||||
} else {
|
||||
new_co_table = static_cast<ObCOSSTableV2 *>(meta_obj);
|
||||
}
|
||||
|
||||
// set cg sstable addr
|
||||
ObSSTableArray &new_cg_sstables = new_co_table->meta_->get_cg_sstables();
|
||||
for (int64_t idx = 0; OB_SUCC(ret) && idx < new_cg_sstables.count(); ++idx) {
|
||||
ObSSTable *cg_table = new_cg_sstables[idx];
|
||||
const ObMetaDiskAddr &cg_addr = cg_addrs.at(idx);
|
||||
if (OB_ISNULL(cg_table)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("get unexpected null cg table", K(ret), KPC(this));
|
||||
} else if (OB_FAIL(cg_table->set_addr(cg_addr))) {
|
||||
LOG_WARN("failed to set cg addr", K(ret));
|
||||
// set cg sstable addr
|
||||
ObSSTableArray &new_cg_sstables = new_co_table->meta_->get_cg_sstables();
|
||||
for (int64_t idx = 0; OB_SUCC(ret) && idx < new_cg_sstables.count(); ++idx) {
|
||||
ObSSTable *cg_table = new_cg_sstables[idx];
|
||||
const ObMetaDiskAddr &cg_addr = cg_addrs.at(idx);
|
||||
if (OB_ISNULL(cg_table)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("get unexpected null cg table", K(ret), KPC(this));
|
||||
} else if (OB_FAIL(cg_table->set_addr(cg_addr))) {
|
||||
LOG_WARN("failed to set cg addr", K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -526,6 +526,9 @@ int ObCOSSTableV2::get_cg_sstable(
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_ISNULL(cg_wrapper.sstable_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("Unexpected null sstable", K(ret));
|
||||
} else if (cg_wrapper.sstable_->is_co_sstable()) {
|
||||
// do nothing
|
||||
} else if (cg_wrapper.sstable_->is_loaded()) {
|
||||
@ -598,7 +601,7 @@ int ObCOSSTableV2::scan(
|
||||
// TODO: check whether use row_store/rowkey sstable when primary keys accessed only
|
||||
ObStoreRowIterator *row_scanner = nullptr;
|
||||
ALLOCATE_TABLE_STORE_ROW_IETRATOR(context, ObCOSSTableRowScanner, row_scanner);
|
||||
if (OB_SUCC(ret) && OB_FAIL(row_scanner->init(param, context, this, &key_range))) {
|
||||
if (OB_SUCC(ret) && OB_NOT_NULL(row_scanner) && OB_FAIL(row_scanner->init(param, context, this, &key_range))) {
|
||||
LOG_WARN("Fail to open row scanner", K(ret), K(param), K(context), K(key_range), K(*this));
|
||||
}
|
||||
|
||||
@ -639,7 +642,7 @@ int ObCOSSTableV2::multi_scan(
|
||||
// TODO: check whether use row_store/rowkey sstable when primary keys accessed only
|
||||
ObStoreRowIterator *row_scanner = nullptr;
|
||||
ALLOCATE_TABLE_STORE_ROW_IETRATOR(context, ObCOSSTableRowMultiScanner, row_scanner);
|
||||
if (OB_SUCC(ret) && OB_FAIL(row_scanner->init(param, context, this, &ranges))) {
|
||||
if (OB_SUCC(ret) && OB_NOT_NULL(row_scanner) && OB_FAIL(row_scanner->init(param, context, this, &ranges))) {
|
||||
LOG_WARN("Fail to open row scanner", K(ret), K(param), K(context), K(ranges), K(*this));
|
||||
}
|
||||
|
||||
@ -746,7 +749,7 @@ int ObCOSSTableV2::get(
|
||||
} else {
|
||||
ObStoreRowIterator *row_getter = nullptr;
|
||||
ALLOCATE_TABLE_STORE_ROW_IETRATOR(context, ObCOSSTableRowGetter, row_getter);
|
||||
if (OB_SUCC(ret) && OB_FAIL(row_getter->init(param, context, this, &rowkey))) {
|
||||
if (OB_SUCC(ret) && OB_NOT_NULL(row_getter) && OB_FAIL(row_getter->init(param, context, this, &rowkey))) {
|
||||
LOG_WARN("Fail to open row scanner", K(ret), K(param), K(context), K(rowkey), K(*this));
|
||||
}
|
||||
|
||||
@ -786,7 +789,7 @@ int ObCOSSTableV2::multi_get(
|
||||
} else {
|
||||
ObStoreRowIterator *row_getter = nullptr;
|
||||
ALLOCATE_TABLE_STORE_ROW_IETRATOR(context, ObCOSSTableRowMultiGetter, row_getter);
|
||||
if (OB_SUCC(ret) && OB_FAIL(row_getter->init(param, context, this, &rowkeys))) {
|
||||
if (OB_SUCC(ret) && OB_NOT_NULL(row_getter) && OB_FAIL(row_getter->init(param, context, this, &rowkeys))) {
|
||||
LOG_WARN("Fail to open row scanner", K(ret), K(param), K(context), K(rowkeys), K(*this));
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user