[CP] [Bugfix] fix obkv lob core & aggregation in sync query core & session pool error

This commit is contained in:
WeiXinChan 2023-10-17 12:39:57 +00:00 committed by ob-robot
parent 3c120e0a75
commit b3dcbb8af9
8 changed files with 46 additions and 15 deletions

View File

@ -643,7 +643,7 @@ ObHTableRowIterator::~ObHTableRowIterator()
int ObHTableRowIterator::next_cell()
{
ObNewRow *ob_row = NULL;
int ret = child_op_->get_next_row(ob_row);
int ret = child_op_->get_next_row(ob_row, false);
if (OB_SUCCESS == ret) {
curr_cell_.set_ob_row(ob_row);
LOG_DEBUG("[yzfdebug] fetch next cell", K_(curr_cell));
@ -659,7 +659,7 @@ int ObHTableRowIterator::next_cell()
int ObHTableRowIterator::reverse_next_cell(ObIArray<common::ObNewRow> &same_kq_cells, ObTableQueryResult *&out_result)
{
ObNewRow *ob_row = NULL;
int ret = child_op_->get_next_row(ob_row);
int ret = child_op_->get_next_row(ob_row, false);
if ((ObQueryFlag::Reverse == scan_order_ && OB_ITER_END == ret) ||
(ObQueryFlag::Reverse == scan_order_ && OB_SUCCESS == ret &&
NULL != hfilter_ && hfilter_->filter_all_remaining())) {

View File

@ -249,7 +249,7 @@ int ObNormalTableQueryResultIterator::get_aggregate_result(table::ObTableQueryRe
LOG_WARN("one_result_ should not be null", K(ret));
} else {
ObNewRow *row = nullptr;
while (OB_SUCC(ret) && OB_SUCC(scan_result_->get_next_row(row))) {
while (OB_SUCC(ret) && OB_SUCC(scan_result_->get_next_row(row, false/*need_deep_copy*/))) {
if (OB_FAIL(agg_calculator_.aggregate(*row))) {
LOG_WARN("fail to aggregate", K(ret), K(*row));
}
@ -259,8 +259,8 @@ int ObNormalTableQueryResultIterator::get_aggregate_result(table::ObTableQueryRe
agg_calculator_.final_aggregate(); // agg sum/svg finally
has_more_rows_ = false;
one_result_->reset();
if (OB_FAIL(one_result_->assign_property_names(get_agg_calculator().get_agg_columns()))) {
LOG_WARN("fail to assign property names to one result", K(ret));
if (OB_FAIL(one_result_->deep_copy_property_names(get_agg_calculator().get_agg_columns()))) {
LOG_WARN("fail to deep copy property names to one result", K(ret));
} else if (OB_FAIL(one_result_->add_row(agg_calculator_.get_aggregate_results()))) {
LOG_WARN("fail to add aggregation result", K(ret), K(agg_calculator_.get_aggregate_results()));
} else {
@ -293,7 +293,7 @@ int ObNormalTableQueryResultIterator::get_normal_result(table::ObTableQueryResul
if (OB_SUCC(ret)) {
next_result = one_result_;
ObNewRow *row = nullptr;
while (OB_SUCC(ret) && OB_SUCC(scan_result_->get_next_row(row))) {
while (OB_SUCC(ret) && OB_SUCC(scan_result_->get_next_row(row, false/*need_deep_copy*/))) {
LOG_DEBUG("[yzfdebug] scan result", "row", *row);
if (OB_FAIL(one_result_->add_row(*row))) {
if (OB_SIZE_OVERFLOW == ret) {
@ -392,7 +392,7 @@ int ObTableFilterOperator::get_aggregate_result(table::ObTableQueryResult *&next
const ObIArray<ObString> &select_columns = one_result_->get_select_columns();
const int64_t N = select_columns.count();
while (OB_SUCC(ret) && (!has_limit || !has_reach_limit) &&
OB_SUCC(scan_result_->get_next_row(row))) {
OB_SUCC(scan_result_->get_next_row(row, false/*need_deep_copy*/))) {
if (N != row->get_count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("select column count is not equal to row cells count", K(ret), K(select_columns), K(*row));
@ -427,8 +427,8 @@ int ObTableFilterOperator::get_aggregate_result(table::ObTableQueryResult *&next
agg_calculator_.final_aggregate(); // agg sum/svg finally
has_more_rows_ = false;
one_result_->reset();
if (OB_FAIL(one_result_->assign_property_names(get_agg_calculator().get_agg_columns()))) {
LOG_WARN("fail to assign property names to one result", K(ret));
if (OB_FAIL(one_result_->deep_copy_property_names(get_agg_calculator().get_agg_columns()))) {
LOG_WARN("fail to deep copy property names to one result", K(ret));
} else if (OB_FAIL(one_result_->add_row(agg_calculator_.get_aggregate_results()))) {
LOG_WARN("fail to add aggregation result", K(ret), K(agg_calculator_.get_aggregate_results()));
} else {
@ -473,7 +473,7 @@ int ObTableFilterOperator::get_normal_result(table::ObTableQueryResult *&next_re
const int64_t N = select_columns.count();
while (OB_SUCC(ret) && (!has_limit || !has_reach_limit) &&
OB_SUCC(scan_result_->get_next_row(row))) {
OB_SUCC(scan_result_->get_next_row(row, false/*need_deep_copy*/))) {
if (N != row->get_count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("select column count is not equal to row cells count", K(ret), K(select_columns), K(*row));

View File

@ -295,7 +295,8 @@ int ObTableApiScanRowIterator::open(ObTableApiScanExecutor *executor)
return ret;
}
int ObTableApiScanRowIterator::get_next_row(ObNewRow *&row)
// Memory of row is owned by iterator, and row cannot be used beyond iterator, unless you use deep copy.
int ObTableApiScanRowIterator::get_next_row(ObNewRow *&row, bool need_deep_copy /* =true */)
{
int ret = OB_SUCCESS;
ObNewRow *tmp_row = nullptr;
@ -322,6 +323,7 @@ int ObTableApiScanRowIterator::get_next_row(ObNewRow *&row)
} else {
// 循环select_exprs,eval获取datum,并将datum转ObObj,最后组成ObNewRow
tmp_row = new(row_buf)ObNewRow(cells, cells_cnt);
ObObj tmp_obj;
ObDatum *datum = nullptr;
ObEvalCtx &eval_ctx = scan_executor_->get_eval_ctx();
if (tb_ctx.is_scan()) { // 转为用户select的顺序
@ -335,16 +337,24 @@ int ObTableApiScanRowIterator::get_next_row(ObNewRow *&row)
LOG_WARN("query column id not found", K(ret), K(select_col_ids), K(col_id), K(query_col_ids));
} else if (OB_FAIL(output_exprs.at(idx)->eval(eval_ctx, datum))) {
LOG_WARN("fail to eval datum", K(ret));
} else if (OB_FAIL(datum->to_obj(cells[i], output_exprs.at(idx)->obj_meta_))) {
} else if (OB_FAIL(datum->to_obj(tmp_obj, output_exprs.at(idx)->obj_meta_))) {
LOG_WARN("fail to datum to obj", K(ret), K(output_exprs.at(idx)->obj_meta_), K(i), K(idx));
} else if (!need_deep_copy) {
cells[i] = tmp_obj;
} else if (ob_write_obj(allocator, tmp_obj, cells[i])) { // need_deep_copy
LOG_WARN("fail to deep copy ObObj", K(ret), K(tmp_obj));
}
}
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < cells_cnt; i++) {
if (OB_FAIL(output_exprs.at(i)->eval(eval_ctx, datum))) {
LOG_WARN("fail to eval datum", K(ret));
} else if (OB_FAIL(datum->to_obj(cells[i], output_exprs.at(i)->obj_meta_))) {
} else if (OB_FAIL(datum->to_obj(tmp_obj, output_exprs.at(i)->obj_meta_))) {
LOG_WARN("fail to datum to obj", K(ret), K(output_exprs.at(i)->obj_meta_));
} else if (!need_deep_copy) {
cells[i] = tmp_obj;
} else if (ob_write_obj(allocator, tmp_obj, cells[i])) { // need_deep_copy
LOG_WARN("fail to deep copy ObObj", K(ret), K(tmp_obj));
}
}
}

View File

@ -94,7 +94,7 @@ public:
virtual ~ObTableApiScanRowIterator() {};
public:
virtual int open(ObTableApiScanExecutor *executor);
virtual int get_next_row(common::ObNewRow *&row);
virtual int get_next_row(common::ObNewRow *&row, bool need_deep_copy = true);
virtual int close();
private:
ObTableApiScanExecutor *scan_executor_;

View File

@ -514,6 +514,8 @@ int ObTableApiSessPool::create_and_add_node_safe(ObTableApiCredential &credentia
} else if (OB_FAIL(key_node_map_.set_refactored(credential.hash_val_, node))) {
if (OB_HASH_EXIST != ret) {
LOG_WARN("fail to add sess node to hash map", K(ret), K(credential), K(*node));
} else {
ret = OB_SUCCESS; // replace error code
}
// this node has been set by other thread, free it
ObLockGuard<ObSpinLock> guard(lock_);

View File

@ -512,7 +512,7 @@ int ObTableTTLDeleteRowIterator::get_next_row(ObNewRow*& row)
LOG_DEBUG("finish get next row", KR(ret), K(cur_del_rows_), K(limit_del_rows_));
} else {
bool is_expired = false;
while(OB_SUCC(ret) && !is_expired && OB_SUCC(ObTableApiScanRowIterator::get_next_row(row))) {
while(OB_SUCC(ret) && !is_expired && OB_SUCC(ObTableApiScanRowIterator::get_next_row(row, false/*need_deep_copy*/))) {
last_row_ = row;
// NOTE: For hbase table, the row expired if and only if
// 1. The row's version exceed maxversion

View File

@ -1406,6 +1406,23 @@ int ObTableQueryResult::assign_property_names(const ObIArray<ObString> &other)
return properties_names_.assign(other);
}
int ObTableQueryResult::deep_copy_property_names(const ObIArray<ObString> &other)
{
int ret = OB_SUCCESS;
if (OB_FAIL(properties_names_.prepare_allocate(other.count()))) {
LOG_WARN("failed to prepare allocate properties names", K(ret), K(other));
}
for (int64_t i = 0; OB_SUCC(ret) && i < other.count(); i++) {
if (OB_FAIL(ob_write_string(allocator_, other.at(i), properties_names_.at(i)))) {
LOG_WARN("failed to write string", K(ret), K(other.at(i)));
}
}
return ret;
}
int ObTableQueryResult::alloc_buf_if_need(const int64_t need_size)
{
int ret = OB_SUCCESS;

View File

@ -847,6 +847,8 @@ public:
virtual int get_next_entity(const ObITableEntity *&entity) override;
int add_property_name(const ObString &name);
int assign_property_names(const common::ObIArray<common::ObString> &other);
// for aggregation
int deep_copy_property_names(const common::ObIArray<common::ObString> &other);
void reset_property_names() { properties_names_.reset(); }
int add_row(const common::ObNewRow &row);
int add_row(const common::ObIArray<ObObj> &row);