add parameter purge_log_parallel to dbms_mview.purge_log()

This commit is contained in:
obdev
2024-05-24 12:08:07 +00:00
committed by ob-robot
parent c7a7805e27
commit e3510a5f8f
12 changed files with 43 additions and 21 deletions

View File

@ -26,19 +26,22 @@ using namespace storage;
/*
PROCEDURE purge_log(
IN master_name VARCHAR(65535));
IN master_name VARCHAR(65535),
IN purge_log_parallel INT DEFAULT 1);
*/
int ObDBMSMViewMysql::purge_log(ObExecContext &ctx, ParamStore &params, ObObj &result)
{
UNUSED(result);
int ret = OB_SUCCESS;
CK(OB_LIKELY(1 == params.count()));
CK(OB_LIKELY(2 == params.count()));
CK(OB_LIKELY(params.at(0).is_varchar()) /*master_name*/);
CK(OB_LIKELY(params.at(1).is_int32()) /*purge_log_parallel*/);
if (OB_SUCC(ret)) {
ObMViewPurgeLogArg purge_params;
ObMViewPurgeLogExecutor purge_executor;
// fill params
purge_params.master_ = params.at(0).get_varchar();
purge_params.purge_log_parallel_ = params.at(1).get_int() >= 0 ? params.at(1).get_int() : 1;
if (OB_FAIL(purge_executor.execute(ctx, purge_params))) {
LOG_WARN("fail to execute mlog purge", KR(ret), K(purge_params));
}

View File

@ -51,7 +51,8 @@ CREATE OR REPLACE PACKAGE dbms_mview AUTHID CURRENT_USER IS
PROCEDURE purge_log(
master IN VARCHAR2,
num IN BINARY_INTEGER := 1,
flag IN VARCHAR2 := 'NOP');
flag IN VARCHAR2 := 'NOP',
purge_log_parallel IN BINARY_INTEGER := 1);
-- -----------------------------------------------------------------------
-- Transaction consistent refresh of an array of materialized views.

View File

@ -30,17 +30,19 @@ CREATE OR REPLACE PACKAGE BODY dbms_mview IS
PROCEDURE do_purge_log(
master IN VARCHAR2,
num IN BINARY_INTEGER := 1,
flag IN VARCHAR2 := 'NOP');
flag IN VARCHAR2 := 'NOP',
purge_log_parallel IN BINARY_INTEGER := 1);
PRAGMA INTERFACE(C, DBMS_MVIEW_PURGE_LOG);
PROCEDURE purge_log(
master IN VARCHAR2,
num IN BINARY_INTEGER := 1,
flag IN VARCHAR2 := 'NOP')
flag IN VARCHAR2 := 'NOP',
purge_log_parallel IN BINARY_INTEGER := 1)
IS
BEGIN
COMMIT;
do_purge_log(master, num, flag);
do_purge_log(master, num, flag, purge_log_parallel);
END;
-- ------------------------------------------------------------------------

View File

@ -4,14 +4,16 @@ CREATE OR REPLACE PACKAGE BODY dbms_mview
-- purge_log
PROCEDURE do_purge_log(
IN master_name VARCHAR(65535));
IN master_name VARCHAR(65535),
IN purge_log_parallel INT DEFAULT 1);
PRAGMA INTERFACE(C, DBMS_MVIEW_MYSQL_PURGE_LOG);
PROCEDURE purge_log(
IN master_name VARCHAR(65535))
IN master_name VARCHAR(65535),
IN purge_log_parallel INT DEFAULT 1)
BEGIN
COMMIT;
CALL do_purge_log(master_name);
CALL do_purge_log(master_name, purge_log_parallel);
END;
-- ------------------------------------------------------------------------

View File

@ -26,7 +26,8 @@ CREATE OR REPLACE PACKAGE dbms_mview AUTHID CURRENT_USER
--
PROCEDURE purge_log(
IN master_name VARCHAR(65535));
IN master_name VARCHAR(65535),
IN purge_log_parallel INT DEFAULT 1);
-- -----------------------------------------------------------------------
-- Refresh the given materialized view.

View File

