From b3dcbb8af9f28652f5dc49e79d6fa32f66bdc6bd Mon Sep 17 00:00:00 2001 From: WeiXinChan Date: Tue, 17 Oct 2023 12:39:57 +0000 Subject: [PATCH] [CP] [Bugfix] fix obkv lob core & aggregation in sync query core & session pool error --- .../table/ob_htable_filter_operator.cpp | 4 ++-- src/observer/table/ob_table_filter.cpp | 16 ++++++++-------- src/observer/table/ob_table_scan_executor.cpp | 16 +++++++++++++--- src/observer/table/ob_table_scan_executor.h | 2 +- src/observer/table/ob_table_session_pool.cpp | 2 ++ src/observer/table/ttl/ob_table_ttl_task.cpp | 2 +- src/share/table/ob_table.cpp | 17 +++++++++++++++++ src/share/table/ob_table.h | 2 ++ 8 files changed, 46 insertions(+), 15 deletions(-) diff --git a/src/observer/table/ob_htable_filter_operator.cpp b/src/observer/table/ob_htable_filter_operator.cpp index 874fe915d..4c31beb48 100644 --- a/src/observer/table/ob_htable_filter_operator.cpp +++ b/src/observer/table/ob_htable_filter_operator.cpp @@ -643,7 +643,7 @@ ObHTableRowIterator::~ObHTableRowIterator() int ObHTableRowIterator::next_cell() { ObNewRow *ob_row = NULL; - int ret = child_op_->get_next_row(ob_row); + int ret = child_op_->get_next_row(ob_row, false); if (OB_SUCCESS == ret) { curr_cell_.set_ob_row(ob_row); LOG_DEBUG("[yzfdebug] fetch next cell", K_(curr_cell)); @@ -659,7 +659,7 @@ int ObHTableRowIterator::next_cell() int ObHTableRowIterator::reverse_next_cell(ObIArray &same_kq_cells, ObTableQueryResult *&out_result) { ObNewRow *ob_row = NULL; - int ret = child_op_->get_next_row(ob_row); + int ret = child_op_->get_next_row(ob_row, false); if ((ObQueryFlag::Reverse == scan_order_ && OB_ITER_END == ret) || (ObQueryFlag::Reverse == scan_order_ && OB_SUCCESS == ret && NULL != hfilter_ && hfilter_->filter_all_remaining())) { diff --git a/src/observer/table/ob_table_filter.cpp b/src/observer/table/ob_table_filter.cpp index 00c361601..b93af3a9c 100644 --- a/src/observer/table/ob_table_filter.cpp +++ b/src/observer/table/ob_table_filter.cpp @@ -249,7 +249,7 @@ int ObNormalTableQueryResultIterator::get_aggregate_result(table::ObTableQueryRe LOG_WARN("one_result_ should not be null", K(ret)); } else { ObNewRow *row = nullptr; - while (OB_SUCC(ret) && OB_SUCC(scan_result_->get_next_row(row))) { + while (OB_SUCC(ret) && OB_SUCC(scan_result_->get_next_row(row, false/*need_deep_copy*/))) { if (OB_FAIL(agg_calculator_.aggregate(*row))) { LOG_WARN("fail to aggregate", K(ret), K(*row)); } @@ -259,8 +259,8 @@ int ObNormalTableQueryResultIterator::get_aggregate_result(table::ObTableQueryRe agg_calculator_.final_aggregate(); // agg sum/svg finally has_more_rows_ = false; one_result_->reset(); - if (OB_FAIL(one_result_->assign_property_names(get_agg_calculator().get_agg_columns()))) { - LOG_WARN("fail to assign property names to one result", K(ret)); + if (OB_FAIL(one_result_->deep_copy_property_names(get_agg_calculator().get_agg_columns()))) { + LOG_WARN("fail to deep copy property names to one result", K(ret)); } else if (OB_FAIL(one_result_->add_row(agg_calculator_.get_aggregate_results()))) { LOG_WARN("fail to add aggregation result", K(ret), K(agg_calculator_.get_aggregate_results())); } else { @@ -293,7 +293,7 @@ int ObNormalTableQueryResultIterator::get_normal_result(table::ObTableQueryResul if (OB_SUCC(ret)) { next_result = one_result_; ObNewRow *row = nullptr; - while (OB_SUCC(ret) && OB_SUCC(scan_result_->get_next_row(row))) { + while (OB_SUCC(ret) && OB_SUCC(scan_result_->get_next_row(row, false/*need_deep_copy*/))) { LOG_DEBUG("[yzfdebug] scan result", "row", *row); if (OB_FAIL(one_result_->add_row(*row))) { if (OB_SIZE_OVERFLOW == ret) { @@ -392,7 +392,7 @@ int ObTableFilterOperator::get_aggregate_result(table::ObTableQueryResult *&next const ObIArray &select_columns = one_result_->get_select_columns(); const int64_t N = select_columns.count(); while (OB_SUCC(ret) && (!has_limit || !has_reach_limit) && - OB_SUCC(scan_result_->get_next_row(row))) { + OB_SUCC(scan_result_->get_next_row(row, false/*need_deep_copy*/))) { if (N != row->get_count()) { ret = OB_ERR_UNEXPECTED; LOG_WARN("select column count is not equal to row cells count", K(ret), K(select_columns), K(*row)); @@ -427,8 +427,8 @@ int ObTableFilterOperator::get_aggregate_result(table::ObTableQueryResult *&next agg_calculator_.final_aggregate(); // agg sum/svg finally has_more_rows_ = false; one_result_->reset(); - if (OB_FAIL(one_result_->assign_property_names(get_agg_calculator().get_agg_columns()))) { - LOG_WARN("fail to assign property names to one result", K(ret)); + if (OB_FAIL(one_result_->deep_copy_property_names(get_agg_calculator().get_agg_columns()))) { + LOG_WARN("fail to deep copy property names to one result", K(ret)); } else if (OB_FAIL(one_result_->add_row(agg_calculator_.get_aggregate_results()))) { LOG_WARN("fail to add aggregation result", K(ret), K(agg_calculator_.get_aggregate_results())); } else { @@ -473,7 +473,7 @@ int ObTableFilterOperator::get_normal_result(table::ObTableQueryResult *&next_re const int64_t N = select_columns.count(); while (OB_SUCC(ret) && (!has_limit || !has_reach_limit) && - OB_SUCC(scan_result_->get_next_row(row))) { + OB_SUCC(scan_result_->get_next_row(row, false/*need_deep_copy*/))) { if (N != row->get_count()) { ret = OB_ERR_UNEXPECTED; LOG_WARN("select column count is not equal to row cells count", K(ret), K(select_columns), K(*row)); diff --git a/src/observer/table/ob_table_scan_executor.cpp b/src/observer/table/ob_table_scan_executor.cpp index 4c2d1baf9..67f6a809d 100644 --- a/src/observer/table/ob_table_scan_executor.cpp +++ b/src/observer/table/ob_table_scan_executor.cpp @@ -295,7 +295,8 @@ int ObTableApiScanRowIterator::open(ObTableApiScanExecutor *executor) return ret; } -int ObTableApiScanRowIterator::get_next_row(ObNewRow *&row) +// Memory of row is owned by iterator, and row cannot be used beyond iterator, unless you use deep copy. +int ObTableApiScanRowIterator::get_next_row(ObNewRow *&row, bool need_deep_copy /* =true */) { int ret = OB_SUCCESS; ObNewRow *tmp_row = nullptr; @@ -322,6 +323,7 @@ int ObTableApiScanRowIterator::get_next_row(ObNewRow *&row) } else { // 循环select_exprs,eval获取datum,并将datum转ObObj,最后组成ObNewRow tmp_row = new(row_buf)ObNewRow(cells, cells_cnt); + ObObj tmp_obj; ObDatum *datum = nullptr; ObEvalCtx &eval_ctx = scan_executor_->get_eval_ctx(); if (tb_ctx.is_scan()) { // 转为用户select的顺序 @@ -335,16 +337,24 @@ int ObTableApiScanRowIterator::get_next_row(ObNewRow *&row) LOG_WARN("query column id not found", K(ret), K(select_col_ids), K(col_id), K(query_col_ids)); } else if (OB_FAIL(output_exprs.at(idx)->eval(eval_ctx, datum))) { LOG_WARN("fail to eval datum", K(ret)); - } else if (OB_FAIL(datum->to_obj(cells[i], output_exprs.at(idx)->obj_meta_))) { + } else if (OB_FAIL(datum->to_obj(tmp_obj, output_exprs.at(idx)->obj_meta_))) { LOG_WARN("fail to datum to obj", K(ret), K(output_exprs.at(idx)->obj_meta_), K(i), K(idx)); + } else if (!need_deep_copy) { + cells[i] = tmp_obj; + } else if (ob_write_obj(allocator, tmp_obj, cells[i])) { // need_deep_copy + LOG_WARN("fail to deep copy ObObj", K(ret), K(tmp_obj)); } } } else { for (int64_t i = 0; OB_SUCC(ret) && i < cells_cnt; i++) { if (OB_FAIL(output_exprs.at(i)->eval(eval_ctx, datum))) { LOG_WARN("fail to eval datum", K(ret)); - } else if (OB_FAIL(datum->to_obj(cells[i], output_exprs.at(i)->obj_meta_))) { + } else if (OB_FAIL(datum->to_obj(tmp_obj, output_exprs.at(i)->obj_meta_))) { LOG_WARN("fail to datum to obj", K(ret), K(output_exprs.at(i)->obj_meta_)); + } else if (!need_deep_copy) { + cells[i] = tmp_obj; + } else if (ob_write_obj(allocator, tmp_obj, cells[i])) { // need_deep_copy + LOG_WARN("fail to deep copy ObObj", K(ret), K(tmp_obj)); } } } diff --git a/src/observer/table/ob_table_scan_executor.h b/src/observer/table/ob_table_scan_executor.h index 472d0bbbb..3e223ebd2 100644 --- a/src/observer/table/ob_table_scan_executor.h +++ b/src/observer/table/ob_table_scan_executor.h @@ -94,7 +94,7 @@ public: virtual ~ObTableApiScanRowIterator() {}; public: virtual int open(ObTableApiScanExecutor *executor); - virtual int get_next_row(common::ObNewRow *&row); + virtual int get_next_row(common::ObNewRow *&row, bool need_deep_copy = true); virtual int close(); private: ObTableApiScanExecutor *scan_executor_; diff --git a/src/observer/table/ob_table_session_pool.cpp b/src/observer/table/ob_table_session_pool.cpp index 0f61134eb..3149f37d4 100644 --- a/src/observer/table/ob_table_session_pool.cpp +++ b/src/observer/table/ob_table_session_pool.cpp @@ -514,6 +514,8 @@ int ObTableApiSessPool::create_and_add_node_safe(ObTableApiCredential &credentia } else if (OB_FAIL(key_node_map_.set_refactored(credential.hash_val_, node))) { if (OB_HASH_EXIST != ret) { LOG_WARN("fail to add sess node to hash map", K(ret), K(credential), K(*node)); + } else { + ret = OB_SUCCESS; // replace error code } // this node has been set by other thread, free it ObLockGuard guard(lock_); diff --git a/src/observer/table/ttl/ob_table_ttl_task.cpp b/src/observer/table/ttl/ob_table_ttl_task.cpp index eb00620e0..43290628c 100644 --- a/src/observer/table/ttl/ob_table_ttl_task.cpp +++ b/src/observer/table/ttl/ob_table_ttl_task.cpp @@ -512,7 +512,7 @@ int ObTableTTLDeleteRowIterator::get_next_row(ObNewRow*& row) LOG_DEBUG("finish get next row", KR(ret), K(cur_del_rows_), K(limit_del_rows_)); } else { bool is_expired = false; - while(OB_SUCC(ret) && !is_expired && OB_SUCC(ObTableApiScanRowIterator::get_next_row(row))) { + while(OB_SUCC(ret) && !is_expired && OB_SUCC(ObTableApiScanRowIterator::get_next_row(row, false/*need_deep_copy*/))) { last_row_ = row; // NOTE: For hbase table, the row expired if and only if // 1. The row's version exceed maxversion diff --git a/src/share/table/ob_table.cpp b/src/share/table/ob_table.cpp index 96212a62c..60bba3761 100644 --- a/src/share/table/ob_table.cpp +++ b/src/share/table/ob_table.cpp @@ -1406,6 +1406,23 @@ int ObTableQueryResult::assign_property_names(const ObIArray &other) return properties_names_.assign(other); } +int ObTableQueryResult::deep_copy_property_names(const ObIArray &other) +{ + int ret = OB_SUCCESS; + + if (OB_FAIL(properties_names_.prepare_allocate(other.count()))) { + LOG_WARN("failed to prepare allocate properties names", K(ret), K(other)); + } + + for (int64_t i = 0; OB_SUCC(ret) && i < other.count(); i++) { + if (OB_FAIL(ob_write_string(allocator_, other.at(i), properties_names_.at(i)))) { + LOG_WARN("failed to write string", K(ret), K(other.at(i))); + } + } + + return ret; +} + int ObTableQueryResult::alloc_buf_if_need(const int64_t need_size) { int ret = OB_SUCCESS; diff --git a/src/share/table/ob_table.h b/src/share/table/ob_table.h index e24fd4099..d56bf265c 100644 --- a/src/share/table/ob_table.h +++ b/src/share/table/ob_table.h @@ -847,6 +847,8 @@ public: virtual int get_next_entity(const ObITableEntity *&entity) override; int add_property_name(const ObString &name); int assign_property_names(const common::ObIArray &other); + // for aggregation + int deep_copy_property_names(const common::ObIArray &other); void reset_property_names() { properties_names_.reset(); } int add_row(const common::ObNewRow &row); int add_row(const common::ObIArray &row);