Fix disorder issue in reverse scan results with multiple column families
This commit is contained in:
@ -95,14 +95,14 @@ int ObTableQueryAsyncSession::deep_copy_select_columns(const common::ObIArray<co
|
||||
* ----------------------------------- ObTableHbaseRowKeyDefaultCompare -------------------------------------
|
||||
*/
|
||||
|
||||
int ObTableHbaseRowKeyDefaultCompare::compare(const common::ObNewRow &lhs, const common::ObNewRow &rhs, int &cmp_ret)
|
||||
int ObTableHbaseRowKeyDefaultCompare::compare(const common::ObNewRow &lhs, const common::ObNewRow &rhs, int &cmp_ret) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!lhs.is_valid() || !rhs.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid args", K(ret), K(lhs), K(rhs));
|
||||
LOG_WARN("invalid args", K(lhs), K(rhs), K(ret));
|
||||
} else {
|
||||
int cmp_ret = 0;
|
||||
cmp_ret = 0;
|
||||
for (int i = 0; i < ObHTableConstants::COL_IDX_T && cmp_ret == 0; ++i) {
|
||||
cmp_ret = lhs.get_cell(i).get_string().compare(rhs.get_cell(i).get_string());
|
||||
}
|
||||
@ -110,19 +110,34 @@ int ObTableHbaseRowKeyDefaultCompare::compare(const common::ObNewRow &lhs, const
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool ObTableHbaseRowKeyDefaultCompare::operator()(const common::ObNewRow &lhs, const common::ObNewRow &rhs) {
|
||||
bool ObTableHbaseRowKeyDefaultCompare::operator()(const common::ObNewRow &lhs, const common::ObNewRow &rhs)
|
||||
{
|
||||
int cmp_ret = 0;
|
||||
result_code_ = compare(lhs, rhs, cmp_ret);
|
||||
return cmp_ret < 0;
|
||||
}
|
||||
|
||||
|
||||
int ObTableHbaseRowKeyReverseCompare::compare(const common::ObNewRow &lhs, const common::ObNewRow &rhs, int &cmp_ret) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!lhs.is_valid() || !rhs.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid args", K(ret), K(lhs), K(rhs));
|
||||
LOG_WARN("invalid args", K(lhs), K(rhs), K(ret));
|
||||
} else {
|
||||
for (int i = 0; i < ObHTableConstants::COL_IDX_T && cmp_ret == 0; ++i) {
|
||||
cmp_ret = lhs.get_cell(i).get_string().compare(rhs.get_cell(i).get_string());
|
||||
cmp_ret = lhs.get_cell(ObHTableConstants::COL_IDX_K).get_string().compare(rhs.get_cell(ObHTableConstants::COL_IDX_K).get_string());
|
||||
if (cmp_ret == 0) {
|
||||
cmp_ret = rhs.get_cell(ObHTableConstants::COL_IDX_Q).get_string().compare(lhs.get_cell(ObHTableConstants::COL_IDX_Q).get_string());
|
||||
}
|
||||
}
|
||||
result_code_ = ret;
|
||||
return cmp_ret < 0;
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool ObTableHbaseRowKeyReverseCompare::operator()(const common::ObNewRow &lhs, const common::ObNewRow &rhs)
|
||||
{
|
||||
int cmp_ret = 0;
|
||||
result_code_ = compare(lhs, rhs, cmp_ret);
|
||||
return cmp_ret > 0;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -648,7 +663,8 @@ int ObTableQueryAsyncP::generate_merge_result_iterator()
|
||||
int ret = OB_SUCCESS;
|
||||
// Merge Iterator: Holds multiple underlying iterators, stored in its own heap
|
||||
ResultMergeIterator *merge_result_iter = nullptr;
|
||||
ObTableHbaseRowKeyDefaultCompare *compare = nullptr;
|
||||
ObTableMergeFilterCompare *compare = nullptr;
|
||||
ObQueryFlag::ScanOrder scan_order = query_session_->get_query().get_scan_order();
|
||||
if (OB_ISNULL(merge_result_iter = OB_NEWx(ResultMergeIterator,
|
||||
&allocator_,
|
||||
allocator_,
|
||||
@ -658,7 +674,10 @@ int ObTableQueryAsyncP::generate_merge_result_iterator()
|
||||
LOG_WARN("fail to create merge_result_iter", K(ret));
|
||||
} else if (OB_FAIL(generate_multi_result_iterator(merge_result_iter->get_inner_result_iterators()))) {
|
||||
LOG_WARN("fail to generate multi result inner iterator", K(ret));
|
||||
} else if (OB_ISNULL(compare = OB_NEWx(ObTableHbaseRowKeyDefaultCompare, &allocator_))) {
|
||||
} else if (scan_order == ObQueryFlag::Reverse && OB_ISNULL(compare = OB_NEWx(ObTableHbaseRowKeyReverseCompare, &allocator_))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("fail to create compare, alloc memory fail", K(ret));
|
||||
} else if (scan_order == ObQueryFlag::Forward && OB_ISNULL(compare = OB_NEWx(ObTableHbaseRowKeyDefaultCompare, &allocator_))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("fail to create compare, alloc memory fail", K(ret));
|
||||
} else if (OB_FAIL(merge_result_iter->init(compare))) {
|
||||
@ -1068,7 +1087,7 @@ int ObTableQueryAsyncP::query_scan_without_init(ObTableCtx &tb_ctx)
|
||||
int ObTableQueryAsyncP::init_multi_cf_query_ctx(const ObString &arg_tablegroup_name) {
|
||||
int ret = OB_SUCCESS;
|
||||
ObTableQueryAsyncCtx *query_ctx = nullptr;
|
||||
ObSEArray<const schema::ObSimpleTableSchemaV2*, 8> table_schemas;
|
||||
ObSEArray<const schema::ObSimpleTableSchemaV2*, 8> sort_table_schemas;
|
||||
uint64_t tablegroup_id = OB_INVALID_ID;
|
||||
if (schema_cache_guard_.is_inited()) {
|
||||
// skip
|
||||
@ -1084,12 +1103,25 @@ int ObTableQueryAsyncP::init_multi_cf_query_ctx(const ObString &arg_tablegroup_n
|
||||
} else if (OB_FAIL(schema_guard_.get_tablegroup_id(credential_.tenant_id_, arg_tablegroup_name, tablegroup_id))) {
|
||||
LOG_WARN("fail to get table schema from table group name", K(ret), K(credential_.tenant_id_),
|
||||
K(credential_.database_id_), K(arg_tablegroup_name));
|
||||
} else if (OB_FAIL(schema_guard_.get_table_schemas_in_tablegroup(credential_.tenant_id_, tablegroup_id, table_schemas))) {
|
||||
} else if (OB_FAIL(schema_guard_.get_table_schemas_in_tablegroup(credential_.tenant_id_, tablegroup_id, sort_table_schemas))) {
|
||||
LOG_WARN("fail to get table schema from table group", K(ret), K(credential_.tenant_id_),
|
||||
K(credential_.database_id_), K(arg_tablegroup_name), K(tablegroup_id));
|
||||
} else {
|
||||
for (int i = 0; OB_SUCC(ret) && i < table_schemas.count(); ++i) {
|
||||
const schema::ObSimpleTableSchemaV2* table_schema = table_schemas.at(i);
|
||||
if (OB_SUCC(ret)) {
|
||||
if (query_session_->get_query().get_scan_order() == ObQueryFlag::Reverse) {
|
||||
lib::ob_sort(sort_table_schemas.begin(), sort_table_schemas.end(), [](const schema::ObSimpleTableSchemaV2* lhs,
|
||||
const schema::ObSimpleTableSchemaV2* rhs) {
|
||||
return lhs->get_table_name() > rhs->get_table_name();
|
||||
});
|
||||
} else {
|
||||
lib::ob_sort(sort_table_schemas.begin(), sort_table_schemas.end(), [](const schema::ObSimpleTableSchemaV2* lhs,
|
||||
const schema::ObSimpleTableSchemaV2* rhs) {
|
||||
return lhs->get_table_name() < rhs->get_table_name();
|
||||
});
|
||||
}
|
||||
}
|
||||
for (int i = 0; OB_SUCC(ret) && i < sort_table_schemas.count(); ++i) {
|
||||
const schema::ObSimpleTableSchemaV2* table_schema = sort_table_schemas.at(i);
|
||||
ObTableSingleQueryInfo* query_info = nullptr;
|
||||
if (OB_ISNULL(table_schema)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
|
@ -65,14 +65,36 @@ struct ObTableSingleQueryInfo : public ObTableInfoBase {
|
||||
table::ObTableQuery query_;
|
||||
};
|
||||
|
||||
class ObTableHbaseRowKeyDefaultCompare
|
||||
{
|
||||
class ObTableMergeFilterCompare {
|
||||
public:
|
||||
ObTableHbaseRowKeyDefaultCompare(): result_code_(common::OB_SUCCESS) {}
|
||||
int compare(const common::ObNewRow &lhs, const common::ObNewRow &rhs, int &cmp_ret);
|
||||
bool operator()(const common::ObNewRow &lhs, const common::ObNewRow &rhs);
|
||||
OB_INLINE int get_error_code() const { return result_code_; }
|
||||
int result_code_;
|
||||
ObTableMergeFilterCompare() = default;
|
||||
virtual ~ObTableMergeFilterCompare() = default;
|
||||
|
||||
virtual int compare(const common::ObNewRow &lhs, const common::ObNewRow &rhs, int &cmp_ret) const = 0;
|
||||
virtual bool operator()(const common::ObNewRow &lhs, const common::ObNewRow &rhs) = 0;
|
||||
|
||||
int get_error_code() const noexcept { return result_code_; }
|
||||
|
||||
protected:
|
||||
int result_code_ = OB_SUCCESS;
|
||||
};
|
||||
|
||||
class ObTableHbaseRowKeyDefaultCompare final : public ObTableMergeFilterCompare {
|
||||
public:
|
||||
ObTableHbaseRowKeyDefaultCompare() = default;
|
||||
~ObTableHbaseRowKeyDefaultCompare() override = default;
|
||||
|
||||
int compare(const common::ObNewRow &lhs, const common::ObNewRow &rhs, int &cmp_ret) const override;
|
||||
bool operator()(const common::ObNewRow &lhs, const common::ObNewRow &rhs) override;
|
||||
};
|
||||
|
||||
class ObTableHbaseRowKeyReverseCompare final : public ObTableMergeFilterCompare {
|
||||
public:
|
||||
ObTableHbaseRowKeyReverseCompare() = default;
|
||||
~ObTableHbaseRowKeyReverseCompare() override = default;
|
||||
|
||||
int compare(const common::ObNewRow &lhs, const common::ObNewRow &rhs, int &cmp_ret) const override;
|
||||
bool operator()(const common::ObNewRow &lhs, const common::ObNewRow &rhs) override;
|
||||
};
|
||||
|
||||
/**
|
||||
@ -275,7 +297,7 @@ class ObTableQueryAsyncP :
|
||||
{
|
||||
typedef ObTableRpcProcessor<obrpc::ObTableRpcProxy::ObRpc<obrpc::OB_TABLE_API_EXECUTE_QUERY_ASYNC>>
|
||||
ParentType;
|
||||
using ResultMergeIterator = table::ObMergeTableQueryResultIterator<common::ObNewRow, ObTableHbaseRowKeyDefaultCompare>;
|
||||
using ResultMergeIterator = table::ObMergeTableQueryResultIterator<common::ObNewRow, ObTableMergeFilterCompare>;
|
||||
public:
|
||||
explicit ObTableQueryAsyncP(const ObGlobalContext &gctx);
|
||||
virtual ~ObTableQueryAsyncP() {}
|
||||
|
Reference in New Issue
Block a user