@ -53,6 +53,7 @@ int ObMViewPurgeLogExecutor::execute(ObExecContext &ctx, const ObMViewPurgeLogAr
ObMLogPurger purger;
purge_param.tenant_id_ = tenant_id_;
purge_param.master_table_id_ = master_table_id_;
purge_param.purge_log_parallel_ = arg.purge_log_parallel_;
if (OB_FAIL(purger.init(ctx, purge_param))) {
LOG_WARN("fail to init mlog purger", KR(ret), K(purge_param));
} else if (OB_FAIL(purger.purge())) {

View File

@ -26,14 +26,15 @@ namespace storage
struct ObMViewPurgeLogArg
{
public:
ObMViewPurgeLogArg() : num_(-1) {}
ObMViewPurgeLogArg() : num_(-1), purge_log_parallel_(0) {}
bool is_valid() const { return !master_.empty(); }
TO_STRING_KV(K_(master), K_(num), K_(flag));
TO_STRING_KV(K_(master), K_(num), K_(flag), K_(purge_log_parallel));
public:
ObString master_;
int64_t num_;
ObString flag_;
int64_t purge_log_parallel_;
};
class ObMViewPurgeLogExecutor

View File

@ -221,6 +221,7 @@ int ObMViewRefreshExecutor::do_refresh()
ObMLogPurger purger;
purge_param.tenant_id_ = tenant_id_;
purge_param.master_table_id_ = dep.get_ref_obj_id();
purge_param.purge_log_parallel_ = arg_->refresh_parallel_; // reuse refresh_parallel_ in purge_log
if (OB_TMP_FAIL(purger.init(*ctx_, purge_param))) {
LOG_WARN("fail to init mlog purger", KR(tmp_ret), K(purge_param));
} else if (OB_TMP_FAIL(purger.purge())) { // mlog may dropped, ignore

View File

@ -192,7 +192,7 @@ int ObMLogPurger::prepare_for_purge()
}
if (OB_SUCC(ret) && need_purge_) {
if (OB_FAIL(ObMViewRefreshHelper::generate_purge_mlog_sql(
schema_guard, tenant_id, mlog_table_id, purge_scn_, purge_sql_))) {
schema_guard, tenant_id, mlog_table_id, purge_scn_, purge_param_.purge_log_parallel_, purge_sql_))) {
LOG_WARN("fail to generate purge mlog sql", KR(ret), K(mlog_table_id), K(purge_scn_));
}
}

View File

@ -29,16 +29,22 @@ namespace storage
struct ObMLogPurgeParam
{
public:
ObMLogPurgeParam() : tenant_id_(OB_INVALID_TENANT_ID), master_table_id_(OB_INVALID_ID) {}
ObMLogPurgeParam()
: tenant_id_(OB_INVALID_TENANT_ID),
master_table_id_(OB_INVALID_ID),
purge_log_parallel_(0)
{
}
bool is_valid() const
{
return tenant_id_ != OB_INVALID_TENANT_ID && master_table_id_ != OB_INVALID_ID;
}
TO_STRING_KV(K_(tenant_id), K_(master_table_id));
TO_STRING_KV(K_(tenant_id), K_(master_table_id), K_(purge_log_parallel));
public:
uint64_t tenant_id_;
uint64_t master_table_id_;
int64_t purge_log_parallel_;
};
class ObMLogPurger

View File

@ -99,7 +99,8 @@ int ObMViewRefreshHelper::lock_mview(ObMViewTransaction &trans, const uint64_t t
int ObMViewRefreshHelper::generate_purge_mlog_sql(ObSchemaGetterGuard &schema_guard,
const uint64_t tenant_id, const uint64_t mlog_id,
const SCN &purge_scn, ObSqlString &sql_string)
const SCN &purge_scn, const int64_t purge_log_parallel,
ObSqlString &sql_string)
{
int ret = OB_SUCCESS;
sql_string.reuse();
@ -141,7 +142,8 @@ int ObMViewRefreshHelper::generate_purge_mlog_sql(ObSchemaGetterGuard &schema_gu
K(table_schema->get_table_name_str()), K(is_oracle_mode));
} else {
if (is_oracle_mode) {
if (OB_FAIL(sql_string.assign_fmt("DELETE FROM \"%.*s\".\"%.*s\" WHERE ora_rowscn <= %lu;",
if (OB_FAIL(sql_string.assign_fmt("DELETE /*+ ENABLE_PARALLEL_DML PARALLEL(%d)*/ FROM \"%.*s\".\"%.*s\" WHERE ora_rowscn <= %lu;",
static_cast<int>(purge_log_parallel),
static_cast<int>(database_name.length()),
database_name.ptr(),
static_cast<int>(table_name.length()), table_name.ptr(),
@ -149,7 +151,8 @@ int ObMViewRefreshHelper::generate_purge_mlog_sql(ObSchemaGetterGuard &schema_gu
LOG_WARN("fail to assign sql", KR(ret));
}
} else {
if (OB_FAIL(sql_string.assign_fmt("DELETE FROM `%.*s`.`%.*s` WHERE ora_rowscn <= %lu;",
if (OB_FAIL(sql_string.assign_fmt("DELETE /*+ ENABLE_PARALLEL_DML PARALLEL(%d)*/ FROM `%.*s`.`%.*s` WHERE ora_rowscn <= %lu;",
static_cast<int>(purge_log_parallel),
static_cast<int>(database_name.length()),
database_name.ptr(),
static_cast<int>(table_name.length()), table_name.ptr(),

View File

@ -49,7 +49,8 @@ public:
static int generate_purge_mlog_sql(share::schema::ObSchemaGetterGuard &schema_guard,
const uint64_t tenant_id, const uint64_t mlog_id,
const share::SCN &purge_scn, ObSqlString &sql_string);
const share::SCN &purge_scn, const int64_t purge_log_parallel,
ObSqlString &sql_string);
static int get_table_row_num(ObMViewTransaction &trans, const uint64_t tenant_id,
const uint64_t table_id, const share::SCN &scn, int64_t &num_rows);