[FEAT MERGE] impl vectorization 2.0
Co-authored-by: oceanoverflow <oceanoverflow@gmail.com> Co-authored-by: hezuojiao <hezuojiao@gmail.com> Co-authored-by: Monk-Liu <1152761042@qq.com>
This commit is contained in:
@ -13,7 +13,6 @@
|
||||
#define USING_LOG_PREFIX SQL_ENG
|
||||
#include "sql/engine/expr/ob_expr_case.h"
|
||||
#include "sql/engine/expr/ob_expr_operator.h"
|
||||
//#include "sql/engine/expr/ob_expr_promotion_util.h"
|
||||
#include "sql/session/ob_sql_session_info.h"
|
||||
#include "sql/engine/ob_exec_context.h"
|
||||
|
||||
@ -139,6 +138,7 @@ int ObExprCase::cg_expr(ObExprCGCtx &op_cg_ctx,
|
||||
if (OB_SUCC(ret)) {
|
||||
rt_expr.eval_func_ = calc_case_expr;
|
||||
rt_expr.eval_batch_func_ = eval_case_batch;
|
||||
rt_expr.eval_vector_func_ = eval_case_vector;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -155,7 +155,6 @@ static int check_is_match(const ObDatum &when_datum, bool &match_when)
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
int ObExprCase::calc_case_expr(const ObExpr &expr, ObEvalCtx &ctx, ObDatum &res_datum)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -247,6 +246,8 @@ int ObExprCase::eval_case_batch(const ObExpr &expr,
|
||||
if (OB_ISNULL(results)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("results frame is not init", K(ret));
|
||||
} else if (batch_size <= 0) {
|
||||
// do nothing
|
||||
} else {
|
||||
ObBitVector &eval_flags = expr.get_evaluated_flags(ctx);
|
||||
ObBitVector *case_when_match = nullptr;
|
||||
@ -256,10 +257,10 @@ int ObExprCase::eval_case_batch(const ObExpr &expr,
|
||||
ObEvalCtx::TempAllocGuard alloc_guard(ctx);
|
||||
if (OB_ISNULL(data = alloc_guard.get_allocator().alloc(ObBitVector::memory_size(batch_size)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("failed to alloc memory for case_when_match", K(ret));
|
||||
LOG_WARN("failed to alloc memory for case_when_match", K(ret), K(batch_size));
|
||||
} else if (OB_ISNULL(data1 = alloc_guard.get_allocator().alloc(ObBitVector::memory_size(batch_size)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("failed to alloc memory for case_when_match", K(ret));
|
||||
LOG_WARN("failed to alloc memory for case_when_match", K(ret), K(batch_size));
|
||||
} else {
|
||||
case_when_match = to_bit_vector(data);
|
||||
case_not_match = to_bit_vector(data1);
|
||||
@ -351,6 +352,269 @@ int ObExprCase::eval_case_batch(const ObExpr &expr,
|
||||
return ret;
|
||||
}
|
||||
|
||||
template<typename WhenVec>
|
||||
static int check_when_is_match(const ObExpr &expr, ObEvalCtx &ctx,
|
||||
ObBitVector &case_when_match, const EvalBound &bound,
|
||||
const int64_t &when_expr_idx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
WhenVec *when_vec = static_cast<WhenVec *>(expr.args_[when_expr_idx]->get_vector(ctx));
|
||||
for (int64_t j = bound.start(); OB_SUCC(ret) && j < bound.end(); ++j) {
|
||||
if (case_when_match.at(j)) {
|
||||
continue;
|
||||
}
|
||||
if (!when_vec->is_null(j) && when_vec->get_int(j) != 0) {
|
||||
case_when_match.set(j);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
static int dispatch_check_when_is_match(const ObExpr &expr, ObEvalCtx &ctx,
|
||||
ObBitVector &case_when_match, const EvalBound &bound,
|
||||
const int64_t &when_expr_idx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
VectorFormat when_format = expr.args_[when_expr_idx]->get_format(ctx);
|
||||
switch (when_format) {
|
||||
case VEC_FIXED: {
|
||||
ret = check_when_is_match<IntegerFixedVec>(expr, ctx, case_when_match, bound, when_expr_idx);
|
||||
break;
|
||||
}
|
||||
case VEC_UNIFORM: {
|
||||
ret = check_when_is_match<IntegerUniVec>(expr, ctx, case_when_match, bound, when_expr_idx);
|
||||
break;
|
||||
}
|
||||
case VEC_UNIFORM_CONST: {
|
||||
ret = check_when_is_match<IntegerUniCVec>(expr, ctx, case_when_match, bound, when_expr_idx);
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
ret = check_when_is_match<ObVectorBase>(expr, ctx, case_when_match, bound, when_expr_idx);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
template<typename ThenVec, typename ResVec>
|
||||
static int eval_match_then(const ObExpr &expr, ObEvalCtx &ctx,
|
||||
ObBitVector &case_when_match, const EvalBound &bound,
|
||||
const int64_t &then_expr_idx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObBitVector &eval_flags = expr.get_evaluated_flags(ctx);
|
||||
ThenVec *then_vec = static_cast<ThenVec *>(expr.args_[then_expr_idx]->get_vector(ctx));
|
||||
ResVec *res_vec = static_cast<ResVec *>(expr.get_vector(ctx));
|
||||
for (int64_t j = bound.start(); OB_SUCC(ret) && j < bound.end(); ++j) {
|
||||
if (case_when_match.at(j)) {
|
||||
continue;
|
||||
}
|
||||
if (then_vec->is_null(j)) {
|
||||
res_vec->set_null(j);
|
||||
} else {
|
||||
res_vec->set_payload_shallow(j, then_vec->get_payload(j), then_vec->get_length(j));
|
||||
}
|
||||
eval_flags.set(j);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
template<typename ResVec>
|
||||
static int dispatch_eval_match_then(const ObExpr &expr, ObEvalCtx &ctx,
|
||||
ObBitVector &case_when_match, const EvalBound &bound,
|
||||
const int64_t &then_expr_idx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
VectorFormat then_format = expr.args_[then_expr_idx]->get_format(ctx);
|
||||
switch (then_format) {
|
||||
case VEC_FIXED: {
|
||||
ret = eval_match_then<ObFixedLengthBase, ResVec>(expr, ctx, case_when_match, bound, then_expr_idx);
|
||||
break;
|
||||
}
|
||||
case VEC_DISCRETE: {
|
||||
ret = eval_match_then<ObDiscreteFormat, ResVec>(expr, ctx, case_when_match, bound, then_expr_idx);
|
||||
break;
|
||||
}
|
||||
case VEC_CONTINUOUS: {
|
||||
ret = eval_match_then<ObContinuousFormat, ResVec>(expr, ctx, case_when_match, bound, then_expr_idx);
|
||||
break;
|
||||
}
|
||||
case VEC_UNIFORM: {
|
||||
ret = eval_match_then<ObUniformFormat<false>, ResVec>(expr, ctx, case_when_match, bound, then_expr_idx);
|
||||
break;
|
||||
}
|
||||
case VEC_UNIFORM_CONST: {
|
||||
ret = eval_match_then<ObUniformFormat<true>, ResVec>(expr, ctx, case_when_match, bound, then_expr_idx);
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
ret = eval_match_then<ObVectorBase, ResVec>(expr, ctx, case_when_match, bound, then_expr_idx);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
template <typename ResVec>
|
||||
static int inner_eval_case_vector(const ObExpr &expr,
|
||||
ObEvalCtx &ctx,
|
||||
const ObBitVector &skip,
|
||||
const EvalBound &bound)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (bound.end() > bound.start()) {
|
||||
const bool has_else = (expr.arg_cnt_ % 2 != 0);
|
||||
int64_t loop = (has_else) ? expr.arg_cnt_ - 1 : expr.arg_cnt_;
|
||||
LOG_DEBUG("eval_case_vector", K(expr.arg_cnt_));
|
||||
// Record the number of skips in a batch,
|
||||
// end when all parameters are skipped.
|
||||
const int64_t total_cnt = bound.end() - bound.start();
|
||||
int64_t skip_cnt = 0;
|
||||
ObBitVector &eval_flags = expr.get_evaluated_flags(ctx);
|
||||
// After verification, it is necessary to adopt the double ObBitVector scheme.
|
||||
// The single ObBitVector scheme can ensure correctness,
|
||||
// but the then_expression will result in redundant calculations.
|
||||
ObBitVector *case_when_match = &expr.get_pvt_skip(ctx);
|
||||
// The relevant implementation of "before_case_when_match" may seem a bit hacky,
|
||||
// but ObBitVector ensures that it only operates on data within bounds, so it is safe.
|
||||
ObBitVector *before_case_when_match = nullptr;
|
||||
// Remember to set all_row_active of bound to false when handling skip.
|
||||
// Otherwise, it may cause issues if all_row_active is used for specialized calculations elsewhere.
|
||||
EvalBound my_bound(bound.batch_size(), bound.start(), bound.end(), bound.get_all_rows_active());
|
||||
void *data_tmp = nullptr;
|
||||
ObEvalCtx::TempAllocGuard alloc_guard(ctx);
|
||||
// For single-line scenarios,
|
||||
// the additional allocated memory needs to be as small as possible in order to optimize performance.
|
||||
// The following approach consumes at most ObBitVector::BYTES_PER_WORD more byte of memory
|
||||
// compared to the ideal scenario,
|
||||
// but subsequent bitwise operations do not require shifting,
|
||||
// resulting in performance optimization and easier implementation.
|
||||
// The unit is byte.
|
||||
int64_t mem_size = (((my_bound.end() - 1) / ObBitVector::WORD_BITS) -
|
||||
(my_bound.start() / ObBitVector::WORD_BITS) + 1) * ObBitVector::BYTES_PER_WORD;
|
||||
if (OB_ISNULL(data_tmp = alloc_guard.get_allocator().alloc(mem_size))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("failed to alloc memory for before_case_when_match", K(ret), K(mem_size));
|
||||
} else {
|
||||
before_case_when_match = (ObBitVector *)((uint64_t *)data_tmp -
|
||||
(my_bound.start() / ObBitVector::WORD_BITS));
|
||||
// There is no need to call reset for initialization here,
|
||||
// because the subsequent bit_calculate will override the original values of ObBitVector.
|
||||
// case_when_match = eval_flags | skip
|
||||
case_when_match->bit_calculate(skip, eval_flags, my_bound,
|
||||
[](const uint64_t l, const uint64_t r) { return (l | r); });
|
||||
skip_cnt = case_when_match->accumulate_bit_cnt(my_bound);
|
||||
before_case_when_match->deep_copy(*case_when_match, my_bound.start(), my_bound.end());
|
||||
}
|
||||
|
||||
// E.G
|
||||
// SELECT CASE WHEN expr1 THEN expr2 WHEN expr3 THEN expr4 ... ELSE exprN END
|
||||
// the logic is
|
||||
// 1. calc when branch, save result in when_vec and use match_when flag
|
||||
// to mark which rows are matched in when branch and these rows should be
|
||||
// calculated in then branch
|
||||
// 2. calc then branch, put matching result(then_vec) into output vec
|
||||
// (result_vec)
|
||||
// REPEAT 1. and 2.
|
||||
// ...
|
||||
// LAST.
|
||||
// calc else branch and put matching result(else_vec) into output vec(result_vec)
|
||||
for (int64_t expr_idx = 0; OB_SUCC(ret) && skip_cnt < total_cnt && expr_idx < loop; expr_idx += 2) {
|
||||
if (my_bound.get_all_rows_active() && skip_cnt != 0) {
|
||||
my_bound.set_all_row_active(false);
|
||||
}
|
||||
if (OB_FAIL(expr.args_[expr_idx]->eval_vector(ctx, *case_when_match, my_bound))) {
|
||||
LOG_WARN("failed to eval vector", K(ret), K(expr_idx));
|
||||
// first eval when datums
|
||||
} else if (OB_FAIL(dispatch_check_when_is_match(expr, ctx, *case_when_match, my_bound, expr_idx))) {
|
||||
LOG_WARN("failed to dispatch_check_is_match", K(ret), K(expr_idx));
|
||||
} else { // now eval then datums
|
||||
// Reverse case_when_match for use with case_not_match.
|
||||
case_when_match->bit_calculate(*case_when_match, *before_case_when_match, my_bound,
|
||||
[](const uint64_t self, const uint64_t other) { return ((~self) | other); });
|
||||
skip_cnt = case_when_match->accumulate_bit_cnt(my_bound);
|
||||
if (skip_cnt < total_cnt) {
|
||||
if (my_bound.get_all_rows_active() && skip_cnt != 0) {
|
||||
my_bound.set_all_row_active(false);
|
||||
}
|
||||
if (OB_FAIL(expr.args_[expr_idx + 1]->eval_vector(ctx, *case_when_match, my_bound))) {
|
||||
LOG_WARN("failed to eval vector", K(ret), K(expr_idx + 1));
|
||||
} else if (OB_FAIL(dispatch_eval_match_then<ResVec>(expr, ctx, *case_when_match, my_bound, expr_idx + 1))) {
|
||||
LOG_WARN("failed to dispatch_eval_match_then", K(ret), K(expr_idx + 1));
|
||||
}
|
||||
}
|
||||
// Reverse case_when_match to be used again as case_when_match.
|
||||
case_when_match->bit_calculate(*case_when_match, *before_case_when_match, my_bound,
|
||||
[](const uint64_t self, const uint64_t other) { return ((~self) | other); });
|
||||
skip_cnt = case_when_match->accumulate_bit_cnt(my_bound);
|
||||
if (expr_idx + 2 < loop) {
|
||||
// rows matched in this round should not match in next round, therefor,
|
||||
// copy last round matched rows flag(case_when_match) into before_case_when_match
|
||||
before_case_when_match->deep_copy(*case_when_match, my_bound.start(), my_bound.end());
|
||||
}
|
||||
}
|
||||
}
|
||||
// now set the result of the rest, skip rows already matched (case_when_match)
|
||||
if (OB_SUCC(ret) && skip_cnt < total_cnt) {
|
||||
if (has_else) {
|
||||
if (my_bound.get_all_rows_active() && skip_cnt != 0) {
|
||||
my_bound.set_all_row_active(false);
|
||||
}
|
||||
if (OB_FAIL(expr.args_[expr.arg_cnt_ - 1]->eval_vector(ctx, *case_when_match, my_bound))) {
|
||||
LOG_WARN("failed to eval batch", K(ret));
|
||||
// The calculation method of "else" is consistent with "then".
|
||||
} else if (OB_FAIL(dispatch_eval_match_then<ResVec>(expr, ctx, *case_when_match, my_bound, expr.arg_cnt_ - 1))) {
|
||||
LOG_WARN("failed to dispatch_eval_match_else", K(ret), K(expr.arg_cnt_ - 1));
|
||||
}
|
||||
} else {
|
||||
for (int64_t j = my_bound.start(); OB_SUCC(ret) && j < my_bound.end(); ++j) {
|
||||
if (case_when_match->at(j)) {
|
||||
continue;
|
||||
}
|
||||
ResVec *res_vec = static_cast<ResVec *>(expr.get_vector(ctx));
|
||||
res_vec->set_null(j);
|
||||
eval_flags.set(j);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObExprCase::eval_case_vector(const ObExpr &expr,
|
||||
ObEvalCtx &ctx,
|
||||
const ObBitVector &skip,
|
||||
const EvalBound &bound)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
VectorFormat res_format = expr.get_format(ctx);
|
||||
switch (res_format) {
|
||||
case VEC_DISCRETE: {
|
||||
ret = inner_eval_case_vector<ObDiscreteFormat>(expr, ctx, skip, bound);
|
||||
break;
|
||||
}
|
||||
case VEC_CONTINUOUS: {
|
||||
ret = inner_eval_case_vector<ObContinuousFormat>(expr, ctx, skip, bound);
|
||||
break;
|
||||
}
|
||||
case VEC_FIXED: {
|
||||
ret = inner_eval_case_vector<ObFixedLengthBase>(expr, ctx, skip, bound);
|
||||
break;
|
||||
}
|
||||
case VEC_UNIFORM: {
|
||||
ret = inner_eval_case_vector<ObUniformFormat<false>>(expr, ctx, skip, bound);
|
||||
break;
|
||||
}
|
||||
case VEC_UNIFORM_CONST: {
|
||||
ret = inner_eval_case_vector<ObUniformFormat<true>>(expr, ctx, skip, bound);
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
ret = inner_eval_case_vector<ObVectorBase>(expr, ctx, skip, bound);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
DEF_SET_LOCAL_SESSION_VARS(ObExprCase, raw_expr) {
|
||||
int ret = OB_SUCCESS;
|
||||
SET_LOCAL_SYSVAR_CAPACITY(1);
|
||||
|
||||
Reference in New Issue
Block a user