diff --git a/src/sql/CMakeLists.txt b/src/sql/CMakeLists.txt index 84615c1f67..1c462831ab 100644 --- a/src/sql/CMakeLists.txt +++ b/src/sql/CMakeLists.txt @@ -1142,7 +1142,7 @@ ob_set_subtarget(ob_sql optimizer optimizer/ob_log_optimizer_stats_gathering.cpp optimizer/ob_dynamic_sampling.cpp optimizer/ob_log_values_table_access.cpp - optimizer/ob_direct_load_optimizer.cpp + optimizer/ob_direct_load_optimizer_ctx.cpp optimizer/ob_log_expand.cpp ) diff --git a/src/sql/code_generator/ob_static_engine_cg.cpp b/src/sql/code_generator/ob_static_engine_cg.cpp index 1a4a6bacd8..3552bee22d 100644 --- a/src/sql/code_generator/ob_static_engine_cg.cpp +++ b/src/sql/code_generator/ob_static_engine_cg.cpp @@ -53,7 +53,7 @@ #include "sql/optimizer/ob_insert_log_plan.h" #include "sql/optimizer/ob_log_stat_collector.h" #include "sql/optimizer/ob_log_optimizer_stats_gathering.h" -#include "sql/optimizer/ob_direct_load_optimizer.h" +#include "sql/optimizer/ob_direct_load_optimizer_ctx.h" #include "sql/optimizer/ob_log_expand.h" #include "share/datum/ob_datum_funcs.h" #include "share/schema/ob_schema_mgr.h" diff --git a/src/sql/engine/cmd/ob_load_data_direct_impl.cpp b/src/sql/engine/cmd/ob_load_data_direct_impl.cpp index 557d01aa96..091bc05c08 100644 --- a/src/sql/engine/cmd/ob_load_data_direct_impl.cpp +++ b/src/sql/engine/cmd/ob_load_data_direct_impl.cpp @@ -12,7 +12,7 @@ #define USING_LOG_PREFIX SQL_ENG #include "sql/engine/cmd/ob_load_data_direct_impl.h" -#include "sql/optimizer/ob_direct_load_optimizer.h" +#include "sql/optimizer/ob_direct_load_optimizer_ctx.h" #include "observer/omt/ob_tenant.h" #include "observer/table_load/ob_table_load_coordinator.h" #include "observer/table_load/ob_table_load_coordinator_ctx.h" diff --git a/src/sql/engine/cmd/ob_load_data_executor.cpp b/src/sql/engine/cmd/ob_load_data_executor.cpp index 7cae557e67..a7ca10248a 100644 --- a/src/sql/engine/cmd/ob_load_data_executor.cpp +++ b/src/sql/engine/cmd/ob_load_data_executor.cpp @@ -18,7 +18,7 @@ #include "sql/engine/cmd/ob_load_data_impl.h" #include "sql/engine/cmd/ob_load_data_direct_impl.h" #include "sql/engine/ob_exec_context.h" -#include "sql/optimizer/ob_direct_load_optimizer.h" +#include "sql/optimizer/ob_direct_load_optimizer_ctx.h" #include "sql/optimizer/ob_optimizer.h" namespace oceanbase @@ -32,15 +32,17 @@ int ObLoadDataExecutor::execute(ObExecContext &ctx, ObLoadDataStmt &stmt) ObTableDirectInsertCtx &table_direct_insert_ctx = ctx.get_table_direct_insert_ctx(); ObLoadDataBase *load_impl = NULL; ObDirectLoadOptimizerCtx optimizer_ctx; - ObDirectLoadOptimizer optimizer(optimizer_ctx); stmt.set_optimizer_ctx(&optimizer_ctx); if (!stmt.get_load_arguments().is_csv_format_) { ret = OB_NOT_SUPPORTED; LOG_WARN("invalid resolver results", K(ret)); - } else if (OB_FAIL(optimizer.optimize(&ctx, stmt))) { - LOG_WARN("fail to optimize", K(ret), K(stmt)); - } else if (FALSE_IT(table_direct_insert_ctx.set_is_direct(optimizer_ctx.use_direct_load()))) { + } else if (OB_FAIL(optimizer_ctx.init_direct_load_ctx(&ctx, stmt))) { + LOG_WARN("fail to init direct load ctx", K(ret), K(stmt)); } else { + if (optimizer_ctx.can_use_direct_load()) { + optimizer_ctx.set_use_direct_load(); + } + table_direct_insert_ctx.set_is_direct(optimizer_ctx.use_direct_load()); if (!table_direct_insert_ctx.get_is_direct()) { if (OB_ISNULL(load_impl = OB_NEWx(ObLoadDataSPImpl, (&ctx.get_allocator())))) { ret = OB_ALLOCATE_MEMORY_FAILED; diff --git a/src/sql/optimizer/ob_del_upd_log_plan.cpp b/src/sql/optimizer/ob_del_upd_log_plan.cpp index 05217758c7..850ca7d452 100644 --- a/src/sql/optimizer/ob_del_upd_log_plan.cpp +++ b/src/sql/optimizer/ob_del_upd_log_plan.cpp @@ -19,7 +19,7 @@ #include "sql/optimizer/ob_log_update.h" #include "sql/optimizer/ob_log_exchange.h" #include "sql/optimizer/ob_log_link_dml.h" -#include "sql/optimizer/ob_direct_load_optimizer.h" +#include "sql/optimizer/ob_direct_load_optimizer_ctx.h" #include "sql/resolver/dml/ob_merge_stmt.h" #include "sql/rewrite/ob_transform_utils.h" #include "sql/dblink/ob_dblink_utils.h" @@ -66,52 +66,28 @@ int ObDelUpdLogPlan::compute_dml_parallel() LOG_USER_ERROR(OB_NOT_SUPPORTED, "online ddl without pdml"); } } else if (OB_FAIL(get_parallel_info_from_candidate_plans(dml_parallel))) { - LOG_WARN("failed to get parallel info from candidate plans", K(ret)); + LOG_WARN("failed to get parallel info", K(ret)); + } else if (OB_FAIL(get_parallel_info_from_direct_load(del_upd_stmt, session_info, dml_parallel))) { + LOG_WARN("failed to get parallel info from direct load", K(ret)); } else if (OB_UNLIKELY(ObGlobalHint::DEFAULT_PARALLEL > dml_parallel)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("get unexpected parallel", K(ret), K(dml_parallel), K(opt_ctx.get_parallel_rule())); } else if (opt_ctx.can_use_pdml()) { - if (del_upd_stmt->is_insert_stmt()) { - const ObInsertStmt *insert_stmt = static_cast(del_upd_stmt); - if (insert_stmt->is_normal_table_overwrite()) { - const int64_t default_insert_overwrite_parallel = 2; - if (dml_parallel <= ObGlobalHint::DEFAULT_PARALLEL) { - dml_parallel = default_insert_overwrite_parallel; - } - } - if (dml_parallel <= ObGlobalHint::DEFAULT_PARALLEL && opt_ctx.get_parallel_rule() == PXParallelRule::MANUAL_HINT) { - // do nothing - } else if (OB_FAIL(check_is_direct_load(*insert_stmt, dml_parallel))) { - LOG_WARN("failed to check is direct load", K(ret)); - } else if (opt_ctx.get_direct_load_optimizer_ctx().can_use_direct_load()) { - const int64_t default_direct_insert_parallel = 2; - if (dml_parallel <= ObGlobalHint::DEFAULT_PARALLEL) { - dml_parallel = default_direct_insert_parallel; - } - } - } - if (OB_SUCC(ret)) { - max_dml_parallel_ = dml_parallel; - use_pdml_ = (opt_ctx.is_online_ddl() || - (ObGlobalHint::DEFAULT_PARALLEL < dml_parallel && - is_strict_mode(session_info->get_sql_mode()))); - if (opt_ctx.get_direct_load_optimizer_ctx().can_use_direct_load() && use_pdml_) { - get_optimizer_context().get_direct_load_optimizer_ctx().set_use_direct_load(); - ObExecContext *exec_ctx = get_optimizer_context().get_exec_ctx(); - if (OB_ISNULL(exec_ctx)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("exec_ctx is null", K(ret)); - } else { - exec_ctx->get_table_direct_insert_ctx().set_is_direct(true); - } - } - } + max_dml_parallel_ = dml_parallel; + use_pdml_ = (opt_ctx.is_online_ddl() || + (ObGlobalHint::DEFAULT_PARALLEL < dml_parallel && + is_strict_mode(session_info->get_sql_mode()))); } else if (opt_ctx.get_can_use_parallel_das_dml()) { max_dml_parallel_ = dml_parallel; use_parallel_das_dml_ = (!opt_ctx.is_online_ddl() && (ObGlobalHint::DEFAULT_PARALLEL < dml_parallel && is_strict_mode(session_info->get_sql_mode()))); } + if (OB_SUCC(ret)) { + if (OB_FAIL(check_use_direct_load())) { + LOG_WARN("failed to check use direct load", K(ret)); + } + } LOG_TRACE("finish compute dml parallel", K(use_pdml_), K(max_dml_parallel_), K(use_parallel_das_dml_), K(opt_ctx.can_use_pdml()), K(opt_ctx.is_online_ddl()), K(opt_ctx.get_parallel_rule()), K(opt_ctx.get_parallel())); @@ -141,6 +117,63 @@ int ObDelUpdLogPlan::get_parallel_info_from_candidate_plans(int64_t &dop) const return ret; } +int ObDelUpdLogPlan::get_parallel_info_from_direct_load(const ObDelUpdStmt *del_upd_stmt, + const ObSQLSessionInfo *session_info, + int64_t &dml_parallel) const +{ + int ret = OB_SUCCESS; + ObOptimizerContext &opt_ctx = get_optimizer_context(); + ObDirectLoadOptimizerCtx &direct_load_opt_ctx = opt_ctx.get_direct_load_optimizer_ctx(); + if (direct_load_opt_ctx.is_insert_overwrite()) { + const int64_t default_insert_overwrite_parallel = 2; + dml_parallel = MAX(dml_parallel, default_insert_overwrite_parallel); + } else if (opt_ctx.get_parallel_rule() == PXParallelRule::MANUAL_HINT) { + // do nothing + } else if (direct_load_opt_ctx.can_use_direct_load() && direct_load_opt_ctx.is_optimized_by_default_load_mode()) { + const int64_t default_direct_insert_parallel = 2; + bool can_use_pdml = opt_ctx.can_use_pdml() && (opt_ctx.is_online_ddl() || is_strict_mode(session_info->get_sql_mode())); + if (can_use_pdml) { + dml_parallel = MAX(dml_parallel, default_direct_insert_parallel); + } + } + return ret; +} + +int ObDelUpdLogPlan::check_use_direct_load() +{ + int ret = OB_SUCCESS; + ObOptimizerContext &opt_ctx = get_optimizer_context(); + ObDirectLoadOptimizerCtx &direct_load_opt_ctx = opt_ctx.get_direct_load_optimizer_ctx(); + ObExecContext *exec_ctx = nullptr; + if (OB_ISNULL(exec_ctx = opt_ctx.get_exec_ctx())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected null", K(ret), KP(opt_ctx.get_exec_ctx())); + } else { + bool use_direct_load = false; + if (direct_load_opt_ctx.is_insert_overwrite()) { + if (OB_UNLIKELY(!use_pdml_)) { + ret = OB_NOT_SUPPORTED; + LOG_USER_ERROR(OB_NOT_SUPPORTED, "PDML is disabled, insert overwrite is"); + } else { + use_direct_load = true; + } + } else if (direct_load_opt_ctx.can_use_direct_load()) { + if (OB_UNLIKELY(!use_pdml_)) { + LOG_USER_WARN(OB_NOT_SUPPORTED, "PDML is disabled, direct load is"); + } else { + use_direct_load = true; + } + } + if (OB_SUCC(ret)) { + if (use_direct_load) { + direct_load_opt_ctx.set_use_direct_load(); + exec_ctx->get_table_direct_insert_ctx().set_is_direct(true); + } + } + } + return ret; +} + int ObDelUpdLogPlan::get_pdml_parallel_degree(const int64_t target_part_cnt, int64_t &dop) const { @@ -2395,26 +2428,6 @@ int ObDelUpdLogPlan::allocate_link_dml_as_top(ObLogicalOperator *&old_top) return ret; } -// Direct-insert is enabled only: -// 1. pdml insert -// 2. insert into select, insert overwrite -// 3. _ob_enable_direct_load -// 4. append hint or direct_load hint or default load mode -// 5. full_direct_load(auto_commit, not in a transaction) or inc_direct_load -int ObDelUpdLogPlan::check_is_direct_load(const ObInsertStmt &insert_stmt, const int64_t dml_parallel) -{ - int ret = OB_SUCCESS; - if (insert_stmt.value_from_select() && !insert_stmt.is_external_table_overwrite()) { - ObOptimizerContext &optimize_ctx = get_optimizer_context(); - ObDirectLoadOptimizerCtx &direct_load_optimize_ctx = get_optimizer_context().get_direct_load_optimizer_ctx(); - ObDirectLoadOptimizer optimizer(direct_load_optimize_ctx); - if (OB_FAIL(optimizer.optimize(insert_stmt, optimize_ctx, dml_parallel))) { - LOG_WARN("fail to optimize", K(ret)); - } - } - return ret; -} - int ObDelUpdLogPlan::perform_vector_assign_expr_replacement(ObDelUpdStmt *stmt) { UNUSED(stmt); diff --git a/src/sql/optimizer/ob_del_upd_log_plan.h b/src/sql/optimizer/ob_del_upd_log_plan.h index 6d3b50f7ba..f680b64e7e 100644 --- a/src/sql/optimizer/ob_del_upd_log_plan.h +++ b/src/sql/optimizer/ob_del_upd_log_plan.h @@ -241,7 +241,6 @@ public: int allocate_link_dml_as_top(ObLogicalOperator *&old_top); bool use_pdml() const { return use_pdml_; } int compute_dml_parallel(); - int check_is_direct_load(const ObInsertStmt &insert_stmt, const int64_t dml_parallel); int get_parallel_info_from_candidate_plans(int64_t &dop) const; int get_pdml_parallel_degree(const int64_t target_part_cnt, int64_t &dop) const; bool get_can_use_parallel_das_dml() const { return use_parallel_das_dml_; } @@ -260,6 +259,10 @@ protected: ObIArray &normal_query_refs, ObIArray &alias_query_refs); private: + int get_parallel_info_from_direct_load(const ObDelUpdStmt *del_upd_stmt, + const ObSQLSessionInfo *session_info, + int64_t &dml_parallel) const; + int check_use_direct_load(); DISALLOW_COPY_AND_ASSIGN(ObDelUpdLogPlan); protected: diff --git a/src/sql/optimizer/ob_direct_load_optimizer.cpp b/src/sql/optimizer/ob_direct_load_optimizer_ctx.cpp similarity index 69% rename from src/sql/optimizer/ob_direct_load_optimizer.cpp rename to src/sql/optimizer/ob_direct_load_optimizer_ctx.cpp index ca09264501..77381b259b 100644 --- a/src/sql/optimizer/ob_direct_load_optimizer.cpp +++ b/src/sql/optimizer/ob_direct_load_optimizer_ctx.cpp @@ -12,7 +12,7 @@ #define USING_LOG_PREFIX SQL_ENG -#include "sql/optimizer/ob_direct_load_optimizer.h" +#include "sql/optimizer/ob_direct_load_optimizer_ctx.h" #include "sql/optimizer/ob_optimizer.h" #include "sql/optimizer/ob_optimizer_context.h" #include "sql/session/ob_sql_session_info.h" @@ -42,7 +42,7 @@ ObDirectLoadOptimizerCtx::ObDirectLoadOptimizerCtx() need_sort_(false), can_use_direct_load_(false), use_direct_load_(false), - is_optimized_(false) + is_optimized_by_default_load_mode_(false) { } @@ -58,17 +58,17 @@ void ObDirectLoadOptimizerCtx::reset() need_sort_ = false; can_use_direct_load_ = false; use_direct_load_ = false; - is_optimized_ = false; + is_optimized_by_default_load_mode_ = false; } -int ObDirectLoadOptimizer::optimize(ObExecContext *exec_ctx, ObLoadDataStmt &stmt) +int ObDirectLoadOptimizerCtx::init_direct_load_ctx(ObExecContext *exec_ctx, ObLoadDataStmt &stmt) { int ret = OB_SUCCESS; if (OB_ISNULL(exec_ctx)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("exec_ctx cannot be null", K(ret)); } else if (GCONF._ob_enable_direct_load) { - direct_load_optimizer_ctx_.reset(); + reset(); omt::ObTenantConfigGuard tenant_config(TENANT_CONF(MTL_ID())); ObLoadDataHint &load_data_hint = stmt.get_hints(); ObDirectLoadHint &direct_load_hint = load_data_hint.get_direct_load_hint(); @@ -89,20 +89,20 @@ int ObDirectLoadOptimizer::optimize(ObExecContext *exec_ctx, ObLoadDataStmt &stm enable_by_config(); } if (OB_SUCC(ret)) { - if (direct_load_optimizer_ctx_.load_method_ != ObDirectLoadMethod::INVALID_METHOD) { - direct_load_optimizer_ctx_.table_id_ = stmt.get_load_arguments().table_id_; - direct_load_optimizer_ctx_.load_mode_ = ObDirectLoadMode::LOAD_DATA; - direct_load_optimizer_ctx_.dup_action_ = stmt.get_load_arguments().dupl_action_; - direct_load_optimizer_ctx_.load_level_ = stmt.get_part_ids().empty() ? ObDirectLoadLevel::TABLE + if (load_method_ != ObDirectLoadMethod::INVALID_METHOD) { + table_id_ = stmt.get_load_arguments().table_id_; + load_mode_ = ObDirectLoadMode::LOAD_DATA; + dup_action_ = stmt.get_load_arguments().dupl_action_; + load_level_ = stmt.get_part_ids().empty() ? ObDirectLoadLevel::TABLE : ObDirectLoadLevel::PARTITION; if (OB_FAIL(check_semantics())) { LOG_WARN("fail to check semantics", K(ret)); } else if (OB_FAIL(check_support_direct_load(exec_ctx))) { LOG_WARN("fail to check support direct load", K(ret)); } else { - direct_load_optimizer_ctx_.dup_action_ = direct_load_optimizer_ctx_.insert_mode_ == ObDirectLoadInsertMode::INC_REPLACE ? + dup_action_ = insert_mode_ == ObDirectLoadInsertMode::INC_REPLACE ? ObLoadDupActionType::LOAD_REPLACE : stmt.get_load_arguments().dupl_action_; //改写成replace语义 - direct_load_optimizer_ctx_.set_use_direct_load(); + can_use_direct_load_ = true; } if (ret == OB_NOT_SUPPORTED) { bool allow_fallback = false; @@ -120,15 +120,15 @@ int ObDirectLoadOptimizer::optimize(ObExecContext *exec_ctx, ObLoadDataStmt &stm } } } - LOG_INFO("direct load optimize result", K(ret), K(direct_load_hint), K(direct_load_optimizer_ctx_)); + LOG_INFO("init direct load ctx result", K(ret), K(direct_load_hint), K(table_id_), K(load_method_), K(insert_mode_), K(load_mode_), K(load_level_), K(dup_action_), + K(max_error_row_count_), K(need_sort_), K(can_use_direct_load_), K(use_direct_load_), K(is_optimized_by_default_load_mode_)); } return ret; } -int ObDirectLoadOptimizer::optimize( +int ObDirectLoadOptimizerCtx::init_direct_load_ctx( const ObInsertStmt &stmt, - ObOptimizerContext &optimizer_ctx, - const int64_t dml_parallel) + ObOptimizerContext &optimizer_ctx) { int ret = OB_SUCCESS; ObExecContext *exec_ctx = nullptr; @@ -136,7 +136,7 @@ int ObDirectLoadOptimizer::optimize( ret = ret = OB_ERR_UNEXPECTED; LOG_WARN("exec_ctx cannot be null", K(ret)); } else { - direct_load_optimizer_ctx_.reset(); + reset(); uint64_t table_id = stmt.get_table_item(0) != nullptr ? stmt.get_table_item(0)->ref_id_ : 0; const ObGlobalHint &global_hint = optimizer_ctx.get_global_hint(); const ObDirectLoadHint &direct_load_hint = global_hint.direct_load_hint_; @@ -157,32 +157,29 @@ int ObDirectLoadOptimizer::optimize( // do nothing } else if (direct_load_hint.has_direct()) { enable_by_direct_load_hint(direct_load_hint); - direct_load_optimizer_ctx_.load_mode_ = ObDirectLoadMode::INSERT_INTO; + load_mode_ = ObDirectLoadMode::INSERT_INTO; } else if (global_hint.has_append()) { enable_by_append_hint(); - direct_load_optimizer_ctx_.load_mode_ = ObDirectLoadMode::INSERT_INTO; + load_mode_ = ObDirectLoadMode::INSERT_INTO; } else if (!session_info->is_inner()) { if (stmt.get_query_ctx()->optimizer_features_enable_version_ >= COMPAT_VERSION_4_3_4) { enable_by_config(); - direct_load_optimizer_ctx_.load_mode_ = ObDirectLoadMode::INSERT_INTO; + load_mode_ = ObDirectLoadMode::INSERT_INTO; } } if (OB_FAIL(ret)) { - } else if (dml_parallel <= ObGlobalHint::DEFAULT_PARALLEL && !direct_load_optimizer_ctx_.is_optimized()) { - // 通过hint而不是默认配置项的方式,不会修改并行度,当并行度小于2时不满足pdml条件,无需走旁路导入检查 - // do nothing - } else if (direct_load_optimizer_ctx_.load_method_ != ObDirectLoadMethod::INVALID_METHOD) { + } else if (load_method_ != ObDirectLoadMethod::INVALID_METHOD) { if (session_info->get_ddl_info().is_mview_complete_refresh()) { - if (direct_load_optimizer_ctx_.insert_mode_ == ObDirectLoadInsertMode::INC_REPLACE) { + if (insert_mode_ == ObDirectLoadInsertMode::INC_REPLACE) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected mview complete refresh enable inc replace", K(ret)); } else { - direct_load_optimizer_ctx_.insert_mode_ = ObDirectLoadInsertMode::OVERWRITE; + insert_mode_ = ObDirectLoadInsertMode::OVERWRITE; } } if (OB_SUCC(ret)) { - direct_load_optimizer_ctx_.table_id_ = table_id; - direct_load_optimizer_ctx_.dup_action_ = direct_load_optimizer_ctx_.insert_mode_ == ObDirectLoadInsertMode::INC_REPLACE ? + table_id_ = table_id; + dup_action_ = insert_mode_ == ObDirectLoadInsertMode::INC_REPLACE ? ObLoadDupActionType::LOAD_REPLACE : ObLoadDupActionType::LOAD_STOP_ON_DUP; ObIArray & table_partition_infos = optimizer_ctx.get_table_partition_info(); for (int64_t i = 0; OB_SUCC(ret) && i < table_partition_infos.count(); ++i) { @@ -191,7 +188,7 @@ int ObDirectLoadOptimizer::optimize( ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpect null table partition info", K(ret)); } else if (info->get_ref_table_id() == table_id) { - direct_load_optimizer_ctx_.load_level_ = info->get_table_location().get_part_hint_ids().empty() + load_level_ = info->get_table_location().get_part_hint_ids().empty() ? ObDirectLoadLevel::TABLE : ObDirectLoadLevel::PARTITION; break; @@ -216,12 +213,12 @@ int ObDirectLoadOptimizer::optimize( } } } else { - direct_load_optimizer_ctx_.set_can_use_direct_load(); + can_use_direct_load_ = true; } } } if (OB_SUCC(ret)) { - if (!direct_load_optimizer_ctx_.can_use_direct_load()) { + if (!can_use_direct_load()) { if (session_info->get_ddl_info().is_mview_complete_refresh()) { ret = OB_NOT_SUPPORTED; LOG_USER_ERROR(OB_NOT_SUPPORTED, "mview complete refresh using non-direct insert is"); @@ -234,73 +231,74 @@ int ObDirectLoadOptimizer::optimize( } } } - LOG_INFO("direct load optimize result", K(ret), K(direct_load_hint), K(direct_load_optimizer_ctx_)); + LOG_INFO("init direct load ctx result", K(ret), K(direct_load_hint), K(table_id_), K(load_method_), K(insert_mode_), K(load_mode_), K(load_level_), K(dup_action_), + K(max_error_row_count_), K(need_sort_), K(can_use_direct_load_), K(use_direct_load_), K(is_optimized_by_default_load_mode_)); } return ret; } -void ObDirectLoadOptimizer::enable_by_direct_load_hint(const ObDirectLoadHint &hint) +void ObDirectLoadOptimizerCtx::enable_by_direct_load_hint(const ObDirectLoadHint &hint) { - direct_load_optimizer_ctx_.need_sort_ = hint.need_sort_; - direct_load_optimizer_ctx_.insert_mode_ = ObDirectLoadInsertMode::NORMAL; - direct_load_optimizer_ctx_.max_error_row_count_ = hint.get_max_error_row_count(); + need_sort_ = hint.need_sort_; + insert_mode_ = ObDirectLoadInsertMode::NORMAL; + max_error_row_count_ = hint.get_max_error_row_count(); if (hint.is_full_direct_load()) { - direct_load_optimizer_ctx_.load_method_ = ObDirectLoadMethod::FULL; + load_method_ = ObDirectLoadMethod::FULL; } else if (hint.is_inc_direct_load()) { - direct_load_optimizer_ctx_.load_method_ = ObDirectLoadMethod::INCREMENTAL; + load_method_ = ObDirectLoadMethod::INCREMENTAL; if (hint.is_inc_replace_load_method()) { - direct_load_optimizer_ctx_.insert_mode_ = ObDirectLoadInsertMode::INC_REPLACE; + insert_mode_ = ObDirectLoadInsertMode::INC_REPLACE; } } } -void ObDirectLoadOptimizer::enable_by_append_hint() +void ObDirectLoadOptimizerCtx::enable_by_append_hint() { - direct_load_optimizer_ctx_.need_sort_ = true; - direct_load_optimizer_ctx_.insert_mode_ = ObDirectLoadInsertMode::NORMAL; - direct_load_optimizer_ctx_.load_method_ = ObDirectLoadMethod::FULL; + need_sort_ = true; + insert_mode_ = ObDirectLoadInsertMode::NORMAL; + load_method_ = ObDirectLoadMethod::FULL; } -void ObDirectLoadOptimizer::enable_by_config() +void ObDirectLoadOptimizerCtx::enable_by_config() { omt::ObTenantConfigGuard tenant_config(TENANT_CONF(MTL_ID())); const ObString &config_str = tenant_config->default_load_mode.get_value_string(); - direct_load_optimizer_ctx_.need_sort_ = true; - direct_load_optimizer_ctx_.insert_mode_ = ObDirectLoadInsertMode::NORMAL; + need_sort_ = true; + insert_mode_ = ObDirectLoadInsertMode::NORMAL; if (tenant_config.is_valid()) { if (0 == config_str.case_compare("FULL_DIRECT_WRITE")) { - direct_load_optimizer_ctx_.load_method_ = ObDirectLoadMethod::FULL; + load_method_ = ObDirectLoadMethod::FULL; } else if (0 == config_str.case_compare("INC_DIRECT_WRITE")) { - direct_load_optimizer_ctx_.load_method_ = ObDirectLoadMethod::INCREMENTAL; + load_method_ = ObDirectLoadMethod::INCREMENTAL; } else if (0 == config_str.case_compare("INC_REPLACE_DIRECT_WRITE")) { - direct_load_optimizer_ctx_.load_method_ = ObDirectLoadMethod::INCREMENTAL; - direct_load_optimizer_ctx_.insert_mode_ = ObDirectLoadInsertMode::INC_REPLACE; + load_method_ = ObDirectLoadMethod::INCREMENTAL; + insert_mode_ = ObDirectLoadInsertMode::INC_REPLACE; } - if (direct_load_optimizer_ctx_.load_method_ != ObDirectLoadMethod::INVALID_METHOD) { - direct_load_optimizer_ctx_.is_optimized_ = true; + if (load_method_ != ObDirectLoadMethod::INVALID_METHOD) { + is_optimized_by_default_load_mode_ = true; } } } -void ObDirectLoadOptimizer::enable_by_overwrite() +void ObDirectLoadOptimizerCtx::enable_by_overwrite() { - direct_load_optimizer_ctx_.need_sort_ = true; - direct_load_optimizer_ctx_.load_method_ = ObDirectLoadMethod::FULL; - direct_load_optimizer_ctx_.insert_mode_ = ObDirectLoadInsertMode::OVERWRITE; - direct_load_optimizer_ctx_.load_mode_ = ObDirectLoadMode::INSERT_OVERWRITE; - direct_load_optimizer_ctx_.dup_action_ = ObLoadDupActionType::LOAD_STOP_ON_DUP; + need_sort_ = true; + load_method_ = ObDirectLoadMethod::FULL; + insert_mode_ = ObDirectLoadInsertMode::OVERWRITE; + load_mode_ = ObDirectLoadMode::INSERT_OVERWRITE; + dup_action_ = ObLoadDupActionType::LOAD_STOP_ON_DUP; } -int ObDirectLoadOptimizer::check_semantics() +int ObDirectLoadOptimizerCtx::check_semantics() { int ret = OB_SUCCESS; - if (direct_load_optimizer_ctx_.is_inc_direct_load()) { - if (direct_load_optimizer_ctx_.dup_action_ == ObLoadDupActionType::LOAD_REPLACE) { + if (is_inc_direct_load()) { + if (dup_action_ == ObLoadDupActionType::LOAD_REPLACE) { ret = OB_NOT_SUPPORTED; LOG_USER_ERROR(OB_NOT_SUPPORTED, "replace for inc load method in direct load is"); } - } else if (direct_load_optimizer_ctx_.is_inc_replace_direct_load()) { - if (direct_load_optimizer_ctx_.dup_action_ != ObLoadDupActionType::LOAD_STOP_ON_DUP) { + } else if (is_inc_replace_direct_load()) { + if (dup_action_ != ObLoadDupActionType::LOAD_STOP_ON_DUP) { ret = OB_NOT_SUPPORTED; LOG_USER_ERROR(OB_NOT_SUPPORTED, "replace or ignore for inc_replace load method in direct load is"); } @@ -312,7 +310,7 @@ int ObDirectLoadOptimizer::check_semantics() // 1. not allow add no_direct hint // 2. not allow add direct load hint // 3. not allow add append hint -int ObDirectLoadOptimizer::check_support_insert_overwrite(const ObGlobalHint &global_hint) +int ObDirectLoadOptimizerCtx::check_support_insert_overwrite(const ObGlobalHint &global_hint) { int ret = OB_SUCCESS; const ObDirectLoadHint &direct_load_hint = global_hint.direct_load_hint_; @@ -329,7 +327,7 @@ int ObDirectLoadOptimizer::check_support_insert_overwrite(const ObGlobalHint &gl return ret; } -int ObDirectLoadOptimizer::check_support_direct_load(ObExecContext *exec_ctx) +int ObDirectLoadOptimizerCtx::check_support_direct_load(ObExecContext *exec_ctx) { int ret = OB_SUCCESS; ObSqlCtx *sql_ctx = nullptr; @@ -344,7 +342,7 @@ int ObDirectLoadOptimizer::check_support_direct_load(ObExecContext *exec_ctx) LOG_WARN("fail to get schema guard", K(ret), KP(sql_ctx)); } else { // insert overwrite和insert into select全量不支持autocommit=false和session在事务内 - if (direct_load_optimizer_ctx_.is_insert_overwrite() || (direct_load_optimizer_ctx_.is_insert_into() && direct_load_optimizer_ctx_.is_full_direct_load())) { + if (is_insert_overwrite() || (is_insert_into() && is_full_direct_load())) { bool auto_commit = false; ObSQLSessionInfo *session_info = nullptr; if (OB_ISNULL(session_info = exec_ctx->get_my_session())) { @@ -353,7 +351,7 @@ int ObDirectLoadOptimizer::check_support_direct_load(ObExecContext *exec_ctx) } else if (OB_FAIL(session_info->get_autocommit(auto_commit))) { LOG_WARN("failed to get auto commit", K(ret)); } else if (!auto_commit || session_info->is_in_transaction()) { - if (direct_load_optimizer_ctx_.is_insert_overwrite()) { + if (is_insert_overwrite()) { ret = OB_NOT_SUPPORTED; LOG_USER_ERROR(OB_NOT_SUPPORTED, "using insert overwrite within a transaction is"); LOG_WARN("insert overwrite within a transaction is not support", KR(ret), K(auto_commit), K(session_info->is_in_transaction())); @@ -366,25 +364,25 @@ int ObDirectLoadOptimizer::check_support_direct_load(ObExecContext *exec_ctx) } if (OB_FAIL(ret)) { } else if (OB_FAIL(ObTableLoadSchema::get_table_schema( - *schema_guard, MTL_ID(), direct_load_optimizer_ctx_.table_id_, table_schema))) { + *schema_guard, MTL_ID(), table_id_, table_schema))) { LOG_WARN("fail to get table schema", KR(ret)); } else if (OB_FAIL(ObTableLoadSchema::get_column_ids(table_schema, column_ids))) { LOG_WARN("fail to get column ids", KR(ret)); } else if (OB_FAIL(ObTableLoadService::check_support_direct_load( *schema_guard, - direct_load_optimizer_ctx_.table_id_, - direct_load_optimizer_ctx_.load_method_, - direct_load_optimizer_ctx_.insert_mode_, - direct_load_optimizer_ctx_.load_mode_, - direct_load_optimizer_ctx_.load_level_, + table_id_, + load_method_, + insert_mode_, + load_mode_, + load_level_, column_ids))) { - LOG_WARN("fail to check support direct load", K(ret), K(direct_load_optimizer_ctx_)); + LOG_WARN("fail to check support direct load", K(ret)); } } return ret; } -int ObDirectLoadOptimizer::check_direct_load_allow_fallback( +int ObDirectLoadOptimizerCtx::check_direct_load_allow_fallback( ObExecContext *exec_ctx, bool &allow_fallback) { @@ -397,7 +395,7 @@ int ObDirectLoadOptimizer::check_direct_load_allow_fallback( LOG_WARN("unexpected session info is null", K(ret)); } else if (session_info->get_ddl_info().is_mview_complete_refresh()) { allow_fallback = false; - } else if (direct_load_optimizer_ctx_.is_insert_overwrite()) { + } else if (is_insert_overwrite()) { allow_fallback = false; } else if (tenant_config.is_valid()) { allow_fallback = tenant_config->direct_load_allow_fallback; diff --git a/src/sql/optimizer/ob_direct_load_optimizer.h b/src/sql/optimizer/ob_direct_load_optimizer_ctx.h similarity index 79% rename from src/sql/optimizer/ob_direct_load_optimizer.h rename to src/sql/optimizer/ob_direct_load_optimizer_ctx.h index 5019e57b2c..737578e11b 100644 --- a/src/sql/optimizer/ob_direct_load_optimizer.h +++ b/src/sql/optimizer/ob_direct_load_optimizer_ctx.h @@ -36,19 +36,29 @@ class ObDirectLoadOptimizerCtx public: ObDirectLoadOptimizerCtx(); ~ObDirectLoadOptimizerCtx() = default; + void reset(); + int init_direct_load_ctx(ObExecContext *exec_ctx, ObLoadDataStmt &stmt); + int init_direct_load_ctx(const ObInsertStmt &stmt, ObOptimizerContext &optimizer_ctx); bool can_use_direct_load() const { return can_use_direct_load_; } - void set_can_use_direct_load() { can_use_direct_load_ = true; } bool use_direct_load() const { return use_direct_load_; } void set_use_direct_load() { use_direct_load_ = true; } - bool is_optimized() { return is_optimized_; } + bool is_optimized_by_default_load_mode() { return is_optimized_by_default_load_mode_; } bool is_full_direct_load() const { return load_method_ == ObDirectLoadMethod::FULL; } bool is_inc_direct_load() const { return load_method_ == ObDirectLoadMethod::INCREMENTAL && insert_mode_ == ObDirectLoadInsertMode::NORMAL; } bool is_inc_replace_direct_load() const { return load_method_ == ObDirectLoadMethod::INCREMENTAL && insert_mode_ == ObDirectLoadInsertMode::INC_REPLACE; } bool is_insert_overwrite() const { return ObDirectLoadMode::is_insert_overwrite(load_mode_); } bool is_insert_into() const { return load_mode_ == ObDirectLoadMode::INSERT_INTO; } - void reset(); TO_STRING_KV(K_(table_id), K_(load_method), K_(insert_mode), K_(load_mode), K_(load_level), K_(dup_action), - K_(max_error_row_count), K_(need_sort), K_(can_use_direct_load), K_(use_direct_load), K_(is_optimized)); + K_(max_error_row_count), K_(need_sort), K_(can_use_direct_load), K_(use_direct_load), K_(is_optimized_by_default_load_mode)); +private: + void enable_by_direct_load_hint(const ObDirectLoadHint &hint); + void enable_by_append_hint(); + void enable_by_config(); + void enable_by_overwrite(); + int check_semantics(); + int check_support_insert_overwrite(const ObGlobalHint &global_hint); + int check_support_direct_load(ObExecContext *exec_ctx); + int check_direct_load_allow_fallback(ObExecContext *exec_ctx, bool &allow_fallback); public: uint64_t table_id_; storage::ObDirectLoadMethod::Type load_method_; @@ -60,30 +70,7 @@ public: bool need_sort_; bool can_use_direct_load_; bool use_direct_load_; - bool is_optimized_; // optimized by default load mode -}; - -class ObDirectLoadOptimizer -{ -public: - ObDirectLoadOptimizer(ObDirectLoadOptimizerCtx &direct_load_optimizer_ctx) - : direct_load_optimizer_ctx_(direct_load_optimizer_ctx) - {} - ~ObDirectLoadOptimizer() = default; - int optimize(ObExecContext *exec_ctx, ObLoadDataStmt &stmt); - int optimize(const ObInsertStmt &stmt, ObOptimizerContext &optimizer_ctx, int64_t dml_parallel); -private: - void enable_by_direct_load_hint(const ObDirectLoadHint &hint); - void enable_by_append_hint(); - void enable_by_config(); - void enable_by_overwrite(); - int check_semantics(); - int check_support_insert_overwrite(const ObGlobalHint &global_hint); - int check_support_direct_load(ObExecContext *exec_ctx); - int check_direct_load_allow_fallback(ObExecContext *exec_ctx, bool &allow_fallback); -private: - ObDirectLoadOptimizerCtx &direct_load_optimizer_ctx_; - DISALLOW_COPY_AND_ASSIGN(ObDirectLoadOptimizer); + bool is_optimized_by_default_load_mode_; // optimized by default load mode }; } // namespace sql diff --git a/src/sql/optimizer/ob_insert_log_plan.cpp b/src/sql/optimizer/ob_insert_log_plan.cpp index 41d8042788..4507ce0e00 100644 --- a/src/sql/optimizer/ob_insert_log_plan.cpp +++ b/src/sql/optimizer/ob_insert_log_plan.cpp @@ -25,7 +25,7 @@ #include "sql/optimizer/ob_log_subplan_filter.h" #include "sql/optimizer/ob_log_insert_all.h" #include "sql/optimizer/ob_log_link_dml.h" -#include "sql/optimizer/ob_direct_load_optimizer.h" +#include "sql/optimizer/ob_direct_load_optimizer_ctx.h" #include "sql/ob_optimizer_trace_impl.h" #include "common/ob_smart_call.h" #include "sql/resolver/dml/ob_del_upd_resolver.h" diff --git a/src/sql/optimizer/ob_log_plan.cpp b/src/sql/optimizer/ob_log_plan.cpp index c0e8b25176..a8e447b6ee 100644 --- a/src/sql/optimizer/ob_log_plan.cpp +++ b/src/sql/optimizer/ob_log_plan.cpp @@ -55,7 +55,7 @@ #include "sql/optimizer/ob_log_temp_table_access.h" #include "sql/optimizer/ob_log_temp_table_transformation.h" #include "sql/optimizer/ob_px_resource_analyzer.h" -#include "sql/optimizer/ob_direct_load_optimizer.h" +#include "sql/optimizer/ob_direct_load_optimizer_ctx.h" #include "common/ob_smart_call.h" #include "observer/omt/ob_tenant_config_mgr.h" #include "sql/optimizer/ob_log_err_log.h" diff --git a/src/sql/optimizer/ob_optimizer.cpp b/src/sql/optimizer/ob_optimizer.cpp index bf090ede17..aa983dbd7b 100644 --- a/src/sql/optimizer/ob_optimizer.cpp +++ b/src/sql/optimizer/ob_optimizer.cpp @@ -26,6 +26,7 @@ #include "sql/dblink/ob_dblink_utils.h" #include "sql/resolver/dml/ob_merge_stmt.h" #include "sql/optimizer/ob_log_temp_table_insert.h" +#include "sql/optimizer/ob_direct_load_optimizer_ctx.h" #include "share/stat/ob_opt_system_stat.h" #include "sql/optimizer/ob_opt_cost_model_parameter.h" #include "src/share/stat/ob_opt_stat_manager.h" @@ -511,6 +512,21 @@ int ObOptimizer::check_pdml_enabled(const ObDMLStmt &stmt, return ret; } +int ObOptimizer::check_direct_load_enabled(const ObDMLStmt &stmt, const ObSQLSessionInfo &session) +{ + int ret = OB_SUCCESS; + if (stmt::T_INSERT == stmt.get_stmt_type()) { + const ObInsertStmt &insert_stmt = static_cast(stmt); + if (insert_stmt.value_from_select() && !insert_stmt.is_external_table_overwrite()) { + ObDirectLoadOptimizerCtx &direct_load_optimize_ctx = ctx_.get_direct_load_optimizer_ctx(); + if (OB_FAIL(direct_load_optimize_ctx.init_direct_load_ctx(insert_stmt, ctx_))) { + LOG_WARN("fail to init direct load ctx", K(ret)); + } + } + } + return ret; +} + // check pdml enable sql case int ObOptimizer::check_pdml_supported_feature(const ObDelUpdStmt &pdml_stmt, const ObSQLSessionInfo &session, bool &is_use_pdml) @@ -631,6 +647,8 @@ int ObOptimizer::init_env_info(ObDMLStmt &stmt) LOG_WARN("fail to extract opt ctx basic flags", K(ret)); } else if (OB_FAIL(check_pdml_enabled(stmt, *session_info))) { LOG_WARN("fail to check enable pdml", K(ret)); + } else if (OB_FAIL(check_direct_load_enabled(stmt, *session_info))) { + LOG_WARN("fail to check enable direct load", K(ret)); } else if (OB_FAIL(check_parallel_das_dml_enabled(stmt, *session_info))) { LOG_WARN("fail to check enable parallel das dml", K(ret)); } else if (OB_FAIL(check_dml_parallel_mode())) { diff --git a/src/sql/optimizer/ob_optimizer.h b/src/sql/optimizer/ob_optimizer.h index 70c40bd533..a42d5202b5 100644 --- a/src/sql/optimizer/ob_optimizer.h +++ b/src/sql/optimizer/ob_optimizer.h @@ -208,6 +208,8 @@ namespace sql int set_auto_dop_params(const ObSQLSessionInfo &session); int check_pdml_enabled(const ObDMLStmt &stmt, const ObSQLSessionInfo &session); + int check_direct_load_enabled(const ObDMLStmt &stmt, + const ObSQLSessionInfo &session); int check_pdml_supported_feature(const ObDelUpdStmt &pdml_stmt, const ObSQLSessionInfo &session, bool &is_use_pdml); diff --git a/src/sql/optimizer/ob_optimizer_context.h b/src/sql/optimizer/ob_optimizer_context.h index b5470bc805..67d63fafb8 100644 --- a/src/sql/optimizer/ob_optimizer_context.h +++ b/src/sql/optimizer/ob_optimizer_context.h @@ -27,7 +27,7 @@ #include "sql/engine/aggregate/ob_adaptive_bypass_ctrl.h" #include "sql/optimizer/ob_dynamic_sampling.h" #include "share/config/ob_config_helper.h" -#include "sql/optimizer/ob_direct_load_optimizer.h" +#include "sql/optimizer/ob_direct_load_optimizer_ctx.h" namespace oceanbase diff --git a/src/sql/plan_cache/ob_plan_cache_util.cpp b/src/sql/plan_cache/ob_plan_cache_util.cpp index 5b898aca36..33d11aaa35 100644 --- a/src/sql/plan_cache/ob_plan_cache_util.cpp +++ b/src/sql/plan_cache/ob_plan_cache_util.cpp @@ -22,7 +22,7 @@ #include "sql/ob_phy_table_location.h" #include "sql/optimizer/ob_phy_table_location_info.h" #include "sql/optimizer/ob_log_plan.h" -#include "sql/optimizer/ob_direct_load_optimizer.h" +#include "sql/optimizer/ob_direct_load_optimizer_ctx.h" using namespace oceanbase::share; using namespace oceanbase::share::schema; using namespace oceanbase::omt;