adapt runtime filter eval_vector interface for single row calculation.
This commit is contained in:
@ -157,15 +157,15 @@ int ObP2PDatahubMsgBase::process_msg_internal(bool &need_free)
|
||||
template <>
|
||||
int ObP2PDatahubMsgBase::proc_filter_empty<IntegerFixedVec>(IntegerFixedVec *res_vec,
|
||||
const ObBitVector &skip,
|
||||
int64_t batch_size,
|
||||
const EvalBound &bound,
|
||||
int64_t &total_count,
|
||||
int64_t &filter_count)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
uint64_t *data = reinterpret_cast<uint64_t *>(res_vec->get_data());
|
||||
MEMSET(data, 0, (batch_size * res_vec->get_length(0)));
|
||||
MEMSET(data + bound.start(), 0, (bound.range_size() * res_vec->get_length(0)));
|
||||
|
||||
int64_t valid_cnt = batch_size - skip.accumulate_bit_cnt(batch_size);
|
||||
int64_t valid_cnt = bound.range_size() - skip.accumulate_bit_cnt(bound);
|
||||
total_count += valid_cnt;
|
||||
filter_count += valid_cnt;
|
||||
return ret;
|
||||
@ -174,12 +174,13 @@ int ObP2PDatahubMsgBase::proc_filter_empty<IntegerFixedVec>(IntegerFixedVec *res
|
||||
template <>
|
||||
int ObP2PDatahubMsgBase::proc_filter_empty<IntegerUniVec>(IntegerUniVec *res_vec,
|
||||
const ObBitVector &skip,
|
||||
int64_t batch_size, int64_t &total_count,
|
||||
const EvalBound &bound,
|
||||
int64_t &total_count,
|
||||
int64_t &filter_count)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(ObBitVector::flip_foreach(
|
||||
skip, batch_size, [&](int64_t idx) __attribute__((always_inline)) {
|
||||
skip, bound, [&](int64_t idx) __attribute__((always_inline)) {
|
||||
res_vec->set_int(idx, 0);
|
||||
++filter_count;
|
||||
++total_count;
|
||||
@ -190,11 +191,11 @@ int ObP2PDatahubMsgBase::proc_filter_empty<IntegerUniVec>(IntegerUniVec *res_vec
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObP2PDatahubMsgBase::preset_not_match(IntegerFixedVec *res_vec, int64_t batch_size)
|
||||
int ObP2PDatahubMsgBase::preset_not_match(IntegerFixedVec *res_vec, const EvalBound &bound)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
uint64_t *data = reinterpret_cast<uint64_t *>(res_vec->get_data());
|
||||
MEMSET(data, 0, (batch_size * res_vec->get_length(0)));
|
||||
MEMSET(data + bound.start(), 0, (bound.range_size() * res_vec->get_length(0)));
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
@ -176,9 +176,9 @@ public:
|
||||
const ObRegisterDmInfo &get_register_dm_info() { return register_dm_info_; }
|
||||
uint64_t &get_dm_cb_node_seq_id() { return dm_cb_node_seq_id_; }
|
||||
template <typename ResVec>
|
||||
int proc_filter_empty(ResVec *res_vec, const ObBitVector &skip, int64_t batch_size,
|
||||
int proc_filter_empty(ResVec *res_vec, const ObBitVector &skip, const EvalBound &bound,
|
||||
int64_t &total_count, int64_t &filter_count);
|
||||
int preset_not_match(IntegerFixedVec *res_vec, int64_t batch_size);
|
||||
int preset_not_match(IntegerFixedVec *res_vec, const EvalBound &bound);
|
||||
TO_STRING_KV(K(p2p_datahub_id_), K_(px_sequence_id), K(tenant_id_), K(timeout_ts_), K(is_active_), K(msg_type_));
|
||||
protected:
|
||||
common::ObCurTraceId::TraceId trace_id_;
|
||||
|
||||
@ -738,19 +738,19 @@ int ObRFBloomFilterMsg::insert_by_row(
|
||||
|
||||
template <VectorFormat ResFormat, typename ResVec>
|
||||
int ObRFBloomFilterMsg::fill_vec_result(ResVec *res_vec, const ObBitVector &skip,
|
||||
int64_t batch_size, uint64_t *hash_values,
|
||||
const EvalBound &bound, uint64_t *hash_values,
|
||||
int64_t &total_count, int64_t &filter_count)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool is_match = true;
|
||||
const int64_t is_match_payload = 1; // for VEC_FIXED set set_payload, always 1
|
||||
if (OB_FAIL(ObBitVector::flip_foreach(
|
||||
skip, batch_size, [&](int64_t idx) __attribute__((always_inline)) {
|
||||
skip, bound, [&](int64_t idx) __attribute__((always_inline)) {
|
||||
bloom_filter_.prefetch_bits_block(hash_values[idx]);
|
||||
return OB_SUCCESS;
|
||||
}))) {
|
||||
} else if (OB_FAIL(ObBitVector::flip_foreach(
|
||||
skip, batch_size, [&](int64_t idx) __attribute__((always_inline)) {
|
||||
skip, bound, [&](int64_t idx) __attribute__((always_inline)) {
|
||||
ret = bloom_filter_.might_contain(hash_values[idx], is_match);
|
||||
if (OB_SUCC(ret)) {
|
||||
if (ResFormat == VEC_FIXED) {
|
||||
@ -783,14 +783,13 @@ int ObRFBloomFilterMsg::do_might_contain_vector(
|
||||
int64_t total_count = 0;
|
||||
int64_t filter_count = 0;
|
||||
bool is_match = true;
|
||||
int64_t batch_size = bound.batch_size();
|
||||
uint64_t seed = ObExprJoinFilter::JOIN_FILTER_SEED;
|
||||
ObBitVector &eval_flags = expr.get_evaluated_flags(ctx);
|
||||
uint64_t *hash_values = filter_ctx.right_hash_vals_;
|
||||
VectorFormat res_format = expr.get_format(ctx);
|
||||
if (VEC_FIXED == res_format) {
|
||||
IntegerFixedVec *res_vec = static_cast<IntegerFixedVec *>(expr.get_vector(ctx));
|
||||
if (OB_FAIL(preset_not_match(res_vec, batch_size))) {
|
||||
if (OB_FAIL(preset_not_match(res_vec, bound))) {
|
||||
LOG_WARN("failed to preset_not_match", K(ret));
|
||||
}
|
||||
}
|
||||
@ -811,11 +810,11 @@ int ObRFBloomFilterMsg::do_might_contain_vector(
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (VEC_UNIFORM == res_format) {
|
||||
IntegerUniVec *res_vec = static_cast<IntegerUniVec *>(expr.get_vector(ctx));
|
||||
ret = fill_vec_result<VEC_UNIFORM, IntegerUniVec>(res_vec, skip, batch_size, hash_values,
|
||||
ret = fill_vec_result<VEC_UNIFORM, IntegerUniVec>(res_vec, skip, bound, hash_values,
|
||||
total_count, filter_count);
|
||||
} else if (VEC_FIXED == res_format) {
|
||||
IntegerFixedVec *res_vec = static_cast<IntegerFixedVec *>(expr.get_vector(ctx));
|
||||
ret = fill_vec_result<VEC_FIXED, IntegerFixedVec>(res_vec, skip, batch_size, hash_values,
|
||||
ret = fill_vec_result<VEC_FIXED, IntegerFixedVec>(res_vec, skip, bound, hash_values,
|
||||
total_count, filter_count);
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
@ -840,15 +839,14 @@ int ObRFBloomFilterMsg::might_contain_vector(
|
||||
if (OB_UNLIKELY(is_empty_)) {
|
||||
int64_t total_count = 0;
|
||||
int64_t filter_count = 0;
|
||||
const int64_t batch_size = bound.batch_size();
|
||||
ObBitVector &eval_flags = expr.get_evaluated_flags(ctx);
|
||||
VectorFormat res_format = expr.get_format(ctx);
|
||||
if (VEC_UNIFORM == res_format) {
|
||||
IntegerUniVec *res_vec = static_cast<IntegerUniVec *>(expr.get_vector(ctx));
|
||||
ret = proc_filter_empty(res_vec, skip, batch_size, total_count, filter_count);
|
||||
ret = proc_filter_empty(res_vec, skip, bound, total_count, filter_count);
|
||||
} else if (VEC_FIXED == res_format) {
|
||||
IntegerFixedVec *res_vec = static_cast<IntegerFixedVec *>(expr.get_vector(ctx));
|
||||
ret = proc_filter_empty(res_vec, skip, batch_size, total_count, filter_count);
|
||||
ret = proc_filter_empty(res_vec, skip, bound, total_count, filter_count);
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
eval_flags.set_all(true);
|
||||
|
||||
@ -115,7 +115,7 @@ private:
|
||||
int insert_partition_bloom_filter(ArgVec *arg_vec, const ObBatchRows *child_brs,
|
||||
uint64_t *batch_hash_values);
|
||||
template <VectorFormat ResFormat, typename ResVec>
|
||||
int fill_vec_result(ResVec *res_vec, const ObBitVector &skip, int64_t batch_size,
|
||||
int fill_vec_result(ResVec *res_vec, const ObBitVector &skip, const EvalBound &bound,
|
||||
uint64_t *hash_values, int64_t &total_count, int64_t &filter_count);
|
||||
|
||||
public:
|
||||
|
||||
@ -28,15 +28,15 @@ using namespace oceanbase::sql;
|
||||
using namespace oceanbase::share;
|
||||
|
||||
template <typename ResVec>
|
||||
static int proc_filter_not_active(ResVec *res_vec, const ObBitVector &skip, int64_t batch_size);
|
||||
static int proc_filter_not_active(ResVec *res_vec, const ObBitVector &skip, const EvalBound &bound);
|
||||
|
||||
template <>
|
||||
int proc_filter_not_active<IntegerUniVec>(IntegerUniVec *res_vec, const ObBitVector &skip,
|
||||
int64_t batch_size)
|
||||
const EvalBound &bound)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(ObBitVector::flip_foreach(
|
||||
skip, batch_size, [&](int64_t idx) __attribute__((always_inline)) {
|
||||
skip, bound, [&](int64_t idx) __attribute__((always_inline)) {
|
||||
res_vec->set_int(idx, 1);
|
||||
return OB_SUCCESS;
|
||||
}))) {
|
||||
@ -47,11 +47,11 @@ int proc_filter_not_active<IntegerUniVec>(IntegerUniVec *res_vec, const ObBitVec
|
||||
|
||||
template <>
|
||||
int proc_filter_not_active<IntegerFixedVec>(IntegerFixedVec *res_vec, const ObBitVector &skip,
|
||||
int64_t batch_size)
|
||||
const EvalBound &bound)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
uint64_t *data = reinterpret_cast<uint64_t *>(res_vec->get_data());
|
||||
MEMSET(data, 1, (batch_size * res_vec->get_length(0)));
|
||||
MEMSET(data + bound.start(), 1, (bound.range_size() * res_vec->get_length(0)));
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -714,7 +714,7 @@ int ObRFRangeFilterVecMsg::do_might_contain_vector(
|
||||
|
||||
if (VEC_FIXED == res_format) {
|
||||
IntegerFixedVec *res_vec = static_cast<IntegerFixedVec *>(expr.get_vector(ctx));
|
||||
if (OB_FAIL(preset_not_match(res_vec, batch_size))) {
|
||||
if (OB_FAIL(preset_not_match(res_vec, bound))) {
|
||||
LOG_WARN("failed to preset_not_match", K(ret));
|
||||
}
|
||||
}
|
||||
@ -734,7 +734,7 @@ int ObRFRangeFilterVecMsg::do_might_contain_vector(
|
||||
bool is_match = true;
|
||||
const int64_t is_match_payload = 1; // for VEC_FIXED set set_payload, always 1
|
||||
ObDatum datum;
|
||||
for (int64_t batch_i = 0; batch_i < batch_size; ++batch_i) {
|
||||
for (int64_t batch_i = bound.start(); batch_i < bound.end() && OB_SUCC(ret); ++batch_i) {
|
||||
if (skip.at(batch_i)) {
|
||||
continue;
|
||||
} else {
|
||||
@ -799,15 +799,14 @@ int ObRFRangeFilterVecMsg::might_contain_vector(
|
||||
if (OB_UNLIKELY(is_empty_)) {
|
||||
int64_t total_count = 0;
|
||||
int64_t filter_count = 0;
|
||||
const int64_t batch_size = bound.batch_size();
|
||||
ObBitVector &eval_flags = expr.get_evaluated_flags(ctx);
|
||||
VectorFormat res_format = expr.get_format(ctx);
|
||||
if (VEC_UNIFORM == res_format) {
|
||||
IntegerUniVec *res_vec = static_cast<IntegerUniVec *>(expr.get_vector(ctx));
|
||||
ret = proc_filter_empty(res_vec, skip, batch_size, total_count, filter_count);
|
||||
ret = proc_filter_empty(res_vec, skip, bound, total_count, filter_count);
|
||||
} else if (VEC_FIXED == res_format) {
|
||||
IntegerFixedVec *res_vec = static_cast<IntegerFixedVec *>(expr.get_vector(ctx));
|
||||
ret = proc_filter_empty(res_vec, skip, batch_size, total_count, filter_count);
|
||||
ret = proc_filter_empty(res_vec, skip, bound, total_count, filter_count);
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
eval_flags.set_all(true);
|
||||
@ -1545,7 +1544,7 @@ int ObRFInFilterVecMsg::do_might_contain_vector(
|
||||
uint64_t *right_hash_vals = filter_ctx.right_hash_vals_;
|
||||
if (VEC_FIXED == res_format) {
|
||||
IntegerFixedVec *res_vec = static_cast<IntegerFixedVec *>(expr.get_vector(ctx));
|
||||
if (OB_FAIL(preset_not_match(res_vec, batch_size))) {
|
||||
if (OB_FAIL(preset_not_match(res_vec, bound))) {
|
||||
LOG_WARN("failed to preset_not_match", K(ret));
|
||||
}
|
||||
}
|
||||
@ -1573,7 +1572,7 @@ int ObRFInFilterVecMsg::do_might_contain_vector(
|
||||
bool is_match = true;
|
||||
const int64_t is_match_payload = 1; // for VEC_FIXED set set_payload, always 1
|
||||
ObDatum datum;
|
||||
for (int64_t batch_i = 0; batch_i < batch_size && OB_SUCC(ret); ++batch_i) {
|
||||
for (int64_t batch_i = bound.start(); batch_i < bound.end() && OB_SUCC(ret); ++batch_i) {
|
||||
if (skip.at(batch_i)) {
|
||||
continue;
|
||||
} else {
|
||||
@ -1635,15 +1634,14 @@ int ObRFInFilterVecMsg::might_contain_vector(
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (!is_active_) {
|
||||
const int64_t batch_size = bound.batch_size();
|
||||
ObBitVector &eval_flags = expr.get_evaluated_flags(ctx);
|
||||
VectorFormat res_format = expr.get_format(ctx);
|
||||
if (VEC_UNIFORM == res_format) {
|
||||
IntegerUniVec *res_vec = static_cast<IntegerUniVec *>(expr.get_vector(ctx));
|
||||
ret = proc_filter_not_active(res_vec, skip, batch_size);
|
||||
ret = proc_filter_not_active(res_vec, skip, bound);
|
||||
} else if (VEC_FIXED == res_format) {
|
||||
IntegerFixedVec *res_vec = static_cast<IntegerFixedVec *>(expr.get_vector(ctx));
|
||||
ret = proc_filter_not_active(res_vec, skip, batch_size);
|
||||
ret = proc_filter_not_active(res_vec, skip, bound);
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
eval_flags.set_all(true);
|
||||
@ -1651,15 +1649,14 @@ int ObRFInFilterVecMsg::might_contain_vector(
|
||||
} else if (OB_UNLIKELY(is_empty_)) {
|
||||
int64_t total_count = 0;
|
||||
int64_t filter_count = 0;
|
||||
const int64_t batch_size = bound.batch_size();
|
||||
ObBitVector &eval_flags = expr.get_evaluated_flags(ctx);
|
||||
VectorFormat res_format = expr.get_format(ctx);
|
||||
if (VEC_UNIFORM == res_format) {
|
||||
IntegerUniVec *res_vec = static_cast<IntegerUniVec *>(expr.get_vector(ctx));
|
||||
ret = proc_filter_empty(res_vec, skip, batch_size, total_count, filter_count);
|
||||
ret = proc_filter_empty(res_vec, skip, bound, total_count, filter_count);
|
||||
} else if (VEC_FIXED == res_format) {
|
||||
IntegerFixedVec *res_vec = static_cast<IntegerFixedVec *>(expr.get_vector(ctx));
|
||||
ret = proc_filter_empty(res_vec, skip, batch_size, total_count, filter_count);
|
||||
ret = proc_filter_empty(res_vec, skip, bound, total_count, filter_count);
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
eval_flags.set_all(true);
|
||||
|
||||
Reference in New Issue
Block a user