bugfix : obkv filter col not in select column
This commit is contained in:
@ -9903,7 +9903,17 @@ TEST_F(TestBatchExecute, table_query_with_filter)
|
||||
ASSERT_EQ(OB_SUCCESS, query.set_scan_index(ObString::make_string("primary")));
|
||||
ASSERT_EQ(OB_SUCCESS, query.set_filter(ObString::make_string("TableCompareFilter(<, 'C2:50')")));
|
||||
int ret = the_table->execute_query(query, iter);
|
||||
ASSERT_NE(OB_SUCCESS, ret);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
int64_t result_cnt = 0;
|
||||
while (OB_SUCC(iter->get_next_entity(result_entity))) {
|
||||
result_cnt++;
|
||||
ObObj v1, v3;
|
||||
ASSERT_EQ(OB_SUCCESS, result_entity->get_property(C1, v1));
|
||||
ASSERT_EQ(OB_SUCCESS, result_entity->get_property(C3, v3));
|
||||
ASSERT_LE(v1.get_int(), 50);
|
||||
// fprintf(stderr, "(%ld,%ld,%s)\n", v1.get_int(), v2.get_int(), S(v3));
|
||||
}
|
||||
ASSERT_EQ(10, result_cnt);
|
||||
// fprintf(stderr, "query ret=%d\n", ret);
|
||||
} // end case 6
|
||||
{
|
||||
|
@ -545,7 +545,7 @@ int ObTableCtx::adjust_column_type(const ObExprResType &column_type,
|
||||
if (!is_autoincrement) {
|
||||
ret = OB_BAD_NULL_ERROR;
|
||||
}
|
||||
} else if (obj.is_null()) {
|
||||
} else if (obj.is_null() || is_inc()) {
|
||||
// continue
|
||||
} else if (column_type.get_type() != obj.get_type()
|
||||
&& !(ob_is_string_type(column_type.get_type()) && ob_is_string_type(obj.get_type()))) {
|
||||
@ -876,7 +876,9 @@ int ObTableCtx::init_scan(const ObTableQuery &query,
|
||||
int ret = OB_SUCCESS;
|
||||
const ObString &index_name = query.get_index_name();
|
||||
const ObIArray<ObString> &select_columns = query.get_select_columns();
|
||||
const bool select_all_columns = select_columns.empty() || query.is_aggregate_query() || is_ttl_table_;
|
||||
bool has_filter = (query.get_htable_filter().is_valid() || query.get_filter_string().length() > 0);
|
||||
const bool select_all_columns = select_columns.empty() || query.is_aggregate_query() || is_ttl_table_
|
||||
|| (has_filter && !is_htable());
|
||||
const ObColumnSchemaV2 *column_schema = nullptr;
|
||||
operation_type_ = ObTableOperationType::Type::SCAN;
|
||||
// init is_weak_read_,scan_order_
|
||||
|
@ -247,6 +247,10 @@ public:
|
||||
return ObTableOperationType::Type::APPEND == operation_type_
|
||||
|| ObTableOperationType::Type::INCREMENT == operation_type_;
|
||||
}
|
||||
OB_INLINE bool is_inc() const
|
||||
{
|
||||
return ObTableOperationType::Type::INCREMENT == operation_type_;
|
||||
}
|
||||
OB_INLINE bool is_dml() const
|
||||
{
|
||||
return ObTableOperationType::Type::GET != operation_type_ && !is_scan_;
|
||||
|
@ -114,6 +114,7 @@ int ObTableApiExecuteP::init_tb_ctx()
|
||||
ObTableOperationType::Type op_type = arg_.table_operation_.type();
|
||||
tb_ctx_.set_entity(&arg_.table_operation_.entity());
|
||||
tb_ctx_.set_operation_type(op_type);
|
||||
tb_ctx_.set_entity_type(arg_.entity_type_);
|
||||
|
||||
if (tb_ctx_.is_init()) {
|
||||
LOG_INFO("tb ctx has been inited", K_(tb_ctx));
|
||||
|
@ -350,6 +350,49 @@ int ObTableFilterOperator::check_limit_param()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableFilterOperator::init_full_column_name(const ObIArray<ObString>& col_arr)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool is_select_column_empty = query_->get_select_columns().empty(); // query select column is empty when do queryAndMutate
|
||||
if (is_aggregate_query()) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(full_column_name_.assign(col_arr))) {
|
||||
LOG_WARN("fail to assign full column name", K(ret));
|
||||
} else if (!is_select_column_empty && OB_FAIL(one_result_->assign_property_names(query_->get_select_columns()))) { // normal query should reset select column
|
||||
LOG_WARN("fail to assign query column name", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableFilterOperator::add_row(table::ObTableQueryResult *next_result, ObNewRow *row)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObNewRow new_row;
|
||||
const ObIArray<ObString> &select_columns = query_->get_select_columns();
|
||||
if (!select_columns.empty()) {
|
||||
size_t new_size = select_columns.count();
|
||||
size_t old_size = full_column_name_.count();
|
||||
ObObj cell_arr[new_size];
|
||||
new_row.assign(cell_arr, new_size);
|
||||
for (size_t i = 0; i < old_size; i ++) {
|
||||
int64_t idx = -1;
|
||||
if (!has_exist_in_array(select_columns, full_column_name_.at(i), &idx)) {
|
||||
// do nothing
|
||||
} else {
|
||||
cell_arr[idx] = row->get_cell(i);
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(next_result->add_row(new_row))) {
|
||||
LOG_WARN("failed to add row", K(ret));
|
||||
}
|
||||
} else { // query select column is empty when do queryAndMutate
|
||||
if (OB_FAIL(next_result->add_row(*row))) {
|
||||
LOG_WARN("failed to add row", K(ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableFilterOperator::get_next_result(ObTableQueryResult *&next_result)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -453,7 +496,7 @@ int ObTableFilterOperator::get_normal_result(table::ObTableQueryResult *&next_re
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
if (NULL != last_row_) {
|
||||
if (OB_FAIL(one_result_->add_row(*last_row_))) {
|
||||
if (OB_FAIL(add_row(one_result_, last_row_))) {
|
||||
LOG_WARN("failed to add row", K(ret));
|
||||
} else {
|
||||
row_idx_++;
|
||||
@ -469,7 +512,7 @@ int ObTableFilterOperator::get_normal_result(table::ObTableQueryResult *&next_re
|
||||
bool has_reach_limit = (row_idx_ >= offset + limit);
|
||||
next_result = one_result_;
|
||||
ObNewRow *row = nullptr;
|
||||
const ObIArray<ObString> &select_columns = one_result_->get_select_columns();
|
||||
const ObIArray<ObString> &select_columns = full_column_name_;
|
||||
const int64_t N = select_columns.count();
|
||||
|
||||
while (OB_SUCC(ret) && (!has_limit || !has_reach_limit) &&
|
||||
@ -490,7 +533,7 @@ int ObTableFilterOperator::get_normal_result(table::ObTableQueryResult *&next_re
|
||||
|
||||
if (has_limit && row_idx_ < offset) {
|
||||
row_idx_++;
|
||||
} else if (OB_FAIL(one_result_->add_row(*row))) {
|
||||
} else if (OB_FAIL(add_row(one_result_, row))) {
|
||||
if (OB_BUF_NOT_ENOUGH == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
last_row_ = row;
|
||||
|
@ -207,9 +207,12 @@ public:
|
||||
is_first_result_(true),
|
||||
has_more_rows_(true),
|
||||
row_idx_(0),
|
||||
agg_calculator_(query)
|
||||
agg_calculator_(query),
|
||||
full_column_name_()
|
||||
{}
|
||||
virtual ~ObTableFilterOperator() {}
|
||||
virtual ~ObTableFilterOperator() {
|
||||
full_column_name_.reset();
|
||||
}
|
||||
virtual int get_next_result(ObTableQueryResult *&next_result) override;
|
||||
virtual bool has_more_result() const override { return has_more_rows_; }
|
||||
virtual void set_one_result(ObTableQueryResult *result) override { one_result_ = result; }
|
||||
@ -220,6 +223,8 @@ public:
|
||||
int get_aggregate_result(table::ObTableQueryResult *&next_result);
|
||||
int get_normal_result(table::ObTableQueryResult *&next_result);
|
||||
bool is_aggregate_query() { return agg_calculator_.is_exist(); }
|
||||
int add_row(table::ObTableQueryResult *next_result, ObNewRow *row);
|
||||
int init_full_column_name(const ObIArray<ObString>& col_arr);
|
||||
private:
|
||||
int check_limit_param();
|
||||
private:
|
||||
@ -234,6 +239,7 @@ private:
|
||||
bool has_more_rows_;
|
||||
int64_t row_idx_; // not filtered row index
|
||||
ObTableAggCalculator agg_calculator_;
|
||||
ObSEArray<ObString, 64> full_column_name_;
|
||||
};
|
||||
|
||||
} // end namespace table
|
||||
|
@ -220,6 +220,7 @@ int ObTableQueryAndMutateP::init_scan_tb_ctx(ObTableApiCacheGuard &cache_guard)
|
||||
const ObTableQuery &query = arg_.query_and_mutate_.get_query();
|
||||
bool is_weak_read = false;
|
||||
tb_ctx_.set_scan(true);
|
||||
tb_ctx_.set_entity_type(arg_.entity_type_);
|
||||
|
||||
if (tb_ctx_.is_init()) {
|
||||
LOG_INFO("tb ctx has been inited", K_(tb_ctx));
|
||||
|
@ -105,6 +105,8 @@ int ObTableQueryUtils::generate_query_result_iterator(ObIAllocator &allocator,
|
||||
one_result))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("fail to alloc table query result iterator", K(ret));
|
||||
} else if (OB_FAIL(table_result_iter->init_full_column_name(tb_ctx.get_query_col_names()))) {
|
||||
LOG_WARN("fail to int full column name", K(ret));
|
||||
} else if (OB_FAIL(table_result_iter->parse_filter_string(&allocator))) {
|
||||
LOG_WARN("fail to parse table filter string", K(ret));
|
||||
} else {
|
||||
|
@ -127,6 +127,7 @@ int ObTableQueryP::init_tb_ctx(ObTableApiCacheGuard &cache_guard)
|
||||
ObExprFrameInfo *expr_frame_info = nullptr;
|
||||
bool is_weak_read = arg_.consistency_level_ == ObTableConsistencyLevel::EVENTUAL;
|
||||
tb_ctx_.set_scan(true);
|
||||
tb_ctx_.set_entity_type(arg_.entity_type_);
|
||||
|
||||
if (tb_ctx_.is_init()) {
|
||||
LOG_INFO("tb ctx has been inited", K_(tb_ctx));
|
||||
|
@ -395,6 +395,7 @@ int ObTableQuerySyncP::init_tb_ctx(ObTableCtx &ctx)
|
||||
ObExprFrameInfo &expr_frame_info = query_ctx.expr_frame_info_;
|
||||
bool is_weak_read = arg_.consistency_level_ == ObTableConsistencyLevel::EVENTUAL;
|
||||
ctx.set_scan(true);
|
||||
ctx.set_entity_type(arg_.entity_type_);
|
||||
|
||||
if (ctx.is_init()) {
|
||||
LOG_INFO("tb ctx has been inited", K(ctx));
|
||||
|
Reference in New Issue
Block a user