[FEAT MERGE] materialized view

This commit is contained in:
obdev
2024-02-07 20:31:13 +00:00
committed by ob-robot
parent 39fae3b529
commit c2511b580f
257 changed files with 35698 additions and 4335 deletions

View File

@ -977,6 +977,19 @@ public:
struct ObCreateIndexArg;//Forward declaration
struct ObCreateForeignKeyArg;//Forward declaration
struct ObMVAdditionalInfo
{
OB_UNIS_VERSION(1);
public:
share::schema::ObTableSchema container_table_schema_;
ObMVRefreshInfo mv_refresh_info_;
int assign(const ObMVAdditionalInfo &other);
TO_STRING_KV(K_(container_table_schema), K_(mv_refresh_info));
};
struct ObCreateTableArg : ObDDLArg
{
OB_UNIS_VERSION(1);
@ -989,10 +1002,13 @@ public:
is_inner_(false),
error_info_(),
is_alter_view_(false),
sequence_ddl_arg_()
sequence_ddl_arg_(),
dep_infos_(),
mv_ainfo_()
{}
bool is_valid() const;
virtual bool is_allow_when_upgrade() const;
int assign(const ObCreateTableArg &other);
DECLARE_TO_STRING;
bool if_not_exist_;
@ -1009,6 +1025,7 @@ public:
bool is_alter_view_;
ObSequenceDDLArg sequence_ddl_arg_;
common::ObSArray<share::schema::ObDependencyInfo> dep_infos_;
common::ObSArray<ObMVAdditionalInfo> mv_ainfo_;
};
struct ObCreateTableRes
@ -1221,6 +1238,12 @@ public:
str = "drop primary key";
} else if (ALTER_INDEX_TABLESPACE == type) {
str = "alter index tablespace";
} else if (ADD_MLOG == type) {
str = "add materialized view log";
} else if (DROP_MLOG == type) {
str = "drop materialized view log";
} else if (RENAME_MLOG == type) {
str = "rename materialized view log";
}
return str;
}
@ -1909,6 +1932,105 @@ public:
share::ObTaskId trace_id_;
};
struct ObMViewCompleteRefreshArg final : public ObDDLArg
{
OB_UNIS_VERSION(1);
public:
ObMViewCompleteRefreshArg()
: ObDDLArg(),
tenant_id_(OB_INVALID_TENANT_ID),
table_id_(OB_INVALID_ID),
session_id_(OB_INVALID_ID),
sql_mode_(0),
last_refresh_scn_(),
allocator_("MVRefDDL"),
tz_info_(),
tz_info_wrap_(),
nls_formats_(),
parent_task_id_(0)
{
}
~ObMViewCompleteRefreshArg() = default;
bool is_valid() const;
void reset();
int assign(const ObMViewCompleteRefreshArg &other);
INHERIT_TO_STRING_KV("ObDDLArg", ObDDLArg,
K_(tenant_id),
K_(table_id),
K_(session_id),
K_(sql_mode),
K_(last_refresh_scn),
K_(tz_info),
K_(tz_info_wrap),
"nls_formats", common::ObArrayWrap<common::ObString>(nls_formats_, common::ObNLSFormatEnum::NLS_MAX),
K_(parent_task_id));
public:
uint64_t tenant_id_;
uint64_t table_id_; // mview table id
uint64_t session_id_;
ObSQLMode sql_mode_;
share::SCN last_refresh_scn_;
common::ObArenaAllocator allocator_;
common::ObTimeZoneInfo tz_info_;
common::ObTimeZoneInfoWrap tz_info_wrap_;
common::ObString nls_formats_[common::ObNLSFormatEnum::NLS_MAX];
int64_t parent_task_id_;
};
struct ObMViewCompleteRefreshRes final
{
OB_UNIS_VERSION(1);
public:
ObMViewCompleteRefreshRes() : task_id_(0), trace_id_() {}
~ObMViewCompleteRefreshRes() = default;
void reset()
{
task_id_ = 0;
trace_id_.reset();
}
int assign(const ObMViewCompleteRefreshRes &other)
{
if (this != &other) {
task_id_ = other.task_id_;
trace_id_ = other.trace_id_;
}
return OB_SUCCESS;
}
TO_STRING_KV(K_(task_id), K_(trace_id));
public:
int64_t task_id_;
share::ObTaskId trace_id_;
};
struct ObMViewRefreshInfo
{
OB_UNIS_VERSION(1);
public:
ObMViewRefreshInfo()
: mview_table_id_(OB_INVALID_ID),
last_refresh_scn_(),
refresh_scn_(),
start_time_(OB_INVALID_TIMESTAMP),
is_mview_complete_refresh_(false)
{
}
~ObMViewRefreshInfo() = default;
bool is_valid() const;
void reset();
int assign(const ObMViewRefreshInfo &other);
TO_STRING_KV(K_(mview_table_id),
K_(last_refresh_scn),
K_(refresh_scn),
K_(start_time),
K_(is_mview_complete_refresh));
public:
uint64_t mview_table_id_;
share::SCN last_refresh_scn_;
share::SCN refresh_scn_;
int64_t start_time_;
bool is_mview_complete_refresh_;
};
struct ObAlterTableArg : public ObDDLArg
{
OB_UNIS_VERSION(1);
@ -2009,7 +2131,8 @@ public:
foreign_key_checks_(true),
is_add_to_scheduler_(false),
inner_sql_exec_addr_(),
local_session_var_(&allocator_)
local_session_var_(&allocator_),
mview_refresh_info_()
{
}
virtual ~ObAlterTableArg()
@ -2075,7 +2198,8 @@ public:
K_(table_id),
K_(hidden_table_id),
K_(inner_sql_exec_addr),
K_(local_session_var));
K_(local_session_var),
K_(mview_refresh_info));
private:
int alloc_index_arg(const ObIndexArg::IndexActionType index_action_type, ObIndexArg *&index_arg);
public:
@ -2108,6 +2232,7 @@ public:
bool is_add_to_scheduler_;
common::ObAddr inner_sql_exec_addr_;
ObLocalSessionVar local_session_var_;
ObMViewRefreshInfo mview_refresh_info_;
int serialize_index_args(char *buf, const int64_t data_len, int64_t &pos) const;
int deserialize_index_args(const char *buf, const int64_t data_len, int64_t &pos);
int64_t get_index_args_serialize_size() const;
@ -2472,6 +2597,165 @@ public:
};
typedef ObCreateIndexArg ObAlterPrimaryArg;
struct ObCreateMLogArg : public ObDDLArg
{
OB_UNIS_VERSION_V(1);
public:
ObCreateMLogArg()
: ObDDLArg(),
database_name_(),
table_name_(),
mlog_name_(),
tenant_id_(OB_INVALID_TENANT_ID),
base_table_id_(common::OB_INVALID_ID),
mlog_table_id_(common::OB_INVALID_ID),
session_id_(common::OB_INVALID_ID),
with_rowid_(false),
with_primary_key_(false),
with_sequence_(false),
include_new_values_(false),
purge_options_(),
mlog_schema_(),
store_columns_(),
nls_date_format_(),
nls_timestamp_format_(),
nls_timestamp_tz_format_(),
sql_mode_(0)
{
}
virtual ~ObCreateMLogArg() {}
bool is_valid() const;
int assign(const ObCreateMLogArg &other) {
int ret = common::OB_SUCCESS;
if (OB_FAIL(ObDDLArg::assign(other))) {
SHARE_LOG(WARN, "failed to assign base", KR(ret));
} else if (OB_FAIL(mlog_schema_.assign(other.mlog_schema_))) {
SHARE_LOG(WARN, "failed to assign mlog schema", KR(ret));
} else if (OB_FAIL(store_columns_.assign(other.store_columns_))) {
SHARE_LOG(WARN, "failed to assign store columns", KR(ret));
} else {
database_name_ = other.database_name_;
table_name_ = other.table_name_;
mlog_name_ = other.mlog_name_;
tenant_id_ = other.tenant_id_;
base_table_id_ = other.base_table_id_;
mlog_table_id_ = other.mlog_table_id_;
session_id_ = other.session_id_;
with_rowid_ = other.with_rowid_;
with_primary_key_ = other.with_primary_key_;
with_sequence_ = other.with_sequence_;
include_new_values_ = other.include_new_values_;
purge_options_ = other.purge_options_;
nls_date_format_ = other.nls_date_format_;
nls_timestamp_format_ = other.nls_timestamp_format_;
nls_timestamp_tz_format_ = other.nls_timestamp_tz_format_;
sql_mode_ = other.sql_mode_;
}
return ret;
}
void reset()
{
database_name_.reset();
table_name_.reset();
mlog_name_.reset();
tenant_id_ = common::OB_INVALID_TENANT_ID;
base_table_id_ = common::OB_INVALID_ID;
mlog_table_id_ = common::OB_INVALID_ID;
session_id_ = common::OB_INVALID_ID;
with_rowid_ = false;
with_primary_key_ = false;
with_sequence_ = false;
include_new_values_ = false;
purge_options_.reset();
mlog_schema_.reset();
store_columns_.reset();
nls_date_format_.reset();
nls_timestamp_format_.reset();
nls_timestamp_tz_format_.reset();
sql_mode_ = 0;
ObDDLArg::reset();
}
DECLARE_TO_STRING;
struct PurgeOptions {
OB_UNIS_VERSION(1);
public:
PurgeOptions() : purge_mode_(share::schema::ObMLogPurgeMode::MAX)
{
}
~PurgeOptions() {}
void reset()
{
purge_mode_ = share::schema::ObMLogPurgeMode::MAX;
start_datetime_expr_.reset();
next_datetime_expr_.reset();
exec_env_.reset();
}
bool is_valid() const
{
return (share::schema::ObMLogPurgeMode::MAX != purge_mode_) && !exec_env_.empty();
}
TO_STRING_KV(K_(purge_mode),
K_(start_datetime_expr),
K_(next_datetime_expr),
K_(exec_env));
share::schema::ObMLogPurgeMode purge_mode_;
common::ObObj start_datetime_expr_;
common::ObString next_datetime_expr_;
common::ObString exec_env_;
};
common::ObString database_name_;
common::ObString table_name_;
common::ObString mlog_name_; // for privilege check
uint64_t tenant_id_;
uint64_t base_table_id_;
uint64_t mlog_table_id_;
uint64_t session_id_;
bool with_rowid_;
bool with_primary_key_;
bool with_sequence_;
bool include_new_values_;
PurgeOptions purge_options_;
share::schema::ObTableSchema mlog_schema_;
common::ObSEArray<common::ObString, common::OB_PREALLOCATED_NUM> store_columns_;
common::ObString nls_date_format_;
common::ObString nls_timestamp_format_;
common::ObString nls_timestamp_tz_format_;
ObSQLMode sql_mode_;
};
struct ObCreateMLogRes
{
OB_UNIS_VERSION(1);
public:
ObCreateMLogRes()
: mlog_table_id_(common::OB_INVALID_ID),
schema_version_(common::OB_INVALID_VERSION),
task_id_(0)
{}
void reset()
{
mlog_table_id_ = OB_INVALID_ID;
schema_version_ = OB_INVALID_VERSION;
task_id_ = 0;
}
int assign(const ObCreateMLogRes &other) {
int ret = common::OB_SUCCESS;
mlog_table_id_ = other.mlog_table_id_;
schema_version_ = other.schema_version_;
task_id_ = other.task_id_;
return ret;
}
public:
TO_STRING_KV(K_(mlog_table_id), K_(schema_version), K_(task_id));
uint64_t mlog_table_id_;
int64_t schema_version_;
int64_t task_id_;
};
struct ObCreateForeignKeyArg : public ObIndexArg
{
OB_UNIS_VERSION_V(1);
@ -5774,6 +6058,31 @@ public:
ObString database_name_;
};
struct ObUpdateMViewStatusArg : public ObDDLArg
{
OB_UNIS_VERSION(1);
public:
ObUpdateMViewStatusArg():
ObDDLArg(),
mview_table_id_(common::OB_INVALID_ID),
mv_available_flag_(ObMVAvailableFlag::IS_MV_UNAVAILABLE),
convert_status_(true),
in_offline_ddl_white_list_(false)
{}
bool is_valid() const;
int assign(const ObUpdateMViewStatusArg &other);
virtual bool is_allow_when_disable_ddl() const { return false; };
virtual bool is_allow_when_upgrade() const { return true; }
virtual bool is_in_offline_ddl_white_list() const { return in_offline_ddl_white_list_; }
TO_STRING_KV(K_(mview_table_id), K_(mv_available_flag), K_(convert_status), K_(in_offline_ddl_white_list));
public:
uint64_t mview_table_id_;
enum ObMVAvailableFlag mv_available_flag_;
bool convert_status_;
bool in_offline_ddl_white_list_;
};
struct ObMergeFinishArg
{
OB_UNIS_VERSION(1);