adapt runtime filter eval_vector interface for single row calculation.
This commit is contained in:
@ -128,8 +128,7 @@ private:
|
|||||||
return Op::template hash<HashMethod, hash_v2>(meta, vec.get_payload(i),
|
return Op::template hash<HashMethod, hash_v2>(meta, vec.get_payload(i),
|
||||||
vec.get_length(i), seed_vec[i], hash_values[i]);
|
vec.get_length(i), seed_vec[i], hash_values[i]);
|
||||||
};
|
};
|
||||||
//TODO shengle flip_foreach bound
|
ret = sql::ObBitVector::flip_foreach(skip, bound, op);
|
||||||
ret = sql::ObBitVector::flip_foreach(skip, bound.batch_size(), op);
|
|
||||||
} else { /*has_null && !all_active*/
|
} else { /*has_null && !all_active*/
|
||||||
auto op = [&](const int64_t i) __attribute__((always_inline)) {
|
auto op = [&](const int64_t i) __attribute__((always_inline)) {
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
@ -141,8 +140,7 @@ private:
|
|||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
};
|
};
|
||||||
//TODO shengle flip_foreach bound
|
ret = sql::ObBitVector::flip_foreach(skip, bound, op);
|
||||||
ret = sql::ObBitVector::flip_foreach(skip, bound.batch_size(), op);
|
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
@ -48,14 +48,15 @@ namespace sql
|
|||||||
}
|
}
|
||||||
|
|
||||||
template <typename ResVec>
|
template <typename ResVec>
|
||||||
static int proc_if_das(ResVec *res_vec, const ObBitVector &skip, int64_t batch_size);
|
static int proc_if_das(ResVec *res_vec, const ObBitVector &skip, const EvalBound &bound);
|
||||||
|
|
||||||
template <>
|
template <>
|
||||||
int proc_if_das<IntegerUniVec>(IntegerUniVec *res_vec, const ObBitVector &skip, int64_t batch_size)
|
int proc_if_das<IntegerUniVec>(IntegerUniVec *res_vec, const ObBitVector &skip,
|
||||||
|
const EvalBound &bound)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (OB_FAIL(ObBitVector::flip_foreach(
|
if (OB_FAIL(ObBitVector::flip_foreach(
|
||||||
skip, batch_size, [&](int64_t idx) __attribute__((always_inline)) {
|
skip, bound, [&](int64_t idx) __attribute__((always_inline)) {
|
||||||
res_vec->set_int(idx, 1);
|
res_vec->set_int(idx, 1);
|
||||||
return OB_SUCCESS;
|
return OB_SUCCESS;
|
||||||
}))) {
|
}))) {
|
||||||
@ -66,26 +67,27 @@ int proc_if_das<IntegerUniVec>(IntegerUniVec *res_vec, const ObBitVector &skip,
|
|||||||
|
|
||||||
template <>
|
template <>
|
||||||
int proc_if_das<IntegerFixedVec>(IntegerFixedVec *res_vec, const ObBitVector &skip,
|
int proc_if_das<IntegerFixedVec>(IntegerFixedVec *res_vec, const ObBitVector &skip,
|
||||||
int64_t batch_size)
|
const EvalBound &bound)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
uint64_t *data = reinterpret_cast<uint64_t *>(res_vec->get_data());
|
uint64_t *data = reinterpret_cast<uint64_t *>(res_vec->get_data());
|
||||||
MEMSET(data, 1, (batch_size * res_vec->get_length(0)));
|
MEMSET(data + bound.start(), 1, (bound.range_size() * res_vec->get_length(0)));
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename ResVec>
|
template <typename ResVec>
|
||||||
static int proc_by_pass(ResVec *res_vec, const ObBitVector &skip, int64_t batch_size,
|
static int proc_by_pass(ResVec *res_vec, const ObBitVector &skip, const EvalBound &bound,
|
||||||
ObExprJoinFilter::ObExprJoinFilterContext *join_filter_ctx);
|
ObExprJoinFilter::ObExprJoinFilterContext *join_filter_ctx);
|
||||||
|
|
||||||
template <>
|
template <>
|
||||||
int proc_by_pass<IntegerUniVec>(IntegerUniVec *res_vec, const ObBitVector &skip, int64_t batch_size,
|
int proc_by_pass<IntegerUniVec>(IntegerUniVec *res_vec, const ObBitVector &skip,
|
||||||
|
const EvalBound &bound,
|
||||||
ObExprJoinFilter::ObExprJoinFilterContext *join_filter_ctx)
|
ObExprJoinFilter::ObExprJoinFilterContext *join_filter_ctx)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
int valid_cnt = 0;
|
int valid_cnt = 0;
|
||||||
if (OB_FAIL(ObBitVector::flip_foreach(
|
if (OB_FAIL(ObBitVector::flip_foreach(
|
||||||
skip, batch_size, [&](int64_t idx) __attribute__((always_inline)) {
|
skip, bound, [&](int64_t idx) __attribute__((always_inline)) {
|
||||||
++valid_cnt;
|
++valid_cnt;
|
||||||
res_vec->set_int(idx, 1);
|
res_vec->set_int(idx, 1);
|
||||||
return OB_SUCCESS;
|
return OB_SUCCESS;
|
||||||
@ -98,14 +100,14 @@ int proc_by_pass<IntegerUniVec>(IntegerUniVec *res_vec, const ObBitVector &skip,
|
|||||||
|
|
||||||
template <>
|
template <>
|
||||||
int proc_by_pass<IntegerFixedVec>(IntegerFixedVec *res_vec, const ObBitVector &skip,
|
int proc_by_pass<IntegerFixedVec>(IntegerFixedVec *res_vec, const ObBitVector &skip,
|
||||||
int64_t batch_size,
|
const EvalBound &bound,
|
||||||
ObExprJoinFilter::ObExprJoinFilterContext *join_filter_ctx)
|
ObExprJoinFilter::ObExprJoinFilterContext *join_filter_ctx)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
uint64_t *data = reinterpret_cast<uint64_t *>(res_vec->get_data());
|
uint64_t *data = reinterpret_cast<uint64_t *>(res_vec->get_data());
|
||||||
MEMSET(data, 1, (batch_size * res_vec->get_length(0)));
|
MEMSET(data + bound.start(), 1, (bound.range_size() * res_vec->get_length(0)));
|
||||||
|
|
||||||
int64_t valid_cnt = batch_size - skip.accumulate_bit_cnt(batch_size);
|
int64_t valid_cnt = bound.range_size() - skip.accumulate_bit_cnt(bound);
|
||||||
join_filter_ctx->n_times_ += valid_cnt;
|
join_filter_ctx->n_times_ += valid_cnt;
|
||||||
join_filter_ctx->total_count_ += valid_cnt;
|
join_filter_ctx->total_count_ += valid_cnt;
|
||||||
ObExprJoinFilter::collect_sample_info_batch(*join_filter_ctx, 0, valid_cnt);
|
ObExprJoinFilter::collect_sample_info_batch(*join_filter_ctx, 0, valid_cnt);
|
||||||
@ -532,7 +534,6 @@ int ObExprJoinFilter::eval_filter_vector_internal(
|
|||||||
const ObExpr &expr, ObEvalCtx &ctx, const ObBitVector &skip, const EvalBound &bound)
|
const ObExpr &expr, ObEvalCtx &ctx, const ObBitVector &skip, const EvalBound &bound)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
int64_t batch_size = bound.batch_size();
|
|
||||||
uint64_t op_id = expr.expr_ctx_id_;
|
uint64_t op_id = expr.expr_ctx_id_;
|
||||||
ObExecContext &exec_ctx = ctx.exec_ctx_;
|
ObExecContext &exec_ctx = ctx.exec_ctx_;
|
||||||
ObExprJoinFilterContext *join_filter_ctx = NULL;
|
ObExprJoinFilterContext *join_filter_ctx = NULL;
|
||||||
@ -543,10 +544,10 @@ int ObExprJoinFilter::eval_filter_vector_internal(
|
|||||||
// join filter ctx may be null in das.
|
// join filter ctx may be null in das.
|
||||||
if (VEC_UNIFORM == res_format) {
|
if (VEC_UNIFORM == res_format) {
|
||||||
IntegerUniVec *res_vec = static_cast<IntegerUniVec *>(expr.get_vector(ctx));
|
IntegerUniVec *res_vec = static_cast<IntegerUniVec *>(expr.get_vector(ctx));
|
||||||
ret = proc_if_das(res_vec, skip, batch_size);
|
ret = proc_if_das(res_vec, skip, bound);
|
||||||
} else if (VEC_FIXED == res_format) {
|
} else if (VEC_FIXED == res_format) {
|
||||||
IntegerFixedVec *res_vec = static_cast<IntegerFixedVec *>(expr.get_vector(ctx));
|
IntegerFixedVec *res_vec = static_cast<IntegerFixedVec *>(expr.get_vector(ctx));
|
||||||
ret = proc_if_das(res_vec, skip, batch_size);
|
ret = proc_if_das(res_vec, skip, bound);
|
||||||
}
|
}
|
||||||
eval_flags.set_all(true);
|
eval_flags.set_all(true);
|
||||||
} else {
|
} else {
|
||||||
@ -563,10 +564,10 @@ int ObExprJoinFilter::eval_filter_vector_internal(
|
|||||||
// rf_msg_ dynamic_disable: disable filter when filter rate < 0.5
|
// rf_msg_ dynamic_disable: disable filter when filter rate < 0.5
|
||||||
if (VEC_UNIFORM == res_format) {
|
if (VEC_UNIFORM == res_format) {
|
||||||
IntegerUniVec *res_vec = static_cast<IntegerUniVec *>(expr.get_vector(ctx));
|
IntegerUniVec *res_vec = static_cast<IntegerUniVec *>(expr.get_vector(ctx));
|
||||||
ret = proc_by_pass(res_vec, skip, batch_size, join_filter_ctx);
|
ret = proc_by_pass(res_vec, skip, bound, join_filter_ctx);
|
||||||
} else if (VEC_FIXED == res_format) {
|
} else if (VEC_FIXED == res_format) {
|
||||||
IntegerFixedVec *res_vec = static_cast<IntegerFixedVec *>(expr.get_vector(ctx));
|
IntegerFixedVec *res_vec = static_cast<IntegerFixedVec *>(expr.get_vector(ctx));
|
||||||
ret = proc_by_pass(res_vec, skip, batch_size, join_filter_ctx);
|
ret = proc_by_pass(res_vec, skip, bound, join_filter_ctx);
|
||||||
}
|
}
|
||||||
eval_flags.set_all(true);
|
eval_flags.set_all(true);
|
||||||
} else if (OB_FAIL(join_filter_ctx->rf_msg_->might_contain_vector(expr, ctx, skip, bound,
|
} else if (OB_FAIL(join_filter_ctx->rf_msg_->might_contain_vector(expr, ctx, skip, bound,
|
||||||
|
@ -151,17 +151,31 @@ public:
|
|||||||
*/
|
*/
|
||||||
template <typename OP>
|
template <typename OP>
|
||||||
static OB_INLINE int flip_foreach(const ObBitVectorImpl<WordType> &skip, int64_t size, OP op);
|
static OB_INLINE int flip_foreach(const ObBitVectorImpl<WordType> &skip, int64_t size, OP op);
|
||||||
|
template <typename OP>
|
||||||
|
static OB_INLINE int flip_foreach(const ObBitVectorImpl<WordType> &skip, const EvalBound &bound,
|
||||||
|
OP op);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* access all bit that it's 1
|
* access all bit that it's 1
|
||||||
*/
|
*/
|
||||||
template <typename OP>
|
template <typename OP>
|
||||||
static OB_INLINE int foreach(const ObBitVectorImpl<WordType> &skip, int64_t size, OP op);
|
static OB_INLINE int foreach(const ObBitVectorImpl<WordType> &skip, int64_t size, OP op);
|
||||||
|
template <typename OP>
|
||||||
|
static OB_INLINE int foreach(const ObBitVectorImpl<WordType> &skip, const EvalBound &bound,
|
||||||
|
OP op);
|
||||||
public:
|
public:
|
||||||
OB_INLINE static int64_t popcount64(uint64_t v);
|
OB_INLINE static int64_t popcount64(uint64_t v);
|
||||||
private:
|
private:
|
||||||
|
/**
|
||||||
|
* the pos in [start_idx, end_idx) will be traversed
|
||||||
|
*/
|
||||||
template <bool IS_FLIP, typename OP>
|
template <bool IS_FLIP, typename OP>
|
||||||
static OB_INLINE int inner_foreach(const ObBitVectorImpl<WordType> &skip, int64_t size, OP op);
|
static OB_INLINE int inner_foreach(const ObBitVectorImpl<WordType> &skip, int64_t start_idx,
|
||||||
|
int64_t end_idx, OP op);
|
||||||
|
template <typename OP>
|
||||||
|
static OB_INLINE int inner_foreach_one_word(const WordType &s_word, const int64_t step_size,
|
||||||
|
int64_t &step, OP op);
|
||||||
|
|
||||||
public:
|
public:
|
||||||
WordType data_[0];
|
WordType data_[0];
|
||||||
};
|
};
|
||||||
@ -629,54 +643,93 @@ inline void ObBitVectorImpl<WordType>::bit_or(const ObBitVectorImpl<WordType> &s
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
template<typename WordType>
|
template <typename WordType>
|
||||||
|
template <typename OP>
|
||||||
|
OB_INLINE int ObBitVectorImpl<WordType>::inner_foreach_one_word(const WordType &s_word,
|
||||||
|
const int64_t step_size,
|
||||||
|
int64_t &step, OP op)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
if (s_word > 0) {
|
||||||
|
WordType tmp_s_word = s_word;
|
||||||
|
int64_t tmp_step = step;
|
||||||
|
do {
|
||||||
|
uint16_t step_val = tmp_s_word & 0xFFFF;
|
||||||
|
if (0xFFFF == step_val) {
|
||||||
|
for (int64_t j = 0; OB_SUCC(ret) && j < step_size; j++) {
|
||||||
|
int64_t k = j + tmp_step;
|
||||||
|
ret = op(k);
|
||||||
|
}
|
||||||
|
} else if (step_val > 0) {
|
||||||
|
do {
|
||||||
|
int64_t start_bit_idx = __builtin_ctz(step_val);
|
||||||
|
int64_t k = start_bit_idx + tmp_step;
|
||||||
|
ret = op(k);
|
||||||
|
step_val &= (step_val - 1);
|
||||||
|
} while (step_val > 0 && OB_SUCC(ret)); // end for, for one step size
|
||||||
|
}
|
||||||
|
tmp_step += step_size;
|
||||||
|
tmp_s_word >>= step_size;
|
||||||
|
} while (tmp_s_word > 0 && OB_SUCC(ret)); // one word-uint64_t
|
||||||
|
}
|
||||||
|
step += WORD_BITS;
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
template <typename WordType>
|
||||||
template <bool IS_FLIP, typename OP>
|
template <bool IS_FLIP, typename OP>
|
||||||
OB_INLINE int ObBitVectorImpl<WordType>::inner_foreach(const ObBitVectorImpl<WordType> &skip, int64_t size, OP op)
|
OB_INLINE int ObBitVectorImpl<WordType>::inner_foreach(const ObBitVectorImpl<WordType> &skip,
|
||||||
|
int64_t start_idx, int64_t end_idx, OP op)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
int64_t tmp_step = 0;
|
int64_t tmp_step = 0;
|
||||||
typedef uint16_t StepType;
|
typedef uint16_t StepType;
|
||||||
const int64_t step_size = sizeof(StepType) * CHAR_BIT;
|
const int64_t step_size = sizeof(StepType) * CHAR_BIT;
|
||||||
int64_t word_cnt = ObBitVectorImpl<WordType>::word_count(size);
|
|
||||||
int64_t step = 0;
|
int64_t start_cnt = 0;
|
||||||
const int64_t remain = size % ObBitVectorImpl<WordType>::WORD_BITS;
|
int64_t end_cnt = 0;
|
||||||
for (int64_t i = 0; i < word_cnt && OB_SUCC(ret); ++i) {
|
WordType start_mask = 0;
|
||||||
WordType s_word = (IS_FLIP ? ~skip.data_[i] : skip.data_[i]);
|
WordType end_mask = 0;
|
||||||
// bool all_bits = (IS_FLIP ? skip.data_[i] == 0 : (~skip.data_[i]) == 0);
|
get_start_end_mask(start_idx, end_idx, start_mask, end_mask, start_cnt, end_cnt);
|
||||||
if (i >= word_cnt - 1 && remain > 0) {
|
// eg. start_remain = 5, start_mask = 11111....11100000
|
||||||
// all_bits = ((IS_FLIP ? skip.data_[i] : ~skip.data_[i]) & ((1LU << remain) - 1)) == 0;
|
// | |
|
||||||
s_word = s_word & ((1LU << remain) - 1);
|
// \ /
|
||||||
|
// nums of '0' == start_remain
|
||||||
|
|
||||||
|
// eg. end_remain = 5, end_mask = 00000000....11111
|
||||||
|
// | |
|
||||||
|
// \ /
|
||||||
|
// nums of '1' == end_remain
|
||||||
|
int64_t step = WORD_BITS * start_cnt; // the bit pos offset of the first word
|
||||||
|
if (start_cnt == end_cnt) {
|
||||||
|
// if only one word, both start_mask and end_mask should be used
|
||||||
|
WordType one_word_mask = start_mask & end_mask;
|
||||||
|
WordType s_word = (IS_FLIP ? ~skip.data_[start_cnt] : skip.data_[start_cnt]);
|
||||||
|
s_word = s_word & one_word_mask;
|
||||||
|
ret = inner_foreach_one_word(s_word, step_size, step, op);
|
||||||
|
} else {
|
||||||
|
// process first word, which may not a complete word
|
||||||
|
WordType s_word = (IS_FLIP ? ~skip.data_[start_cnt] : skip.data_[start_cnt]);
|
||||||
|
if (start_mask > 0) {
|
||||||
|
s_word = s_word & start_mask;
|
||||||
}
|
}
|
||||||
if (s_word > 0) {
|
// process words in the middle, all of these are whole word
|
||||||
WordType tmp_s_word = s_word;
|
if (OB_FAIL(inner_foreach_one_word(s_word, step_size, step, op))) {
|
||||||
tmp_step = step;
|
} else {
|
||||||
do {
|
for (int64_t i = start_cnt + 1; i < end_cnt && OB_SUCC(ret); ++i) {
|
||||||
uint16_t step_val = tmp_s_word & 0xFFFF;
|
WordType s_word = (IS_FLIP ? ~skip.data_[i] : skip.data_[i]);
|
||||||
if (0xFFFF == step_val) {
|
ret = inner_foreach_one_word(s_word, step_size, step, op);
|
||||||
// no skip
|
}
|
||||||
// last batch ?
|
|
||||||
int64_t mini_cnt = step_size;
|
|
||||||
if (tmp_step + step_size > size) {
|
|
||||||
mini_cnt = size - tmp_step;
|
|
||||||
}
|
|
||||||
for (int64_t j = 0; OB_SUCC(ret) && j < mini_cnt; j++) {
|
|
||||||
int64_t k = j + tmp_step;
|
|
||||||
ret = op(k);
|
|
||||||
}
|
|
||||||
} else if (step_val > 0) {
|
|
||||||
do {
|
|
||||||
int64_t start_bit_idx = __builtin_ctz(step_val);
|
|
||||||
int64_t k = start_bit_idx + tmp_step;
|
|
||||||
ret = op(k);
|
|
||||||
step_val &= (step_val - 1);
|
|
||||||
} while (step_val > 0 && OB_SUCC(ret)); // end for, for one step size
|
|
||||||
}
|
|
||||||
tmp_step += step_size;
|
|
||||||
tmp_s_word >>= step_size;
|
|
||||||
} while (tmp_s_word > 0 && OB_SUCC(ret)); // one word-uint64_t
|
|
||||||
}
|
}
|
||||||
step += ObBitVectorImpl<WordType>::WORD_BITS;
|
if (OB_SUCC(ret)) {
|
||||||
} // end for
|
// if end_mask > 0, means there is a incomplete word in the last
|
||||||
|
if (end_mask > 0) {
|
||||||
|
WordType s_word = (IS_FLIP ? ~skip.data_[end_cnt] : skip.data_[end_cnt]);
|
||||||
|
s_word = s_word & end_mask;
|
||||||
|
ret = inner_foreach_one_word(s_word, step_size, step, op);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -684,14 +737,30 @@ template<typename WordType>
|
|||||||
template <typename OP>
|
template <typename OP>
|
||||||
OB_INLINE int ObBitVectorImpl<WordType>::flip_foreach(const ObBitVectorImpl<WordType> &skip, int64_t size, OP op)
|
OB_INLINE int ObBitVectorImpl<WordType>::flip_foreach(const ObBitVectorImpl<WordType> &skip, int64_t size, OP op)
|
||||||
{
|
{
|
||||||
return ObBitVectorImpl<WordType>::inner_foreach<true, OP>(skip, size, op);
|
return ObBitVectorImpl<WordType>::inner_foreach<true, OP>(skip, 0 /*start_idx*/, size, op);
|
||||||
}
|
}
|
||||||
|
|
||||||
template<typename WordType>
|
template<typename WordType>
|
||||||
template <typename OP>
|
template <typename OP>
|
||||||
OB_INLINE int ObBitVectorImpl<WordType>::foreach(const ObBitVectorImpl<WordType> &skip, int64_t size, OP op)
|
OB_INLINE int ObBitVectorImpl<WordType>::foreach(const ObBitVectorImpl<WordType> &skip, int64_t size, OP op)
|
||||||
{
|
{
|
||||||
return ObBitVectorImpl<WordType>::inner_foreach<false, OP>(skip, size, op);
|
return ObBitVectorImpl<WordType>::inner_foreach<false, OP>(skip, 0 /*start_idx*/, size, op);
|
||||||
|
}
|
||||||
|
|
||||||
|
template <typename WordType>
|
||||||
|
template <typename OP>
|
||||||
|
OB_INLINE int ObBitVectorImpl<WordType>::flip_foreach(const ObBitVectorImpl<WordType> &skip,
|
||||||
|
const EvalBound &bound, OP op)
|
||||||
|
{
|
||||||
|
return ObBitVectorImpl<WordType>::inner_foreach<true, OP>(skip, bound.start(), bound.end(), op);
|
||||||
|
}
|
||||||
|
|
||||||
|
template <typename WordType>
|
||||||
|
template <typename OP>
|
||||||
|
OB_INLINE int ObBitVectorImpl<WordType>::foreach (const ObBitVectorImpl<WordType> &skip,
|
||||||
|
const EvalBound &bound, OP op)
|
||||||
|
{
|
||||||
|
return ObBitVectorImpl<WordType>::inner_foreach<false, OP>(skip, bound.start(), bound.end(), op);
|
||||||
}
|
}
|
||||||
|
|
||||||
} // end namespace sql
|
} // end namespace sql
|
||||||
|
@ -157,15 +157,15 @@ int ObP2PDatahubMsgBase::process_msg_internal(bool &need_free)
|
|||||||
template <>
|
template <>
|
||||||
int ObP2PDatahubMsgBase::proc_filter_empty<IntegerFixedVec>(IntegerFixedVec *res_vec,
|
int ObP2PDatahubMsgBase::proc_filter_empty<IntegerFixedVec>(IntegerFixedVec *res_vec,
|
||||||
const ObBitVector &skip,
|
const ObBitVector &skip,
|
||||||
int64_t batch_size,
|
const EvalBound &bound,
|
||||||
int64_t &total_count,
|
int64_t &total_count,
|
||||||
int64_t &filter_count)
|
int64_t &filter_count)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
uint64_t *data = reinterpret_cast<uint64_t *>(res_vec->get_data());
|
uint64_t *data = reinterpret_cast<uint64_t *>(res_vec->get_data());
|
||||||
MEMSET(data, 0, (batch_size * res_vec->get_length(0)));
|
MEMSET(data + bound.start(), 0, (bound.range_size() * res_vec->get_length(0)));
|
||||||
|
|
||||||
int64_t valid_cnt = batch_size - skip.accumulate_bit_cnt(batch_size);
|
int64_t valid_cnt = bound.range_size() - skip.accumulate_bit_cnt(bound);
|
||||||
total_count += valid_cnt;
|
total_count += valid_cnt;
|
||||||
filter_count += valid_cnt;
|
filter_count += valid_cnt;
|
||||||
return ret;
|
return ret;
|
||||||
@ -174,12 +174,13 @@ int ObP2PDatahubMsgBase::proc_filter_empty<IntegerFixedVec>(IntegerFixedVec *res
|
|||||||
template <>
|
template <>
|
||||||
int ObP2PDatahubMsgBase::proc_filter_empty<IntegerUniVec>(IntegerUniVec *res_vec,
|
int ObP2PDatahubMsgBase::proc_filter_empty<IntegerUniVec>(IntegerUniVec *res_vec,
|
||||||
const ObBitVector &skip,
|
const ObBitVector &skip,
|
||||||
int64_t batch_size, int64_t &total_count,
|
const EvalBound &bound,
|
||||||
|
int64_t &total_count,
|
||||||
int64_t &filter_count)
|
int64_t &filter_count)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (OB_FAIL(ObBitVector::flip_foreach(
|
if (OB_FAIL(ObBitVector::flip_foreach(
|
||||||
skip, batch_size, [&](int64_t idx) __attribute__((always_inline)) {
|
skip, bound, [&](int64_t idx) __attribute__((always_inline)) {
|
||||||
res_vec->set_int(idx, 0);
|
res_vec->set_int(idx, 0);
|
||||||
++filter_count;
|
++filter_count;
|
||||||
++total_count;
|
++total_count;
|
||||||
@ -190,11 +191,11 @@ int ObP2PDatahubMsgBase::proc_filter_empty<IntegerUniVec>(IntegerUniVec *res_vec
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObP2PDatahubMsgBase::preset_not_match(IntegerFixedVec *res_vec, int64_t batch_size)
|
int ObP2PDatahubMsgBase::preset_not_match(IntegerFixedVec *res_vec, const EvalBound &bound)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
uint64_t *data = reinterpret_cast<uint64_t *>(res_vec->get_data());
|
uint64_t *data = reinterpret_cast<uint64_t *>(res_vec->get_data());
|
||||||
MEMSET(data, 0, (batch_size * res_vec->get_length(0)));
|
MEMSET(data + bound.start(), 0, (bound.range_size() * res_vec->get_length(0)));
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -176,9 +176,9 @@ public:
|
|||||||
const ObRegisterDmInfo &get_register_dm_info() { return register_dm_info_; }
|
const ObRegisterDmInfo &get_register_dm_info() { return register_dm_info_; }
|
||||||
uint64_t &get_dm_cb_node_seq_id() { return dm_cb_node_seq_id_; }
|
uint64_t &get_dm_cb_node_seq_id() { return dm_cb_node_seq_id_; }
|
||||||
template <typename ResVec>
|
template <typename ResVec>
|
||||||
int proc_filter_empty(ResVec *res_vec, const ObBitVector &skip, int64_t batch_size,
|
int proc_filter_empty(ResVec *res_vec, const ObBitVector &skip, const EvalBound &bound,
|
||||||
int64_t &total_count, int64_t &filter_count);
|
int64_t &total_count, int64_t &filter_count);
|
||||||
int preset_not_match(IntegerFixedVec *res_vec, int64_t batch_size);
|
int preset_not_match(IntegerFixedVec *res_vec, const EvalBound &bound);
|
||||||
TO_STRING_KV(K(p2p_datahub_id_), K_(px_sequence_id), K(tenant_id_), K(timeout_ts_), K(is_active_), K(msg_type_));
|
TO_STRING_KV(K(p2p_datahub_id_), K_(px_sequence_id), K(tenant_id_), K(timeout_ts_), K(is_active_), K(msg_type_));
|
||||||
protected:
|
protected:
|
||||||
common::ObCurTraceId::TraceId trace_id_;
|
common::ObCurTraceId::TraceId trace_id_;
|
||||||
|
@ -738,19 +738,19 @@ int ObRFBloomFilterMsg::insert_by_row(
|
|||||||
|
|
||||||
template <VectorFormat ResFormat, typename ResVec>
|
template <VectorFormat ResFormat, typename ResVec>
|
||||||
int ObRFBloomFilterMsg::fill_vec_result(ResVec *res_vec, const ObBitVector &skip,
|
int ObRFBloomFilterMsg::fill_vec_result(ResVec *res_vec, const ObBitVector &skip,
|
||||||
int64_t batch_size, uint64_t *hash_values,
|
const EvalBound &bound, uint64_t *hash_values,
|
||||||
int64_t &total_count, int64_t &filter_count)
|
int64_t &total_count, int64_t &filter_count)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
bool is_match = true;
|
bool is_match = true;
|
||||||
const int64_t is_match_payload = 1; // for VEC_FIXED set set_payload, always 1
|
const int64_t is_match_payload = 1; // for VEC_FIXED set set_payload, always 1
|
||||||
if (OB_FAIL(ObBitVector::flip_foreach(
|
if (OB_FAIL(ObBitVector::flip_foreach(
|
||||||
skip, batch_size, [&](int64_t idx) __attribute__((always_inline)) {
|
skip, bound, [&](int64_t idx) __attribute__((always_inline)) {
|
||||||
bloom_filter_.prefetch_bits_block(hash_values[idx]);
|
bloom_filter_.prefetch_bits_block(hash_values[idx]);
|
||||||
return OB_SUCCESS;
|
return OB_SUCCESS;
|
||||||
}))) {
|
}))) {
|
||||||
} else if (OB_FAIL(ObBitVector::flip_foreach(
|
} else if (OB_FAIL(ObBitVector::flip_foreach(
|
||||||
skip, batch_size, [&](int64_t idx) __attribute__((always_inline)) {
|
skip, bound, [&](int64_t idx) __attribute__((always_inline)) {
|
||||||
ret = bloom_filter_.might_contain(hash_values[idx], is_match);
|
ret = bloom_filter_.might_contain(hash_values[idx], is_match);
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
if (ResFormat == VEC_FIXED) {
|
if (ResFormat == VEC_FIXED) {
|
||||||
@ -783,14 +783,13 @@ int ObRFBloomFilterMsg::do_might_contain_vector(
|
|||||||
int64_t total_count = 0;
|
int64_t total_count = 0;
|
||||||
int64_t filter_count = 0;
|
int64_t filter_count = 0;
|
||||||
bool is_match = true;
|
bool is_match = true;
|
||||||
int64_t batch_size = bound.batch_size();
|
|
||||||
uint64_t seed = ObExprJoinFilter::JOIN_FILTER_SEED;
|
uint64_t seed = ObExprJoinFilter::JOIN_FILTER_SEED;
|
||||||
ObBitVector &eval_flags = expr.get_evaluated_flags(ctx);
|
ObBitVector &eval_flags = expr.get_evaluated_flags(ctx);
|
||||||
uint64_t *hash_values = filter_ctx.right_hash_vals_;
|
uint64_t *hash_values = filter_ctx.right_hash_vals_;
|
||||||
VectorFormat res_format = expr.get_format(ctx);
|
VectorFormat res_format = expr.get_format(ctx);
|
||||||
if (VEC_FIXED == res_format) {
|
if (VEC_FIXED == res_format) {
|
||||||
IntegerFixedVec *res_vec = static_cast<IntegerFixedVec *>(expr.get_vector(ctx));
|
IntegerFixedVec *res_vec = static_cast<IntegerFixedVec *>(expr.get_vector(ctx));
|
||||||
if (OB_FAIL(preset_not_match(res_vec, batch_size))) {
|
if (OB_FAIL(preset_not_match(res_vec, bound))) {
|
||||||
LOG_WARN("failed to preset_not_match", K(ret));
|
LOG_WARN("failed to preset_not_match", K(ret));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -811,11 +810,11 @@ int ObRFBloomFilterMsg::do_might_contain_vector(
|
|||||||
if (OB_FAIL(ret)) {
|
if (OB_FAIL(ret)) {
|
||||||
} else if (VEC_UNIFORM == res_format) {
|
} else if (VEC_UNIFORM == res_format) {
|
||||||
IntegerUniVec *res_vec = static_cast<IntegerUniVec *>(expr.get_vector(ctx));
|
IntegerUniVec *res_vec = static_cast<IntegerUniVec *>(expr.get_vector(ctx));
|
||||||
ret = fill_vec_result<VEC_UNIFORM, IntegerUniVec>(res_vec, skip, batch_size, hash_values,
|
ret = fill_vec_result<VEC_UNIFORM, IntegerUniVec>(res_vec, skip, bound, hash_values,
|
||||||
total_count, filter_count);
|
total_count, filter_count);
|
||||||
} else if (VEC_FIXED == res_format) {
|
} else if (VEC_FIXED == res_format) {
|
||||||
IntegerFixedVec *res_vec = static_cast<IntegerFixedVec *>(expr.get_vector(ctx));
|
IntegerFixedVec *res_vec = static_cast<IntegerFixedVec *>(expr.get_vector(ctx));
|
||||||
ret = fill_vec_result<VEC_FIXED, IntegerFixedVec>(res_vec, skip, batch_size, hash_values,
|
ret = fill_vec_result<VEC_FIXED, IntegerFixedVec>(res_vec, skip, bound, hash_values,
|
||||||
total_count, filter_count);
|
total_count, filter_count);
|
||||||
}
|
}
|
||||||
if (OB_FAIL(ret)) {
|
if (OB_FAIL(ret)) {
|
||||||
@ -840,15 +839,14 @@ int ObRFBloomFilterMsg::might_contain_vector(
|
|||||||
if (OB_UNLIKELY(is_empty_)) {
|
if (OB_UNLIKELY(is_empty_)) {
|
||||||
int64_t total_count = 0;
|
int64_t total_count = 0;
|
||||||
int64_t filter_count = 0;
|
int64_t filter_count = 0;
|
||||||
const int64_t batch_size = bound.batch_size();
|
|
||||||
ObBitVector &eval_flags = expr.get_evaluated_flags(ctx);
|
ObBitVector &eval_flags = expr.get_evaluated_flags(ctx);
|
||||||
VectorFormat res_format = expr.get_format(ctx);
|
VectorFormat res_format = expr.get_format(ctx);
|
||||||
if (VEC_UNIFORM == res_format) {
|
if (VEC_UNIFORM == res_format) {
|
||||||
IntegerUniVec *res_vec = static_cast<IntegerUniVec *>(expr.get_vector(ctx));
|
IntegerUniVec *res_vec = static_cast<IntegerUniVec *>(expr.get_vector(ctx));
|
||||||
ret = proc_filter_empty(res_vec, skip, batch_size, total_count, filter_count);
|
ret = proc_filter_empty(res_vec, skip, bound, total_count, filter_count);
|
||||||
} else if (VEC_FIXED == res_format) {
|
} else if (VEC_FIXED == res_format) {
|
||||||
IntegerFixedVec *res_vec = static_cast<IntegerFixedVec *>(expr.get_vector(ctx));
|
IntegerFixedVec *res_vec = static_cast<IntegerFixedVec *>(expr.get_vector(ctx));
|
||||||
ret = proc_filter_empty(res_vec, skip, batch_size, total_count, filter_count);
|
ret = proc_filter_empty(res_vec, skip, bound, total_count, filter_count);
|
||||||
}
|
}
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
eval_flags.set_all(true);
|
eval_flags.set_all(true);
|
||||||
|
@ -115,7 +115,7 @@ private:
|
|||||||
int insert_partition_bloom_filter(ArgVec *arg_vec, const ObBatchRows *child_brs,
|
int insert_partition_bloom_filter(ArgVec *arg_vec, const ObBatchRows *child_brs,
|
||||||
uint64_t *batch_hash_values);
|
uint64_t *batch_hash_values);
|
||||||
template <VectorFormat ResFormat, typename ResVec>
|
template <VectorFormat ResFormat, typename ResVec>
|
||||||
int fill_vec_result(ResVec *res_vec, const ObBitVector &skip, int64_t batch_size,
|
int fill_vec_result(ResVec *res_vec, const ObBitVector &skip, const EvalBound &bound,
|
||||||
uint64_t *hash_values, int64_t &total_count, int64_t &filter_count);
|
uint64_t *hash_values, int64_t &total_count, int64_t &filter_count);
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
@ -28,15 +28,15 @@ using namespace oceanbase::sql;
|
|||||||
using namespace oceanbase::share;
|
using namespace oceanbase::share;
|
||||||
|
|
||||||
template <typename ResVec>
|
template <typename ResVec>
|
||||||
static int proc_filter_not_active(ResVec *res_vec, const ObBitVector &skip, int64_t batch_size);
|
static int proc_filter_not_active(ResVec *res_vec, const ObBitVector &skip, const EvalBound &bound);
|
||||||
|
|
||||||
template <>
|
template <>
|
||||||
int proc_filter_not_active<IntegerUniVec>(IntegerUniVec *res_vec, const ObBitVector &skip,
|
int proc_filter_not_active<IntegerUniVec>(IntegerUniVec *res_vec, const ObBitVector &skip,
|
||||||
int64_t batch_size)
|
const EvalBound &bound)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (OB_FAIL(ObBitVector::flip_foreach(
|
if (OB_FAIL(ObBitVector::flip_foreach(
|
||||||
skip, batch_size, [&](int64_t idx) __attribute__((always_inline)) {
|
skip, bound, [&](int64_t idx) __attribute__((always_inline)) {
|
||||||
res_vec->set_int(idx, 1);
|
res_vec->set_int(idx, 1);
|
||||||
return OB_SUCCESS;
|
return OB_SUCCESS;
|
||||||
}))) {
|
}))) {
|
||||||
@ -47,11 +47,11 @@ int proc_filter_not_active<IntegerUniVec>(IntegerUniVec *res_vec, const ObBitVec
|
|||||||
|
|
||||||
template <>
|
template <>
|
||||||
int proc_filter_not_active<IntegerFixedVec>(IntegerFixedVec *res_vec, const ObBitVector &skip,
|
int proc_filter_not_active<IntegerFixedVec>(IntegerFixedVec *res_vec, const ObBitVector &skip,
|
||||||
int64_t batch_size)
|
const EvalBound &bound)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
uint64_t *data = reinterpret_cast<uint64_t *>(res_vec->get_data());
|
uint64_t *data = reinterpret_cast<uint64_t *>(res_vec->get_data());
|
||||||
MEMSET(data, 1, (batch_size * res_vec->get_length(0)));
|
MEMSET(data + bound.start(), 1, (bound.range_size() * res_vec->get_length(0)));
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -714,7 +714,7 @@ int ObRFRangeFilterVecMsg::do_might_contain_vector(
|
|||||||
|
|
||||||
if (VEC_FIXED == res_format) {
|
if (VEC_FIXED == res_format) {
|
||||||
IntegerFixedVec *res_vec = static_cast<IntegerFixedVec *>(expr.get_vector(ctx));
|
IntegerFixedVec *res_vec = static_cast<IntegerFixedVec *>(expr.get_vector(ctx));
|
||||||
if (OB_FAIL(preset_not_match(res_vec, batch_size))) {
|
if (OB_FAIL(preset_not_match(res_vec, bound))) {
|
||||||
LOG_WARN("failed to preset_not_match", K(ret));
|
LOG_WARN("failed to preset_not_match", K(ret));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -734,7 +734,7 @@ int ObRFRangeFilterVecMsg::do_might_contain_vector(
|
|||||||
bool is_match = true;
|
bool is_match = true;
|
||||||
const int64_t is_match_payload = 1; // for VEC_FIXED set set_payload, always 1
|
const int64_t is_match_payload = 1; // for VEC_FIXED set set_payload, always 1
|
||||||
ObDatum datum;
|
ObDatum datum;
|
||||||
for (int64_t batch_i = 0; batch_i < batch_size; ++batch_i) {
|
for (int64_t batch_i = bound.start(); batch_i < bound.end() && OB_SUCC(ret); ++batch_i) {
|
||||||
if (skip.at(batch_i)) {
|
if (skip.at(batch_i)) {
|
||||||
continue;
|
continue;
|
||||||
} else {
|
} else {
|
||||||
@ -799,15 +799,14 @@ int ObRFRangeFilterVecMsg::might_contain_vector(
|
|||||||
if (OB_UNLIKELY(is_empty_)) {
|
if (OB_UNLIKELY(is_empty_)) {
|
||||||
int64_t total_count = 0;
|
int64_t total_count = 0;
|
||||||
int64_t filter_count = 0;
|
int64_t filter_count = 0;
|
||||||
const int64_t batch_size = bound.batch_size();
|
|
||||||
ObBitVector &eval_flags = expr.get_evaluated_flags(ctx);
|
ObBitVector &eval_flags = expr.get_evaluated_flags(ctx);
|
||||||
VectorFormat res_format = expr.get_format(ctx);
|
VectorFormat res_format = expr.get_format(ctx);
|
||||||
if (VEC_UNIFORM == res_format) {
|
if (VEC_UNIFORM == res_format) {
|
||||||
IntegerUniVec *res_vec = static_cast<IntegerUniVec *>(expr.get_vector(ctx));
|
IntegerUniVec *res_vec = static_cast<IntegerUniVec *>(expr.get_vector(ctx));
|
||||||
ret = proc_filter_empty(res_vec, skip, batch_size, total_count, filter_count);
|
ret = proc_filter_empty(res_vec, skip, bound, total_count, filter_count);
|
||||||
} else if (VEC_FIXED == res_format) {
|
} else if (VEC_FIXED == res_format) {
|
||||||
IntegerFixedVec *res_vec = static_cast<IntegerFixedVec *>(expr.get_vector(ctx));
|
IntegerFixedVec *res_vec = static_cast<IntegerFixedVec *>(expr.get_vector(ctx));
|
||||||
ret = proc_filter_empty(res_vec, skip, batch_size, total_count, filter_count);
|
ret = proc_filter_empty(res_vec, skip, bound, total_count, filter_count);
|
||||||
}
|
}
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
eval_flags.set_all(true);
|
eval_flags.set_all(true);
|
||||||
@ -1545,7 +1544,7 @@ int ObRFInFilterVecMsg::do_might_contain_vector(
|
|||||||
uint64_t *right_hash_vals = filter_ctx.right_hash_vals_;
|
uint64_t *right_hash_vals = filter_ctx.right_hash_vals_;
|
||||||
if (VEC_FIXED == res_format) {
|
if (VEC_FIXED == res_format) {
|
||||||
IntegerFixedVec *res_vec = static_cast<IntegerFixedVec *>(expr.get_vector(ctx));
|
IntegerFixedVec *res_vec = static_cast<IntegerFixedVec *>(expr.get_vector(ctx));
|
||||||
if (OB_FAIL(preset_not_match(res_vec, batch_size))) {
|
if (OB_FAIL(preset_not_match(res_vec, bound))) {
|
||||||
LOG_WARN("failed to preset_not_match", K(ret));
|
LOG_WARN("failed to preset_not_match", K(ret));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1573,7 +1572,7 @@ int ObRFInFilterVecMsg::do_might_contain_vector(
|
|||||||
bool is_match = true;
|
bool is_match = true;
|
||||||
const int64_t is_match_payload = 1; // for VEC_FIXED set set_payload, always 1
|
const int64_t is_match_payload = 1; // for VEC_FIXED set set_payload, always 1
|
||||||
ObDatum datum;
|
ObDatum datum;
|
||||||
for (int64_t batch_i = 0; batch_i < batch_size && OB_SUCC(ret); ++batch_i) {
|
for (int64_t batch_i = bound.start(); batch_i < bound.end() && OB_SUCC(ret); ++batch_i) {
|
||||||
if (skip.at(batch_i)) {
|
if (skip.at(batch_i)) {
|
||||||
continue;
|
continue;
|
||||||
} else {
|
} else {
|
||||||
@ -1635,15 +1634,14 @@ int ObRFInFilterVecMsg::might_contain_vector(
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (!is_active_) {
|
if (!is_active_) {
|
||||||
const int64_t batch_size = bound.batch_size();
|
|
||||||
ObBitVector &eval_flags = expr.get_evaluated_flags(ctx);
|
ObBitVector &eval_flags = expr.get_evaluated_flags(ctx);
|
||||||
VectorFormat res_format = expr.get_format(ctx);
|
VectorFormat res_format = expr.get_format(ctx);
|
||||||
if (VEC_UNIFORM == res_format) {
|
if (VEC_UNIFORM == res_format) {
|
||||||
IntegerUniVec *res_vec = static_cast<IntegerUniVec *>(expr.get_vector(ctx));
|
IntegerUniVec *res_vec = static_cast<IntegerUniVec *>(expr.get_vector(ctx));
|
||||||
ret = proc_filter_not_active(res_vec, skip, batch_size);
|
ret = proc_filter_not_active(res_vec, skip, bound);
|
||||||
} else if (VEC_FIXED == res_format) {
|
} else if (VEC_FIXED == res_format) {
|
||||||
IntegerFixedVec *res_vec = static_cast<IntegerFixedVec *>(expr.get_vector(ctx));
|
IntegerFixedVec *res_vec = static_cast<IntegerFixedVec *>(expr.get_vector(ctx));
|
||||||
ret = proc_filter_not_active(res_vec, skip, batch_size);
|
ret = proc_filter_not_active(res_vec, skip, bound);
|
||||||
}
|
}
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
eval_flags.set_all(true);
|
eval_flags.set_all(true);
|
||||||
@ -1651,15 +1649,14 @@ int ObRFInFilterVecMsg::might_contain_vector(
|
|||||||
} else if (OB_UNLIKELY(is_empty_)) {
|
} else if (OB_UNLIKELY(is_empty_)) {
|
||||||
int64_t total_count = 0;
|
int64_t total_count = 0;
|
||||||
int64_t filter_count = 0;
|
int64_t filter_count = 0;
|
||||||
const int64_t batch_size = bound.batch_size();
|
|
||||||
ObBitVector &eval_flags = expr.get_evaluated_flags(ctx);
|
ObBitVector &eval_flags = expr.get_evaluated_flags(ctx);
|
||||||
VectorFormat res_format = expr.get_format(ctx);
|
VectorFormat res_format = expr.get_format(ctx);
|
||||||
if (VEC_UNIFORM == res_format) {
|
if (VEC_UNIFORM == res_format) {
|
||||||
IntegerUniVec *res_vec = static_cast<IntegerUniVec *>(expr.get_vector(ctx));
|
IntegerUniVec *res_vec = static_cast<IntegerUniVec *>(expr.get_vector(ctx));
|
||||||
ret = proc_filter_empty(res_vec, skip, batch_size, total_count, filter_count);
|
ret = proc_filter_empty(res_vec, skip, bound, total_count, filter_count);
|
||||||
} else if (VEC_FIXED == res_format) {
|
} else if (VEC_FIXED == res_format) {
|
||||||
IntegerFixedVec *res_vec = static_cast<IntegerFixedVec *>(expr.get_vector(ctx));
|
IntegerFixedVec *res_vec = static_cast<IntegerFixedVec *>(expr.get_vector(ctx));
|
||||||
ret = proc_filter_empty(res_vec, skip, batch_size, total_count, filter_count);
|
ret = proc_filter_empty(res_vec, skip, bound, total_count, filter_count);
|
||||||
}
|
}
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
eval_flags.set_all(true);
|
eval_flags.set_all(true);
|
||||||
|
@ -12,29 +12,35 @@
|
|||||||
|
|
||||||
#include <gtest/gtest.h>
|
#include <gtest/gtest.h>
|
||||||
|
|
||||||
#include <cstring>
|
|
||||||
|
|
||||||
#include "lib/allocator/page_arena.h"
|
|
||||||
#include "common/object/ob_object.h"
|
|
||||||
#include "lib/container/ob_se_array.h"
|
|
||||||
#include "src/sql/engine/ob_bit_vector.h"
|
#include "src/sql/engine/ob_bit_vector.h"
|
||||||
|
#include "src/sql/ob_eval_bound.h"
|
||||||
|
|
||||||
|
#define private public
|
||||||
|
#define WordType uint64_t
|
||||||
|
|
||||||
|
using namespace std;
|
||||||
namespace oceanbase
|
namespace oceanbase
|
||||||
{
|
{
|
||||||
namespace sql
|
namespace sql
|
||||||
{
|
{
|
||||||
class ObTestBitVector: public ::testing::Test
|
class ObTestBitVector : public ::testing::Test
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
ObTestBitVector() {}
|
ObTestBitVector()
|
||||||
~ObTestBitVector() {}
|
{}
|
||||||
virtual void SetUp() {}
|
~ObTestBitVector()
|
||||||
virtual void TearDown() {}
|
{}
|
||||||
private:
|
virtual void SetUp()
|
||||||
DISALLOW_COPY_AND_ASSIGN(ObTestBitVector);
|
{}
|
||||||
};
|
virtual void TearDown()
|
||||||
|
{}
|
||||||
|
|
||||||
void expect_range(ObBitVector *dest_bit_vector, int64_t start, int64_t middle, int64_t end) {
|
private:
|
||||||
|
DISALLOW_COPY_AND_ASSIGN(ObTestBitVector);
|
||||||
|
};
|
||||||
|
|
||||||
|
void expect_range(ObBitVector *dest_bit_vector, int64_t start, int64_t middle, int64_t end)
|
||||||
|
{
|
||||||
for (int64_t i = 0; i < start; i++) {
|
for (int64_t i = 0; i < start; i++) {
|
||||||
EXPECT_EQ(0, dest_bit_vector->at(i));
|
EXPECT_EQ(0, dest_bit_vector->at(i));
|
||||||
}
|
}
|
||||||
@ -115,6 +121,243 @@ TEST(ObTestBitVector, bit_or_range)
|
|||||||
test_range(dest_bit_vector, src_bit_vector, 64, 127);
|
test_range(dest_bit_vector, src_bit_vector, 64, 127);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// copy from the previos version ObBitVectorImpl, for check result
|
||||||
|
template <bool IS_FLIP, typename OP>
|
||||||
|
void copied_inner_foreach(const ObBitVectorImpl<WordType> &skip, int64_t size, OP op)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
int64_t tmp_step = 0;
|
||||||
|
typedef uint16_t StepType;
|
||||||
|
const int64_t step_size = sizeof(StepType) * CHAR_BIT;
|
||||||
|
int64_t word_cnt = ObBitVectorImpl<WordType>::word_count(size);
|
||||||
|
int64_t step = 0;
|
||||||
|
const int64_t remain = size % ObBitVectorImpl<WordType>::WORD_BITS;
|
||||||
|
for (int64_t i = 0; i < word_cnt && OB_SUCC(ret); ++i) {
|
||||||
|
WordType s_word = (IS_FLIP ? ~skip.data_[i] : skip.data_[i]);
|
||||||
|
// bool all_bits = (false ? skip.data_[i] == 0 : (~skip.data_[i]) == 0);
|
||||||
|
if (i >= word_cnt - 1 && remain > 0) {
|
||||||
|
// all_bits = ((false ? skip.data_[i] : ~skip.data_[i]) & ((1LU << remain) - 1)) == 0;
|
||||||
|
s_word = s_word & ((1LU << remain) - 1);
|
||||||
|
}
|
||||||
|
if (s_word > 0) {
|
||||||
|
WordType tmp_s_word = s_word;
|
||||||
|
tmp_step = step;
|
||||||
|
do {
|
||||||
|
uint16_t step_val = tmp_s_word & 0xFFFF;
|
||||||
|
if (0xFFFF == step_val) {
|
||||||
|
// no skip
|
||||||
|
// last batch ?
|
||||||
|
int64_t mini_cnt = step_size;
|
||||||
|
if (tmp_step + step_size > size) {
|
||||||
|
mini_cnt = size - tmp_step;
|
||||||
|
}
|
||||||
|
for (int64_t j = 0; OB_SUCC(ret) && j < mini_cnt; j++) {
|
||||||
|
int64_t k = j + tmp_step;
|
||||||
|
ret = op(k);
|
||||||
|
}
|
||||||
|
} else if (step_val > 0) {
|
||||||
|
do {
|
||||||
|
int64_t start_bit_idx = __builtin_ctz(step_val);
|
||||||
|
int64_t k = start_bit_idx + tmp_step;
|
||||||
|
ret = op(k);
|
||||||
|
step_val &= (step_val - 1);
|
||||||
|
} while (step_val > 0 && OB_SUCC(ret)); // end for, for one step size
|
||||||
|
}
|
||||||
|
tmp_step += step_size;
|
||||||
|
tmp_s_word >>= step_size;
|
||||||
|
} while (tmp_s_word > 0 && OB_SUCC(ret)); // one word-uint64_t
|
||||||
|
}
|
||||||
|
step += ObBitVectorImpl<WordType>::WORD_BITS;
|
||||||
|
} // end for
|
||||||
|
}
|
||||||
|
|
||||||
|
// 这部分代码不要删除,用于调试新接口,因为ob的单测编译要编译一大堆无效文件,而ob_bit_vector.h这个头文件又被很多地方引用,
|
||||||
|
// 导致编译速度巨慢,尽量不要直接在ob_bit_vector.h改代码调试,而是在这里先把接口改正确了,然后再放到ob_bit_vector.h里面
|
||||||
|
// 进行调试
|
||||||
|
template <bool IS_FLIP, typename OP>
|
||||||
|
void my_foreach_bound(const ObBitVectorImpl<WordType> &skip, int64_t start_idx, int64_t end_idx, OP op)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
int64_t tmp_step = 0;
|
||||||
|
typedef uint16_t StepType;
|
||||||
|
const int64_t step_size = sizeof(StepType) * CHAR_BIT;
|
||||||
|
|
||||||
|
int64_t start_cnt = start_idx / ObBitVectorImpl<WordType>::WORD_BITS; // start_idx is included
|
||||||
|
const int64_t begin_remain = start_idx % ObBitVectorImpl<WordType>::WORD_BITS;
|
||||||
|
const int64_t begin_mask = (-1LU << begin_remain);
|
||||||
|
|
||||||
|
int64_t end_cnt = ObBitVectorImpl<WordType>::word_count(end_idx); // end_idx is not included
|
||||||
|
const int64_t end_remain = end_idx % ObBitVectorImpl<WordType>::WORD_BITS;
|
||||||
|
const int64_t end_mask = (1LU << end_remain) - 1;
|
||||||
|
|
||||||
|
int64_t step = ObBitVectorImpl<WordType>::WORD_BITS * start_cnt;
|
||||||
|
for (int64_t i = start_cnt; i < end_cnt && OB_SUCC(ret); ++i) {
|
||||||
|
WordType s_word = (IS_FLIP ? ~skip.data_[i] : skip.data_[i]);
|
||||||
|
if (start_cnt == end_cnt - 1) {
|
||||||
|
// if only one word, both begin_mask and end_mask should be used
|
||||||
|
if (begin_remain > 0) {
|
||||||
|
s_word = s_word & begin_mask;
|
||||||
|
}
|
||||||
|
if (end_remain > 0) {
|
||||||
|
s_word = s_word & end_mask;
|
||||||
|
}
|
||||||
|
} else if (i == start_cnt && begin_remain > 0) {
|
||||||
|
// add begin_mask for first word, remove the bit less than start_idx
|
||||||
|
s_word = s_word & begin_mask;
|
||||||
|
} else if (i == end_cnt - 1 && end_remain > 0) {
|
||||||
|
// add end_mask for last word, remove the bit greater equal than end_idx
|
||||||
|
s_word = s_word & end_mask;
|
||||||
|
}
|
||||||
|
if (s_word > 0) {
|
||||||
|
WordType tmp_s_word = s_word;
|
||||||
|
tmp_step = step;
|
||||||
|
do {
|
||||||
|
uint16_t step_val = tmp_s_word & 0xFFFF;
|
||||||
|
if (0xFFFF == step_val) {
|
||||||
|
for (int64_t j = 0; OB_SUCC(ret) && j < step_size; j++) {
|
||||||
|
int64_t k = j + tmp_step;
|
||||||
|
ret = op(k);
|
||||||
|
}
|
||||||
|
} else if (step_val > 0) {
|
||||||
|
do {
|
||||||
|
int64_t start_bit_idx = __builtin_ctz(step_val);
|
||||||
|
int64_t k = start_bit_idx + tmp_step;
|
||||||
|
ret = op(k);
|
||||||
|
step_val &= (step_val - 1);
|
||||||
|
} while (step_val > 0 && OB_SUCC(ret)); // end for, for one step size
|
||||||
|
}
|
||||||
|
tmp_step += step_size;
|
||||||
|
tmp_s_word >>= step_size;
|
||||||
|
} while (tmp_s_word > 0 && OB_SUCC(ret)); // one word-uint64_t
|
||||||
|
}
|
||||||
|
step += ObBitVectorImpl<WordType>::ObBitVectorImpl<WordType>::WORD_BITS;
|
||||||
|
} // end for
|
||||||
|
}
|
||||||
|
|
||||||
|
void test_foreach_result_random(int64_t batch_size, int64_t start_idx, int64_t end_idx)
|
||||||
|
{
|
||||||
|
void *buf = malloc(batch_size);
|
||||||
|
ObBitVector *bit_vector = to_bit_vector(buf);
|
||||||
|
bit_vector->init(batch_size);
|
||||||
|
|
||||||
|
int64_t true_start_idx = common::ObRandom::rand(0, batch_size);
|
||||||
|
int64_t true_end_idx = common::ObRandom::rand(0, batch_size);
|
||||||
|
if (true_start_idx > true_end_idx) {
|
||||||
|
swap(true_start_idx, true_end_idx);
|
||||||
|
}
|
||||||
|
|
||||||
|
bit_vector->set_all(true_start_idx, true_end_idx);
|
||||||
|
EvalBound bound(batch_size, start_idx, end_idx, false);
|
||||||
|
|
||||||
|
// cout << "start_idx: " << start_idx << "\nend_idx: " << end_idx
|
||||||
|
// << "\ntrue_start_idx: " << true_start_idx << "\ntrue_end_idx: " << true_end_idx << endl;
|
||||||
|
|
||||||
|
// test foreach
|
||||||
|
std::vector<int> result_foreach_ori(batch_size, 0);
|
||||||
|
std::vector<int> result_foreach_batch(batch_size, 0);
|
||||||
|
std::vector<int> result_foreach_bound(batch_size, 0);
|
||||||
|
copied_inner_foreach<false>(*bit_vector, end_idx, [&](int64_t idx) __attribute__((always_inline)) {
|
||||||
|
result_foreach_ori[idx] = 1;
|
||||||
|
return OB_SUCCESS;
|
||||||
|
});
|
||||||
|
ObBitVector::foreach (*bit_vector, end_idx, [&](int64_t idx) __attribute__((always_inline)) {
|
||||||
|
result_foreach_batch[idx] = 1;
|
||||||
|
return OB_SUCCESS;
|
||||||
|
});
|
||||||
|
ObBitVector::foreach (*bit_vector, bound, [&](int64_t idx) __attribute__((always_inline)) {
|
||||||
|
result_foreach_bound[idx] = 1;
|
||||||
|
return OB_SUCCESS;
|
||||||
|
});
|
||||||
|
|
||||||
|
// test flip_foreach
|
||||||
|
std::vector<int> result_flip_foreach_ori(batch_size, 0);
|
||||||
|
std::vector<int> result_flip_foreach_batch(batch_size, 0);
|
||||||
|
std::vector<int> result_flip_foreach_bound(batch_size, 0);
|
||||||
|
copied_inner_foreach<true>(*bit_vector, end_idx, [&](int64_t idx) __attribute__((always_inline)) {
|
||||||
|
result_flip_foreach_ori[idx] = 1;
|
||||||
|
return OB_SUCCESS;
|
||||||
|
});
|
||||||
|
ObBitVector::flip_foreach(*bit_vector, end_idx, [&](int64_t idx) __attribute__((always_inline)) {
|
||||||
|
result_flip_foreach_batch[idx] = 1;
|
||||||
|
return OB_SUCCESS;
|
||||||
|
});
|
||||||
|
|
||||||
|
ObBitVector::flip_foreach(*bit_vector, bound, [&](int64_t idx) __attribute__((always_inline)) {
|
||||||
|
result_flip_foreach_bound[idx] = 1;
|
||||||
|
return OB_SUCCESS;
|
||||||
|
});
|
||||||
|
|
||||||
|
// result结果,0表示未处理,1表示处理
|
||||||
|
for (int64_t i = 0; i < batch_size; ++i) {
|
||||||
|
// 固定check新的batch接口是否和老的batch接口结果是否相同
|
||||||
|
EXPECT_EQ(result_foreach_ori[i], result_foreach_batch[i]);
|
||||||
|
EXPECT_EQ(result_flip_foreach_ori[i], result_flip_foreach_batch[i]);
|
||||||
|
|
||||||
|
// 1. 对于 i < start_idx 部分, bound接口不会处理,只有batch接口和copied接口会处理
|
||||||
|
// 2. 对于 start_idx <= i < end_idx 部分, 所有接口都会处理
|
||||||
|
// 3. 对于 i >= end_idx 部分, 所有接口都不会处理
|
||||||
|
if (i < start_idx) {
|
||||||
|
if (i < true_start_idx) {
|
||||||
|
// 此部分 bit vector 为 0,因此 foreach 结果为 0, flip foreach 结果为 1
|
||||||
|
EXPECT_EQ(0, result_foreach_batch[i]);
|
||||||
|
EXPECT_EQ(1, result_flip_foreach_batch[i]);
|
||||||
|
} else if (i >= true_start_idx && i < true_end_idx) {
|
||||||
|
// 此部分 bit vector 为 1,因此 foreach 结果为 1, flip foreach 结果为 0
|
||||||
|
EXPECT_EQ(1, result_foreach_batch[i]);
|
||||||
|
EXPECT_EQ(0, result_flip_foreach_batch[i]);
|
||||||
|
} else if (i >= true_end_idx) {
|
||||||
|
// 此部分 bit vector 为 0,因此 foreach 结果为 0, flip foreach 结果为 1
|
||||||
|
EXPECT_EQ(0, result_foreach_batch[i]);
|
||||||
|
EXPECT_EQ(1, result_flip_foreach_batch[i]);
|
||||||
|
}
|
||||||
|
// bound接口不会处理这部分数据,因此全部结果为 0
|
||||||
|
EXPECT_EQ(0, result_foreach_bound[i]);
|
||||||
|
EXPECT_EQ(0, result_flip_foreach_bound[i]);
|
||||||
|
} else if (i >= start_idx && i < end_idx) {
|
||||||
|
if (i < true_start_idx) {
|
||||||
|
// 此部分 bit vector 为 0,因此 foreach 结果为 0, flip foreach 结果为 1
|
||||||
|
EXPECT_EQ(0, result_foreach_batch[i]);
|
||||||
|
EXPECT_EQ(1, result_flip_foreach_batch[i]);
|
||||||
|
EXPECT_EQ(0, result_foreach_bound[i]);
|
||||||
|
EXPECT_EQ(1, result_flip_foreach_bound[i]);
|
||||||
|
} else if (i >= true_start_idx && i < true_end_idx) {
|
||||||
|
// 此部分 bit vector 为 1,因此 foreach 结果为 1, flip foreach 结果为 0
|
||||||
|
EXPECT_EQ(1, result_foreach_batch[i]);
|
||||||
|
EXPECT_EQ(0, result_flip_foreach_batch[i]);
|
||||||
|
EXPECT_EQ(1, result_foreach_bound[i]);
|
||||||
|
EXPECT_EQ(0, result_flip_foreach_bound[i]);
|
||||||
|
} else if (i >= true_end_idx) {
|
||||||
|
// 此部分 bit vector 为 0,因此 foreach 结果为 0, flip foreach 结果为 1
|
||||||
|
EXPECT_EQ(0, result_foreach_batch[i]);
|
||||||
|
EXPECT_EQ(1, result_flip_foreach_batch[i]);
|
||||||
|
EXPECT_EQ(0, result_foreach_bound[i]);
|
||||||
|
EXPECT_EQ(1, result_flip_foreach_bound[i]);
|
||||||
|
}
|
||||||
|
} else if (i >= end_idx) {
|
||||||
|
// 所有接口不会处理这部分数据,因此全部结果为 0
|
||||||
|
EXPECT_EQ(0, result_foreach_batch[i]);
|
||||||
|
EXPECT_EQ(0, result_flip_foreach_batch[i]);
|
||||||
|
EXPECT_EQ(0, result_foreach_bound[i]);
|
||||||
|
EXPECT_EQ(0, result_flip_foreach_bound[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(ObTestBitVector, test_foreach)
|
||||||
|
{
|
||||||
|
int64_t batch_size = common::ObRandom::rand(0, 1024);
|
||||||
|
int64_t round = 100;
|
||||||
|
for (int64_t i = 0; i < round; ++i) {
|
||||||
|
int64_t start_idx = common::ObRandom::rand(0, batch_size);
|
||||||
|
int64_t end_idx = common::ObRandom::rand(0, batch_size);
|
||||||
|
if (start_idx > end_idx) {
|
||||||
|
swap(start_idx, end_idx);
|
||||||
|
}
|
||||||
|
test_foreach_result_random(batch_size, start_idx, end_idx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user