fix bug: runtime filter fill empty query range with multi columns
This commit is contained in:
@ -17,6 +17,7 @@
|
||||
#include "sql/engine/px/p2p_datahub/ob_p2p_dh_rpc_process.h"
|
||||
#include "sql/engine/px/p2p_datahub/ob_p2p_dh_mgr.h"
|
||||
#include "share/detect/ob_detect_manager_utils.h"
|
||||
#include "sql/engine/px/p2p_datahub/ob_runtime_filter_query_range.h"
|
||||
using namespace oceanbase;
|
||||
using namespace common;
|
||||
using namespace sql;
|
||||
@ -199,6 +200,39 @@ int ObP2PDatahubMsgBase::preset_not_match(IntegerFixedVec *res_vec, const EvalBo
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObP2PDatahubMsgBase::fill_empty_query_range(const ObPxQueryRangeInfo &query_range_info,
|
||||
common::ObIAllocator &allocator, ObNewRange &query_range)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
query_range.table_id_ = query_range_info.table_id_;
|
||||
|
||||
ObObj *start = NULL;
|
||||
ObObj *end = NULL;
|
||||
int64_t range_column_cnt = query_range_info.range_column_cnt_;
|
||||
if (OB_ISNULL(start = static_cast<ObObj *>(
|
||||
allocator.alloc(sizeof(ObObj) * range_column_cnt)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("alloc memory for start_obj failed", K(ret));
|
||||
} else if (OB_ISNULL(end = static_cast<ObObj *>(
|
||||
allocator.alloc(sizeof(ObObj) * range_column_cnt)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("alloc memory for end_obj failed", K(ret));
|
||||
} else {
|
||||
// fill all coloumns with (max, min)
|
||||
for (int64_t i = 0; i < range_column_cnt; ++i) {
|
||||
new (start + i) ObObj();
|
||||
new (end + i) ObObj();
|
||||
(start + i)->set_max_value();
|
||||
(end + i)->set_min_value();
|
||||
}
|
||||
ObRowkey start_key(start, range_column_cnt);
|
||||
ObRowkey end_key(end, range_column_cnt);
|
||||
query_range.start_key_ = start_key;
|
||||
query_range.end_key_ = end_key;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
ObP2PDatahubMsgGuard::ObP2PDatahubMsgGuard(ObP2PDatahubMsgBase *msg) : msg_(msg)
|
||||
{
|
||||
// one for dh map hold msg and one for we use msg to reg dm
|
||||
|
||||
@ -26,6 +26,7 @@ namespace sql
|
||||
{
|
||||
|
||||
class ObBatchRows;
|
||||
class ObPxQueryRangeInfo;
|
||||
class ObP2PDatahubMsgBase
|
||||
{
|
||||
OB_UNIS_VERSION_V(1);
|
||||
@ -180,6 +181,10 @@ public:
|
||||
int64_t &total_count, int64_t &filter_count);
|
||||
int preset_not_match(IntegerFixedVec *res_vec, const EvalBound &bound);
|
||||
TO_STRING_KV(K(p2p_datahub_id_), K_(px_sequence_id), K(tenant_id_), K(timeout_ts_), K(is_active_), K(msg_type_));
|
||||
protected:
|
||||
int fill_empty_query_range(const ObPxQueryRangeInfo &query_range_info,
|
||||
common::ObIAllocator &allocator, ObNewRange &query_range);
|
||||
|
||||
protected:
|
||||
common::ObCurTraceId::TraceId trace_id_;
|
||||
int64_t p2p_datahub_id_;
|
||||
|
||||
@ -1289,9 +1289,11 @@ int ObRFRangeFilterMsg::prepare_query_range()
|
||||
is_query_range_ready_ = false;
|
||||
} else if (is_empty_) {
|
||||
// make empty range
|
||||
query_range_.table_id_ = query_range_info_.table_id_;
|
||||
query_range_.set_false_range();
|
||||
is_query_range_ready_ = true;
|
||||
if (OB_FAIL(fill_empty_query_range(query_range_info_, query_range_allocator_, query_range_))) {
|
||||
LOG_WARN("faild to fill_empty_query_range");
|
||||
} else {
|
||||
is_query_range_ready_ = true;
|
||||
}
|
||||
} else {
|
||||
// only extract the first column
|
||||
int64_t prefix_col_idx = query_range_info_.prefix_col_idxs_.at(0);
|
||||
@ -1309,11 +1311,11 @@ int ObRFRangeFilterMsg::prepare_query_range()
|
||||
if (OB_ISNULL(start = static_cast<ObObj *>(
|
||||
query_range_allocator_.alloc(sizeof(ObObj) * range_column_cnt)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_ERROR("alloc memory for start_obj failed", K(ret));
|
||||
LOG_WARN("alloc memory for start_obj failed", K(ret));
|
||||
} else if (OB_ISNULL(end = static_cast<ObObj *>(
|
||||
query_range_allocator_.alloc(sizeof(ObObj) * range_column_cnt)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_ERROR("alloc memory for end_obj failed", K(ret));
|
||||
LOG_WARN("alloc memory for end_obj failed", K(ret));
|
||||
} else {
|
||||
new(start) ObObj();
|
||||
new(end) ObObj();
|
||||
@ -1336,7 +1338,7 @@ int ObRFRangeFilterMsg::prepare_query_range()
|
||||
is_query_range_ready_ = true;
|
||||
}
|
||||
}
|
||||
LOG_TRACE("range filter prepare query range", K(ret), K(is_query_range_ready_), K(query_range_));
|
||||
LOG_TRACE("range filter prepare query range", K(ret), K(is_query_range_ready_), K(query_range_), K(query_range_info_), K(is_empty_));
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -2129,9 +2131,9 @@ int ObRFInFilterMsg::prepare_query_ranges()
|
||||
} else if (is_empty_) {
|
||||
// make empty range
|
||||
ObNewRange query_range;
|
||||
query_range.table_id_ = query_range_info_.table_id_;
|
||||
query_range.set_false_range();
|
||||
if (OB_FAIL(query_range_.push_back(query_range))) {
|
||||
if (OB_FAIL(fill_empty_query_range(query_range_info_, query_range_allocator_, query_range))) {
|
||||
LOG_WARN("faild to fill_empty_query_range");
|
||||
} else if (OB_FAIL(query_range_.push_back(query_range))) {
|
||||
LOG_WARN("failed to push back query_range");
|
||||
} else {
|
||||
is_query_range_ready_ = true;
|
||||
@ -2149,7 +2151,7 @@ int ObRFInFilterMsg::prepare_query_ranges()
|
||||
// we need to deduplicate to avoid duplicate range
|
||||
ret = process_query_ranges_with_deduplicate();
|
||||
}
|
||||
LOG_TRACE("in filter prepare query range", K(ret), K(is_query_range_ready_), K(query_range_));
|
||||
LOG_TRACE("in filter prepare query range", K(ret), K(is_query_range_ready_), K(query_range_), K(query_range_info_), K(is_empty_));
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -2305,11 +2307,11 @@ int ObRFInFilterMsg::generate_one_range(int row_idx)
|
||||
if (OB_ISNULL(start = static_cast<ObObj *>(
|
||||
query_range_allocator_.alloc(sizeof(ObObj) * range_column_cnt)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_ERROR("alloc memory for start_obj failed", K(ret));
|
||||
LOG_WARN("alloc memory for start_obj failed", K(ret));
|
||||
} else if (OB_ISNULL(end = static_cast<ObObj *>(
|
||||
query_range_allocator_.alloc(sizeof(ObObj) * range_column_cnt)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_ERROR("alloc memory for end_obj failed", K(ret));
|
||||
LOG_WARN("alloc memory for end_obj failed", K(ret));
|
||||
}
|
||||
for (int64_t j = 0; j < prefix_col_idxs.count() && OB_SUCC(ret); ++j) {
|
||||
int64_t col_idx = prefix_col_idxs.at(j);
|
||||
|
||||
@ -190,7 +190,6 @@ OB_DEF_SERIALIZE(ObRFInFilterVecMsg)
|
||||
OB_DEF_DESERIALIZE(ObRFInFilterVecMsg)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t row_cnt = row_store_.get_row_cnt();
|
||||
BASE_DESER((ObRFInFilterVecMsg, ObP2PDatahubMsgBase));
|
||||
OB_UNIS_DECODE(max_in_num_);
|
||||
OB_UNIS_DECODE(need_null_cmp_flags_);
|
||||
@ -199,6 +198,7 @@ OB_DEF_DESERIALIZE(ObRFInFilterVecMsg)
|
||||
OB_UNIS_DECODE(build_row_meta_);
|
||||
if (OB_SUCC(ret) && is_active_) {
|
||||
OB_UNIS_DECODE(row_store_);
|
||||
int64_t row_cnt = row_store_.get_row_cnt();
|
||||
int64_t buckets_cnt = max(row_cnt, 1);
|
||||
if (OB_FAIL(rows_set_.create(buckets_cnt * 2,
|
||||
"RFDEInFilter",
|
||||
@ -829,9 +829,11 @@ int ObRFRangeFilterVecMsg::prepare_query_range()
|
||||
is_query_range_ready_ = false;
|
||||
} else if (is_empty_) {
|
||||
// make empty range
|
||||
query_range_.table_id_ = query_range_info_.table_id_;
|
||||
query_range_.set_false_range();
|
||||
is_query_range_ready_ = true;
|
||||
if (OB_FAIL(fill_empty_query_range(query_range_info_, query_range_allocator_, query_range_))) {
|
||||
LOG_WARN("faild to fill_empty_query_range");
|
||||
} else {
|
||||
is_query_range_ready_ = true;
|
||||
}
|
||||
} else {
|
||||
// only extract the first column
|
||||
int64_t prefix_col_idx = query_range_info_.prefix_col_idxs_.at(0);
|
||||
@ -849,11 +851,11 @@ int ObRFRangeFilterVecMsg::prepare_query_range()
|
||||
if (OB_ISNULL(start = static_cast<ObObj *>(
|
||||
query_range_allocator_.alloc(sizeof(ObObj) * range_column_cnt)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_ERROR("alloc memory for start_obj failed", K(ret));
|
||||
LOG_WARN("alloc memory for start_obj failed", K(ret));
|
||||
} else if (OB_ISNULL(end = static_cast<ObObj *>(
|
||||
query_range_allocator_.alloc(sizeof(ObObj) * range_column_cnt)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_ERROR("alloc memory for end_obj failed", K(ret));
|
||||
LOG_WARN("alloc memory for end_obj failed", K(ret));
|
||||
} else {
|
||||
new(start) ObObj();
|
||||
new(end) ObObj();
|
||||
@ -1726,9 +1728,9 @@ int ObRFInFilterVecMsg::prepare_query_ranges()
|
||||
} else if (is_empty_) {
|
||||
// make empty range
|
||||
ObNewRange query_range;
|
||||
query_range.table_id_ = query_range_info_.table_id_;
|
||||
query_range.set_false_range();
|
||||
if (OB_FAIL(query_range_.push_back(query_range))) {
|
||||
if (OB_FAIL(fill_empty_query_range(query_range_info_, query_range_allocator_, query_range))) {
|
||||
LOG_WARN("faild to fill_empty_query_range");
|
||||
} else if (OB_FAIL(query_range_.push_back(query_range))) {
|
||||
LOG_WARN("failed to push back query_range");
|
||||
} else {
|
||||
is_query_range_ready_ = true;
|
||||
@ -1948,11 +1950,11 @@ int ObRFInFilterVecMsg::generate_one_range(int row_idx)
|
||||
if (OB_ISNULL(start = static_cast<ObObj *>(
|
||||
query_range_allocator_.alloc(sizeof(ObObj) * range_column_cnt)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_ERROR("alloc memory for start_obj failed", K(ret));
|
||||
LOG_WARN("alloc memory for start_obj failed", K(ret));
|
||||
} else if (OB_ISNULL(end = static_cast<ObObj *>(
|
||||
query_range_allocator_.alloc(sizeof(ObObj) * range_column_cnt)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_ERROR("alloc memory for end_obj failed", K(ret));
|
||||
LOG_WARN("alloc memory for end_obj failed", K(ret));
|
||||
}
|
||||
for (int64_t j = 0; j < prefix_col_idxs.count() && OB_SUCC(ret); ++j) {
|
||||
int64_t col_idx = prefix_col_idxs.at(j);
|
||||
|
||||
Reference in New Issue
Block a user