diff --git a/src/pl/sys_package/ob_dbms_mview_mysql.cpp b/src/pl/sys_package/ob_dbms_mview_mysql.cpp index 82dbe7ce37..045cf37827 100644 --- a/src/pl/sys_package/ob_dbms_mview_mysql.cpp +++ b/src/pl/sys_package/ob_dbms_mview_mysql.cpp @@ -26,19 +26,22 @@ using namespace storage; /* PROCEDURE purge_log( - IN master_name VARCHAR(65535)); + IN master_name VARCHAR(65535), + IN purge_log_parallel INT DEFAULT 1); */ int ObDBMSMViewMysql::purge_log(ObExecContext &ctx, ParamStore ¶ms, ObObj &result) { UNUSED(result); int ret = OB_SUCCESS; - CK(OB_LIKELY(1 == params.count())); + CK(OB_LIKELY(2 == params.count())); CK(OB_LIKELY(params.at(0).is_varchar()) /*master_name*/); + CK(OB_LIKELY(params.at(1).is_int32()) /*purge_log_parallel*/); if (OB_SUCC(ret)) { ObMViewPurgeLogArg purge_params; ObMViewPurgeLogExecutor purge_executor; // fill params purge_params.master_ = params.at(0).get_varchar(); + purge_params.purge_log_parallel_ = params.at(1).get_int() >= 0 ? params.at(1).get_int() : 1; if (OB_FAIL(purge_executor.execute(ctx, purge_params))) { LOG_WARN("fail to execute mlog purge", KR(ret), K(purge_params)); } diff --git a/src/share/inner_table/sys_package/dbms_mview.sql b/src/share/inner_table/sys_package/dbms_mview.sql index f07e1555b9..95ab5208f1 100644 --- a/src/share/inner_table/sys_package/dbms_mview.sql +++ b/src/share/inner_table/sys_package/dbms_mview.sql @@ -49,9 +49,10 @@ CREATE OR REPLACE PACKAGE dbms_mview AUTHID CURRENT_USER IS -- PROCEDURE purge_log( - master IN VARCHAR2, - num IN BINARY_INTEGER := 1, - flag IN VARCHAR2 := 'NOP'); + master IN VARCHAR2, + num IN BINARY_INTEGER := 1, + flag IN VARCHAR2 := 'NOP', + purge_log_parallel IN BINARY_INTEGER := 1); -- ----------------------------------------------------------------------- -- Transaction consistent refresh of an array of materialized views. diff --git a/src/share/inner_table/sys_package/dbms_mview_body.sql b/src/share/inner_table/sys_package/dbms_mview_body.sql index 1b93f95e89..c531140b2b 100644 --- a/src/share/inner_table/sys_package/dbms_mview_body.sql +++ b/src/share/inner_table/sys_package/dbms_mview_body.sql @@ -30,17 +30,19 @@ CREATE OR REPLACE PACKAGE BODY dbms_mview IS PROCEDURE do_purge_log( master IN VARCHAR2, num IN BINARY_INTEGER := 1, - flag IN VARCHAR2 := 'NOP'); + flag IN VARCHAR2 := 'NOP', + purge_log_parallel IN BINARY_INTEGER := 1); PRAGMA INTERFACE(C, DBMS_MVIEW_PURGE_LOG); PROCEDURE purge_log( master IN VARCHAR2, num IN BINARY_INTEGER := 1, - flag IN VARCHAR2 := 'NOP') + flag IN VARCHAR2 := 'NOP', + purge_log_parallel IN BINARY_INTEGER := 1) IS BEGIN COMMIT; - do_purge_log(master, num, flag); + do_purge_log(master, num, flag, purge_log_parallel); END; -- ------------------------------------------------------------------------ diff --git a/src/share/inner_table/sys_package/dbms_mview_body_mysql.sql b/src/share/inner_table/sys_package/dbms_mview_body_mysql.sql index b3babf99ac..918b183053 100644 --- a/src/share/inner_table/sys_package/dbms_mview_body_mysql.sql +++ b/src/share/inner_table/sys_package/dbms_mview_body_mysql.sql @@ -4,14 +4,16 @@ CREATE OR REPLACE PACKAGE BODY dbms_mview -- purge_log PROCEDURE do_purge_log( - IN master_name VARCHAR(65535)); + IN master_name VARCHAR(65535), + IN purge_log_parallel INT DEFAULT 1); PRAGMA INTERFACE(C, DBMS_MVIEW_MYSQL_PURGE_LOG); PROCEDURE purge_log( - IN master_name VARCHAR(65535)) + IN master_name VARCHAR(65535), + IN purge_log_parallel INT DEFAULT 1) BEGIN COMMIT; - CALL do_purge_log(master_name); + CALL do_purge_log(master_name, purge_log_parallel); END; -- ------------------------------------------------------------------------ diff --git a/src/share/inner_table/sys_package/dbms_mview_mysql.sql b/src/share/inner_table/sys_package/dbms_mview_mysql.sql index 79d7a4740c..3410eda245 100644 --- a/src/share/inner_table/sys_package/dbms_mview_mysql.sql +++ b/src/share/inner_table/sys_package/dbms_mview_mysql.sql @@ -26,7 +26,8 @@ CREATE OR REPLACE PACKAGE dbms_mview AUTHID CURRENT_USER -- PROCEDURE purge_log( - IN master_name VARCHAR(65535)); + IN master_name VARCHAR(65535), + IN purge_log_parallel INT DEFAULT 1); -- ----------------------------------------------------------------------- -- Refresh the given materialized view. diff --git a/src/storage/mview/cmd/ob_mview_purge_log_executor.cpp b/src/storage/mview/cmd/ob_mview_purge_log_executor.cpp index 90d24ed354..2cab5b4a5f 100644 --- a/src/storage/mview/cmd/ob_mview_purge_log_executor.cpp +++ b/src/storage/mview/cmd/ob_mview_purge_log_executor.cpp @@ -53,6 +53,7 @@ int ObMViewPurgeLogExecutor::execute(ObExecContext &ctx, const ObMViewPurgeLogAr ObMLogPurger purger; purge_param.tenant_id_ = tenant_id_; purge_param.master_table_id_ = master_table_id_; + purge_param.purge_log_parallel_ = arg.purge_log_parallel_; if (OB_FAIL(purger.init(ctx, purge_param))) { LOG_WARN("fail to init mlog purger", KR(ret), K(purge_param)); } else if (OB_FAIL(purger.purge())) { diff --git a/src/storage/mview/cmd/ob_mview_purge_log_executor.h b/src/storage/mview/cmd/ob_mview_purge_log_executor.h index 68164a8dcd..b32d35df6e 100644 --- a/src/storage/mview/cmd/ob_mview_purge_log_executor.h +++ b/src/storage/mview/cmd/ob_mview_purge_log_executor.h @@ -26,14 +26,15 @@ namespace storage struct ObMViewPurgeLogArg { public: - ObMViewPurgeLogArg() : num_(-1) {} + ObMViewPurgeLogArg() : num_(-1), purge_log_parallel_(0) {} bool is_valid() const { return !master_.empty(); } - TO_STRING_KV(K_(master), K_(num), K_(flag)); + TO_STRING_KV(K_(master), K_(num), K_(flag), K_(purge_log_parallel)); public: ObString master_; int64_t num_; ObString flag_; + int64_t purge_log_parallel_; }; class ObMViewPurgeLogExecutor diff --git a/src/storage/mview/cmd/ob_mview_refresh_executor.cpp b/src/storage/mview/cmd/ob_mview_refresh_executor.cpp index 6edb0d8e44..e05abc4940 100644 --- a/src/storage/mview/cmd/ob_mview_refresh_executor.cpp +++ b/src/storage/mview/cmd/ob_mview_refresh_executor.cpp @@ -221,6 +221,7 @@ int ObMViewRefreshExecutor::do_refresh() ObMLogPurger purger; purge_param.tenant_id_ = tenant_id_; purge_param.master_table_id_ = dep.get_ref_obj_id(); + purge_param.purge_log_parallel_ = arg_->refresh_parallel_; // reuse refresh_parallel_ in purge_log if (OB_TMP_FAIL(purger.init(*ctx_, purge_param))) { LOG_WARN("fail to init mlog purger", KR(tmp_ret), K(purge_param)); } else if (OB_TMP_FAIL(purger.purge())) { // mlog may dropped, ignore diff --git a/src/storage/mview/ob_mlog_purge.cpp b/src/storage/mview/ob_mlog_purge.cpp index acd457a405..dd2f2a41bd 100644 --- a/src/storage/mview/ob_mlog_purge.cpp +++ b/src/storage/mview/ob_mlog_purge.cpp @@ -192,7 +192,7 @@ int ObMLogPurger::prepare_for_purge() } if (OB_SUCC(ret) && need_purge_) { if (OB_FAIL(ObMViewRefreshHelper::generate_purge_mlog_sql( - schema_guard, tenant_id, mlog_table_id, purge_scn_, purge_sql_))) { + schema_guard, tenant_id, mlog_table_id, purge_scn_, purge_param_.purge_log_parallel_, purge_sql_))) { LOG_WARN("fail to generate purge mlog sql", KR(ret), K(mlog_table_id), K(purge_scn_)); } } diff --git a/src/storage/mview/ob_mlog_purge.h b/src/storage/mview/ob_mlog_purge.h index cf5f21cd71..c1f2aec881 100644 --- a/src/storage/mview/ob_mlog_purge.h +++ b/src/storage/mview/ob_mlog_purge.h @@ -29,16 +29,22 @@ namespace storage struct ObMLogPurgeParam { public: - ObMLogPurgeParam() : tenant_id_(OB_INVALID_TENANT_ID), master_table_id_(OB_INVALID_ID) {} + ObMLogPurgeParam() + : tenant_id_(OB_INVALID_TENANT_ID), + master_table_id_(OB_INVALID_ID), + purge_log_parallel_(0) + { + } bool is_valid() const { return tenant_id_ != OB_INVALID_TENANT_ID && master_table_id_ != OB_INVALID_ID; } - TO_STRING_KV(K_(tenant_id), K_(master_table_id)); + TO_STRING_KV(K_(tenant_id), K_(master_table_id), K_(purge_log_parallel)); public: uint64_t tenant_id_; uint64_t master_table_id_; + int64_t purge_log_parallel_; }; class ObMLogPurger diff --git a/src/storage/mview/ob_mview_refresh_helper.cpp b/src/storage/mview/ob_mview_refresh_helper.cpp index a4d3cf6035..be2bd773ba 100644 --- a/src/storage/mview/ob_mview_refresh_helper.cpp +++ b/src/storage/mview/ob_mview_refresh_helper.cpp @@ -99,7 +99,8 @@ int ObMViewRefreshHelper::lock_mview(ObMViewTransaction &trans, const uint64_t t int ObMViewRefreshHelper::generate_purge_mlog_sql(ObSchemaGetterGuard &schema_guard, const uint64_t tenant_id, const uint64_t mlog_id, - const SCN &purge_scn, ObSqlString &sql_string) + const SCN &purge_scn, const int64_t purge_log_parallel, + ObSqlString &sql_string) { int ret = OB_SUCCESS; sql_string.reuse(); @@ -141,7 +142,8 @@ int ObMViewRefreshHelper::generate_purge_mlog_sql(ObSchemaGetterGuard &schema_gu K(table_schema->get_table_name_str()), K(is_oracle_mode)); } else { if (is_oracle_mode) { - if (OB_FAIL(sql_string.assign_fmt("DELETE FROM \"%.*s\".\"%.*s\" WHERE ora_rowscn <= %lu;", + if (OB_FAIL(sql_string.assign_fmt("DELETE /*+ ENABLE_PARALLEL_DML PARALLEL(%d)*/ FROM \"%.*s\".\"%.*s\" WHERE ora_rowscn <= %lu;", + static_cast(purge_log_parallel), static_cast(database_name.length()), database_name.ptr(), static_cast(table_name.length()), table_name.ptr(), @@ -149,7 +151,8 @@ int ObMViewRefreshHelper::generate_purge_mlog_sql(ObSchemaGetterGuard &schema_gu LOG_WARN("fail to assign sql", KR(ret)); } } else { - if (OB_FAIL(sql_string.assign_fmt("DELETE FROM `%.*s`.`%.*s` WHERE ora_rowscn <= %lu;", + if (OB_FAIL(sql_string.assign_fmt("DELETE /*+ ENABLE_PARALLEL_DML PARALLEL(%d)*/ FROM `%.*s`.`%.*s` WHERE ora_rowscn <= %lu;", + static_cast(purge_log_parallel), static_cast(database_name.length()), database_name.ptr(), static_cast(table_name.length()), table_name.ptr(), diff --git a/src/storage/mview/ob_mview_refresh_helper.h b/src/storage/mview/ob_mview_refresh_helper.h index 879233669b..8c5d16efd7 100644 --- a/src/storage/mview/ob_mview_refresh_helper.h +++ b/src/storage/mview/ob_mview_refresh_helper.h @@ -49,7 +49,8 @@ public: static int generate_purge_mlog_sql(share::schema::ObSchemaGetterGuard &schema_guard, const uint64_t tenant_id, const uint64_t mlog_id, - const share::SCN &purge_scn, ObSqlString &sql_string); + const share::SCN &purge_scn, const int64_t purge_log_parallel, + ObSqlString &sql_string); static int get_table_row_num(ObMViewTransaction &trans, const uint64_t tenant_id, const uint64_t table_id, const share::SCN &scn, int64_t &num_rows);