Adapt the new vec2.0 interface for single row calculation

This commit is contained in:
DengzhiLiu
2024-02-08 21:55:56 +00:00
committed by ob-robot
parent d3da1d9ffc
commit 3154082c61
15 changed files with 162 additions and 33 deletions

View File

@ -1651,7 +1651,7 @@ ObPhysicalFilterExecutor::~ObPhysicalFilterExecutor()
}
}
int ObPhysicalFilterExecutor::filter(blocksstable::ObStorageDatum *datums, int64_t col_cnt, bool &filtered)
int ObPhysicalFilterExecutor::filter(blocksstable::ObStorageDatum *datums, int64_t col_cnt, const sql::ObBitVector &skip_bit, bool &filtered)
{
int ret = OB_SUCCESS;
const common::ObIArray<ObExpr *> *column_exprs = get_cg_col_exprs();
@ -1666,7 +1666,7 @@ int ObPhysicalFilterExecutor::filter(blocksstable::ObStorageDatum *datums, int64
LOG_WARN("Failed to convert object from datum", K(ret), K(datums[i]));
}
}
if (OB_SUCC(ret) && OB_FAIL(filter(eval_ctx, filtered))) {
if (OB_SUCC(ret) && OB_FAIL(filter(eval_ctx, skip_bit, filtered))) {
LOG_WARN("Failed to calc filter", K(ret));
}
}
@ -1875,15 +1875,26 @@ bool ObWhiteFilterExecutor::is_cmp_op_with_null_ref_value() const
return is_cmp_op && has_single_null_ref_value;
}
int ObWhiteFilterExecutor::filter(ObEvalCtx &eval_ctx, bool &filtered)
int ObWhiteFilterExecutor::filter(ObEvalCtx &eval_ctx, const sql::ObBitVector &skip_bit, bool &filtered)
{
int ret = OB_SUCCESS;
filtered = false;
ObDatum *cmp_res = nullptr;
if (OB_FAIL(filter_.expr_->eval(eval_ctx, cmp_res))) {
LOG_WARN("Failed to eval", K(ret));
if (!op_.enable_rich_format_) {
ObDatum *cmp_res = nullptr;
if (OB_FAIL(filter_.expr_->eval(eval_ctx, cmp_res))) {
LOG_WARN("Failed to eval", K(ret));
} else {
filtered = is_row_filtered(*cmp_res);
}
} else {
filtered = is_row_filtered(*cmp_res);
const int64_t batch_idx = eval_ctx.get_batch_idx();
EvalBound eval_bound(eval_ctx.get_batch_size(), batch_idx, batch_idx + 1, false);
if (OB_FAIL(filter_.expr_->eval_vector(eval_ctx, skip_bit, eval_bound))) {
LOG_WARN("Failed to eval vector", K(ret));
} else {
ObIVector *res = filter_.expr_->get_vector(eval_ctx);
filtered = !res->is_true(batch_idx);
}
}
if (op_.is_vectorized()) {
@ -1902,16 +1913,30 @@ ObBlackFilterExecutor::~ObBlackFilterExecutor()
}
}
int ObBlackFilterExecutor::filter(ObEvalCtx &eval_ctx, bool &filtered)
int ObBlackFilterExecutor::filter(ObEvalCtx &eval_ctx, const sql::ObBitVector &skip_bit, bool &filtered)
{
int ret = OB_SUCCESS;
filtered = false;
ObDatum *cmp_res = nullptr;
FOREACH_CNT_X(e, filter_.filter_exprs_, OB_SUCC(ret) && !filtered) {
if (OB_FAIL((*e)->eval(eval_ctx, cmp_res))) {
LOG_WARN("failed to filter child", K(ret));
} else {
filtered = is_row_filtered(*cmp_res);
const bool enable_rich_format = op_.enable_rich_format_;
if (!enable_rich_format) {
ObDatum *cmp_res = nullptr;
FOREACH_CNT_X(e, filter_.filter_exprs_, OB_SUCC(ret) && !filtered) {
if (OB_FAIL((*e)->eval(eval_ctx, cmp_res))) {
LOG_WARN("failed to filter child", K(ret));
} else {
filtered = is_row_filtered(*cmp_res);
}
}
} else {
const int64_t batch_idx = eval_ctx.get_batch_idx();
EvalBound eval_bound(eval_ctx.get_batch_size(), batch_idx, batch_idx + 1, false);
FOREACH_CNT_X(e, filter_.filter_exprs_, OB_SUCC(ret) && !filtered) {
if (OB_FAIL((*e)->eval_vector(eval_ctx, skip_bit, eval_bound))) {
LOG_WARN("Failed to evaluate vector", K(ret));
} else {
ObIVector *res = (*e)->get_vector(eval_ctx);
filtered = !res->is_true(batch_idx);
}
}
}
@ -2564,6 +2589,10 @@ void PushdownFilterInfo::reset()
allocator_->free(ref_bitmap_);
ref_bitmap_ = nullptr;
}
if (nullptr != skip_bit_) {
allocator_->free(skip_bit_);
skip_bit_ = nullptr;
}
allocator_ = nullptr;
}
filter_ = nullptr;
@ -2614,7 +2643,8 @@ int PushdownFilterInfo::init(const storage::ObTableIterParam &iter_param, common
col_capacity_ = out_col_cnt;
}
if (OB_SUCC(ret) && (iter_param.vectorized_enabled_ || iter_param.enable_pd_aggregate())) {
if (OB_SUCC(ret) && (iter_param.vectorized_enabled_ || iter_param.enable_pd_aggregate() ||
(nullptr != iter_param.op_ && iter_param.op_->enable_rich_format_))) {
batch_size_ = iter_param.vectorized_enabled_ ? iter_param.op_->get_batch_size() : storage::AGGREGATE_STORE_BATCH_SIZE;
if (OB_FAIL(col_datum_buf_.init(batch_size_, alloc))) {
LOG_WARN("fail to init tmp col datum buf", K(ret));
@ -2622,6 +2652,9 @@ int PushdownFilterInfo::init(const storage::ObTableIterParam &iter_param, common
ret = common::OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to alloc cell data ptr", K(ret), K(batch_size_));
} else if (FALSE_IT(cell_data_ptrs_ = reinterpret_cast<const char **>(buf))) {
} else if (OB_ISNULL(skip_bit_ = to_bit_vector(alloc.alloc(ObBitVector::memory_size(batch_size_))))) {
ret = common::OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("Failed to alloc skip bit", K(ret), K_(batch_size));
} else if (OB_ISNULL(buf = alloc.alloc(sizeof(int64_t) * batch_size_))) {
ret = common::OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to alloc row_ids", K(ret), K(batch_size_));
@ -2629,6 +2662,7 @@ int PushdownFilterInfo::init(const storage::ObTableIterParam &iter_param, common
ret = common::OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to alloc len_array_buf", K(ret), K_(batch_size));
} else {
skip_bit_->init(batch_size_);
row_ids_ = reinterpret_cast<int64_t *>(buf);
len_array_ = reinterpret_cast<uint32_t *>(len_array_buf);
}

View File

@ -711,9 +711,9 @@ public:
datum_eval_flags_(nullptr)
{}
~ObPhysicalFilterExecutor();
int filter(blocksstable::ObStorageDatum *datums, int64_t col_cnt, bool &ret_val);
int filter(blocksstable::ObStorageDatum *datums, int64_t col_cnt, const sql::ObBitVector &skip_bit, bool &ret_val);
virtual int init_evaluated_datums() override;
virtual int filter(ObEvalCtx &eval_ctx, bool &filtered) = 0;
virtual int filter(ObEvalCtx &eval_ctx, const sql::ObBitVector &skip_bit, bool &filtered) = 0;
INHERIT_TO_STRING_KV("ObPhysicalFilterExecutor", ObPushdownFilterExecutor,
K_(n_eval_infos), KP_(eval_infos));
protected:
@ -750,7 +750,7 @@ public:
int get_datums_from_column(common::ObIArray<blocksstable::ObSqlDatumInfo> &datum_infos);
INHERIT_TO_STRING_KV("ObPushdownBlackFilterExecutor", ObPhysicalFilterExecutor,
K_(filter), KP_(skip_bit));
virtual int filter(ObEvalCtx &eval_ctx, bool &filtered) override;
virtual int filter(ObEvalCtx &eval_ctx, const sql::ObBitVector &skip_bit, bool &filtered) override;
private:
int eval_exprs_batch(ObBitVector &skip, const int64_t bsize);
@ -792,7 +792,7 @@ public:
INHERIT_TO_STRING_KV("ObPushdownWhiteFilterExecutor", ObPushdownFilterExecutor,
K_(null_param_contained), K_(datum_params), K(param_set_.created()),
K_(filter));
virtual int filter(ObEvalCtx &eval_ctx, bool &filtered) override;
virtual int filter(ObEvalCtx &eval_ctx, const sql::ObBitVector &skip_bit, bool &filtered) override;
protected:
void check_null_params();
int init_obj_set();
@ -971,7 +971,7 @@ public:
OB_INLINE int64_t get_batch_size() const { return expr_spec_.max_batch_size_; }
// filter row for storage callback.
// clear expression evaluated flag if row filtered.
OB_INLINE int filter_row_outside(const ObExprPtrIArray &exprs, bool &filtered);
OB_INLINE int filter_row_outside(const ObExprPtrIArray &exprs, const sql::ObBitVector &skip_bit, bool &filtered);
// Notice:
// clear one/current datum eval flag at a time, do NOT call it
// unless fully understand this API.
@ -990,10 +990,14 @@ public:
};
// filter row for storage callback.
OB_INLINE int ObPushdownOperator::filter_row_outside(const ObExprPtrIArray &exprs, bool &filtered)
OB_INLINE int ObPushdownOperator::filter_row_outside(const ObExprPtrIArray &exprs, const sql::ObBitVector &skip_bit, bool &filtered)
{
int ret = common::OB_SUCCESS;
ret = ObOperator::filter_row(eval_ctx_, exprs, filtered);
if (!enable_rich_format_) {
ret = ObOperator::filter_row(eval_ctx_, exprs, filtered);
} else {
ret = ObOperator::filter_row_vector(eval_ctx_, exprs, skip_bit, filtered);
}
// always clear evaluated flag, because filter expr and table scan output expr may have
// common expr, when eval filter expr, memory of dependence column may from storage,
// if not filter and we don't clear eval flag, output expr will used the result datum
@ -1030,6 +1034,7 @@ struct PushdownFilterInfo
row_ids_(nullptr),
len_array_(nullptr),
ref_bitmap_(nullptr),
skip_bit_(nullptr),
col_datum_buf_(),
allocator_(nullptr),
param_(nullptr),
@ -1045,7 +1050,8 @@ struct PushdownFilterInfo
ret = ret && (nullptr != datum_buf_);
}
if (0 < batch_size_) {
ret = ret && (nullptr != cell_data_ptrs_) && (nullptr != row_ids_);
ret = ret && (nullptr != cell_data_ptrs_) && (nullptr != row_ids_)
&& (nullptr != skip_bit_);
}
return ret;
}
@ -1068,7 +1074,7 @@ struct PushdownFilterInfo
TO_STRING_KV(K_(is_pd_filter), K_(is_pd_to_cg), K_(start), K_(count), K_(col_capacity), K_(batch_size),
KP_(datum_buf), KP_(filter), KP_(cell_data_ptrs), KP_(row_ids), KP_(ref_bitmap),
K_(col_datum_buf), KP_(param), KP_(context));
K_(col_datum_buf), KP_(param), KP_(context), KP_(skip_bit));
bool is_inited_;
bool is_pd_filter_;
@ -1084,6 +1090,7 @@ struct PushdownFilterInfo
int64_t *row_ids_;
uint32_t *len_array_;
common::ObBitmap *ref_bitmap_;
sql::ObBitVector *skip_bit_;
mutable TmpColDatumBuf col_datum_buf_;
common::ObIAllocator *allocator_;
const storage::ObTableIterParam *param_;

View File

@ -228,6 +228,7 @@ struct ObDoArithFixedVectorEval
for (int64_t idx = bound.start(); idx < bound.end(); ++idx) {
ArithOp::raw_op(res_arr[idx], left_arr[idx], right_arr[idx], args...);
}
res_vec->get_nulls()->unset_all(bound.start(), bound.end());
if (expr.may_not_need_raw_check_ && ob_is_int_less_than_64(expr.args_[0]->datum_meta_.type_)
&& ob_is_int_less_than_64(expr.args_[1]->datum_meta_.type_)) {
// do nothing
@ -273,6 +274,7 @@ struct ObDoArithFixedConstVectorEval
for (int64_t idx = bound.start(); idx < bound.end(); ++idx) {
ArithOp::raw_op(res_arr[idx], left_arr[idx], *right_val, args...);
}
res_vec->get_nulls()->unset_all(bound.start(), bound.end());
if (expr.may_not_need_raw_check_ && ob_is_int_less_than_64(expr.args_[0]->datum_meta_.type_)
&& INT_MIN < *right_val < INT_MAX) {
// do nothing
@ -318,6 +320,7 @@ struct ObDoArithConstFixedVectorEval
for (int64_t idx = bound.start(); idx < bound.end(); ++idx) {
ArithOp::raw_op(res_arr[idx], *left_val, right_arr[idx], args...);
}
res_vec->get_nulls()->unset_all(bound.start(), bound.end());
if (expr.may_not_need_raw_check_ && INT_MIN < *left_val < INT_MAX
&& ob_is_int_less_than_64(expr.args_[1]->datum_meta_.type_)) {
} else {
@ -394,11 +397,18 @@ inline int ObDoNumberVectorEval(VECTOR_EVAL_FUNC_ARG_DECL, const bool right_eval
for (int idx = bound.start(); OB_SUCC(ret) && idx < bound.end(); idx++) {
ret = nmb_eval_op(idx, left_vec, right_vec, res_vec, nmb_fast_op, local_alloc);
}
if (std::is_same<ResVector, ObDiscreteFormat>::value) {
reinterpret_cast<ObDiscreteFormat *>(res_vec)->get_nulls()->unset_all(bound.start(),
bound.end());
}
} else {
for (int idx = bound.start(); OB_SUCC(ret) && idx < bound.end(); idx++) {
if (left_vec->is_null(idx) || right_vec->is_null(idx)) {
res_vec->set_null(idx);
} else {
if (std::is_same<ResVector, ObDiscreteFormat>::value) {
res_vec->unset_null(idx);
}
ret = nmb_eval_op(idx, left_vec, right_vec, res_vec, nmb_fast_op, local_alloc);
}
}
@ -806,6 +816,7 @@ struct ObVectorArithOpWrap : public Base
if (left_vec.is_null(idx) || right_vec.is_null(idx)) {
res_vec.set_null(idx);
} else {
res_vec.unset_null(idx);
ret = ObVectorArithOpWrap()(res_vec, left_vec, right_vec, idx, args...);
}
return ret;

View File

@ -130,6 +130,8 @@ public:
inline void set_all(const int64_t start_idx, const int64_t end_idx);
inline void unset_all(const int64_t start_idx, const int64_t end_idx);
inline bool is_all_true(const int64_t start_idx, const int64_t end_idx) const;
inline void deep_copy(const ObBitVectorImpl<WordType> &src, const int64_t start_idx, const int64_t end_idx);
@ -555,6 +557,27 @@ inline void ObBitVectorImpl<WordType>::set_all(const int64_t start_idx, const in
}
}
template<typename WordType>
inline void ObBitVectorImpl<WordType>::unset_all(const int64_t start_idx, const int64_t end_idx)
{
int64_t start_cnt = 0;
int64_t end_cnt = 0;
WordType start_mask = 0;
WordType end_mask = 0;
get_start_end_mask(start_idx, end_idx, start_mask, end_mask, start_cnt, end_cnt);
if (start_cnt == end_cnt) {
data_[start_cnt] &= (~(start_mask & end_mask));
} else {
data_[start_cnt] &= (~start_mask);
MEMSET(data_ + start_cnt + 1, 0x0, (end_cnt - start_cnt - 1) * BYTES_PER_WORD);
if (end_mask > 0) {
data_[end_cnt] &= (~end_mask);
}
}
}
template<typename WordType>
inline void ObBitVectorImpl<WordType>::deep_copy(const ObBitVectorImpl<WordType> &src, const int64_t start_idx,
const int64_t end_idx)

View File

@ -1433,6 +1433,26 @@ int ObOperator::filter_row(ObEvalCtx &eval_ctx, const ObIArray<ObExpr *> &exprs,
return ret;
}
int ObOperator::filter_row_vector(ObEvalCtx &eval_ctx,
const common::ObIArray<ObExpr *> &exprs,
const sql::ObBitVector &skip_bit,
bool &filtered)
{
int ret = OB_SUCCESS;
filtered = false;
const int64_t batch_idx = eval_ctx.get_batch_idx();
EvalBound eval_bound(eval_ctx.get_batch_size(), batch_idx, batch_idx + 1, false);
FOREACH_CNT_X(e, exprs, OB_SUCC(ret) && !filtered) {
if (OB_FAIL((*e)->eval_vector(eval_ctx, skip_bit, eval_bound))) {
LOG_WARN("Failed to evaluate vector", K(ret));
} else {
ObIVector *res = (*e)->get_vector(eval_ctx);
filtered = !res->is_true(batch_idx);
}
}
return ret;
}
int ObOperator::filter(const common::ObIArray<ObExpr *> &exprs, bool &filtered)
{
return filter_row(eval_ctx_, exprs, filtered);

View File

@ -476,6 +476,10 @@ public:
static int filter_row(ObEvalCtx &eval_ctx,
const common::ObIArray<ObExpr *> &exprs,
bool &filtered);
static int filter_row_vector(ObEvalCtx &eval_ctx,
const common::ObIArray<ObExpr *> &exprs,
const sql::ObBitVector &skip_bit,
bool &filtered);
ObBatchRows &get_brs() { return brs_; }
// Drain exchange in data for PX, or producer DFO will be blocked.
int drain_exch();