diff --git a/src/pl/sys_package/ob_dbms_mview_mysql.cpp b/src/pl/sys_package/ob_dbms_mview_mysql.cpp index 925a83e4ac..82dbe7ce37 100644 --- a/src/pl/sys_package/ob_dbms_mview_mysql.cpp +++ b/src/pl/sys_package/ob_dbms_mview_mysql.cpp @@ -50,7 +50,7 @@ int ObDBMSMViewMysql::purge_log(ObExecContext &ctx, ParamStore ¶ms, ObObj &r PROCEDURE refresh( IN mv_name VARCHAR(65535), IN method VARCHAR(65535) DEFAULT NULL, - IN parallelism INT DEFAULT 1); + IN refresh_parallel INT DEFAULT 1); */ int ObDBMSMViewMysql::refresh(ObExecContext &ctx, ParamStore ¶ms, ObObj &result) { @@ -59,13 +59,13 @@ int ObDBMSMViewMysql::refresh(ObExecContext &ctx, ParamStore ¶ms, ObObj &res CK(OB_LIKELY(3 == params.count())); CK(OB_LIKELY(params.at(0).is_varchar()) /*mv_name*/, OB_LIKELY(params.at(1).is_null() || params.at(1).is_varchar()) /*method*/, - OB_LIKELY(params.at(2).is_int32()) /*parallelism*/); + OB_LIKELY(params.at(2).is_int32()) /*refresh_parallel*/); if (OB_SUCC(ret)) { ObMViewRefreshArg refresh_params; ObMViewRefreshExecutor refresh_executor; refresh_params.list_ = params.at(0).get_varchar(); params.at(1).is_varchar() ? refresh_params.method_ = params.at(1).get_varchar() : NULL; - refresh_params.parallelism_ = params.at(2).get_int(); + refresh_params.refresh_parallel_ = params.at(2).get_int(); if (OB_FAIL(refresh_executor.execute(ctx, refresh_params))) { LOG_WARN("fail to execute mview refresh", KR(ret), K(refresh_params)); } diff --git a/src/rootserver/ob_ddl_service.cpp b/src/rootserver/ob_ddl_service.cpp index 2042ecb56a..2a05d6ea77 100755 --- a/src/rootserver/ob_ddl_service.cpp +++ b/src/rootserver/ob_ddl_service.cpp @@ -2678,6 +2678,7 @@ int ObDDLService::start_mview_complete_refresh_task( int ret = OB_SUCCESS; int64_t max_dependency_version = 0; uint64_t tenant_id = mview_schema.get_tenant_id(); + const ObMVRefreshInfo *mv_refresh_info = mview_schema.get_view_schema().get_mv_refresh_info(); ObFixedLengthString time_zone; const ObSysVarSchema *data_format_schema = nullptr; const ObSysVarSchema *nls_timestamp_format = nullptr; @@ -2687,11 +2688,10 @@ int ObDDLService::start_mview_complete_refresh_task( arg.table_id_ = mview_schema.get_table_id(); arg.consumer_group_id_ = THIS_WORKER.get_group_id(); arg.session_id_ = 100;// FIXME - arg.parallelism_ = container_table_schema.get_dop(); arg.exec_tenant_id_ = tenant_id; - if (dep_infos == nullptr) { + if (OB_UNLIKELY(nullptr == dep_infos || nullptr == mv_refresh_info)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("dep_infos is nullptr", KR(ret) , KP(dep_infos)); + LOG_WARN("dep_infos is nullptr", KR(ret) , KP(dep_infos), KP(mv_refresh_info)); } else if (OB_FAIL(share::ObBackupUtils::get_tenant_sys_time_zone_wrap(tenant_id, time_zone, arg.tz_info_wrap_))) { @@ -2712,6 +2712,7 @@ int ObDDLService::start_mview_complete_refresh_task( ret = OB_ERR_UNEXPECTED; LOG_WARN("var schema must not be null", K(ret)); } else { + arg.parallelism_ = mv_refresh_info->parallel_; arg.tz_info_ = arg.tz_info_wrap_.get_tz_info_offset(); arg.nls_formats_[ObNLSFormatEnum::NLS_DATE] = data_format_schema->get_value(); arg.nls_formats_[ObNLSFormatEnum::NLS_TIMESTAMP] = nls_timestamp_format->get_value(); diff --git a/src/share/inner_table/sys_package/dbms_mview.sql b/src/share/inner_table/sys_package/dbms_mview.sql index 03a9c8cffa..f07e1555b9 100644 --- a/src/share/inner_table/sys_package/dbms_mview.sql +++ b/src/share/inner_table/sys_package/dbms_mview.sql @@ -157,6 +157,11 @@ CREATE OR REPLACE PACKAGE dbms_mview AUTHID CURRENT_USER IS -- SKIP_EXT_DATA -- Provides you an option to skip the MV data refresh corresponding to -- the external partitions. + -- REFRESH_PARALLEL + -- Max degree of parallelism for executing refresh. Now only works on + -- complete refresh. + -- n <= 1 specifies serial executing. + -- n > 1 specifies parallel executing with n parallel processes. -- -- EXCEPTIONS -- @@ -173,7 +178,8 @@ CREATE OR REPLACE PACKAGE dbms_mview AUTHID CURRENT_USER IS atomic_refresh IN BOOLEAN := true, nested IN BOOLEAN := false, out_of_place IN BOOLEAN := false, - skip_ext_data IN BOOLEAN := false); + skip_ext_data IN BOOLEAN := false, + refresh_parallel IN BINARY_INTEGER := 1); PROCEDURE refresh( tab IN DBMS_UTILITY.UNCL_ARRAY, @@ -187,7 +193,8 @@ CREATE OR REPLACE PACKAGE dbms_mview AUTHID CURRENT_USER IS atomic_refresh IN BOOLEAN := true, nested IN BOOLEAN := false, out_of_place IN BOOLEAN := false, - skip_ext_data IN BOOLEAN := false); + skip_ext_data IN BOOLEAN := false, + refresh_parallel IN BINARY_INTEGER := 1); END dbms_mview; // diff --git a/src/share/inner_table/sys_package/dbms_mview_body.sql b/src/share/inner_table/sys_package/dbms_mview_body.sql index 760d186c2a..1b93f95e89 100644 --- a/src/share/inner_table/sys_package/dbms_mview_body.sql +++ b/src/share/inner_table/sys_package/dbms_mview_body.sql @@ -58,7 +58,8 @@ CREATE OR REPLACE PACKAGE BODY dbms_mview IS atomic_refresh IN BOOLEAN := true, nested IN BOOLEAN := false, out_of_place IN BOOLEAN := false, - skip_ext_data IN BOOLEAN := false); + skip_ext_data IN BOOLEAN := false, + refresh_parallel IN BINARY_INTEGER := 1); PRAGMA INTERFACE(C, DBMS_MVIEW_REFRESH); PROCEDURE refresh( @@ -73,7 +74,8 @@ CREATE OR REPLACE PACKAGE BODY dbms_mview IS atomic_refresh IN BOOLEAN := true, nested IN BOOLEAN := false, out_of_place IN BOOLEAN := false, - skip_ext_data IN BOOLEAN := false) + skip_ext_data IN BOOLEAN := false, + refresh_parallel IN BINARY_INTEGER := 1) IS BEGIN COMMIT; @@ -88,7 +90,8 @@ CREATE OR REPLACE PACKAGE BODY dbms_mview IS atomic_refresh, nested, out_of_place, - skip_ext_data); + skip_ext_data, + refresh_parallel); END; PROCEDURE refresh( @@ -103,7 +106,8 @@ CREATE OR REPLACE PACKAGE BODY dbms_mview IS atomic_refresh IN BOOLEAN := true, nested IN BOOLEAN := false, out_of_place IN BOOLEAN := false, - skip_ext_data IN BOOLEAN := false) + skip_ext_data IN BOOLEAN := false, + refresh_parallel IN BINARY_INTEGER := 1) IS list VARCHAR2(4000); BEGIN @@ -120,7 +124,8 @@ CREATE OR REPLACE PACKAGE BODY dbms_mview IS atomic_refresh, nested, out_of_place, - skip_ext_data); + skip_ext_data, + refresh_parallel); END; END dbms_mview; diff --git a/src/share/inner_table/sys_package/dbms_mview_body_mysql.sql b/src/share/inner_table/sys_package/dbms_mview_body_mysql.sql index c4b2c40529..b3babf99ac 100644 --- a/src/share/inner_table/sys_package/dbms_mview_body_mysql.sql +++ b/src/share/inner_table/sys_package/dbms_mview_body_mysql.sql @@ -20,16 +20,16 @@ CREATE OR REPLACE PACKAGE BODY dbms_mview PROCEDURE do_refresh( IN mv_name VARCHAR(65535), IN method VARCHAR(65535) DEFAULT NULL, - IN parallelism INT DEFAULT 1); + IN refresh_parallel INT DEFAULT 1); PRAGMA INTERFACE(C, DBMS_MVIEW_MYSQL_REFRESH); PROCEDURE refresh( IN mv_name VARCHAR(65535), IN method VARCHAR(65535) DEFAULT NULL, - IN parallelism INT DEFAULT 1) + IN refresh_parallel INT DEFAULT 1) BEGIN COMMIT; - CALL do_refresh(mv_name, method, parallelism); + CALL do_refresh(mv_name, method, refresh_parallel); END; END dbms_mview; diff --git a/src/share/inner_table/sys_package/dbms_mview_mysql.sql b/src/share/inner_table/sys_package/dbms_mview_mysql.sql index efbebc2dba..79d7a4740c 100644 --- a/src/share/inner_table/sys_package/dbms_mview_mysql.sql +++ b/src/share/inner_table/sys_package/dbms_mview_mysql.sql @@ -42,8 +42,9 @@ CREATE OR REPLACE PACKAGE dbms_mview AUTHID CURRENT_USER -- (that is, if more materialized views are specified than refresh -- methods), then that materialized view is refreshed according to its -- default refresh method. - -- PARALLELISM - -- Max degree of parallelism for executing refresh. + -- REFRESH_PARALLEL + -- Max degree of parallelism for executing refresh. Now only works on + -- complete refresh. -- n <= 1 specifies serial executing. -- n > 1 specifies parallel executing with n parallel processes. -- @@ -53,6 +54,6 @@ CREATE OR REPLACE PACKAGE dbms_mview AUTHID CURRENT_USER PROCEDURE refresh( IN mv_name VARCHAR(65535), IN method VARCHAR(65535) DEFAULT NULL, - IN parallelism INT DEFAULT 1); + IN refresh_parallel INT DEFAULT 1); END dbms_mview; diff --git a/src/share/schema/ob_schema_struct.cpp b/src/share/schema/ob_schema_struct.cpp index c8c0860e1a..2bb664b321 100644 --- a/src/share/schema/ob_schema_struct.cpp +++ b/src/share/schema/ob_schema_struct.cpp @@ -8026,7 +8026,8 @@ OB_SERIALIZE_MEMBER(ObMVRefreshInfo, refresh_mode_, start_time_, next_time_expr_, - exec_env_); + exec_env_, + parallel_); /*------------------------------------------------------------------------------------------------- * ------------------------------ObViewSchema------------------------------------------- diff --git a/src/share/schema/ob_schema_struct.h b/src/share/schema/ob_schema_struct.h index 90566c7222..17d2f82eea 100755 --- a/src/share/schema/ob_schema_struct.h +++ b/src/share/schema/ob_schema_struct.h @@ -3324,13 +3324,15 @@ public: common::ObObj start_time_; ObString next_time_expr_; ObString exec_env_; + int64_t parallel_; ObMVRefreshInfo() : refresh_method_(ObMVRefreshMethod::NEVER), refresh_mode_(ObMVRefreshMode::DEMAND), start_time_(), next_time_expr_(), - exec_env_() {} + exec_env_(), + parallel_(OB_INVALID_COUNT) {} void reset() { refresh_method_ = ObMVRefreshMethod::NEVER; @@ -3338,6 +3340,7 @@ public: start_time_.reset(); next_time_expr_.reset(); exec_env_.reset(); + parallel_ = OB_INVALID_COUNT; } bool operator == (const ObMVRefreshInfo &other) const { @@ -3345,7 +3348,8 @@ public: && refresh_mode_ == other.refresh_mode_ && start_time_ == other.start_time_ && next_time_expr_ == other.next_time_expr_ - && exec_env_ == other.exec_env_; + && exec_env_ == other.exec_env_ + && parallel_ == other.parallel_; } @@ -3353,7 +3357,8 @@ public: K_(refresh_method), K_(start_time), K_(next_time_expr), - K_(exec_env)); + K_(exec_env), + K_(parallel)); }; class ObViewSchema : public ObSchema diff --git a/src/sql/parser/sql_parser_mysql_mode.y b/src/sql/parser/sql_parser_mysql_mode.y index c3f63a5940..ec1557a8b7 100644 --- a/src/sql/parser/sql_parser_mysql_mode.y +++ b/src/sql/parser/sql_parser_mysql_mode.y @@ -7870,7 +7870,7 @@ create_with_opt_hint opt_replace opt_algorithm opt_definer opt_sql_security VIEW UNUSED($3); UNUSED($4); UNUSED($5); - malloc_non_terminal_node($$, result->malloc_pool_, T_CREATE_VIEW, 11, + malloc_non_terminal_node($$, result->malloc_pool_, T_CREATE_VIEW, 12, NULL, /* opt_materialized, not support*/ $7, /* view name */ $8, /* column list */ @@ -7879,7 +7879,7 @@ create_with_opt_hint opt_replace opt_algorithm opt_definer opt_sql_security VIEW $2, $12, /* with option */ NULL, /* force view opt */ - NULL, NULL, NULL + NULL, NULL, NULL, NULL ); dup_expr_string($11, result, @11.first_column, @11.last_column); $$->reserved_ = 0; /* is create view */ @@ -7914,11 +7914,10 @@ create_with_opt_hint opt_replace opt_algorithm opt_definer opt_sql_security VIEW create_mview_stmt: create_with_opt_hint MATERIALIZED VIEW view_name opt_column_list opt_table_option_list opt_partition_option create_mview_refresh AS view_select_stmt opt_check_option { - (void)($1); ParseNode *table_options = NULL; merge_nodes(table_options, result, T_TABLE_OPTION_LIST, $6); - malloc_non_terminal_node($$, result->malloc_pool_, T_CREATE_VIEW, 11, + malloc_non_terminal_node($$, result->malloc_pool_, T_CREATE_VIEW, 12, NULL, /* opt_materialized, not support*/ $4, /* view name */ $5, /* column list */ @@ -7929,7 +7928,8 @@ create_with_opt_hint MATERIALIZED VIEW view_name opt_column_list opt_table_optio NULL, /* force view opt */ $8, $7, /* partition option */ - table_options /* table options */ + table_options, /* table options */ + $1 /* hint */ ); dup_expr_string($10, result, @10.first_column, @10.last_column); $$->reserved_ = 2; /* create materialized view */ diff --git a/src/sql/resolver/ddl/ob_create_view_resolver.cpp b/src/sql/resolver/ddl/ob_create_view_resolver.cpp index 39820e0e3a..418860c9e2 100644 --- a/src/sql/resolver/ddl/ob_create_view_resolver.cpp +++ b/src/sql/resolver/ddl/ob_create_view_resolver.cpp @@ -340,6 +340,13 @@ int ObCreateViewResolver::resolve(const ParseNode &parse_tree) container_table_schema.set_mv_container_table(IS_MV_CONTAINER_TABLE); } } + if (OB_SUCC(ret)) { + if (OB_FAIL(resolve_hints(parse_tree.children_[HINT_NODE], *stmt, container_table_schema))) { + LOG_WARN("resolve hints failed", K(ret)); + } else { + mv_ainfo.mv_refresh_info_.parallel_ = stmt->get_parallelism(); + } + } if (OB_SUCC(ret)) { ObViewSchema &view_schema = table_schema.get_view_schema(); if (OB_FAIL(resolve_mv_refresh_info(parse_tree.children_[MVIEW_NODE], mv_ainfo.mv_refresh_info_))) { diff --git a/src/sql/resolver/ddl/ob_create_view_resolver.h b/src/sql/resolver/ddl/ob_create_view_resolver.h index 3af2f5e56b..94fc73a05a 100644 --- a/src/sql/resolver/ddl/ob_create_view_resolver.h +++ b/src/sql/resolver/ddl/ob_create_view_resolver.h @@ -44,7 +44,8 @@ class ObCreateViewResolver : public ObCreateTableResolverBase static const int64_t MVIEW_NODE = 8; static const int64_t PARTITION_NODE = 9; static const int64_t TABLE_OPTION_NODE = 10; - static const int64_t ROOT_NUM_CHILD = 11; + static const int64_t HINT_NODE = 11; + static const int64_t ROOT_NUM_CHILD = 12; public: explicit ObCreateViewResolver(ObResolverParams ¶ms); diff --git a/src/storage/mview/cmd/ob_mview_refresh_executor.cpp b/src/storage/mview/cmd/ob_mview_refresh_executor.cpp index 314783fd34..6edb0d8e44 100644 --- a/src/storage/mview/cmd/ob_mview_refresh_executor.cpp +++ b/src/storage/mview/cmd/ob_mview_refresh_executor.cpp @@ -166,7 +166,7 @@ int ObMViewRefreshExecutor::do_refresh() refresh_param.tenant_id_ = tenant_id_; refresh_param.mview_id_ = mview_id; refresh_param.refresh_method_ = refresh_method; - refresh_param.parallelism_ = arg_->parallelism_; + refresh_param.parallelism_ = arg_->refresh_parallel_; while (OB_SUCC(ret) && OB_SUCC(ctx_->check_status())) { ObMViewTransaction trans; ObMViewRefresher refresher; diff --git a/src/storage/mview/cmd/ob_mview_refresh_executor.h b/src/storage/mview/cmd/ob_mview_refresh_executor.h index 055d3eb0f9..fd35ea41fd 100644 --- a/src/storage/mview/cmd/ob_mview_refresh_executor.h +++ b/src/storage/mview/cmd/ob_mview_refresh_executor.h @@ -37,7 +37,8 @@ public: atomic_refresh_(false), nested_(false), out_of_place_(false), - skip_ext_data_(false) + skip_ext_data_(false), + refresh_parallel_(0) { } bool is_valid() const { return !list_.empty(); } @@ -56,6 +57,7 @@ public: bool nested_; bool out_of_place_; bool skip_ext_data_; + int64_t refresh_parallel_; }; class ObMViewRefreshExecutor diff --git a/src/storage/mview/ob_mview_sched_job_utils.cpp b/src/storage/mview/ob_mview_sched_job_utils.cpp index edac687d9b..73beab52db 100644 --- a/src/storage/mview/ob_mview_sched_job_utils.cpp +++ b/src/storage/mview/ob_mview_sched_job_utils.cpp @@ -196,22 +196,19 @@ int ObMViewSchedJobUtils::add_mview_info_and_refresh_job(ObISQLClient &sql_clien refresh_job))) { LOG_WARN("failed to generate mview job name", KR(ret), K(tenant_id), K(job_id), K(job_prefix)); } else { - const ObString mview_refresh_func("DBMS_MVIEW.refresh"); - ObString job_action; - // job_action is generated as "mlog_purge_func('db_name.table_name')" - if (OB_FAIL(ObMViewSchedJobUtils::generate_job_action( - allocator, - mview_refresh_func, - db_name, - table_name, - job_action))) { - LOG_WARN("failed to generate mview job action", KR(ret)); + ObSqlString job_action; + if (OB_FAIL(job_action.assign_fmt("DBMS_MVIEW.refresh('%.*s.%.*s', refresh_parallel => %ld)", + static_cast(db_name.length()), db_name.ptr(), + static_cast(table_name.length()), table_name.ptr(), + refresh_info->parallel_))) { + LOG_WARN("fail to assign job action", KR(ret), K(db_name), K(table_name), + K(refresh_info->parallel_)); } else if (OB_FAIL(ObMViewSchedJobUtils::add_scheduler_job( sql_client, tenant_id, job_id, refresh_job, - job_action, + job_action.string(), refresh_info->start_time_, refresh_info->next_time_expr_, refresh_info->exec_env_))) {