[FEAT MERGE] support NLJ/SPF vectorization 2.0

This commit is contained in:
pe-99y 2024-11-21 13:14:25 +00:00 committed by ob-robot
parent e9c9570701
commit 5b6af76499
34 changed files with 2665 additions and 29 deletions

View File

@ -1545,11 +1545,13 @@ DEF_BOOL(_enable_spf_batch_rescan, OB_TENANT_PARAMETER, "False",
DEF_BOOL(_enable_das_keep_order, OB_TENANT_PARAMETER, "True",
"enable das keep order optimization",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_BOOL(_enable_nlj_spf_use_rich_format, OB_TENANT_PARAMETER, "True",
"enable nlj and spf use rich format",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_INT(_parallel_max_active_sessions, OB_TENANT_PARAMETER, "0", "[0,]",
"max active parallel sessions allowed for tenant. Range: [0,+∞)",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_BOOL(enable_tcp_keepalive, OB_CLUSTER_PARAMETER, "true",
"enable TCP keepalive for the TCP connection of sql protocol. Take effect for "
"new established connections.",

View File

@ -159,6 +159,7 @@ ob_set_subtarget(ob_sql engine
engine/window_function/row_store.cpp
engine/window_function/win_expr.cpp
engine/window_function/row_store.cpp
engine/subquery/ob_subplan_filter_vec_op.cpp
)
ob_set_subtarget(ob_sql engine_aggregate
@ -228,6 +229,7 @@ ob_set_subtarget(ob_sql engine_basic
engine/basic/chunk_store/ob_compact_block_reader.cpp
engine/basic/chunk_store/ob_compact_block_writer.cpp
engine/basic/chunk_store/ob_chunk_block_compressor.cpp
engine/basic/ob_group_join_buffer_v2.cpp
#engine/basic/chunk_store/ob_encoded_block_writer.cpp
#engine/basic/chunk_store/ob_encoded_block_reader.cpp
engine/basic/ob_values_table_access_op.cpp
@ -856,6 +858,7 @@ ob_set_subtarget(ob_sql engine_join
engine/join/ob_join_filter_partition_splitter.cpp
engine/join/ob_join_filter_material_control_info.cpp
engine/join/ob_merge_join_vec_op.cpp
engine/join/ob_nested_loop_join_vec_op.cpp
)
ob_set_subtarget(ob_sql engine_pdml

View File

@ -164,6 +164,8 @@
#include "sql/engine/join/ob_join_filter_material_control_info.h"
#include "sql/engine/join/ob_merge_join_vec_op.h"
#include "sql/resolver/mv/ob_mv_provider.h"
#include "sql/engine/join/ob_nested_loop_join_vec_op.h"
#include "sql/engine/subquery/ob_subplan_filter_vec_op.h"
#ifdef OB_BUILD_TDE_SECURITY
#include "share/ob_master_key_getter.h"
#endif
@ -973,7 +975,7 @@ int ObStaticEngineCG::generate_spec_final(ObLogicalOperator &op, ObOpSpec &spec)
int ret = OB_SUCCESS;
UNUSED(op);
if (PHY_SUBPLAN_FILTER == spec.type_) {
if (PHY_SUBPLAN_FILTER == spec.type_ || PHY_VEC_SUBPLAN_FILTER == spec.type_) {
FOREACH_CNT_X(e, spec.calc_exprs_, OB_SUCC(ret)) {
if (T_REF_QUERY == (*e)->type_) {
ObExprSubQueryRef::Extra::get_info(**e).op_id_ = spec.id_;
@ -6094,6 +6096,129 @@ int ObStaticEngineCG::generate_spec(ObLogJoin &op,
UNUSED(in_root_job);
return generate_join_spec(op, spec);
}
int ObStaticEngineCG::generate_spec(ObLogJoin &op,
ObNestedLoopJoinVecSpec &spec,
const bool in_root_job)
{
int ret = OB_SUCCESS;
UNUSED(in_root_job);
bool is_late_mat = (phy_plan_->get_is_late_materialized() || op.is_late_mat());
phy_plan_->set_is_late_materialized(is_late_mat);
if (op.is_partition_wise()) {
phy_plan_->set_is_wise_join(op.is_partition_wise()); // set is_wise_join
}
// 1. add other join conditions
const ObIArray<ObRawExpr*> &other_join_conds = op.get_other_join_conditions();
if (OB_FAIL(spec.other_join_conds_.init(other_join_conds.count()))) {
LOG_WARN("failed to init other join conditions", K(ret));
} else {
ARRAY_FOREACH(other_join_conds, i) {
ObRawExpr *raw_expr = other_join_conds.at(i);
ObExpr *expr = NULL;
if (OB_ISNULL(raw_expr)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("null pointer", K(ret));
} else if (OB_FAIL(generate_rt_expr(*raw_expr, expr))) {
LOG_WARN("fail to generate rt expr", K(ret), K(*raw_expr));
} else if (OB_FAIL(spec.other_join_conds_.push_back(expr))) {
LOG_WARN("failed to add sql expr", K(ret), K(*expr));
} else {
LOG_DEBUG("equijoin condition", K(*raw_expr), K(*expr));
}
} // end for
}
spec.join_type_ = op.get_join_type();
if (OB_FAIL(ret)) {
// do nothing
} else if (NESTED_LOOP_JOIN == op.get_join_algo()) {
if (0 != op.get_equal_join_conditions().count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("equal join conditions' count should equal 0", K(ret));
} else {
spec.enable_gi_partition_pruning_ = op.is_enable_gi_partition_pruning();
if (spec.enable_gi_partition_pruning_ && OB_FAIL(do_gi_partition_pruning(op, spec))) {
LOG_WARN("fail do gi partition pruning", K(ret));
} else if (OB_FAIL(generate_param_spec(op.get_nl_params(), spec.rescan_params_))) {
LOG_WARN("fail to generate param spec", K(ret));
} else {
spec.enable_px_batch_rescan_ = false;
if (OB_SUCC(ret) && PHY_VEC_NESTED_LOOP_JOIN == spec.type_) {
bool use_batch_nlj = op.can_use_batch_nlj();
if (use_batch_nlj) {
spec.group_rescan_ = use_batch_nlj;
}
if (spec.is_vectorized()) {
// populate other cond join info
const ObIArray<ObExpr *> &conds = spec.other_join_conds_;
if (OB_FAIL(spec.left_expr_ids_in_other_cond_.prepare_allocate(conds.count()))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Failed to prepare_allocate left_expr_ids_in_other_cond_", K(ret));
} else {
ARRAY_FOREACH(conds, i) {
ObExpr *cond = conds.at(i);
ObSEArray<int, 1> left_expr_ids;
for (int64_t l_output_idx = 0;
OB_SUCC(ret) && l_output_idx < spec.get_left()->output_.count();
l_output_idx++) {
// check if left child expr appears in other_condition
bool appears_in_cond = false;
if (OB_FAIL(cond->contain_expr(
spec.get_left()->output_.at(l_output_idx), appears_in_cond))) {
LOG_WARN("other expr contain calculate failed", K(ret), KPC(cond),
K(l_output_idx),
KPC(spec.get_left()->output_.at(l_output_idx)));
} else {
if (appears_in_cond) {
if (OB_FAIL(left_expr_ids.push_back(l_output_idx))) {
LOG_WARN("other expr contain", K(ret));
}
}
}
}
// Note: no need to call init explicitly as init() is invoked inside assign()
OZ(spec.left_expr_ids_in_other_cond_.at(i).assign(left_expr_ids));
}
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(spec.left_rescan_params_.init(op.get_above_pushdown_left_params().count()))) {
LOG_WARN("fail to init fixed array", K(ret));
} else if (OB_FAIL(spec.right_rescan_params_.init(op.get_above_pushdown_right_params().count()))) {
LOG_WARN("fail to init fixed array", K(ret));
} else if (OB_FAIL(set_batch_exec_param(op.get_nl_params(), spec.rescan_params_))) {
LOG_WARN("fail to set batch exec param", K(ret));
}
ARRAY_FOREACH(op.get_above_pushdown_left_params(), i) {
ObExecParamRawExpr* param_expr = op.get_above_pushdown_left_params().at(i);
if (OB_FAIL(batch_exec_param_caches_.push_back(BatchExecParamCache(param_expr,
&spec,
true)))) {
LOG_WARN("fail to push back param expr", K(ret));
}
}
ARRAY_FOREACH(op.get_above_pushdown_right_params(), i) {
ObExecParamRawExpr* param_expr = op.get_above_pushdown_right_params().at(i);
if (OB_FAIL(batch_exec_param_caches_.push_back(BatchExecParamCache(param_expr,
&spec,
false)))) {
LOG_WARN("fail to push back param expr", K(ret));
}
}
}
}
}
}
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid join algorithm to generate NLJ spec", K(ret), K(op.get_join_algo()));
}
return ret;
}
int ObStaticEngineCG::generate_spec(ObLogJoin &op,
ObMergeJoinSpec &spec,
const bool in_root_job)
@ -6459,6 +6584,15 @@ int ObStaticEngineCG::do_gi_partition_pruning(
OZ(generate_rt_expr(*op.get_partition_id_expr(), spec.gi_partition_id_expr_));
return ret;
}
int ObStaticEngineCG::do_gi_partition_pruning(
ObLogJoin &op,
ObNestedLoopJoinVecSpec &spec)
{
int ret = OB_SUCCESS;
OZ(generate_rt_expr(*op.get_partition_id_expr(), spec.gi_partition_id_expr_));
return ret;
}
int ObStaticEngineCG::calc_equal_cond_opposite(const ObLogJoin &op,
const ObRawExpr &raw_expr,
bool &is_opposite)
@ -6834,6 +6968,18 @@ int ObStaticEngineCG::generate_spec(
return ret;
}
int ObStaticEngineCG::generate_spec(
ObLogSubPlanFilter &op, ObSubPlanFilterVecSpec &spec, const bool in_root_job)
{
int ret = OB_SUCCESS;
spec.use_rich_format_ = true;
ObSubPlanFilterSpec &base_spec = spec;
if (OB_FAIL(generate_spec(op, base_spec, in_root_job))) {
LOG_WARN("failed to generate spec for subplan filter operator", K(ret));
}
return ret;
}
int ObStaticEngineCG::generate_spec(
ObLogSubPlanScan &op, ObSubPlanScanSpec &spec, const bool)
{
@ -9140,6 +9286,11 @@ int ObStaticEngineCG::get_phy_op_type(ObLogicalOperator &log_op,
: (op.get_nl_params().count() > 0
? PHY_NESTED_LOOP_CONNECT_BY_WITH_INDEX
: PHY_CONNECT_BY);
if (type == PHY_NESTED_LOOP_JOIN && use_rich_format &&
op.get_plan()->get_optimizer_context().get_session_info()->is_nlj_spf_use_rich_format_enabled() &&
!op.enable_px_batch_rescan() && GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_4_3_5_0) {
type = PHY_VEC_NESTED_LOOP_JOIN;
}
break;
}
case MERGE_JOIN: {
@ -9370,7 +9521,15 @@ int ObStaticEngineCG::get_phy_op_type(ObLogicalOperator &log_op,
break;
}
case log_op_def::LOG_SUBPLAN_FILTER: {
auto &op = static_cast<ObLogSubPlanFilter &>(log_op);
type = PHY_SUBPLAN_FILTER;
if (!op.is_update_set() && use_rich_format &&
op.get_plan()->get_optimizer_context().get_session_info()->is_nlj_spf_use_rich_format_enabled() &&
!op.is_px_batch_rescan_enabled() && GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_4_3_5_0) {
type = PHY_VEC_SUBPLAN_FILTER;
} else {
type = PHY_SUBPLAN_FILTER;
}
break;
}
case log_op_def::LOG_SUBPLAN_SCAN: {
@ -9854,6 +10013,28 @@ int ObStaticEngineCG::set_batch_exec_param(const ObIArray<ObExecParamRawExpr *>
} else if (OB_FAIL(batch_exec_param_caches_.remove(j))) {
LOG_WARN("fail to remove batch nl param caches", K(ret));
}
} else if (cache.spec_->get_type() == PHY_VEC_SUBPLAN_FILTER) {
ObSubPlanFilterVecSpec *spf = static_cast<ObSubPlanFilterVecSpec*>(cache.spec_);
if (cache.is_left_param_ &&
OB_FAIL(spf->left_rescan_params_.push_back(setter))) {
LOG_WARN("fail to push back left rescan params", K(ret));
} else if (!cache.is_left_param_ &&
OB_FAIL(spf->right_rescan_params_.push_back(setter))) {
LOG_WARN("fail to push back right rescan params", K(ret));
} else if (OB_FAIL(batch_exec_param_caches_.remove(j))) {
LOG_WARN("fail to remove batch nl param caches", K(ret));
}
} else if (cache.spec_->get_type() == PHY_VEC_NESTED_LOOP_JOIN) {
ObNestedLoopJoinVecSpec *nlj = static_cast<ObNestedLoopJoinVecSpec*>(cache.spec_);
if (cache.is_left_param_ &&
OB_FAIL(nlj->left_rescan_params_.push_back(setter))) {
LOG_WARN("fail to push back left rescan params", K(ret));
} else if (!cache.is_left_param_ &&
OB_FAIL(nlj->right_rescan_params_.push_back(setter))) {
LOG_WARN("fail to push back right rescan params", K(ret));
} else if (OB_FAIL(batch_exec_param_caches_.remove(j))) {
LOG_WARN("fail to remove batch nl param caches", K(ret));
}
}
}
}

View File

@ -93,6 +93,7 @@ class ObPxMSCoordSpec;
class ObPxMSCoordVecSpec;
class ObLogSubPlanFilter;
class ObSubPlanFilterSpec;
class ObSubPlanFilterVecSpec;
class ObLogSubPlanScan;
class ObSubPlanScanSpec;
class ObGroupBySpec;
@ -140,6 +141,7 @@ class ObLogValuesTableAccess;
class ObValuesTableAccessSpec;
class ObMergeGroupByVecSpec;
class ObNestedLoopJoinVecSpec;
typedef common::ObList<uint64_t, common::ObIAllocator> DASTableIdList;
typedef common::ObSEArray<common::ObSEArray<int64_t, 8, common::ModulePageAllocator, true>,
1, common::ModulePageAllocator, true> RowParamMap;
@ -315,6 +317,7 @@ private:
int generate_spec(ObLogValues &op, ObValuesSpec &spec, const bool in_root_job);
int generate_spec(ObLogSubPlanFilter &op, ObSubPlanFilterSpec &spec, const bool in_root_job);
int generate_spec(ObLogSubPlanFilter &op, ObSubPlanFilterVecSpec &spec, const bool in_root_job);
int generate_spec(ObLogSubPlanScan &op, ObSubPlanScanSpec &spec, const bool in_root_job);
@ -365,6 +368,8 @@ private:
// generate nested loop join
int generate_spec(ObLogJoin &op, ObNestedLoopJoinSpec &spec, const bool in_root_job);
// generate NLJ for vec_2_0
int generate_spec(ObLogJoin &op, ObNestedLoopJoinVecSpec &spec, const bool in_root_job);
// generate merge join
int generate_spec(ObLogJoin &op, ObMergeJoinSpec &spec, const bool in_root_job);
int generate_spec(ObLogJoin &op, ObMergeJoinVecSpec &spec, const bool in_root_job);
@ -526,6 +531,10 @@ private:
ObLogJoin &op,
ObBasicNestedLoopJoinSpec &spec);
int do_gi_partition_pruning(
ObLogJoin &op,
ObNestedLoopJoinVecSpec &spec);
int generate_hash_func_exprs(
const common::ObIArray<ObExchangeInfo::HashExpr> &hash_dist_exprs,
ExprFixedArray &dist_exprs,

View File

@ -52,7 +52,6 @@ public:
int64_t param_idx_;
ObSqlArrayObj *gr_param_; //group rescan param
};
typedef common::ObArrayWrap<GroupRescanParam> GroupParamArray;
class ObDASCtx
{

View File

@ -0,0 +1,661 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX SQL_ENG
#include "ob_group_join_buffer_v2.h"
#include "sql/engine/ob_exec_context.h"
namespace oceanbase
{
using namespace common;
namespace sql
{
ObDriverRowBuffer::ObDriverRowBuffer()
:op_(nullptr), left_(nullptr), left_brs_(), spec_(nullptr), ctx_(nullptr), eval_ctx_(nullptr),
rescan_params_(nullptr), left_store_(), left_store_iter_(), mem_context_(),
cur_group_idx_(-1), group_rescan_cnt_(0), group_params_(), last_batch_(),
max_group_size_(0), group_scan_size_(0), left_store_read_(0), flags_(0)
{
}
int ObDriverRowBuffer::init(ObOperator *op,
const int64_t max_group_size,
const int64_t group_scan_size,
const common::ObIArray<ObDynamicParamSetter> *rescan_params)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(is_inited())) {
ret = OB_INIT_TWICE;
LOG_WARN("batch info was already inited", KR(ret));
} else if (OB_UNLIKELY(NULL != mem_context_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("mem context should be null", KR(ret));
} else if (OB_ISNULL(op)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("op is null", KR(ret), KP(op));
} else if (OB_UNLIKELY(op->get_child_cnt() < 2)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("op should have at least 2 children", KR(ret), K(op->get_child_cnt()));
} else if (max_group_size < group_scan_size) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("max group size is less than group scan size", K(max_group_size), K(group_scan_size));
} else {
op_ = op;
spec_ = &op_->get_spec();
ctx_ = &op_->get_exec_ctx();
eval_ctx_ = &op_->get_eval_ctx();
left_ = op_->get_child();
max_group_size_ = max_group_size;
group_scan_size_ = group_scan_size;
rescan_params_ = rescan_params;
ObMemAttr mem_attr(ctx_->get_my_session()->get_effective_tenant_id(), ObModIds::OB_SQL_NLJ_CACHE, ObCtxIds::WORK_AREA);
lib::ContextParam param;
ObCompressorType compressor_type = ObCompressorType::NONE_COMPRESSOR;
bool reorder_fixed_expr = false;
bool enable_trunc = false;
param.set_mem_attr(mem_attr)
.set_properties(lib::USE_TL_PAGE_OPTIONAL);
if (OB_FAIL(CURRENT_CONTEXT->CREATE_CONTEXT(mem_context_, param))) {
LOG_WARN("create entity failed", KR(ret));
} else if (OB_ISNULL(mem_context_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("null memory entity", KR(ret));
} else if (OB_FAIL(left_store_.init(left_->get_spec().output_, eval_ctx_->max_batch_size_, mem_attr, UINT64_MAX, true, 0, compressor_type, false, false))) {
LOG_WARN("init row store failed", KR(ret));
} else if (FALSE_IT(left_store_.set_allocator(mem_context_->get_malloc_allocator()))) {
}
if (OB_SUCC(ret)) {
if (OB_FAIL(last_batch_.init(left_->get_spec().output_,
*eval_ctx_))) {
LOG_WARN("init batch failed", KR(ret));
} else if (OB_FAIL(init_left_batch_rows())) {
LOG_WARN("failed to init left batch rows", K(ret));
}
}
}
if (OB_SUCC(ret)) {
is_inited_ = true;
}
return ret;
}
int ObDriverRowBuffer::init_left_batch_rows()
{
int ret = OB_SUCCESS;
if (op_->is_vectorized()) {
if (OB_ISNULL(ctx_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("exec ctx is nullptr", K(ret));
} else {
int batch_size = eval_ctx_->max_batch_size_;
void *mem =ctx_->get_allocator().alloc(ObBitVector::memory_size(batch_size));
if (OB_ISNULL(mem)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate memory failed", K(ret), K(batch_size));
} else {
left_brs_.skip_ = to_bit_vector(mem);
left_brs_.skip_->init(batch_size);
}
}
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("the parent op is not vectorized", K(ret));
}
return ret;
}
void ObDriverRowBuffer::bind_group_params_to_das_ctx(GroupParamBackupGuard &guard)
{
guard.bind_batch_rescan_params(cur_group_idx_, group_rescan_cnt_, &rescan_params_info_);
}
int ObDriverRowBuffer::rescan_left()
{
int ret = OB_SUCCESS;
if (OB_FAIL(left_->rescan())) {
LOG_WARN("rescan left failed", KR(ret),
K(left_->get_spec().get_id()), K(left_->op_name()));
}
return ret;
}
void ObDriverRowBuffer::destroy()
{
left_store_iter_.reset();
left_store_.reset();
last_batch_.reset();
if (NULL != mem_context_) {
DESTROY_CONTEXT(mem_context_);
mem_context_ = NULL;
}
}
void ObDriverRowBuffer::reset_buffer_state()
{
cur_group_idx_ = -1;
left_store_iter_.reset();
left_store_.reset();
left_store_read_ = 0;
last_batch_.clear_saved_size();
mem_context_->get_arena_allocator().reset();
}
void ObDriverRowBuffer::reset()
{
left_store_iter_.reset();
left_store_.reset();
save_last_batch_ = false;
last_batch_.reset();
cur_group_idx_ = -1;
left_store_read_ = 0;
is_left_end_ = false;
}
int ObDriverRowBuffer::add_row_to_store()
{
int ret = OB_SUCCESS;
ObCompactRow *compact_row = nullptr;
if (OB_FAIL(left_store_.add_row(left_->get_spec().output_, *eval_ctx_, compact_row))) {
LOG_WARN("add row failed", KR(ret));
}
return ret;
}
int ObDriverRowBuffer::init_group_params()
{
int ret = OB_SUCCESS;
if (!group_params_.empty()) {
for (int64_t i = 0; i < group_params_.count(); ++i) {
group_params_.at(i).count_ = 0;
}
} else if (OB_FAIL(group_params_.allocate_array(ctx_->get_allocator(),
rescan_params_->count()))) {
LOG_WARN("allocate group params array failed", KR(ret), K(rescan_params_->count()));
} else {
int64_t obj_buf_size = sizeof(ObObjParam) * max_group_size_;
for (int64_t i = 0; OB_SUCC(ret) && i < group_params_.count(); ++i) {
ObExpr *dst_expr = rescan_params_->at(i).dst_;
void *buf = ctx_->get_allocator().alloc(obj_buf_size);
if (OB_ISNULL(buf)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("alloc memory failed", KR(ret), K(obj_buf_size));
} else {
group_params_.at(i).data_ = reinterpret_cast<ObObjParam*>(buf);
group_params_.at(i).count_ = 0;
group_params_.at(i).element_.set_meta_type(dst_expr->obj_meta_);
}
}
}
// collect batch nlj params needed by rescan right op
if (OB_FAIL(ret)) {
} else if (group_params_.empty()) {
// do nothing
} else if (rescan_params_info_.empty()) { // only perform once
int64_t rescan_params_info_cnt = group_params_.count();
if (OB_FAIL(rescan_params_info_.allocate_array(ctx_->get_allocator(),rescan_params_info_cnt))) {
LOG_WARN("failed to allocate group param info", K(ret), K(rescan_params_info_cnt));
} else {
// collect rescan params of current nlj op
int64_t j = 0;
for (int64_t i = 0; OB_SUCC(ret) && i < group_params_.count() && j < rescan_params_info_.count(); ++i, ++j) {
int64_t param_idx = rescan_params_->at(i).param_idx_;
rescan_params_info_.at(j).param_idx_ = param_idx;
rescan_params_info_.at(j).gr_param_ = &group_params_.at(i);
}
}
}
return ret;
}
// Optimize: without copy the value to paramstore, and copy from paramstore to objarray
int ObDriverRowBuffer::deep_copy_dynamic_obj()
{
int ret = OB_SUCCESS;
ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(*ctx_);
ParamStore &param_store = plan_ctx->get_param_store_for_update();
if (OB_ISNULL(mem_context_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("mem entity is not inited", KR(ret));
}
for (int64_t i = 0; OB_SUCC(ret) && i < rescan_params_->count(); ++i) {
const ObDynamicParamSetter &rescan_param = rescan_params_->at(i);
int64_t param_idx = rescan_param.param_idx_;
ObExpr* src_expr = rescan_param.src_;
ObObj tmp_obj;
ObDatum* res_datum = nullptr;
// NOTE: use eval_vector here
if (OB_FAIL(src_expr->eval(*eval_ctx_, res_datum))) {
LOG_WARN("failed to eval src expr", K(ret));
} else if (OB_ISNULL(res_datum)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("the result ObDatum of res expr is nullptr", K(ret));
} else if (OB_FAIL(res_datum->to_obj(tmp_obj, src_expr->obj_meta_, src_expr->obj_datum_map_))) {
LOG_WARN("failed to convert ObDatum to ObObj", K(ret));
} else if (OB_FAIL(ob_write_obj(mem_context_->get_arena_allocator(),
tmp_obj,
group_params_.at(i).data_[group_params_.at(i).count_]))) {
LOG_WARN("deep copy dynamic param failed", KR(ret), K(i), K(param_idx));
} else {
group_params_.at(i).count_++;
}
}
return ret;
}
int ObDriverRowBuffer::fill_cur_row_group_param()
{
int ret = OB_SUCCESS;
ObPhysicalPlanCtx *plan_ctx = ctx_->get_physical_plan_ctx();
if (group_params_.empty() || cur_group_idx_ >= group_params_.at(0).count_) {
ret = OB_ERR_UNEXPECTED;
if (group_params_.empty()) {
LOG_WARN("empty group params", KR(ret), K(cur_group_idx_), K(group_params_.empty()));
} else {
LOG_WARN("row idx is unexpected", KR(ret),
K(cur_group_idx_), K(group_params_.count()));
}
} else {
cur_group_idx_++;
for (int64_t i = 0; OB_SUCC(ret) && i < group_params_.count(); i++) {
const ObDynamicParamSetter &rescan_param = rescan_params_->at(i);
int64_t param_idx = rescan_param.param_idx_;
ObExpr *dst = rescan_param.dst_;
OZ(dst->init_vector(*eval_ctx_, VEC_UNIFORM_CONST, 1));
ObDatum &param_datum = dst->locate_datum_for_write(*eval_ctx_);
ObSqlArrayObj &arr = group_params_.at(i);
dst->get_eval_info(*eval_ctx_).clear_evaluated_flag();
ObDynamicParamSetter::clear_parent_evaluated_flag(*eval_ctx_, *dst);
if (OB_FAIL(param_datum.from_obj(arr.data_[cur_group_idx_], dst->obj_datum_map_))) {
LOG_WARN("fail to cast datum", K(ret));
} else {
plan_ctx->get_param_store_for_update().at(param_idx) = arr.data_[cur_group_idx_];
dst->set_evaluated_projected(*eval_ctx_);
}
}
}
return ret;
}
int ObDriverRowBuffer::batch_fill_group_buffer(const int64_t max_row_cnt)
{
int ret = OB_SUCCESS;
if (!is_left_end_ && need_fill_group_buffer()) {
group_rescan_cnt_++;
ObEvalCtx::BatchInfoScopeGuard batch_info_guard(*eval_ctx_);
if (save_last_batch_) {
last_batch_.restore();
save_last_batch_ = false;
}
const ObBatchRows *batch_rows = &left_->get_brs();
reset_buffer_state();
if (OB_FAIL(init_group_params())) {
LOG_WARN("init group params failed", KR(ret));
}
while (OB_SUCC(ret) && !is_full() && !batch_rows->end_) {
op_->clear_evaluated_flag();
if (!rescan_params_->empty()) {
op_->set_pushdown_param_null(*rescan_params_);
}
if (OB_FAIL(left_->get_next_batch(max_row_cnt, batch_rows))) {
LOG_WARN("get next batch from left failed", KR(ret));
} else if (batch_rows->end_) {
is_left_end_ = true;
} else {
for (int64_t l_idx = 0; OB_SUCC(ret) && l_idx < batch_rows->size_; l_idx++) {
if (batch_rows->skip_->exist(l_idx)) {
// do nothing
} else {
batch_info_guard.set_batch_idx(l_idx);
batch_info_guard.set_batch_size(batch_rows->size_);
if (OB_FAIL(add_row_to_store())) {
LOG_WARN("store left row failed", KR(ret));
} else if (OB_FAIL(deep_copy_dynamic_obj())) {
LOG_WARN("deep copy dynamic obj failed", KR(ret));
}
}
}
} // end add a batch rows to store
} // end fill group join buffer
if (OB_SUCC(ret)) {
if (!rescan_params_->empty()) {
op_->set_pushdown_param_null(*rescan_params_);
}
if (batch_rows->size_ == 0 && batch_rows->end_) {
// do nothing
} else {
last_batch_.save(spec_->max_batch_size_);
save_last_batch_ = true;
}
if (left_store_.get_row_cnt() <= 0) {
// this could happen if we have skipped all rows
ret = OB_ITER_END;
} else if (OB_FAIL(left_store_.begin(left_store_iter_))) {
LOG_WARN("begin iterator for chunk row store failed", KR(ret));
}
}
}
return ret;
}
int ObDriverRowBuffer::get_next_left_batch(int64_t max_rows, const ObBatchRows *&batch_rows)
{
int ret = OB_SUCCESS;
// if joinbuffer is empty, fill joinbuffer first, and then get next batch from joinbuffer
int64_t read_size = 0;
batch_rows = &left_brs_;
if (OB_FAIL(batch_fill_group_buffer(max_rows))) {
if (OB_ITER_END != ret) {
LOG_WARN("failed to batch fill group join buffer", KR(ret));
} else {
left_brs_.size_ = 0;
left_brs_.end_ = true;
ret = OB_SUCCESS;
}
} else if (OB_FAIL(get_next_batch_from_store(max_rows, read_size))) {
if (OB_ITER_END != ret) {
LOG_WARN("get next batch from store failed", KR(ret));
} else {
left_brs_.size_ = 0;
left_brs_.end_ = true;
ret = OB_SUCCESS;
}
} else {
left_brs_.skip_->reset(read_size);
left_brs_.size_ = read_size;
left_brs_.end_ = false;
}
return ret;
}
int ObDriverRowBuffer::get_next_batch_from_store(int64_t max_rows, int64_t &read_rows)
{
int ret = OB_SUCCESS;
if (left_store_iter_.is_valid() && left_store_iter_.has_next()) {
if (OB_FAIL(left_store_iter_.get_next_batch(left_->get_spec().output_, *eval_ctx_, max_rows, read_rows))) {
if (OB_ITER_END != ret) {
LOG_WARN("get next row from iter failed", KR(ret));
}
}
} else {
ret = OB_ITER_END;
}
return ret;
}
ObDriverRowIterator::ObDriverRowIterator():
left_brs_(nullptr), l_idx_(0), op_(nullptr), left_(nullptr),
join_buffer_(), left_batch_(), rescan_params_(nullptr), is_group_rescan_(false),
eval_ctx_(nullptr), op_max_batch_size_(0), need_backup_left_(false), left_expr_extend_size_(0),left_matched_(nullptr), batch_mem_ctx_(NULL), ctx_(nullptr)
{
}
int ObDriverRowIterator::init(ObOperator *op, const int64_t op_group_scan_size,
const common::ObIArray<ObDynamicParamSetter> *rescan_params, bool is_group_rescan,
bool need_backup_left)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(op)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("op is null", KR(ret), KP(op));
} else if (OB_UNLIKELY(op->get_child_cnt() < 2)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("op should have at least 2 children", KR(ret), K(op->get_child_cnt()));
} else if (OB_ISNULL(rescan_params)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("rescan params is nullptr", K(ret));
} else {
l_idx_ = 0;
op_ = op;
left_ = op->get_child();
eval_ctx_ = &op->get_eval_ctx();
is_group_rescan_ = is_group_rescan;
op_max_batch_size_ = left_->get_spec().max_batch_size_;
need_backup_left_ = need_backup_left;
left_expr_extend_size_ = 0;
rescan_params_ = rescan_params;
ctx_ = &op->get_exec_ctx();
if (OB_SUCC(ret) && op_->is_vectorized()) {
if (OB_ISNULL(batch_mem_ctx_)) {
ObSQLSessionInfo *session = op_->get_exec_ctx().get_my_session();
uint64_t tenant_id =session->get_effective_tenant_id();
lib::ContextParam param;
const int64_t mem_limit = 8 * 1024 * 1024; //8M;
param.set_mem_attr(tenant_id,
ObModIds::OB_SQL_NLJ_CACHE,
ObCtxIds::WORK_AREA)
.set_properties(lib::USE_TL_PAGE_OPTIONAL);
if (OB_FAIL(CURRENT_CONTEXT->CREATE_CONTEXT(batch_mem_ctx_, param))) {
LOG_WARN("create entity failed", K(ret));
} else if (OB_ISNULL(batch_mem_ctx_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("null memory entity returned", K(ret));
}
}
if (OB_SUCC(ret)) {
char *buf = (char *)batch_mem_ctx_->get_arena_allocator()
.alloc(ObBitVector::memory_size(op_->get_spec().max_batch_size_));
if (OB_ISNULL(buf)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to alloc", K(ret));
} else {
MEMSET(buf, 0, ObBitVector::memory_size(op_->get_spec().max_batch_size_));
left_matched_ = to_bit_vector(buf);
}
}
if (OB_SUCC(ret) && OB_FAIL(left_batch_.init(left_->get_spec().output_, *eval_ctx_))) {
LOG_WARN("failed to init left batch result holder", K(ret));
}
}
if (OB_SUCC(ret) && is_group_rescan_) {
int64_t simulate_group_size = - EVENT_CALL(EventTable::EN_DAS_SIMULATE_GROUP_SIZE);
int64_t group_scan_size = 0;
int64_t max_group_size = 0;
if (simulate_group_size > 0) {
max_group_size = simulate_group_size + op_->get_spec().plan_->get_batch_size();
group_scan_size = simulate_group_size;
} else {
max_group_size = OB_MAX_BULK_JOIN_ROWS + op_->get_spec().plan_->get_batch_size();
group_scan_size = op_group_scan_size;
}
if (OB_FAIL(join_buffer_.init(op, max_group_size, group_scan_size, rescan_params))) {
LOG_WARN("failed to init group join buffer for group rescan", K(ret));
}
}
}
return ret;
}
void ObDriverRowIterator::destroy()
{
if (op_->is_vectorized()) {
left_batch_.reset();
if (nullptr != batch_mem_ctx_) {
DESTROY_CONTEXT(batch_mem_ctx_);
batch_mem_ctx_ = nullptr;
}
}
if (is_group_rescan_) {
join_buffer_.destroy();
}
}
int ObDriverRowIterator::get_next_left_batch(int64_t max_rows, const ObBatchRows *&batch_rows)
{
int ret = OB_SUCCESS;
DASGroupScanMarkGuard mark_guard(ctx_->get_das_ctx(), is_group_rescan_);
if (!is_group_rescan_) {
op_->set_pushdown_param_null(*rescan_params_);
if (OB_FAIL(left_->get_next_batch(max_rows, batch_rows))) {
LOG_WARN("failed to get batch from left child", K(ret));
}
} else {
if (OB_FAIL(join_buffer_.get_next_left_batch(max_rows, batch_rows))) {
LOG_WARN("failed to get next batch from join buffer", K(ret));
}
}
left_brs_ = batch_rows;
return ret;
}
int ObDriverRowIterator::fill_cur_row_group_param()
{
int ret = OB_SUCCESS;
// ObEvalCtx::BatchInfoScopeGuard batch_info_guard(*eval_ctx_);
// batch_info_guard.set_batch_size(left_brs_->size_);
// batch_info_guard.set_batch_idx(l_idx_);
if (is_group_rescan_) {
if (OB_FAIL(join_buffer_.fill_cur_row_group_param())) {
LOG_WARN("failed to fill group param from join buffer", K(ret));
}
} else {
int64_t param_cnt = rescan_params_->count();
for (int64_t i = 0; OB_SUCC(ret) && i < param_cnt; ++i) {
const ObDynamicParamSetter &rescan_param = rescan_params_->at(i);
if (OB_FAIL(rescan_param.set_dynamic_param_vec2(*eval_ctx_, *(left_brs_->skip_)))) {
LOG_WARN("fail to set dynamic param", K(ret), K(l_idx_));
}
}
}
return ret;
}
int ObDriverRowIterator::get_next_left_row()
{
int ret = OB_SUCCESS;
bool get_row = false;
op_->set_pushdown_param_null(*rescan_params_);
left_expr_extend_size_ = 0;
l_idx_++;
while (!get_row && OB_SUCC(ret)) {
// current batch has left row, no need to get left batch
if (OB_NOT_NULL(left_brs_) && l_idx_ < left_brs_->size_) {
for (int i = l_idx_; i < left_brs_->size_ && !get_row; i++) {
if (!left_brs_->skip_->exist(l_idx_)) {
get_row = true;
} else {
l_idx_ = i + 1;
}
}
LOG_TRACE("get a row from left batch", K(l_idx_), K(get_row), K(left_brs_->size_));
} else {
// get a new batch
LOG_TRACE("start get next batch from left op");
if (need_backup_left_ && OB_FAIL(left_batch_.restore())) {
LOG_WARN("failed to restore left batch rows", K(ret));
} else if (OB_FAIL(get_next_left_batch(op_max_batch_size_, left_brs_))) {
LOG_WARN("failed to get next left batch", K(ret));
} else if (left_brs_->end_) {
ret = OB_ITER_END;
} else if (need_backup_left_ && OB_FAIL(left_batch_.save(left_->is_vectorized() ? op_max_batch_size_ : 1))) { // backup left_batch for NLJ
LOG_WARN("failed to get backup left batch rows", K(ret));
}
l_idx_ = 0;
LOG_TRACE("end get next batch from left", K(left_brs_->size_), K(left_brs_->end_));
}
}
return ret;
}
int ObDriverRowIterator::drive_row_extend(int size)
{
int ret = OB_SUCCESS;
int min_vec_size = 0;
if (OB_FAIL(get_min_vec_size_from_drive_row(min_vec_size))) {
LOG_WARN("failed to get min vector size of drive row", K(ret));
} else if (left_expr_extend_size_ < size || min_vec_size < size) {
left_batch_.drive_row_extended(l_idx_, 0, size);
left_expr_extend_size_ = size;
}
LOG_TRACE("drive_row extended", K(l_idx_), K(size), K(left_expr_extend_size_), K(min_vec_size), K(op_->get_spec().id_));
return ret;
}
int ObDriverRowIterator::restore_drive_row(int from_idx, int to_idx)
{
int ret = OB_SUCCESS;
left_batch_.restore_single_row(from_idx, to_idx);
return ret;
}
int ObDriverRowIterator::rescan_left()
{
int ret = OB_SUCCESS;
if (is_group_rescan_) {
if (OB_FAIL(join_buffer_.rescan_left())) {
LOG_WARN("failed to rescan left in group rescan", K(ret));
}
} else {
if (OB_FAIL(left_->rescan())) {
LOG_WARN("failed to rescan left op", K(ret));
}
}
//reset();
return ret;
}
void ObDriverRowIterator::bind_group_params_to_das_ctx(GroupParamBackupGuard &guard)
{
if (is_group_rescan_) {
join_buffer_.bind_group_params_to_das_ctx(guard);
}
}
void ObDriverRowIterator::reset()
{
l_idx_ = 0;
left_expr_extend_size_ = 0;
left_batch_.clear_saved_size();
if (is_group_rescan_) {
join_buffer_.reset();
}
left_brs_ = nullptr;
}
int ObDriverRowIterator::get_min_vec_size_from_drive_row(int &min_vec_size) {
int ret = OB_SUCCESS;
min_vec_size = left_expr_extend_size_;
if (OB_ISNULL(left_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("drive operator of NLJ is null", K(ret));
} else {
for (int i = 0; OB_SUCC(ret) && i < left_->get_spec().output_.count(); i++) {
const ObExpr *expr = left_->get_spec().output_.at(i);
if (OB_ISNULL(expr) || OB_ISNULL(expr->get_vector(*eval_ctx_))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("expr of drive operator for NLJ is null", K(ret), K(i));
} else {
int tmp_vec_size = static_cast<ObVectorBase *>(expr->get_vector(*eval_ctx_))->get_max_row_cnt();
if (tmp_vec_size < left_expr_extend_size_) {
LOG_TRACE("the vector size is less than left expr extended size", K(expr), K(i));
}
if (tmp_vec_size < min_vec_size) {
min_vec_size = tmp_vec_size;
}
}
}
}
return ret;
}
} // end anmespace oceanbase
} // end naemespace sql

View File

@ -0,0 +1,153 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_BASIC_OB_GROUP_JOIN_BUFFER_V2_H_
#define OCEANBASE_BASIC_OB_GROUP_JOIN_BUFFER_V2_H_
#include "sql/engine/basic/ob_vector_result_holder.h"
#include "sql/engine/basic/ob_temp_row_store.h"
#include "sql/engine/ob_operator.h"
#include "sql/das/ob_das_context.h"
namespace oceanbase
{
namespace sql
{
class ObDriverRowBuffer
{
public:
ObDriverRowBuffer();
~ObDriverRowBuffer() {}
public:
int get_next_left_row();
int rescan_left();
int fill_cur_row_group_param();
void bind_group_params_to_das_ctx(GroupParamBackupGuard &guard);
int get_next_left_batch(int64_t max_rows, const ObBatchRows *&batch_rows);
void incre_group_id() { cur_group_idx_++; }
int get_cur_group_id() const { return cur_group_idx_; }
int get_group_rescan_cnt() const { return group_rescan_cnt_; }
int init(ObOperator *op, const int64_t max_group_size, const int64_t group_scan_size, const common::ObIArray<ObDynamicParamSetter> *rescan_params);
void destroy();
void reset();
bool is_multi_level() const { return is_multi_level_; }
const GroupParamArray *get_rescan_params_info() const { return &rescan_params_info_; }
private:
bool need_fill_group_buffer() { return !(left_store_iter_.is_valid() && left_store_iter_.has_next()); }
int batch_fill_group_buffer(const int64_t max_row_cnt);
int get_next_batch_from_store(int64_t max_rows, int64_t &read_rows);
void reset_buffer_state();
bool is_full() { return left_store_.get_row_cnt() >= group_scan_size_; }
int add_row_to_store();
int deep_copy_dynamic_obj();
int init_group_params();
bool is_inited() { return is_inited_; }
int init_left_batch_rows();
private:
// the elements of join buffer
ObOperator *op_;
ObOperator *left_;
ObBatchRows left_brs_;
const ObOpSpec *spec_;
ObExecContext *ctx_;
ObEvalCtx *eval_ctx_;
const common::ObIArray<ObDynamicParamSetter> *rescan_params_;
ObTempRowStore left_store_;
ObTempRowStore::Iterator left_store_iter_;
lib::MemoryContext mem_context_; // for dynamic param copying, will reset after each group rescan
int64_t cur_group_idx_;
int64_t group_rescan_cnt_;
common::ObArrayWrap<ObSqlArrayObj> group_params_;
GroupParamArray rescan_params_info_;
ObVectorsResultHolder last_batch_;
int64_t max_group_size_;
int64_t group_scan_size_;
int64_t left_store_read_;
union {
uint64_t flags_;
struct {
uint64_t is_inited_ : 1;
uint64_t need_check_above_ : 1;
uint64_t is_multi_level_ : 1;
uint64_t is_left_end_ : 1;
uint64_t save_last_row_ : 1;
uint64_t save_last_batch_ : 1;
uint64_t skip_rescan_right_ : 1;
uint64_t reserved_ : 57;
};
};
};
class ObDriverRowIterator
{
public:
ObDriverRowIterator();
~ObDriverRowIterator() { }
int get_next_left_row();
int rescan_left();
void bind_group_params_to_das_ctx(GroupParamBackupGuard &guard);
int fill_cur_row_group_param();
int get_next_left_batch(int64_t max_rows, const ObBatchRows *&batch_rows);
int drive_row_extend(int size);
int restore_drive_row(int from_idx, int to_idx);
int extend_save_drive_rows(int64_t size) { return left_batch_.extend_save(size); }
int get_left_batch_idx() { return l_idx_; }
int get_cur_group_id() const { return join_buffer_.get_cur_group_id(); }
int get_group_rescan_cnt() const { return join_buffer_.get_group_rescan_cnt(); }
int get_left_batch_size() { return left_brs_ == nullptr ? 0 : left_brs_->size_; }
int init(ObOperator *op, const int64_t op_group_scan_size,
const common::ObIArray<ObDynamicParamSetter> *rescan_params,
bool is_group_rescan, bool need_backup_left);
void destroy();
void reset();
bool is_multi_level_group_rescan() const { return is_group_rescan_ && join_buffer_.is_multi_level(); }
const GroupParamArray *get_rescan_params_info() const { return join_buffer_.get_rescan_params_info(); }
private:
int get_min_vec_size_from_drive_row(int &min_vec_size);
private:
// NOTE: alloc a new batchrows here
const ObBatchRows *left_brs_;
int l_idx_;
ObOperator *op_;
ObOperator *left_;
ObDriverRowBuffer join_buffer_;
ObVectorsResultHolder left_batch_;
const common::ObIArray<ObDynamicParamSetter> *rescan_params_;
bool is_group_rescan_;
ObEvalCtx *eval_ctx_;
int64_t op_max_batch_size_;
// for nlj, the left batches will be extended by a single row, so we need backup it
bool need_backup_left_;
int left_expr_extend_size_;
ObBitVector *left_matched_;
// used to alloc memory for left_matched_ and left_batch_
lib::MemoryContext batch_mem_ctx_;
ObExecContext *ctx_;
};
} // end namespace sql
} // end namespace oceanbase
#endif // OCEANBASE_BASIC_OB_GROUP_JOIN_BUFFER_V2_H_

View File

@ -223,6 +223,8 @@ void ObVectorsResultHolder::ObColResultHolder::restore_fixed_base(ObFixedLengthB
MEMCPY(expr_->get_rev_buf(eval_ctx), frame_data_, len_ * max_row_cnt_);
}
}
void ObVectorsResultHolder::ObColResultHolder::restore_discrete_base(ObDiscreteBase &vec,
const int64_t batch_size,
ObEvalCtx &eval_ctx) const
@ -248,6 +250,7 @@ restore_continuous_base(ObContinuousBase &vec,
}
}
void ObVectorsResultHolder::ObColResultHolder::restore_uniform_base(
const ObExpr *expr, ObUniformBase &vec, bool is_const,
ObEvalCtx &eval_ctx,
@ -265,6 +268,185 @@ void ObVectorsResultHolder::ObColResultHolder::restore_uniform_base(
}
}
void ObVectorsResultHolder::ObColResultHolder::restore_base_single_row(ObVectorBase &vec, int64_t from_idx, int64_t to_idx, ObEvalCtx &eval_ctx) const
{
UNUSED(vec);
UNUSED(from_idx);
UNUSED(to_idx);
UNUSED(eval_ctx);
}
void ObVectorsResultHolder::ObColResultHolder::restore_bitmap_null_base_single_row(ObBitmapNullVectorBase &vec, int64_t from_idx, int64_t to_idx, ObEvalCtx &eval_ctx) const
{
restore_base_single_row(vec, from_idx, to_idx, eval_ctx);
vec.set_has_null(has_null_);
// vec.set_nulls(nulls_);
if (OB_NOT_NULL(expr_)) {
ObBitVector &dst_bit_vec = expr_->get_nulls(eval_ctx);
ObBitVector *src_bit_vec = (&dst_bit_vec == nulls_) ? to_bit_vector(frame_nulls_) : nulls_;
if (src_bit_vec->at(from_idx)) {
dst_bit_vec.set(to_idx);
} else {
dst_bit_vec.reset(to_idx);
}
}
}
void ObVectorsResultHolder::ObColResultHolder::restore_fixed_base_single_row(ObFixedLengthBase &vec,
int64_t from_idx,
int64_t to_idx,
ObEvalCtx &eval_ctx) const
{
restore_bitmap_null_base_single_row(vec, from_idx, to_idx, eval_ctx);
if (!vec.is_null(from_idx)) {
if (OB_NOT_NULL(expr_)) {
if (data_ == expr_->get_rev_buf(eval_ctx)) {
MEMCPY(expr_->get_rev_buf(eval_ctx) + to_idx * len_, frame_data_ + from_idx * len_, len_);
} else {
MEMCPY(expr_->get_rev_buf(eval_ctx) + to_idx * len_, data_ + from_idx * len_, len_);
}
}
}
}
void ObVectorsResultHolder::ObColResultHolder::restore_discrete_base_single_row(ObDiscreteBase &vec,
int64_t from_idx,
int64_t to_idx,
ObEvalCtx &eval_ctx) const
{
restore_bitmap_null_base_single_row(vec, from_idx, to_idx, eval_ctx);
//vec.set_lens(lens_);
//vec.set_ptrs(ptrs_);
if (OB_NOT_NULL(expr_)) {
if (lens_ == expr_->get_discrete_vector_lens(eval_ctx)) {
MEMCPY(expr_->get_discrete_vector_lens(eval_ctx) + to_idx, frame_lens_ + from_idx, sizeof(ObLength));
} else {
MEMCPY(expr_->get_discrete_vector_lens(eval_ctx) + to_idx, lens_ + from_idx, sizeof(ObLength));
}
if (expr_->get_discrete_vector_ptrs(eval_ctx) == ptrs_) {
MEMCPY(expr_->get_discrete_vector_ptrs(eval_ctx) + to_idx, frame_ptrs_ + from_idx, sizeof(char *));
} else {
MEMCPY(expr_->get_discrete_vector_ptrs(eval_ctx) + to_idx, ptrs_ + from_idx, sizeof(char *));
}
}
}
// NOTE: continous format can't be restored single row because the value of elems[k] will affect the value of elems[k +1] ... elems[k + N] for var-len type
// So we need convert the continous format to other format and then restore single row
void ObVectorsResultHolder::ObColResultHolder::restore_continuous_base_single_row(ObExpr *expr, int64_t from_idx,
int64_t to_idx, VectorFormat dst_fmt, ObEvalCtx &eval_ctx) const
{
switch(dst_fmt) {
case VEC_FIXED:
convert_continuous_to_fixed(static_cast<ObFixedLengthBase &>(*expr->get_vector(eval_ctx)), from_idx, to_idx, eval_ctx);
break;
case VEC_DISCRETE:
convert_continuous_to_discrete(static_cast<ObDiscreteBase &>(*expr->get_vector(eval_ctx)), from_idx, to_idx, eval_ctx);
break;
case VEC_UNIFORM:
convert_continuous_to_uniform(expr, from_idx, to_idx, eval_ctx);
break;
case VEC_UNIFORM_CONST:
convert_continuous_to_uniform(expr, from_idx, 0, eval_ctx);
break;
default:
LOG_INFO("get wrong vector format", K(dst_fmt));
break;
}
}
void ObVectorsResultHolder::ObColResultHolder::convert_continuous_to_fixed(ObFixedLengthBase &vec, int64_t from_idx, int64_t to_idx, ObEvalCtx &eval_ctx) const
{
uint32_t offset = offsets_[from_idx];
uint32_t len = offsets_[from_idx + 1] - offsets_[from_idx];
const char *ptr = continuous_data_ + offset;
restore_bitmap_null_base_single_row(vec, from_idx, to_idx, eval_ctx);
if (!vec.is_null(from_idx)) {
MEMCPY(expr_->get_rev_buf(eval_ctx) + to_idx * len_, ptr, len);
}
}
void ObVectorsResultHolder::ObColResultHolder::convert_continuous_to_discrete(ObDiscreteBase &vec, int64_t from_idx, int64_t to_idx, ObEvalCtx &eval_ctx) const
{
uint32_t offset = offsets_[from_idx];
uint32_t len = offsets_[from_idx + 1] - offsets_[from_idx];
const char *ptr = continuous_data_ + offset;
restore_bitmap_null_base_single_row(vec, from_idx, to_idx, eval_ctx);
MEMCPY(expr_->get_discrete_vector_lens(eval_ctx) + to_idx, &len, sizeof(ObLength));
MEMCPY(expr_->get_discrete_vector_ptrs(eval_ctx) + to_idx, &ptr, sizeof(char *));
}
void ObVectorsResultHolder::ObColResultHolder::convert_continuous_to_uniform(ObExpr *expr, int64_t from_idx, int64_t to_idx, ObEvalCtx &eval_ctx) const
{
uint32_t offset = offsets_[from_idx];
uint32_t len = offsets_[from_idx + 1] - offsets_[from_idx];
const char *ptr = continuous_data_ + offset;
ObDatum *datums = expr_->locate_batch_datums(eval_ctx);
if (nulls_->at(from_idx)) {
datums[to_idx].set_null();
} else {
datums[to_idx].ptr_ = ptr;
datums[to_idx].pack_ = len;
datums[to_idx].null_ = false;
}
}
void ObVectorsResultHolder::ObColResultHolder::restore_uniform_base_single_row(const ObExpr *expr, ObUniformBase &vec,
int64_t from_idx, int64_t to_idx,
ObEvalCtx &eval_ctx, bool is_const
) const
{
bool need_copy_rev_buf = expr->is_fixed_length_data_
|| ObNumberTC == ob_obj_type_class(expr->datum_meta_.get_type());
restore_vector_base(vec);
// vec.set_datums(datums_);
if (is_const) {
// MEMCPY(expr->locate_batch_datums(eval_ctx), frame_datums_, sizeof(ObDatum));
if (datums_ == expr->locate_batch_datums(eval_ctx)) {
MEMCPY(expr->locate_batch_datums(eval_ctx), frame_datums_, sizeof(ObDatum));
} else {
MEMCPY(expr->locate_batch_datums(eval_ctx), datums_, sizeof(ObDatum));
}
} else {
if (datums_ == expr->locate_batch_datums(eval_ctx)) {
MEMCPY(expr->locate_batch_datums(eval_ctx) + to_idx, frame_datums_ + from_idx, sizeof(ObDatum));
} else {
MEMCPY(expr->locate_batch_datums(eval_ctx) + to_idx, datums_ + from_idx, sizeof(ObDatum));
}
}
}
void ObVectorsResultHolder::ObColResultHolder::extend_fixed_base_vector(ObFixedLengthBase &vec, int64_t from_idx, int64_t start_dst_idx, int64_t size, ObEvalCtx &eval_ctx) const
{
for (int64_t k = 0; k < size; k++) {
int64_t to_idx = start_dst_idx + k;
restore_fixed_base_single_row(vec, from_idx, to_idx, eval_ctx);
}
}
void ObVectorsResultHolder::ObColResultHolder::extend_discrete_base_vector(ObDiscreteBase &vec, int64_t from_idx, int64_t start_dst_idx, int64_t size, ObEvalCtx &eval_ctx) const
{
for (int64_t k = 0; k < size; k++) {
int64_t to_idx = start_dst_idx + k;
restore_discrete_base_single_row(vec, from_idx, to_idx, eval_ctx);
}
}
void ObVectorsResultHolder::ObColResultHolder::extend_uniform_base_vector(const ObExpr *expr, ObUniformBase &vec, int64_t from_idx, int64_t start_dst_idx, int64_t size, ObEvalCtx &eval_ctx, bool is_const) const
{
for (int64_t k = 0; k < size; k++) {
int64_t to_idx = start_dst_idx + k;
restore_uniform_base_single_row(expr, vec, from_idx, to_idx, eval_ctx, is_const);
}
}
void ObVectorsResultHolder::ObColResultHolder::extend_continuous_base_vector(ObExpr *expr, int64_t from_idx, int64_t start_dst_idx, int64_t size, VectorFormat dst_fmt, ObEvalCtx &eval_ctx) const
{
for (int64_t k = 0; k < size; k++) {
int64_t to_idx = start_dst_idx + k;
restore_continuous_base_single_row(expr, from_idx, to_idx, dst_fmt, eval_ctx);
}
}
int ObVectorsResultHolder::init(const common::ObIArray<ObExpr *> &exprs, ObEvalCtx &eval_ctx)
{
int ret = OB_SUCCESS;
@ -430,7 +612,6 @@ int ObVectorsResultHolder::save(const int64_t batch_size)
saved_ = true;
return ret;
}
int ObVectorsResultHolder::restore() const
{
int ret = OB_SUCCESS;
@ -530,5 +711,112 @@ int ObVectorsResultHolder::calc_backup_size(const common::ObIArray<ObExpr *> &ex
#undef CALC_FIXED_COL
#undef CALC_COL_BACKUP_SIZE
int ObVectorsResultHolder::drive_row_extended(int64_t from_idx, int64_t start_dst_idx, int64_t size)
{
int ret = OB_SUCCESS;
if (NULL == exprs_ || !saved_ || 0 == saved_size_) {
// empty expr_: do nothing
} else if (OB_ISNULL(backup_cols_) || OB_ISNULL(eval_ctx_)) {
ret = OB_NOT_INIT;
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < exprs_->count(); ++i) {
VectorFormat backup_format = backup_cols_[i].header_.format_;
LOG_TRACE("drive row extended for NLJ_VEC_2, and backup format is", K(i), K(backup_format));
VectorFormat extend_format = get_single_row_restore_format(backup_cols_[i].header_.get_format(), exprs_->at(i));
if (OB_FAIL(exprs_->at(i)->init_vector(*eval_ctx_, extend_format, eval_ctx_->max_batch_size_))) {
LOG_WARN("failed to init vector for backup expr", K(i), K(backup_cols_[i].header_.get_format()), K(ret));
} else {
switch(backup_format) {
case VEC_FIXED:
backup_cols_[i].extend_fixed_base_vector(static_cast<ObFixedLengthBase &>(*exprs_->at(i)->get_vector(*eval_ctx_)),
from_idx, start_dst_idx, size, *eval_ctx_);
break;
case VEC_DISCRETE:
backup_cols_[i].extend_discrete_base_vector(static_cast<ObDiscreteBase &>(*exprs_->at(i)->get_vector(*eval_ctx_)),
from_idx, start_dst_idx, size, *eval_ctx_);
break;
case VEC_CONTINUOUS:
backup_cols_[i].extend_continuous_base_vector(exprs_->at(i), from_idx, start_dst_idx, size, extend_format, *eval_ctx_);
break;
case VEC_UNIFORM:
backup_cols_[i].extend_uniform_base_vector(exprs_->at(i), static_cast<ObUniformBase &>
(*exprs_->at(i)->get_vector(*eval_ctx_)), from_idx, start_dst_idx, size,
*eval_ctx_, false);
break;
case VEC_UNIFORM_CONST:
backup_cols_[i].extend_uniform_base_vector(exprs_->at(i), static_cast<ObUniformBase &>
(*exprs_->at(i)->get_vector(*eval_ctx_)), from_idx, start_dst_idx, size,
*eval_ctx_, true);
break;
default:
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get wrong vector format", K(backup_format), K(ret));
break;
}
}
}
}
return ret;
}
int ObVectorsResultHolder::restore_single_row(int64_t from_idx, int64_t to_idx) const
{
int ret = OB_SUCCESS;
if (NULL == exprs_ || !saved_ || 0 == saved_size_) {
// empty expr_: do nothing
} else if (OB_ISNULL(backup_cols_) || OB_ISNULL(eval_ctx_)) {
ret = OB_NOT_INIT;
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < exprs_->count(); ++i) {
VectorFormat extend_format = get_single_row_restore_format(backup_cols_[i].header_.get_format(), exprs_->at(i));
if (OB_FAIL(exprs_->at(i)->init_vector(*eval_ctx_, extend_format, eval_ctx_->max_batch_size_))) {
LOG_WARN("failed to init vector for backup expr", K(i), K(backup_cols_[i].header_.get_format()), K(ret));
} else {
VectorFormat format = backup_cols_[i].header_.format_;
LOG_TRACE("vector format is", K(format));
switch (format) {
case VEC_FIXED:
backup_cols_[i].restore_fixed_base_single_row(static_cast<ObFixedLengthBase &>
(*exprs_->at(i)->get_vector(*eval_ctx_)),
from_idx, to_idx, *eval_ctx_);
break;
case VEC_DISCRETE:
backup_cols_[i].restore_discrete_base_single_row(static_cast<ObDiscreteBase &>
(*exprs_->at(i)->get_vector(*eval_ctx_)),
from_idx, to_idx, *eval_ctx_);
break;
case VEC_CONTINUOUS:
backup_cols_[i].restore_continuous_base_single_row(exprs_->at(i), from_idx, to_idx, extend_format, *eval_ctx_);
break;
case VEC_UNIFORM:
backup_cols_[i].restore_uniform_base_single_row(exprs_->at(i), static_cast<ObUniformBase &>
(*exprs_->at(i)->get_vector(*eval_ctx_)), from_idx, to_idx,
*eval_ctx_, false);
break;
case VEC_UNIFORM_CONST:
backup_cols_[i].restore_uniform_base_single_row(exprs_->at(i), static_cast<ObUniformBase &>
(*exprs_->at(i)->get_vector(*eval_ctx_)), from_idx, to_idx,
*eval_ctx_, true);
break;
default:
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get wrong vector format", K(format), K(ret));
break;
}
}
}
}
return ret;
}
int ObVectorsResultHolder::extend_save(const int64_t size)
{
int ret = OB_SUCCESS;
if (size > saved_size_) {
LOG_INFO("needed backup left extended size", K(size), K(saved_size_));
}
return ret;
}
} // end namespace sql
} // end namespace oceanbase

View File

@ -44,6 +44,16 @@ public:
int init(const common::ObIArray<ObExpr *> &exprs, ObEvalCtx &eval_ctx);
int save(const int64_t batch_size);
int restore() const;
int restore_single_row(int64_t from_idx, int64_t to_idx) const;
int drive_row_extended(int64_t from_idx, int64_t start_dst_idx, int64_t size);
VectorFormat get_single_row_restore_format(VectorFormat src_format, const ObExpr *expr) const
{
// continuous format don't support restore single row, so if the backup vector is continuous format, we need to convert it to other format
return src_format == VEC_CONTINUOUS ? expr->get_default_res_format() : src_format;
}
int extend_save(const int64_t size);
void clear_saved_size() { saved_size_ = 0; }
void reset()
{
saved_ = false;
@ -90,6 +100,8 @@ private:
void restore_vector_base(ObVectorBase &vec) const;
void restore_bitmap_null_base(ObBitmapNullVectorBase &vec, const int64_t batch_size, ObEvalCtx &eval_ctx) const;
void restore_fixed_base(ObFixedLengthBase &vec, const int64_t batch_size, ObEvalCtx &eval_ctx) const;
void restore_discrete_base(ObDiscreteBase &vec, const int64_t batch_size, ObEvalCtx &eval_ctx) const;
void restore_continuous_base(ObContinuousBase &vec, const int64_t batch_size, ObEvalCtx &eval_ctx) const;
@ -100,6 +112,26 @@ private:
int save(ObIAllocator &alloc, const int64_t batch_size, ObEvalCtx *eval_ctx);
int restore_nested(const int64_t saved_size, ObEvalCtx *eval_ctx);
int restore(const int64_t saved_size, ObEvalCtx *eval_ctx);
void restore_bitmap_null_base_single_row(ObBitmapNullVectorBase &vec, int64_t from_idx, int64_t to_idx, ObEvalCtx &eval_ctx) const;
void restore_base_single_row(ObVectorBase &vec, int64_t from_idx, int64_t to_idx, ObEvalCtx &eval_ctx) const;
void restore_fixed_base_single_row(ObFixedLengthBase &vec, int64_t from_idx, int64_t to_idx, ObEvalCtx &eval_ctx) const;
void restore_discrete_base_single_row(ObDiscreteBase &vec, int64_t from_idx, int64_t to_idx, ObEvalCtx &eval_ctx) const;
void restore_continuous_base_single_row(ObExpr *expr, int64_t from_idx, int64_t to_idx, VectorFormat dst_fmt,ObEvalCtx &eval_ctx) const;
void restore_uniform_base_single_row(const ObExpr *expr, ObUniformBase &vec,
int64_t from_idx, int64_t to_idx, ObEvalCtx &eval_ctx, bool is_const) const;
void extend_fixed_base_vector(ObFixedLengthBase &vec, int64_t from_idx, int64_t start_dst_idx, int64_t size, ObEvalCtx &eval_ctx) const;
void extend_discrete_base_vector(ObDiscreteBase &vec, int64_t from_idx, int64_t start_dst_idx, int64_t size, ObEvalCtx &eval_ctx) const;
void extend_uniform_base_vector(const ObExpr *expr, ObUniformBase &vec, int64_t from_idx, int64_t start_dst_idx, int64_t size, ObEvalCtx &eval_ctx, bool is_const) const;
void extend_continuous_base_vector(ObExpr *expr, int64_t from_idx, int64_t start_dst_idx, int64_t size, VectorFormat dst_fmt, ObEvalCtx &eval_ctx) const;
void convert_continuous_to_fixed(ObFixedLengthBase &vec, int64_t from_idx, int64_t to_idx, ObEvalCtx &eval_ctx) const;
void convert_continuous_to_discrete(ObDiscreteBase &vec, int64_t from_idx, int64_t to_idx, ObEvalCtx &eval_ctx) const;
void convert_continuous_to_uniform(ObExpr *expr, int64_t from_idx, int64_t to_idx, ObEvalCtx &eval_ctx) const;
VectorHeader header_;
int64_t max_row_cnt_; //ObVectorBase

View File

@ -384,6 +384,12 @@ int ObArrayExprUtils::set_array_res(ObIArrayType *arr_obj, const ObExpr &expr, O
template int ObArrayExprUtils::set_array_res<ObUniformFormat<true>>(
ObIArrayType* arr_obj, const ObExpr& expr, ObEvalCtx& ctx,
ObUniformFormat<true>* res_vec, int64_t batch_idx);
template int ObArrayExprUtils::set_array_res<ObDiscreteFormat>(
ObIArrayType* arr_obj, const ObExpr& expr, ObEvalCtx& ctx,
ObDiscreteFormat* res_vec, int64_t batch_idx);
template int ObArrayExprUtils::set_array_res<ObVectorBase>(
ObIArrayType* arr_obj, const ObExpr& expr, ObEvalCtx& ctx,
ObVectorBase* res_vec, int64_t batch_idx);
int ObArrayExprUtils::set_array_obj_res(ObIArrayType *arr_obj, ObObjCastParams *params, ObObj *obj)
{

View File

@ -709,8 +709,8 @@ int def_nested_vector_arith_op(VECTOR_EVAL_FUNC_ARG_DECL, Args &... args)
FORMAT_DISPATCH_BRANCH(VEC_DISCRETE, Discrete, Const, Uniform, ArithOp);
FORMAT_DISPATCH_BRANCH(VEC_DISCRETE, Discrete, Const, Discrete, ArithOp);
default:
ret = OB_NOT_SUPPORTED;
SQL_LOG(WARN, "not supported format", K(ret), K(left_format), K(right_format), K(res_format));
ret = ObDoArithVectorBaseEval<ObVectorBase, ObVectorBase, ObVectorBase, ArithOp>()(
VEC_ARG_LIST, args...);
}
}
if (OB_SUCC(ret)) {
@ -970,11 +970,11 @@ struct ObNestedVectorArithOpFunc : public Base
SQL_ENG_LOG(WARN, "exec calculate func failed", K(ret));
} else if (OB_FAIL(res_obj->init())) {
SQL_ENG_LOG(WARN, "init nested obj failed", K(ret));
} else if (std::is_same<ResVector, ObUniformFormat<false>>::value || std::is_same<ResVector, ObUniformFormat<true>>::value) {
} else if (res_vec.get_format() == VEC_UNIFORM || res_vec.get_format() == VEC_UNIFORM_CONST) {
if (OB_FAIL(Base::get_res_batch(ctx, res_obj, expr, idx, &res_vec))) {
SQL_ENG_LOG(WARN, "get array binary string failed", K(ret));
}
} else if (std::is_same<ResVector, ObDiscreteFormat>::value) {
} else {
if (OB_FAIL(Base::distribute_expr_attrs(expr, ctx, idx, *res_obj))) {
SQL_ENG_LOG(WARN, "get array binary string failed", K(ret));
}

View File

@ -53,6 +53,7 @@ class ObDatumCaster;
using common::ObDatum;
using common::ObDatumVector;
class ObBatchRows;
class ObSubQueryIterator;
typedef ObItemType ObExprOperatorType;
@ -162,7 +163,8 @@ struct ObEvalCtx
friend struct ObExpr;
friend class ObOperator;
friend class ObDASScanOp;
friend class ObSubPlanFilterOp; // FIXME qubin.qb: remove this line from friend
friend class ObSubPlanFilterOp;
friend class ObSubQueryIterator;
friend class oceanbase::storage::ObVectorStore;
friend class oceanbase::storage::ObAggregatedStoreVec;
friend class ObDatumCaster;

View File

@ -498,7 +498,7 @@ int ObExprSubQueryRef::get_subquery_iter(ObEvalCtx &ctx,
if (OB_ISNULL(kit) || OB_ISNULL(kit->op_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("operator is NULL", K(ret), K(extra), KP(kit));
} else if (PHY_SUBPLAN_FILTER != kit->op_->get_spec().type_) {
} else if (PHY_SUBPLAN_FILTER != kit->op_->get_spec().type_ && PHY_VEC_SUBPLAN_FILTER != kit->op_->get_spec().type_) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("is not subplan filter operator", K(ret), K(extra),
"spec", kit->op_->get_spec());

View File

@ -78,21 +78,25 @@ void ObJoinVecOp::blank_row_batch_one(const ExprFixedArray &exprs)
}
}
int ObJoinVecOp::calc_other_conds(bool &is_match)
int ObJoinVecOp::calc_other_conds(const ObBitVector &skip, bool &is_match)
{
int ret = OB_SUCCESS;
is_match = true;
const ObIArray<ObExpr *> &conds = get_spec().other_join_conds_;
ObDatum *cmp_res = NULL;
const int64_t batch_idx = eval_ctx_.get_batch_idx();
EvalBound eval_bound(eval_ctx_.get_batch_size(), batch_idx, batch_idx + 1, false);
ObIVector *res_vec = nullptr;
ARRAY_FOREACH(conds, i) {
if (OB_FAIL(conds.at(i)->eval(eval_ctx_, cmp_res))) {
if (OB_FAIL(conds.at(i)->eval_vector(eval_ctx_, skip, eval_bound))) {
LOG_WARN("fail to calc other join condition", K(ret), K(*conds.at(i)));
} else if (cmp_res->is_null() || 0 == cmp_res->get_int()) {
} else if (OB_ISNULL(res_vec = conds.at(i)->get_vector(eval_ctx_))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get source vector", K(ret), K(res_vec));
} else if (res_vec->is_null(batch_idx) || 0 == res_vec->get_int(batch_idx)) {
is_match = false;
break;
}
}
return ret;
}

View File

@ -56,7 +56,8 @@ public:
inline bool need_left_join() const;
inline bool need_right_join() const;
int calc_other_conds(bool &is_match);
// TODO: @bingfan to see whether nlj can use batch calc other conds
int calc_other_conds(const ObBitVector &skip, bool &is_match);
int batch_calc_other_conds(ObBatchRows &brs);
const ObJoinVecSpec &get_spec() const

View File

@ -0,0 +1,451 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX SQL_ENG
#include "sql/engine/join/ob_nested_loop_join_vec_op.h"
#include "sql/engine/ob_exec_context.h"
#include "sql/engine/basic/ob_material_vec_op.h"
#include "sql/engine/table/ob_table_scan_op.h"
#include "sql/engine/basic/ob_material_op.h"
#include "sql/engine/expr/ob_expr.h"
namespace oceanbase
{
using namespace commom;
namespace sql
{
OB_SERIALIZE_MEMBER((ObNestedLoopJoinVecSpec, ObJoinVecSpec),
rescan_params_,
gi_partition_id_expr_,
enable_gi_partition_pruning_,
enable_px_batch_rescan_,
group_rescan_, group_size_,
left_expr_ids_in_other_cond_,
left_rescan_params_,
right_rescan_params_);
ObNestedLoopJoinVecOp::ObNestedLoopJoinVecOp(ObExecContext &exec_ctx,
const ObOpSpec &spec,
ObOpInput *input)
: ObJoinVecOp(exec_ctx, spec, input),
batch_state_(JS_GET_LEFT_ROW),
is_left_end_(false), left_brs_(nullptr),
iter_end_(false), op_max_batch_size_(0),
drive_iter_(), match_right_batch_end_(false),
no_match_row_found_(true), need_output_row_(false),
defered_right_rescan_(false)
{
}
int ObNestedLoopJoinVecOp::inner_open()
{
LOG_TRACE("open ObNestedLoopJoinVecOp", K(MY_SPEC.join_type_));
int ret = OB_SUCCESS;
if (OB_ISNULL(left_) || OB_ISNULL(right_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("nlp_op child is null", KP(left_), KP(right_), K(ret));
} else if (OB_FAIL(ObJoinVecOp::inner_open())) {
LOG_WARN("failed to open in base class", K(ret));
} else if (OB_FAIL(drive_iter_.init(this,
MY_SPEC.group_size_,
&MY_SPEC.rescan_params_,
MY_SPEC.group_rescan_,
true
))) {
LOG_WARN("failed to init drive iterator for NLJ", KR(ret));
}
return ret;
}
int ObNestedLoopJoinVecOp::rescan()
{
int ret = OB_SUCCESS;
//NLJ's rescan should only drive left child's rescan,
//the right child's rescan is defer to rescan_right_operator() driven by get_next_row();
defered_right_rescan_ = true;
if (OB_FAIL(drive_iter_.rescan_left())) {
LOG_WARN("rescan left child operator failed", KR(ret), "child op_type", left_->op_name());
} else if (OB_FAIL(inner_rescan())) {
LOG_WARN("failed to inner rescan", KR(ret));
}
#ifndef NDEBUG
OX(OB_ASSERT(false == brs_.end_));
#endif
return ret;
}
int ObNestedLoopJoinVecOp::do_drain_exch()
{
int ret = OB_SUCCESS;
if (!MY_SPEC.group_rescan_) {
if (OB_FAIL( ObOperator::do_drain_exch())) {
LOG_WARN("failed to drain NLJ operator", K(ret));
}
} else if (!drive_iter_.is_multi_level_group_rescan()) {
if (OB_FAIL( ObOperator::do_drain_exch())) {
LOG_WARN("failed to drain NLJ operator", K(ret));
}
} else {
if (!is_operator_end()) {
// the drain request is triggered by parent operator
// NLJ needs to pass the drain request to it's child operator
LOG_TRACE("The drain request is passed by parent operator");
if (OB_FAIL( ObOperator::do_drain_exch())) {
LOG_WARN("failed to drain normal NLJ operator", K(ret));
}
} else if (OB_FAIL(do_drain_exch_multi_lvel_bnlj())) {
LOG_WARN("failed to drain multi level NLJ operator", K(ret));
}
}
return ret;
}
int ObNestedLoopJoinVecOp::do_drain_exch_multi_lvel_bnlj()
{
int ret = OB_SUCCESS;
if (OB_FAIL(try_open())) {
LOG_WARN("fail to open operator", K(ret));
} else if (!exch_drained_) {
// the drain request is triggered by current NLJ operator, and current NLJ is a multi level Batch NLJ
// It will block rescan request for it's child operator, if the drain request is passed to it's child operator
// The child operators will be marked as iter-end_, and will not get any row if rescan is blocked
// So we block the drain request here; Only set current operator to end;
int tmp_ret = inner_drain_exch();
exch_drained_ = true;
brs_.end_ = true;
batch_reach_end_ = true;
row_reach_end_ = true;
if (OB_SUCC(ret)) {
ret = tmp_ret;
}
}
return ret;
}
void ObNestedLoopJoinVecOp::reset_buf_state()
{
is_left_end_ = false;
batch_state_ = JS_GET_LEFT_ROW;
match_right_batch_end_ = false;
no_match_row_found_ = true;
need_output_row_ = false;
drive_iter_.reset();
iter_end_ = false;
}
int ObNestedLoopJoinVecOp::inner_rescan()
{
int ret = OB_SUCCESS;
reset_buf_state();
set_param_null();
if (OB_FAIL(ObJoinVecOp::inner_rescan())) {
LOG_WARN("failed to rescan", K(ret));
}
return ret;
}
int ObNestedLoopJoinVecOp::inner_close()
{
drive_iter_.reset();
return OB_SUCCESS;
}
int ObNestedLoopJoinVecOp::get_next_left_row()
{
int ret = OB_SUCCESS;
if (OB_FAIL(drive_iter_.get_next_left_row())) {
LOG_WARN("failed to get next left row from friver iterator", K(ret));
}
return ret;
}
int ObNestedLoopJoinVecOp::perform_gi_partition_prunig()
{
int ret = OB_SUCCESS;
// 左边每一行出来后,去通知右侧 GI 实施 part id 过滤,避免 PKEY NLJ 场景下扫不必要分区
if (OB_SUCC(ret) && !get_spec().enable_px_batch_rescan_ && !get_spec().group_rescan_ && get_spec().enable_gi_partition_pruning_) {
ObDatum *datum = nullptr;
if (OB_FAIL(get_spec().gi_partition_id_expr_->eval(eval_ctx_, datum))) {
LOG_WARN("fail eval value", K(ret));
} else {
// NOTE: 如果右侧对应多张表,这里的逻辑也没有问题
// 如 A REPART TO NLJ (B JOIN C) 的场景
// 此时 GI 在 B 和 C 的上面
int64_t part_id = datum->get_int();
ctx_.get_gi_pruning_info().set_part_id(part_id);
}
}
return ret;
}
int ObNestedLoopJoinVecOp::rescan_right_operator()
{
int ret = OB_SUCCESS;
bool do_rescan = false;
if (defered_right_rescan_) {
do_rescan = true;
defered_right_rescan_ = false;
} else {
// FIXME bin.lb: handle monitor dump + material ?
if (PHY_MATERIAL == right_->get_spec().type_) {
if (OB_FAIL(static_cast<ObMaterialOp*>(right_)->rewind())) {
if (OB_ITER_END != ret) {
LOG_WARN("rewind failed", K(ret));
}
}
} else if (PHY_VEC_MATERIAL == right_->get_spec().type_) {
if (OB_FAIL(static_cast<ObMaterialVecOp*>(right_)->rewind())) {
if (OB_ITER_END != ret) {
LOG_WARN("rewind failed", K(ret));
}
}
} else {
do_rescan = true;
}
}
if (OB_SUCC(ret) && do_rescan) {
GroupParamBackupGuard guard(right_->get_exec_ctx().get_das_ctx());
if (MY_SPEC.group_rescan_) {
drive_iter_.bind_group_params_to_das_ctx(guard);
}
if (OB_FAIL(right_->rescan())) {
if (OB_ITER_END != ret) {
LOG_WARN("rescan right failed", K(ret));
}
} else {
/*do nothing*/
}
}
return ret;
}
int ObNestedLoopJoinVecOp::rescan_right_op()
{
int ret = OB_SUCCESS;
ObEvalCtx::BatchInfoScopeGuard batch_info_guard(eval_ctx_);
batch_info_guard.set_batch_size(drive_iter_.get_left_batch_size());
batch_info_guard.set_batch_idx(drive_iter_.get_left_batch_idx());
if (OB_FAIL(drive_iter_.restore_drive_row(drive_iter_.get_left_batch_idx(), drive_iter_.get_left_batch_idx()))) {
LOG_WARN("failed to restore single row", K(ret), K(drive_iter_.get_left_batch_idx()));
} else if (OB_FAIL(drive_iter_.fill_cur_row_group_param())) {
LOG_WARN("failed to prepare rescan params for NLJ", K(ret));
} else if (OB_FAIL(perform_gi_partition_prunig())) {
LOG_WARN("failed perform gi partition pruning", K(ret));
} else if (OB_FAIL(rescan_right_operator())) {
LOG_WARN("failed to rescan right operator", K(ret));
}
const ObBitVector *skip = nullptr;
PRINT_VECTORIZED_ROWS(SQL, DEBUG, eval_ctx_, left_->get_spec().output_, left_->get_brs().size_, skip, K(spec_.id_));
return ret;
}
int ObNestedLoopJoinVecOp::get_next_batch_from_right(const ObBatchRows *right_brs)
{
int ret = OB_SUCCESS;
GroupParamBackupGuard guard(right_->get_exec_ctx().get_das_ctx());
if (MY_SPEC.group_rescan_) {
drive_iter_.bind_group_params_to_das_ctx(guard);
}
ret = right_->get_next_batch(op_max_batch_size_, right_brs);
return ret;
}
int ObNestedLoopJoinVecOp::process_right_batch()
{
int ret = OB_SUCCESS;
ObEvalCtx::BatchInfoScopeGuard batch_info_guard(eval_ctx_);
batch_info_guard.set_batch_size(drive_iter_.get_left_batch_size());
reset_batchrows();
const ObBatchRows *right_brs = &right_->get_brs();
const ObIArray<ObExpr *> &conds = get_spec().other_join_conds_;
clear_evaluated_flag();
DASGroupScanMarkGuard mark_guard(ctx_.get_das_ctx(), MY_SPEC.group_rescan_);
if (OB_FAIL(get_next_batch_from_right(right_brs))) {
LOG_WARN("fail to get next right batch", K(ret), K(MY_SPEC));
} else if (0 == right_brs->size_ && right_brs->end_) {
match_right_batch_end_ = true;
LOG_TRACE("right rows iter end");
} else if (OB_FAIL(drive_iter_.extend_save_drive_rows(right_brs->size_))) {
LOG_WARN("failed to extend save left brs", K(ret));
} else if (OB_FAIL(drive_iter_.drive_row_extend(right_brs->size_))) {
LOG_WARN("failed to extend drive row", K(ret));
} else {
if (0 == conds.count()) {
brs_.skip_->deep_copy(*right_brs->skip_, right_brs->size_);
} else {
batch_info_guard.set_batch_size(right_brs->size_);
bool is_match = false;
for (int64_t r_idx = 0; OB_SUCC(ret) && r_idx < right_brs->size_; r_idx++) {
batch_info_guard.set_batch_idx(r_idx);
if (right_brs->skip_->exist(r_idx)) {
brs_.skip_->set(r_idx);
} else if (OB_FAIL(calc_other_conds(*right_brs->skip_, is_match))) {
LOG_WARN("calc_other_conds failed", K(ret), K(r_idx),
K(right_brs->size_));
} else if (!is_match) {
brs_.skip_->set(r_idx);
} else { /*do nothing*/
}
LOG_DEBUG("cal_other_conds finished ", K(is_match), K(r_idx), K(drive_iter_.get_left_batch_idx()));
} // for conds end
}
if (OB_SUCC(ret)) {
// if is not semi/anti-join, the output of NLJ is the join result of right batches
brs_.size_ = right_brs->size_;
int64_t skip_cnt = brs_.skip_->accumulate_bit_cnt(right_brs->size_);
if (MY_SPEC.join_type_ == LEFT_SEMI_JOIN) {
if (right_brs->size_ - skip_cnt > 0) {
match_right_batch_end_ = true;
no_match_row_found_ = false;
need_output_row_ = true;
}
} else if (MY_SPEC.join_type_ == LEFT_ANTI_JOIN) {
if (right_brs->size_ - skip_cnt > 0) {
no_match_row_found_ = false;
match_right_batch_end_ = true;
}
} else {
if (right_brs->size_ - skip_cnt > 0) {
no_match_row_found_ = false;
need_output_row_ = true;
}
}
match_right_batch_end_ = match_right_batch_end_ || right_brs->end_;
}
}
// outer join or anti-join
if (OB_SUCC(ret)) {
if (match_right_batch_end_ && no_match_row_found_) {
if (need_left_join() || MY_SPEC.join_type_ == LEFT_ANTI_JOIN) {
need_output_row_ = true;
}
}
}
const ObBitVector *skip = nullptr;
PRINT_VECTORIZED_ROWS(SQL, DEBUG, eval_ctx_, left_->get_spec().output_, left_->get_brs().size_, skip, K(spec_.id_));
return ret;
}
int ObNestedLoopJoinVecOp::output()
{
int ret = OB_SUCCESS;
if (IS_LEFT_SEMI_ANTI_JOIN(MY_SPEC.join_type_)) {
reset_batchrows();
brs_.size_ = 1;
drive_iter_.restore_drive_row(drive_iter_.get_left_batch_idx(), 0);
}
if (OB_SUCC(ret) && need_left_join() && match_right_batch_end_ && no_match_row_found_) {
reset_batchrows();
brs_.size_ = 1;
ObEvalCtx::BatchInfoScopeGuard guard(eval_ctx_);
guard.set_batch_idx(0);
blank_row_batch_one(right_->get_spec().output_);
drive_iter_.restore_drive_row(drive_iter_.get_left_batch_idx(), 0);
} else {
// do nothing
}
const ObBitVector *skip = nullptr;
PRINT_VECTORIZED_ROWS(SQL, DEBUG, eval_ctx_, left_->get_spec().output_, left_->get_brs().size_, skip, K(spec_.id_));
return ret;
}
void ObNestedLoopJoinVecOp::reset_right_batch_state()
{
match_right_batch_end_ = false;
no_match_row_found_ = true;
}
int ObNestedLoopJoinVecOp::inner_get_next_batch(const int64_t max_row_cnt)
{
int ret = OB_SUCCESS;
if (iter_end_) {
brs_.size_ = 0;
brs_.end_ = true;
}
op_max_batch_size_ = min(max_row_cnt, MY_SPEC.max_batch_size_);
while (!iter_end_ && OB_SUCC(ret)) {
clear_evaluated_flag();
if (JS_GET_LEFT_ROW == batch_state_) {
LOG_TRACE("start get left row", K(spec_.id_));
if (OB_FAIL(get_next_left_row())) {
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
brs_.size_ = 0;
brs_.end_ = true;
iter_end_ = true;
} else {
LOG_WARN("fail to get left batch", K(ret));
}
} else {
batch_state_ = JS_RESCAN_RIGHT_OP;
}
}
if (OB_SUCC(ret) && JS_RESCAN_RIGHT_OP == batch_state_) {
LOG_TRACE("start rescan right op", K(spec_.id_), K(drive_iter_.get_left_batch_idx()));
if (OB_FAIL(rescan_right_op())) {
LOG_WARN("failed to rescan right", K(ret));
} else {
batch_state_ = JS_PROCESS_RIGHT_BATCH;
}
}
// process right batch
if (OB_SUCC(ret) && JS_PROCESS_RIGHT_BATCH == batch_state_) {
LOG_TRACE("start process right batch", K(spec_.id_),K(drive_iter_.get_left_batch_idx()));
if (OB_FAIL(process_right_batch())) {
LOG_WARN("fail to process right batch", K(ret));
} else {
if (need_output_row_) {
batch_state_ = JS_OUTPUT;
need_output_row_ = false;
} else {
if (match_right_batch_end_) {
batch_state_ = JS_GET_LEFT_ROW;
reset_right_batch_state();
} else {
batch_state_ = JS_PROCESS_RIGHT_BATCH;
}
}
}
} // end process right state
// start output state
if (OB_SUCC(ret) && JS_OUTPUT == batch_state_) {
LOG_TRACE("start output", K(spec_.id_), K(drive_iter_.get_left_batch_idx()));
if (OB_FAIL(output())) {
LOG_WARN("fail to output", K(ret));
} else {
if (match_right_batch_end_) {
batch_state_ = JS_GET_LEFT_ROW;
reset_right_batch_state();
} else {
batch_state_ = JS_PROCESS_RIGHT_BATCH;
}
break;
}
}
}
const ObBitVector *skip = nullptr;
PRINT_VECTORIZED_ROWS(SQL, DEBUG, eval_ctx_, spec_.output_, brs_.size_, skip, K(spec_.id_));
if (OB_SUCC(ret) && iter_end_) {
set_param_null();
}
return ret;
}
} // end namespace sql
} // end namespace oceanbase

View File

@ -0,0 +1,155 @@
/** * Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef SRC_SQL_ENGINE_JOIN_OB_NEST_LOOP_JOIN_VEC_OP_H_
#define SRC_SQL_ENGINE_JOIN_OB_NEST_LOOP_JOIN_VEC_OP_H_
#include "lib/container/ob_bit_set.h"
#include "lib/container/ob_2d_array.h"
#include "lib/lock/ob_scond.h"
#include "share/datum/ob_datum_funcs.h"
#include "sql/engine/join/ob_join_vec_op.h"
#include "sql/engine/ob_sql_mem_mgr_processor.h"
#include "sql/engine/basic/ob_group_join_buffer_v2.h"
#include "sql/engine/basic/ob_vector_result_holder.h"
namespace oceanbase
{
namespace sql
{
class ObNestedLoopJoinVecSpec : public ObJoinVecSpec
{
OB_UNIS_VERSION_V(1);
public:
ObNestedLoopJoinVecSpec(common::ObIAllocator &alloc, const ObPhyOperatorType type)
: ObJoinVecSpec(alloc, type),
rescan_params_(alloc),
gi_partition_id_expr_(nullptr),
enable_gi_partition_pruning_(false),
enable_px_batch_rescan_(false),
group_rescan_(false),
group_size_(OB_MAX_BULK_JOIN_ROWS),
left_expr_ids_in_other_cond_(alloc),
left_rescan_params_(alloc),
right_rescan_params_(alloc)
{}
int init_param_count(int64_t count)
{ return rescan_params_.init(count); }
int add_nlj_param(int64_t param_idx, ObExpr *org_expr, ObExpr *param_expr);
public:
common::ObFixedArray<ObDynamicParamSetter, common::ObIAllocator> rescan_params_;
// 指示吐出的行中 partition id 列所在位置,通过 expr 读出 part id,用于右侧 pruning
ObExpr *gi_partition_id_expr_;
bool enable_gi_partition_pruning_;
bool enable_px_batch_rescan_;
// for group join buffer
bool group_rescan_;
int64_t group_size_;
ObFixedArray<ObFixedArray<int, common::ObIAllocator>, common::ObIAllocator>
left_expr_ids_in_other_cond_;
// for multi level batch rescan
// NLJ 1
// / \
// TSC 1 NLJ 2
// / \
// TSC 2 TSC 3
// As shown above, for NLJ 2, its left_rescan_params_ stores params used by TSC 2 and
// set by NLJ 1.
// Similarly, for NLJ 2, its right_rescan_params_ stores params used by TSC 3 and set
// by NLJ 1.
common::ObFixedArray<ObDynamicParamSetter, common::ObIAllocator> left_rescan_params_;
common::ObFixedArray<ObDynamicParamSetter, common::ObIAllocator> right_rescan_params_;
private:
DISALLOW_COPY_AND_ASSIGN(ObNestedLoopJoinVecSpec);
};
class ObNestedLoopJoinVecOp: public ObJoinVecOp
{
public:
enum ObJoinBatchState {
JS_GET_LEFT_ROW = 0,
JS_RESCAN_RIGHT_OP,
JS_PROCESS_RIGHT_BATCH,
JS_OUTPUT
};
ObNestedLoopJoinVecOp(ObExecContext &exec_ctx, const ObOpSpec &spec, ObOpInput *input);
virtual int inner_open() override;
virtual int rescan() override;
virtual int inner_rescan() override;
virtual int inner_get_next_batch(const int64_t max_row_cnt);
virtual int inner_get_next_row() override { return common::OB_NOT_IMPLEMENT; }
virtual int inner_close() final;
virtual OperatorOpenOrder get_operator_open_order() const override final
{ return OPEN_SELF_FIRST; }
int prepare_rescan_params(bool is_group = false);
virtual void destroy() override { ObJoinVecOp::destroy(); }
// ObBatchRescanCtl &get_batch_rescan_ctl() { return batch_rescan_ctl_; }
int fill_cur_row_rescan_param();
int do_drain_exch_multi_lvel_bnlj();
const ObNestedLoopJoinVecSpec &get_spec() const
{ return static_cast<const ObNestedLoopJoinVecSpec &>(spec_); }
void set_param_null()
{
set_pushdown_param_null(get_spec().rescan_params_);
}
private:
bool is_full() const;
// used for rescan and switch iter
virtual void reset_buf_state();
int rescan_params_batch_one(int64_t batch_idx);
int get_left_batch();
int group_get_left_batch(const ObBatchRows *&left_brs);
int get_next_left_row();
int rescan_right_operator();
int rescan_right_op();
int perform_gi_partition_prunig();
int process_right_batch();
int output();
void reset_left_batch_state();
void reset_right_batch_state();
void skip_l_idx();
bool continue_fetching() { return !(left_brs_->end_ || is_full());}
virtual int do_drain_exch() override;
virtual int inner_drain_exch() { return OB_SUCCESS; }
int get_next_batch_from_right(const ObBatchRows *right_brs);
public:
ObJoinBatchState batch_state_;
bool is_left_end_;
const ObBatchRows *left_brs_;
bool iter_end_;
int64_t op_max_batch_size_;
ObDriverRowIterator drive_iter_;
bool match_right_batch_end_;
bool no_match_row_found_;
bool need_output_row_;
bool defered_right_rescan_;
private:
DISALLOW_COPY_AND_ASSIGN(ObNestedLoopJoinVecOp);
};
} // end namespace sql
} // end namespace oceanbase
#endif /*SRC_SQL_ENGINE_JOIN_OB_NEST_LOOP_JOIN_VEC_OP_H_*/

View File

@ -44,6 +44,69 @@ int ObDynamicParamSetter::set_dynamic_param(ObEvalCtx &eval_ctx) const
return ret;
}
int ObDynamicParamSetter::set_dynamic_param_vec2(ObEvalCtx &eval_ctx, const sql::ObBitVector &skip_bit) const
{
int ret = OB_SUCCESS;
ObIVector *src_vec = nullptr;
const int64_t batch_idx = eval_ctx.get_batch_idx();
EvalBound eval_bound(eval_ctx.get_batch_size(), batch_idx, batch_idx + 1, false);
ObPhysicalPlanCtx *phy_ctx = eval_ctx.exec_ctx_.get_physical_plan_ctx();
//dst_->batch_result_ = true;
if (OB_ISNULL(src_) || OB_ISNULL(dst_) || OB_ISNULL(phy_ctx)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("expr not init", K(ret), KP(src_), K(dst_), K(phy_ctx));
} else if (OB_FAIL(src_->eval_vector(eval_ctx, skip_bit, eval_bound))) {
LOG_WARN("fail to calc rescan params", K(ret), K(*this));
} else if (OB_ISNULL(src_vec = src_->get_vector(eval_ctx))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get source vector", K(ret), K(src_vec));
} else {
const char *payload = NULL;
ObLength len = 0;
src_vec->get_payload(batch_idx, payload, len);
ObDatum res;
if (src_vec->is_null(batch_idx)) {
res.set_null();
} else {
res.ptr_ = payload;
res.len_ = len;
res.null_ = 0;
}
ParamStore &param_store = phy_ctx->get_param_store_for_update();
if (OB_FAIL(ret)) {
} else if (OB_UNLIKELY(param_idx_ < 0 || param_idx_ >= param_store.count())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid index", K(ret), K(param_idx_), K(param_store.count()));
} else if (OB_FAIL(res.to_obj(param_store.at(param_idx_),
dst_->obj_meta_,
dst_->obj_datum_map_))) {
LOG_WARN("convert datum to obj failed", K(ret), "datum",
DATUM2STR(*dst_, res));
} else {
param_store.at(param_idx_).set_param_meta();
}
if (OB_SUCC(ret)) {
ObDatum &param_datum = dst_->locate_expr_datum(eval_ctx);
OZ(dst_->init_vector(eval_ctx, VEC_UNIFORM_CONST, 1));
clear_parent_evaluated_flag(eval_ctx, *dst_);
dst_->get_eval_info(eval_ctx).evaluated_ = true;
if (0 == dst_->res_buf_off_) {
// for compat, old server don't have ref buf for dynamic expr,
// so keep shallow copy
param_datum.set_datum(res);
} else {
if (OB_FAIL(dst_->deep_copy_datum(eval_ctx, res))) {
LOG_WARN("fail to deep copy datum", K(ret), K(eval_ctx), K(*dst_));
}
}
}
}
return ret;
}
int ObDynamicParamSetter::set_dynamic_param(ObEvalCtx &eval_ctx, ObObjParam *&param) const
{
int ret = OB_SUCCESS;
@ -1442,7 +1505,6 @@ int ObOperator::convert_vector_format()
}
}
}
return ret;
}
@ -1684,6 +1746,23 @@ void ObOperator::set_pushdown_param_null(const ObIArray<ObDynamicParamSetter> &r
}
}
void ObOperator::set_pushdown_param_null_vec2(const ObIArray<ObDynamicParamSetter> &rescan_params)
{
ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(ctx_);
ParamStore &param_store = plan_ctx->get_param_store_for_update();
FOREACH_CNT(param, rescan_params) {
param_store.at(param->param_idx_).set_null();
ObIVector *vec = param->dst_->get_vector(eval_ctx_);
VectorFormat format = param->dst_->get_format(eval_ctx_);
if (common::VEC_INVALID == format) {
// dst_vector is not inited, do nothing
} else if (OB_UNLIKELY(is_uniform_format(format))) {
reinterpret_cast<ObUniformBase *>(vec)->set_null(eval_ctx_.get_batch_idx());
} else {
reinterpret_cast<ObBitmapNullVectorBase *>(vec)->set_null(eval_ctx_.get_batch_idx());
}
}
}
inline int ObOperator::get_next_row_vectorizely()
{
int ret = OB_SUCCESS;

View File

@ -207,6 +207,7 @@ public:
virtual ~ObDynamicParamSetter() {}
int set_dynamic_param(ObEvalCtx &eval_ctx) const;
int set_dynamic_param_vec2(ObEvalCtx &eval_ctx, const sql::ObBitVector &skip_bit) const;
int set_dynamic_param(ObEvalCtx &eval_ctx, common::ObObjParam *&param) const;
int update_dynamic_param(ObEvalCtx &eval_ctx, common::ObDatum &datum) const;
@ -484,6 +485,7 @@ public:
// Drain exchange in data for PX, or producer DFO will be blocked.
int drain_exch();
void set_pushdown_param_null(const common::ObIArray<ObDynamicParamSetter> &rescan_params);
void set_pushdown_param_null_vec2(const ObIArray<ObDynamicParamSetter> &rescan_params);
void set_feedback_node_idx(int64_t idx)
{ fb_node_idx_ = idx; }

View File

@ -184,6 +184,8 @@
#include "sql/engine/aggregate/ob_merge_groupby_vec_op.h"
#include "sql/engine/join/ob_merge_join_vec_op.h"
#include "sql/engine/join/ob_nested_loop_join_vec_op.h"
#include "sql/engine/subquery/ob_subplan_filter_vec_op.h"
namespace oceanbase
{
using namespace common;

View File

@ -462,6 +462,13 @@ class ObNestedLoopJoinOp;
REGISTER_OPERATOR(ObLogJoin, PHY_NESTED_LOOP_JOIN, ObNestedLoopJoinSpec,
ObNestedLoopJoinOp, NOINPUT, VECTORIZED_OP);
class ObNestedLoopJoinVecSpec;
class ObNestedLoopJoinVecOp;
REGISTER_OPERATOR(ObLogJoin, PHY_VEC_NESTED_LOOP_JOIN, ObNestedLoopJoinVecSpec,
ObNestedLoopJoinVecOp, NOINPUT, VECTORIZED_OP, 0 /*+version*/,
SUPPORT_RICH_FORMAT);
class ObLogSubPlanFilter;
class ObSubPlanFilterSpec;
class ObSubPlanFilterOp;
@ -469,6 +476,13 @@ REGISTER_OPERATOR(ObLogSubPlanFilter, PHY_SUBPLAN_FILTER,
ObSubPlanFilterSpec, ObSubPlanFilterOp,
NOINPUT, VECTORIZED_OP);
class ObLogSubPlanFilter;
class ObSubPlanFilterVecSpec;
class ObSubPlanFilterVecOp;
REGISTER_OPERATOR(ObLogSubPlanFilter, PHY_VEC_SUBPLAN_FILTER,
ObSubPlanFilterVecSpec, ObSubPlanFilterVecOp,
NOINPUT, VECTORIZED_OP, 0 /*+version*/, SUPPORT_RICH_FORMAT);
class ObLogSubPlanScan;
class ObSubPlanScanSpec;
class ObSubPlanScanOp;

View File

@ -141,6 +141,8 @@ PHY_OP_DEF(PHY_VEC_HASH_EXCEPT)
PHY_OP_DEF(PHY_VEC_WINDOW_FUNCTION)
PHY_OP_DEF(PHY_VEC_MERGE_GROUP_BY)
PHY_OP_DEF(PHY_VEC_MERGE_JOIN)
PHY_OP_DEF(PHY_VEC_NESTED_LOOP_JOIN)
PHY_OP_DEF(PHY_VEC_SUBPLAN_FILTER)
PHY_OP_DEF(PHY_END)
#endif /*PHY_OP_DEF*/

View File

@ -78,13 +78,117 @@ int ObSubQueryIterator::get_next_row()
group_params = parent_->get_rescan_params_info();
GroupParamBackupGuard guard(op_.get_exec_ctx().get_das_ctx());
guard.bind_batch_rescan_params(parent_spf_group, parent_group_rescan_cnt, group_params);
ret = op_.get_next_row();
ret = get_next_row_from_child();
} else {
ret = get_next_row_from_child();
}
return ret;
}
int ObSubQueryIterator::init_batch_rows_holder(const common::ObIArray<ObExpr *> &exprs, ObEvalCtx &eval_ctx)
{
int ret = OB_SUCCESS;
ret = brs_holder_.init(exprs, eval_ctx);
return ret;
}
int ObSubQueryIterator::get_next_row_from_child()
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(op_.get_spec().is_vectorized())) {
ret = get_next_row_vecrorizely();
} else {
ret = op_.get_next_row();
}
return ret;
}
int ObSubQueryIterator::get_next_row_vecrorizely()
{
int ret = OB_SUCCESS;
const int64_t max_row_cnt = INT64_MAX;
LOG_DEBUG("debug batch to row transform ", K(batch_row_pos_));
if (NULL == iter_brs_) {
if (OB_FAIL(op_.get_next_batch(max_row_cnt, iter_brs_))) {
LOG_WARN("get next batch failed", K(ret));
} else if (OB_FAIL(cast_vector_format())) {
LOG_WARN("failed to cast vector format", K(ret));
} else if (OB_FAIL(brs_holder_.save(1))) {
LOG_WARN("backup datumss[0] failed", K(ret));
}
// backup datums[0]
LOG_DEBUG("batch to row transform ", K(batch_row_pos_), KPC(iter_brs_));
}
while(OB_SUCC(ret)) {
if (batch_row_pos_ >= iter_brs_->size_ && iter_brs_->end_) {
ret = OB_ITER_END;
break;
}
while (batch_row_pos_ < iter_brs_->size_ && iter_brs_->skip_->at(batch_row_pos_)) {
batch_row_pos_++;
}
if (batch_row_pos_ >= iter_brs_->size_) {
if (!iter_brs_->end_) {
brs_holder_.restore();
if (OB_FAIL(op_.get_next_batch(max_row_cnt, iter_brs_))) {
LOG_WARN("get next batch failed", K(ret));
} else if (OB_FAIL(cast_vector_format())) {
LOG_WARN("failed to cast vector format", K(ret));
} else {
batch_row_pos_ = 0;
if (0 == iter_brs_->size_ && iter_brs_->end_) {
LOG_DEBUG("get empty batch ", K(iter_brs_));
ret = OB_ITER_END;
break;
} else if (OB_FAIL(brs_holder_.save(1))) {
LOG_WARN("backup datumss[0] failed", K(ret));
}
}
}
continue;
} else {
// got row, increase the index to next row
batch_row_pos_ += 1;
break;
}
}
// overwrite datums[0]: shallow copy
for (int i = 0; OB_SUCC(ret) && i < op_.get_spec().output_.count(); i++) {
ObExpr *expr = op_.get_spec().output_.at(i);
if (expr->is_batch_result() && 0 != cur_idx()) {
ObDatum *datums = expr->locate_batch_datums(eval_ctx_);
datums[0] = datums[cur_idx()];
LOG_DEBUG("copy datum to datum[0], details: ", K(cur_idx()), K(datums[0]),
K(expr->locate_batch_datums(eval_ctx_)),
KPC(expr->locate_batch_datums(eval_ctx_)), K(expr), KPC(expr));
} // non batch expr always point to offset 0, do nothing for else brach
}
eval_ctx_.set_batch_size(1);
eval_ctx_.set_batch_idx(0);
return ret;
}
int ObSubQueryIterator::cast_vector_format()
{
int ret = OB_SUCCESS;
if (OB_ISNULL(iter_brs_) || OB_ISNULL(parent_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid nullptr found", K(ret), K(iter_brs_), K(parent_));
} else if (parent_->get_spec().use_rich_format_ && op_.get_spec().use_rich_format_) {
FOREACH_CNT_X(e, op_.get_spec().output_, OB_SUCC(ret)) {
LOG_TRACE("cast to uniform", K(*e));
if (OB_FAIL((*e)->cast_to_uniform(iter_brs_->size_, eval_ctx_))) {
LOG_WARN("expr evaluate failed", K(ret), KPC(*e), K_(eval_ctx));
}
}
}
return ret;
}
void ObSubQueryIterator::drain_exch()
{
op_.drain_exch();
@ -364,10 +468,10 @@ DEF_TO_STRING(ObSubPlanFilterSpec)
ObSubPlanFilterOp::ObSubPlanFilterOp(
ObExecContext &exec_ctx, const ObOpSpec &spec, ObOpInput *input)
: ObOperator(exec_ctx, spec, input),
update_set_mem_(NULL),
iter_end_(false),
enable_left_px_batch_(false),
max_group_size_(0),
update_set_mem_(NULL),
enable_left_px_batch_(false),
current_group_(0),
das_batch_params_(),
left_rows_("SpfOp"),
@ -593,6 +697,8 @@ int ObSubPlanFilterOp::inner_open()
LOG_WARN("failed to init hash map for idx", K(i), K(ret));
} else if (OB_FAIL(iter->init_probe_row(MY_SPEC.exec_param_array_[i - 1].count()))) {
LOG_WARN("failed to init probe row", K(ret));
} else if (children_[i]->is_vectorized() && OB_FAIL(iter->init_batch_rows_holder(children_[i]->get_spec().output_, children_[i]->get_eval_ctx()))) {
LOG_WARN("failed to init batch rows holder", K(ret));
}
}
}

View File

@ -90,6 +90,7 @@ public:
int set_refactored(const DatumRow &row, const ObDatum &result, const int64_t deep_copy_size);
void set_parent(const ObSubPlanFilterOp *filter) { parent_ = filter; }
int reset_hash_map();
int64_t cur_idx() const { return batch_row_pos_ - 1; }
bool check_can_insert(const int64_t deep_copy_size)
{
@ -110,9 +111,12 @@ public:
//hard core, 1M limit for each hashmap
const static int HASH_MAP_MEMORY_LIMIT = 1024 * 1024;
void drain_exch();
int init_batch_rows_holder(const common::ObIArray<ObExpr *> &exprs, ObEvalCtx &eval_ctx);
private:
int get_next_row_from_child();
int get_next_row_vecrorizely();
int cast_vector_format();
// for das batch spf
int alloc_das_batch_store();
// for das batch spf end
@ -137,6 +141,7 @@ private:
int64_t batch_size_;
int64_t batch_row_pos_;
bool iter_end_;
ObBatchResultHolder brs_holder_;
// for vectorized end
common::ObArrayWrap<ObObjParam> das_batch_params_recovery_;
@ -201,7 +206,7 @@ public:
int handle_next_row();
bool enable_px_batch_rescan() { return enable_left_px_batch_; }
//for vectorized
int inner_get_next_batch(const int64_t max_row_cnt);
virtual int inner_get_next_batch(const int64_t max_row_cnt);
// for vectorized end
int init_left_cur_row(const int64_t column_cnt, ObExecContext &ctx);
@ -210,8 +215,8 @@ public:
//for DAS batch SPF
int fill_cur_row_das_batch_param(ObEvalCtx& eval_ctx, uint64_t current_group) const;
int bind_das_batch_params_to_store() const;
void get_current_group(uint64_t& current_group) const;
void get_current_batch_cnt(int64_t& current_batch_cnt) const { current_batch_cnt = group_rescan_cnt_; }
virtual void get_current_group(uint64_t& current_group) const;
virtual void get_current_batch_cnt(int64_t& current_batch_cnt) const { current_batch_cnt = group_rescan_cnt_; }
bool enable_left_das_batch() const {return MY_SPEC.enable_das_group_rescan_;}
//for DAS batch SPF end
@ -222,7 +227,7 @@ public:
ObBatchRescanCtl &get_batch_rescan_ctl() { return batch_rescan_ctl_; }
int handle_next_batch_with_px_rescan(const int64_t op_max_batch_size);
int handle_next_batch_with_group_rescan(const int64_t op_max_batch_size);
const GroupParamArray *get_rescan_params_info() const { return &rescan_params_info_; }
virtual const GroupParamArray *get_rescan_params_info() const { return &rescan_params_info_; }
private:
void set_param_null() { set_pushdown_param_null(MY_SPEC.rescan_params_); };
void destroy_subplan_iters();
@ -252,15 +257,16 @@ private:
int deep_copy_dynamic_obj();
// for das batch spf end
private:
protected:
common::ObSEArray<Iterator *, 16> subplan_iters_;
lib::MemoryContext update_set_mem_;
bool iter_end_;
uint64_t max_group_size_; //Das batch rescan size;
private:
lib::MemoryContext update_set_mem_;
// for px batch rescan
bool enable_left_px_batch_;
// for px batch rescan end
// for das batch rescan
uint64_t max_group_size_; //Das batch rescan size;
uint64_t current_group_; //The group id in this time right iter rescan;
common::ObArrayWrap<ObSqlArrayObj> das_batch_params_;

View File

@ -0,0 +1,355 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX SQL_ENG
#include "ob_subplan_filter_vec_op.h"
#include "sql/engine/ob_physical_plan.h"
#include "sql/engine/ob_exec_context.h"
namespace oceanbase
{
using namespace common;
namespace sql
{
ObSubPlanFilterVecSpec::ObSubPlanFilterVecSpec(ObIAllocator &alloc, const ObPhyOperatorType type)
: ObSubPlanFilterSpec(alloc, type)
{
}
OB_SERIALIZE_MEMBER((ObSubPlanFilterVecSpec, ObSubPlanFilterSpec));
DEF_TO_STRING(ObSubPlanFilterVecSpec)
{
int64_t pos = 0;
J_OBJ_START();
J_NAME("op_spec");
J_COLON();
pos += ObOpSpec::to_string(buf + pos, buf_len - pos);
J_COMMA();
J_KV(K_(rescan_params),
K_(onetime_exprs),
K_(init_plan_idxs),
K_(one_time_idxs),
K_(update_set),
K_(exec_param_idxs_inited));
J_OBJ_END();
return pos;
}
ObSubPlanFilterVecOp::ObSubPlanFilterVecOp(
ObExecContext &exec_ctx, const ObOpSpec &spec, ObOpInput *input)
: ObSubPlanFilterOp(exec_ctx, spec, input),
drive_iter_()
{
}
ObSubPlanFilterVecOp::~ObSubPlanFilterVecOp()
{
destroy_subplan_iters();
drive_iter_.reset();
}
void ObSubPlanFilterVecOp::destroy_subplan_iters()
{
FOREACH_CNT(it, subplan_iters_) {
if (NULL != *it) {
(*it)->~Iterator();
*it = NULL;
}
}
subplan_iters_.reset();
}
void ObSubPlanFilterVecOp::destroy()
{
destroy_subplan_iters();
drive_iter_.reset();
// vectorization not supported update set, no need destroy update set memory
// destroy_update_set_mem();
ObOperator::destroy();
}
int ObSubPlanFilterVecOp::rescan()
{
int ret = OB_SUCCESS;
brs_.end_ = false;
iter_end_ = false;
clear_evaluated_flag();
set_param_null();
if (OB_FAIL(ObOperator::inner_rescan())) {
LOG_WARN("failed to inner rescan", K(ret));
}
drive_iter_.reset();
if (!MY_SPEC.enable_das_group_rescan_) {
for (int32_t i = 1; OB_SUCC(ret) && i < child_cnt_; ++i) {
if (OB_FAIL(children_[i]->rescan())) {
LOG_WARN("rescan child operator failed", K(ret),
"op", op_name(), "child", children_[i]->op_name());
}
}
}
for (int32_t i = 1; OB_SUCC(ret) && i < child_cnt_; ++i) {
Iterator *iter = subplan_iters_.at(i - 1);
if (OB_ISNULL(iter)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("subplan_iter is null", K(ret));
} else if (MY_SPEC.init_plan_idxs_.has_member(i)) {
iter->reuse();
if (OB_FAIL(iter->prepare_init_plan())) {
LOG_WARN("prepare init plan failed", K(ret), K(i));
}
} else if (OB_FAIL(iter->reset_hash_map())) {
LOG_WARN("failed to reset hash map", K(ret), K(i));
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(prepare_onetime_exprs())) {
LOG_WARN("prepare onetime exprs failed", K(ret));
} else if (OB_FAIL(drive_iter_.rescan_left())) { // use drive_iter_ to rescan drive child operator of SPF
LOG_WARN("failed to do rescan", K(ret));
} else {
startup_passed_ = spec_.startup_filters_.empty();
}
}
need_init_before_get_row_ = false;
#ifndef NDEBUG
OX(OB_ASSERT(false == brs_.end_));
#endif
return ret;
}
int ObSubPlanFilterVecOp::switch_iterator()
{
int ret = OB_SUCCESS;
if (OB_FAIL(ObOperator::inner_switch_iterator())) {
LOG_WARN("failed to inner switch iterator", K(ret));
} else if (OB_FAIL(child_->switch_iterator())) {
//TODO: 目前只支持对非相关子查询做多组迭代器切换,只切换主表
if (OB_ITER_END != ret) {
LOG_WARN("switch child operator iterator failed", K(ret));
}
}
#ifndef NDEBUG
OX(OB_ASSERT(false == brs_.end_));
#endif
return ret;
}
int ObSubPlanFilterVecOp::init_subplan_iters()
{
int ret = OB_SUCCESS;
CK(child_cnt_ >= 2);
LOG_TRACE("init subplan iters in ObSubPlanFilterVecOp", K(child_cnt_));
if (OB_SUCC(ret)) {
OZ(subplan_iters_.prepare_allocate(child_cnt_ - 1));
for (int32_t i = 1; OB_SUCC(ret) && i < child_cnt_; ++i) {
void *ptr = ctx_.get_allocator().alloc(sizeof(Iterator));
Iterator *&iter = subplan_iters_.at(i - 1);
if (OB_ISNULL(ptr)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_ERROR("alloc subplan iterator failed", K(ret), "size", sizeof(Iterator));
} else {
iter = new(ptr) Iterator(*children_[i]);
iter->set_iter_id(i);
iter->set_parent(this);
if (MY_SPEC.init_plan_idxs_.has_member(i)) {
iter->set_init_plan();
//init plan 移到get_next_row之后
} else if (MY_SPEC.one_time_idxs_.has_member(i)) {
iter->set_onetime_plan();
} else if (!MY_SPEC.enable_px_batch_rescans_.empty() &&
MY_SPEC.enable_px_batch_rescans_.at(i)) {
// enable_left_px_batch_ = true;
}
if (!MY_SPEC.exec_param_idxs_inited_) {
//unittest or old version, do not init hashmap
} else if (OB_FAIL(iter->init_mem_entity())) {
LOG_WARN("failed to init mem_entity", K(ret));
} else if (MY_SPEC.exec_param_array_[i - 1].count() > 0) {
//min of buckets is 16,
//max will not exceed card of left_child and HASH_MAP_MEMORY_LIMIT/ObObj
if (OB_FAIL(iter->init_hashmap(max(
16/*hard code*/, min(get_child(0)->get_spec().get_rows(),
iter->HASH_MAP_MEMORY_LIMIT / static_cast<int64_t>(sizeof(ObDatum))))))) {
LOG_WARN("failed to init hash map for idx", K(i), K(ret));
} else if (OB_FAIL(iter->init_probe_row(MY_SPEC.exec_param_array_[i - 1].count()))) {
LOG_WARN("failed to init probe row", K(ret));
} else if (children_[i]->is_vectorized() && OB_FAIL(iter->init_batch_rows_holder(children_[i]->get_spec().output_, children_[i]->get_eval_ctx()))) {
LOG_WARN("failed to init batch rows holder", K(ret));
}
}
}
}
}
return ret;
}
int ObSubPlanFilterVecOp::inner_open()
{
int ret = OB_SUCCESS;
max_group_size_ = OB_MAX_BULK_JOIN_ROWS;
LOG_TRACE("max group size of SPF is", K(max_group_size_));
if (OB_FAIL(drive_iter_.init(this,
max_group_size_,
&MY_SPEC.rescan_params_,
MY_SPEC.enable_das_group_rescan_,
false
))) {
LOG_WARN("failed to init drive iterator for SPF", KR(ret));
} else if (OB_FAIL(init_subplan_iters())) {
LOG_WARN("failed to init sub plan iters for SPF", K(ret));
}
return ret;
}
int ObSubPlanFilterVecOp::inner_close()
{
destroy_subplan_iters();
// destroy_update_set_mem();
drive_iter_.reset();
return OB_SUCCESS;
}
int ObSubPlanFilterVecOp::inner_get_next_batch(const int64_t max_row_cnt)
{
int ret = OB_SUCCESS;
int64_t op_max_batch_size = min(max_row_cnt, MY_SPEC.max_batch_size_);
int64_t params_size = 0;
if (iter_end_) {
brs_.size_ = 0;
brs_.end_ = true;
} else if (need_init_before_get_row_) {
OZ(prepare_onetime_exprs());
}
clear_evaluated_flag();
while (OB_SUCC(ret) && !iter_end_) {
const ObBatchRows *child_brs = NULL;
set_param_null();
if (OB_FAIL(drive_iter_.get_next_left_batch(op_max_batch_size, child_brs))) {
LOG_WARN("fail to get next batch", K(ret));
} else if (child_brs->end_) {
iter_end_ = true;
}
LOG_TRACE("child_brs size", K(child_brs->size_), K(spec_.id_));
ObEvalCtx::BatchInfoScopeGuard guard(eval_ctx_);
guard.set_batch_size(child_brs->size_);
brs_.size_ = child_brs->size_;
bool all_filtered = true;
brs_.skip_->deep_copy(*child_brs->skip_, child_brs->size_);
clear_evaluated_flag();
for (int64_t l_idx = 0; OB_SUCC(ret) && l_idx < child_brs->size_ && !iter_end_; l_idx++) {
if (child_brs->skip_->exist(l_idx)) { continue; }
guard.set_batch_idx(l_idx);
if (OB_FAIL(drive_iter_.fill_cur_row_group_param())) {
LOG_WARN("prepare rescan params failed", K(ret));
} else {
if (need_init_before_get_row_) {
for (int32_t i = 1; OB_SUCC(ret) && i < child_cnt_; ++i) {
Iterator *&iter = subplan_iters_.at(i - 1);
if (MY_SPEC.init_plan_idxs_.has_member(i)) {
OZ(iter->prepare_init_plan());
}
}
need_init_before_get_row_ = false;
}
}
if (OB_SUCC(ret)) {
bool filtered = false;
if (OB_FAIL(filter_row_vector(eval_ctx_, MY_SPEC.filter_exprs_, *(child_brs->skip_), filtered))) {
LOG_WARN("fail to filter row", K(ret), K(l_idx), K(spec_.id_));
} else if (filtered) {
brs_.skip_->set(l_idx);
LOG_TRACE("left rows is filterd", K(l_idx), K(spec_.id_));
} else {
LOG_TRACE("left rows is not filterd", K(l_idx), K(spec_.id_));
all_filtered = false;
// ObDatum *datum = NULL;
EvalBound eval_bound(eval_ctx_.get_batch_size(), l_idx, l_idx + 1, false);
FOREACH_CNT_X(e, spec_.output_, OB_SUCC(ret)) {
if (OB_FAIL((*e)->eval_vector(eval_ctx_, *(child_brs->skip_), eval_bound))) {
LOG_WARN("expr evaluate failed", K(ret), K(*e));
}
}
}
}
} // for end
if (OB_SUCC(ret) && all_filtered) {
reset_batchrows();
continue;
}
FOREACH_CNT_X(e, spec_.output_, OB_SUCC(ret)) {
(*e)->get_eval_info(eval_ctx_).projected_ = true;
}
break;
} // end while
if (OB_SUCC(ret) && iter_end_) {
set_param_null();
}
return ret;
}
int ObSubPlanFilterVecOp::prepare_onetime_exprs()
{
int ret = OB_SUCCESS;
if (is_vectorized()) {
ObEvalCtx::BatchInfoScopeGuard guard(eval_ctx_);
guard.set_batch_size(MY_SPEC.max_batch_size_);
ret = prepare_onetime_exprs_inner();
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("SPF operator is not vectorized", K(ret));
}
for (int64_t i = 1; OB_SUCC(ret) && i < child_cnt_; ++i) {
Iterator *iter = subplan_iters_.at(i - 1);
if (OB_ISNULL(iter)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("subplan_iter is null", K(ret));
} else if (MY_SPEC.one_time_idxs_.has_member(i)) {
iter->drain_exch();
}
}
return ret;
}
int ObSubPlanFilterVecOp::prepare_onetime_exprs_inner()
{
int ret = OB_SUCCESS;
ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(ctx_);
for (int64_t i = 0; OB_SUCC(ret) && i < MY_SPEC.onetime_exprs_.count(); ++i) {
char mock_skip_data[1] = {0};
sql::ObBitVector &skip = *sql::to_bit_vector(mock_skip_data);
const ObDynamicParamSetter &setter = MY_SPEC.onetime_exprs_.at(i);
if (OB_FAIL(setter.set_dynamic_param_vec2(eval_ctx_, skip))) {
LOG_WARN("failed to prepare onetime expr", K(ret), K(i));
}
}
return ret;
}
} // end sql
} // oceanbase

View File

@ -0,0 +1,92 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_SUBQUERY_OB_SUBPLAN_FILTER_VEC_OP_H_
#define OCEANBASE_SUBQUERY_OB_SUBPLAN_FILTER_VEC_OP_H_
#include "sql/engine/ob_operator.h"
#include "sql/ob_sql_define.h"
#include "sql/engine/basic/ob_group_join_buffer_v2.h"
#include "sql/engine/basic/ob_vector_result_holder.h"
#include "sql/engine/subquery/ob_subplan_filter_op.h"
namespace oceanbase
{
namespace sql
{
class ObSubQueryIterator;
class ObSubPlanFilterVecSpec : public ObSubPlanFilterSpec
{
OB_UNIS_VERSION_V(1);
public:
DECLARE_VIRTUAL_TO_STRING;
ObSubPlanFilterVecSpec(ObIAllocator &alloc, const ObPhyOperatorType type);
};
class ObSubPlanFilterVecOp: public ObSubPlanFilterOp
{
public:
ObSubPlanFilterVecOp(ObExecContext &exec_ctx, const ObOpSpec &spec, ObOpInput *input);
virtual ~ObSubPlanFilterVecOp();
virtual int inner_open() override;
virtual int rescan() override;
virtual int switch_iterator() override;
virtual int inner_get_next_row() override { return common::OB_NOT_IMPLEMENT; }
virtual int inner_close() override;
virtual void destroy() override;
// const common::ObIArray<Iterator *> &get_subplan_iters() const { return subplan_iters_; }
virtual int inner_get_next_batch(const int64_t max_row_cnt);
virtual void get_current_group(uint64_t& current_group) const
{
current_group = drive_iter_.get_cur_group_id();
}
virtual void get_current_batch_cnt(int64_t& current_batch_cnt) const
{
current_batch_cnt = drive_iter_.get_group_rescan_cnt();
}
bool enable_left_das_batch() const {return MY_SPEC.enable_das_group_rescan_;}
virtual const GroupParamArray *get_rescan_params_info() const { return MY_SPEC.enable_das_group_rescan_? drive_iter_.get_rescan_params_info() : nullptr; }
const ObSubPlanFilterVecSpec &get_spec() const
{ return static_cast<const ObSubPlanFilterVecSpec &>(spec_); }
private:
void set_param_null() { set_pushdown_param_null(MY_SPEC.rescan_params_); };
void destroy_subplan_iters();
int prepare_onetime_exprs();
int prepare_onetime_exprs_inner();
int init_subplan_iters();
private:
// common::ObSEArray<Iterator *, 16> subplan_iters_;
// lib::MemoryContext update_set_mem_;
// bool iter_end_;
// uint64_t max_group_size_; //Das batch rescan size;
// // Combined in join buffer
// // common::ObArrayWrap<ObSqlArrayObj> das_batch_params_;
// common::ObSEArray<Iterator*, 8> subplan_iters_to_check_;
ObDriverRowIterator drive_iter_;
public:
};
} // end anmespace sql
} // end namespace oceanbase
#endif // OCEANBASE_SUBQUERY_OB_SUBPLAN_FILTER_VEC_OP_H_

View File

@ -1461,7 +1461,7 @@ int ObTableScanOp::prepare_single_scan_range(int64_t group_idx)
if (OB_SUCC(ret) && MY_SPEC.is_vt_mapping_) {
OZ(vt_result_converter_->convert_key_ranges(MY_INPUT.key_ranges_));
}
LOG_DEBUG("prepare single scan range", K(ret), K(key_ranges), K(MY_INPUT.key_ranges_),
LOG_TRACE("prepare single scan range", K(ret), K(key_ranges), K(MY_INPUT.key_ranges_),
K(MY_INPUT.ss_key_ranges_), K(spec_.id_));
return ret;
}

View File

@ -1015,3 +1015,14 @@ int ObLogSubPlanFilter::close_px_resource_analyze(CLOSE_PX_RESOURCE_ANALYZE_DECL
}
return ret;
}
bool ObLogSubPlanFilter::is_px_batch_rescan_enabled()
{
bool enable_px_batch_rescan = false;
if (!enable_px_batch_rescans_.empty()) {
for (int i = 0; i < enable_px_batch_rescans_.count() && !enable_px_batch_rescan; i++) {
enable_px_batch_rescan = enable_px_batch_rescans_.at(i);
}
}
return enable_px_batch_rescan;
}

View File

@ -139,6 +139,7 @@ public:
int compute_spf_batch_rescan_compat(bool &can_batch);
int check_right_is_local_scan(bool &is_local_scan) const;
int pre_check_spf_can_px_batch_rescan(bool &can_px_batch_rescan, bool &rescan_contain_match_all) const;
bool is_px_batch_rescan_enabled();
private:
int extract_exist_style_subquery_exprs(ObRawExpr *expr,
ObIArray<ObRawExpr*> &exist_style_exprs);

View File

@ -545,6 +545,7 @@ int ObConfigInfoInPC::load_influence_plan_config()
(0 == ObString::make_string("Hyperscan").case_compare(tenant_config->_regex_engine.str()));
direct_load_allow_fallback_ = tenant_config->direct_load_allow_fallback;
default_load_mode_ = ObDefaultLoadMode::get_type_value(tenant_config->default_load_mode.get_value_string());
enable_nlj_spf_use_rich_format_ = tenant_config->_enable_nlj_spf_use_rich_format;
}
return ret;
@ -615,6 +616,9 @@ int ObConfigInfoInPC::serialize_configs(char *buf, int buf_len, int64_t &pos)
} else if (OB_FAIL(databuff_printf(buf, buf_len, pos,
"%d", default_load_mode_))) {
SQL_PC_LOG(WARN, "failed to databuff_printf", K(ret), K(default_load_mode_));
} else if (OB_FAIL(databuff_printf(buf, buf_len, pos,
"%d,", enable_nlj_spf_use_rich_format_))) {
SQL_PC_LOG(WARN, "failed to databuff_printf", K(ret), K(enable_nlj_spf_use_rich_format_));
} else {
// do nothing
}

View File

@ -1043,6 +1043,7 @@ public:
enable_spf_batch_rescan_(false),
enable_var_assign_use_das_(false),
enable_das_keep_order_(false),
enable_nlj_spf_use_rich_format_(false),
bloom_filter_ratio_(0),
enable_hyperscan_regexp_engine_(false),
realistic_runtime_bloom_filter_size_(false),
@ -1092,6 +1093,7 @@ public:
bool enable_spf_batch_rescan_;
bool enable_var_assign_use_das_;
bool enable_das_keep_order_;
bool enable_nlj_spf_use_rich_format_;
int bloom_filter_ratio_;
bool enable_hyperscan_regexp_engine_;
bool realistic_runtime_bloom_filter_size_;

View File

@ -608,6 +608,17 @@ bool ObSQLSessionInfo::is_var_assign_use_das_enabled() const
return bret;
}
bool ObSQLSessionInfo::is_nlj_spf_use_rich_format_enabled() const
{
bool bret = false;
int64_t tenant_id = get_effective_tenant_id();
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id));
if (tenant_config.is_valid()) {
bret = tenant_config->_enable_nlj_spf_use_rich_format;
}
return bret;
}
int ObSQLSessionInfo::is_adj_index_cost_enabled(bool &enabled, int64_t &stats_cost_percent) const
{
int ret = OB_SUCCESS;

View File

@ -1310,6 +1310,7 @@ public:
bool is_qualify_filter_enabled() const;
int is_enable_range_extraction_for_not_in(bool &enabled) const;
bool is_var_assign_use_das_enabled() const;
bool is_nlj_spf_use_rich_format_enabled() const;
int is_adj_index_cost_enabled(bool &enabled, int64_t &stats_cost_percent) const;
bool is_spf_mlj_group_rescan_enabled() const;
int is_preserve_order_for_pagination_enabled(bool &enabled) const;

View File

@ -351,6 +351,7 @@ _enable_log_cache
_enable_memleak_light_backtrace
_enable_newsort
_enable_new_sql_nio
_enable_nlj_spf_use_rich_format
_enable_optimizer_qualify_filter
_enable_oracle_priv_check
_enable_parallel_minor_merge