[FEAT MERGE] impl vectorization 2.0

Co-authored-by: Naynahs <cfzy002@126.com>
Co-authored-by: hwx65 <1780011298@qq.com>
Co-authored-by: oceanoverflow <oceanoverflow@gmail.com>
This commit is contained in:
obdev
2023-12-22 03:43:19 +00:00
committed by ob-robot
parent 1178245448
commit b6773084c6
592 changed files with 358124 additions and 303288 deletions

View File

@ -26,17 +26,6 @@ using namespace common;
namespace sql
{
DEF_TO_STRING(ObBatchRows)
{
int64_t pos = 0;
J_OBJ_START();
J_KV(K_(size), K_(end), KP(skip_),
"skip_bit_vec", ObLogPrintHex(reinterpret_cast<char *>(skip_),
NULL == skip_ ? 0 : ObBitVector::memory_size(size_)));
J_OBJ_END();
return pos;
}
OB_SERIALIZE_MEMBER(ObDynamicParamSetter, param_idx_, src_, dst_);
OB_SERIALIZE_MEMBER(ObOpSchemaObj, obj_type_, is_not_null_, order_type_);
@ -137,7 +126,8 @@ ObOpSpec::ObOpSpec(ObIAllocator &alloc, const ObPhyOperatorType type)
px_est_size_factor_(),
plan_depth_(0),
max_batch_size_(0),
need_check_output_datum_(false)
need_check_output_datum_(false),
use_rich_format_(false)
{
}
@ -157,7 +147,8 @@ OB_SERIALIZE_MEMBER(ObOpSpec,
px_est_size_factor_,
plan_depth_,
max_batch_size_,
need_check_output_datum_);
need_check_output_datum_,
use_rich_format_);
DEF_TO_STRING(ObOpSpec)
{
@ -172,7 +163,8 @@ DEF_TO_STRING(ObOpSpec)
"calc_exprs_cnt", calc_exprs_.count(),
K_(rows),
K_(max_batch_size),
K_(filters));
K_(filters),
K_(use_rich_format));
J_OBJ_END();
return pos;
}
@ -623,25 +615,27 @@ int ObOperator::output_expr_sanity_check()
int ObOperator::output_expr_sanity_check_batch()
{
int ret = OB_SUCCESS;
for (int64_t i = 0; OB_SUCC(ret) && i < spec_.output_.count(); ++i) {
const ObExpr *expr = spec_.output_[i];
if (OB_ISNULL(expr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("error unexpected, expr is nullptr", K(ret));
} else if (OB_FAIL(expr->eval_batch(eval_ctx_, *brs_.skip_, brs_.size_))) {
LOG_WARN("evaluate expression failed", K(ret));
} else if (!expr->is_batch_result()){
const ObDatum &datum = expr->locate_expr_datum(eval_ctx_);
SANITY_CHECK_RANGE(datum.ptr_, datum.len_);
} else {
const ObDatum *datums = expr->locate_batch_datums(eval_ctx_);
for (int64_t j = 0; j < brs_.size_; j++) {
if (!brs_.skip_->at(j)) {
SANITY_CHECK_RANGE(datums[j].ptr_, datums[j].len_);
}
}
}
}
// TODO: add sanity check for different vector formats
// for (int64_t i = 0; OB_SUCC(ret) && i < spec_.output_.count(); ++i) {
// const ObExpr *expr = spec_.output_[i];
// if (OB_ISNULL(expr)) {
// ret = OB_ERR_UNEXPECTED;
// LOG_WARN("error unexpected, expr is nullptr", K(ret));
// } else if (OB_FAIL(expr->eval_batch(eval_ctx_, *brs_.skip_, brs_.size_))) {
// LOG_WARN("evaluate expression failed", K(ret));
// } else if (!expr->is_batch_result()){
// const ObDatum &datum = expr->locate_expr_datum(eval_ctx_);
// SANITY_CHECK_RANGE(datum.ptr_, datum.len_);
// } else {
// const ObDatum *datums = expr->locate_batch_datums(eval_ctx_);
// for (int64_t j = 0; j < brs_.size_; j++) {
// if (!brs_.skip_->at(j)) {
// SANITY_CHECK_RANGE(datums[j].ptr_, datums[j].len_);
// }
// }
// }
// }
return ret;
}
@ -811,6 +805,15 @@ int ObOperator::open()
return ret;
}
void ObOperator::reset_output_format()
{
if (spec_.plan_->get_use_rich_format() && !spec_.use_rich_format_) {
FOREACH_CNT(e, spec_.output_) {
(*e)->get_vector_header(eval_ctx_).format_ = VEC_INVALID;
}
}
}
int ObOperator::init_evaluated_flags()
{
int ret = OB_SUCCESS;
@ -892,6 +895,11 @@ int ObOperator::inner_rescan()
batch_reach_end_ = false;
row_reach_end_ = false;
clear_batch_end_flag();
// For the vectorization 2.0, when rescan, upper operator may have changed expr format,
// but it has not been changed back, and the old operator will not initialize the format.
// When calling cast to uniform again, The current format may unexpected, and caused cast error;
// so when rescan, for operator which not support rich format, we reset the output format
reset_output_format();
op_monitor_info_.rescan_times_++;
output_batches_b4_rescan_ = op_monitor_info_.output_batches_;
if (spec_.need_check_output_datum_ && brs_checker_) {
@ -1228,7 +1236,9 @@ int ObOperator::get_next_batch(const int64_t max_row_cnt, const ObBatchRows *&ba
if (OB_FAIL(check_stack_once())) {
LOG_WARN("too deep recursive", K(ret));
} else {
if (OB_UNLIKELY(spec_.need_check_output_datum_ && brs_checker_)) {
if (OB_UNLIKELY(!spec_.plan_->get_use_rich_format()
&& spec_.need_check_output_datum_
&& brs_checker_)) {
if (OB_FAIL(brs_checker_->check_datum_modified())) {
LOG_WARN("check output datum failed", K(ret), "id", spec_.get_id(), "op_name", op_name());
}
@ -1261,7 +1271,7 @@ int ObOperator::get_next_batch(const int64_t max_row_cnt, const ObBatchRows *&ba
}
}
while (OB_SUCC(ret) && !brs_.end_) {
if (OB_FAIL(inner_get_next_batch(max_row_cnt))) {
if (OB_FAIL(inner_get_next_batch(min(max_row_cnt, get_spec().max_batch_size_)))) {
LOG_WARN("get next batch failed", K(ret), K_(eval_ctx), "id", spec_.get_id(), "op_name", op_name());
} else {
LOG_DEBUG("inner get next batch", "id", spec_.get_id(), "op_name", op_name(), K(brs_));
@ -1282,10 +1292,11 @@ int ObOperator::get_next_batch(const int64_t max_row_cnt, const ObBatchRows *&ba
} else if (OB_FAIL(try_check_status_by_rows(brs_.size_))) {
LOG_WARN("check status failed", K(ret));
} else if (!spec_.filters_.empty()) {
if (OB_FAIL(filter_batch_rows(spec_.filters_,
*brs_.skip_,
brs_.size_,
all_filtered))) {
if (OB_FAIL(filter_rows(spec_.filters_,
*brs_.skip_,
brs_.size_,
all_filtered,
brs_.all_rows_active_))) {
LOG_WARN("filter batch rows failed", K(ret), K_(eval_ctx));
} else if (all_filtered) {
brs_.skip_->reset(brs_.size_);
@ -1314,11 +1325,9 @@ int ObOperator::get_next_batch(const int64_t max_row_cnt, const ObBatchRows *&ba
// do project
if (OB_SUCC(ret) && brs_.size_ > 0) {
FOREACH_CNT_X(e, spec_.output_, OB_SUCC(ret)) {
if (OB_FAIL((*e)->eval_batch(eval_ctx_, *brs_.skip_, brs_.size_))) {
LOG_WARN("expr evaluate failed", K(ret), KPC(*e), K_(eval_ctx));
} else {
(*e)->get_eval_info(eval_ctx_).projected_ = true;
}
ret = spec_.use_rich_format_ ? (*e)->eval_vector(eval_ctx_, brs_)
: (*e)->eval_batch(eval_ctx_, *brs_.skip_, brs_.size_);
(*e)->get_eval_info(eval_ctx_).projected_ = true;
}
}
@ -1361,11 +1370,48 @@ int ObOperator::get_next_batch(const int64_t max_row_cnt, const ObBatchRows *&ba
}
}
// for operator which not use_rich_format, not maintain all_rows_active flag;
OZ (convert_vector_format());
if (!spec_.use_rich_format_ && PHY_TABLE_SCAN != spec_.type_) {
brs_.set_all_rows_active(false);
}
end_ash_line_id_reg();
end_cpu_time_counting();
return ret;
}
// for old -> new(parent) operator, need init_vector for output exprs
// for new -> old(parent) operator, need cast format of output exprs to uniform
int ObOperator::convert_vector_format()
{
int ret = OB_SUCCESS;
if (NULL != spec_.get_parent() &&
spec_.get_parent()->use_rich_format_ && !spec_.use_rich_format_) {
// old operator -> new operator
FOREACH_CNT_X(e, spec_.output_, OB_SUCC(ret)) {
VectorFormat format = (*e)->is_batch_result() ? VEC_UNIFORM : VEC_UNIFORM_CONST;
LOG_TRACE("init vector", K(format), K(*e));
if (OB_FAIL((*e)->init_vector(eval_ctx_, format, brs_.size_))) {
LOG_WARN("expr evaluate failed", K(ret), KPC(*e), K_(eval_ctx));
}
}
} else if ((spec_.use_rich_format_ && (&spec_ == spec_.plan_->get_root_op_spec()
&& !IS_TRANSMIT(spec_.type_)))
|| (spec_.use_rich_format_ &&
NULL != spec_.get_parent() && !spec_.get_parent()->use_rich_format_)) {
// new operator -> old operator
FOREACH_CNT_X(e, spec_.output_, OB_SUCC(ret)) {
LOG_TRACE("cast to uniform", K(*e));
if (OB_FAIL((*e)->cast_to_uniform(brs_.size_, eval_ctx_))) {
LOG_WARN("expr evaluate failed", K(ret), KPC(*e), K_(eval_ctx));
}
}
}
return ret;
}
int ObOperator::filter_row(ObEvalCtx &eval_ctx, const ObIArray<ObExpr *> &exprs, bool &filtered)
{
ObDatum *datum = NULL;
@ -1392,13 +1438,93 @@ int ObOperator::filter(const common::ObIArray<ObExpr *> &exprs, bool &filtered)
return filter_row(eval_ctx_, exprs, filtered);
}
int ObOperator::filter_batch_rows(const ObExprPtrIArray &exprs,
ObBitVector &skip,
const int64_t bsize,
bool &all_filtered)
int ObOperator::filter_rows(const ObExprPtrIArray &exprs,
ObBitVector &skip,
const int64_t bsize,
bool &all_filtered,
bool &all_active)
{
return spec_.use_rich_format_
? filter_vector_rows(exprs, skip, bsize, all_filtered, all_active)
: filter_batch_rows(exprs, skip, bsize, all_filtered, all_active);
}
int ObOperator::filter_vector_rows(const ObExprPtrIArray &exprs,
ObBitVector &skip,
const int64_t bsize,
bool &all_filtered,
bool &all_active)
{
int ret = OB_SUCCESS;
all_filtered = false;
bool tmp_all_active = true;
FOREACH_CNT_X(e, exprs, OB_SUCC(ret) && !all_filtered) {
int64_t output_rows = 0;
OB_ASSERT(ob_is_int_tc((*e)->datum_meta_.type_));
ObIVector *vec = NULL;
if (OB_FAIL((*e)->eval_vector(eval_ctx_, skip, bsize, all_active))) {
LOG_WARN("evaluate batch failed", K(ret), K_(eval_ctx));
} else if (FALSE_IT(vec = (*e)->get_vector(eval_ctx_))) {
// do nothing
} else if (!(*e)->is_batch_result()) {
ObUniformBase *const_vec = static_cast<ObUniformBase*>(vec);
ObDatum &d = const_vec->get_datums()[0];
if (d.null_ || 0 == *d.int_) {
skip.set_all(bsize);
tmp_all_active = false;
} else {
output_rows++;
}
LOG_DEBUG("const vector filter", K(bsize));
} else if (OB_LIKELY(VEC_FIXED == vec->get_format())) {
ObFixedLengthBase *fixed_vec = static_cast<ObFixedLengthBase *>(vec);
ObBitVector *nulls = fixed_vec->get_nulls();
int64_t *int_arr = reinterpret_cast<int64_t *>(fixed_vec->get_data());
ObBitVector::flip_foreach(skip, bsize,
[&](int64_t idx) __attribute__((always_inline)) {
if (0 == int_arr[idx] || nulls->at(idx)) {
skip.set(idx);
tmp_all_active = false;
} else {
output_rows++;
}
return OB_SUCCESS;
}
);
LOG_DEBUG("fixed vector filter", K(bsize));
} else if (VEC_UNIFORM == vec->get_format()) {
ObUniformBase *uniform_vec = static_cast<ObUniformBase *>(vec);
const ObDatum *datums = uniform_vec->get_datums();
for (int64_t i = 0; i < bsize; i++) {
if (!skip.at(i)) {
if (datums[i].null_ || 0 == *datums[i].int_) {
skip.set(i);
tmp_all_active = false;
} else {
output_rows++;
}
}
}
// FIXME bin.lb: add output_rows to ObBatchRows?
LOG_DEBUG("uniform vector filter", K(bsize));
}
all_filtered = (0 == output_rows);
all_active &= tmp_all_active;
}
return ret;
}
int ObOperator::filter_batch_rows(const ObExprPtrIArray &exprs,
ObBitVector &skip,
const int64_t bsize,
bool &all_filtered,
bool &all_active)
{
int ret = OB_SUCCESS;
all_filtered = false;
bool tmp_all_active = true;
FOREACH_CNT_X(e, exprs, OB_SUCC(ret) && !all_filtered) {
OB_ASSERT(ob_is_int_tc((*e)->datum_meta_.type_));
if (OB_FAIL((*e)->eval_batch(eval_ctx_, skip, bsize))) {
@ -1408,6 +1534,7 @@ int ObOperator::filter_batch_rows(const ObExprPtrIArray &exprs,
if (d.null_ || 0 == *d.int_) {
all_filtered = true;
skip.set_all(bsize);
tmp_all_active = false;
}
} else {
int64_t output_rows = 0;
@ -1416,6 +1543,7 @@ int ObOperator::filter_batch_rows(const ObExprPtrIArray &exprs,
if (!skip.at(i)) {
if (datums[i].null_ || 0 == *datums[i].int_) {
skip.set(i);
tmp_all_active = false;
} else {
output_rows += 1;
}
@ -1424,6 +1552,8 @@ int ObOperator::filter_batch_rows(const ObExprPtrIArray &exprs,
// FIXME bin.lb: add output_rows to ObBatchRows?
all_filtered = (0 == output_rows);
}
all_active &= tmp_all_active;
}
return ret;
}
@ -1475,31 +1605,6 @@ int ObOperator::do_drain_exch()
return ret;
}
int ObOperator::get_real_child(ObOperator *&child, const int32_t child_idx)
{
int ret = OB_SUCCESS;
const int32_t first_child_idx = 0;
ObOperator *first_child = nullptr;
if (first_child_idx >= child_cnt_) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("Invalid child idx", K(ret), K(child_cnt_));
} else if (OB_ISNULL(first_child = get_child(first_child_idx))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Unexpected null child", K(ret));
} else if (IS_DUMMY_PHY_OPERATOR(first_child->get_spec().get_type())) {
if (OB_FAIL(first_child->get_real_child(child, child_idx))) {
LOG_WARN("Failed to get real child", K(ret), K(first_child->get_spec().get_type()));
}
} else {
child = get_child(child_idx);
if (OB_ISNULL(child)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Get a null child", K(ret));
}
}
return ret;
}
int ObOperator::check_status()
{
return ctx_.check_status();