[CP] [CP] delete dropped table in data_dict refresh mode and building baseline scenario
This commit is contained in:
@ -168,10 +168,12 @@ int ObLogDdlParser::handle(void *data,
|
||||
"compat_mode", print_compat_mode(compat_mode), KPC(task));
|
||||
} else {
|
||||
lib::CompatModeGuard g(compat_mode);
|
||||
const bool is_build_baseline = false;
|
||||
|
||||
// Parse DDL task
|
||||
if (OB_FAIL(part_trans_parser_->parse(*task, stop_flag))) {
|
||||
LOG_ERROR("parse DDL task fail", KR(ret), KPC(task), "compat_mode", print_compat_mode(compat_mode));
|
||||
if (OB_FAIL(part_trans_parser_->parse(*task, is_build_baseline, stop_flag))) {
|
||||
LOG_ERROR("parse DDL task fail", KR(ret), KPC(task), K(is_build_baseline),
|
||||
"compat_mode", print_compat_mode(compat_mode));
|
||||
} else {
|
||||
// The DDL task does not need to go through the formatter module, and here the formatting is set to complete directly
|
||||
// DDL Handler directly waits for formatting to complete or not
|
||||
|
||||
@ -998,7 +998,7 @@ int ObLogInstance::init_components_(const uint64_t start_tstamp_ns)
|
||||
if (OB_SUCC(ret)) {
|
||||
if (is_data_dict_refresh_mode(refresh_mode_)) {
|
||||
if (OB_FAIL(ObLogMetaDataService::get_instance().init(start_tstamp_ns, fetching_mode, archive_dest,
|
||||
sys_ls_handler_, &mysql_proxy_.get_ob_mysql_proxy(), err_handler,
|
||||
sys_ls_handler_, &mysql_proxy_.get_ob_mysql_proxy(), err_handler, *part_trans_parser_,
|
||||
cluster_info.cluster_id_, TCONF, start_seq))) {
|
||||
LOG_ERROR("ObLogMetaDataService init failed", KR(ret), K(start_tstamp_ns));
|
||||
}
|
||||
|
||||
@ -28,7 +28,8 @@ namespace libobcdc
|
||||
ObLogMetaDataReplayer::ObLogMetaDataReplayer() :
|
||||
is_inited_(false),
|
||||
queue_(),
|
||||
schema_inc_replay_()
|
||||
schema_inc_replay_(),
|
||||
part_trans_parser_(NULL)
|
||||
{
|
||||
}
|
||||
|
||||
@ -37,7 +38,7 @@ ObLogMetaDataReplayer::~ObLogMetaDataReplayer()
|
||||
destroy();
|
||||
}
|
||||
|
||||
int ObLogMetaDataReplayer::init()
|
||||
int ObLogMetaDataReplayer::init(IObLogPartTransParser &part_trans_parser)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
@ -47,6 +48,7 @@ int ObLogMetaDataReplayer::init()
|
||||
} else if (OB_FAIL(schema_inc_replay_.init(true/*is_start_progress*/))) {
|
||||
LOG_ERROR("schema_inc_replay_ init failed", KR(ret));
|
||||
} else {
|
||||
part_trans_parser_ = &part_trans_parser;
|
||||
is_inited_ = true;
|
||||
}
|
||||
|
||||
@ -58,6 +60,7 @@ void ObLogMetaDataReplayer::destroy()
|
||||
if (IS_INIT) {
|
||||
queue_.reset();
|
||||
schema_inc_replay_.destroy();
|
||||
part_trans_parser_ = NULL;
|
||||
is_inited_ = false;
|
||||
}
|
||||
}
|
||||
@ -154,6 +157,7 @@ int ObLogMetaDataReplayer::handle_ddl_trans_(
|
||||
ReplayInfoStat &replay_info_stat)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool stop_flag = false;
|
||||
|
||||
if (OB_UNLIKELY(! part_trans_task.is_ddl_trans())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
@ -173,12 +177,57 @@ int ObLogMetaDataReplayer::handle_ddl_trans_(
|
||||
DictTenantArray &tenant_metas = part_trans_task.get_dict_tenant_array();
|
||||
DictDatabaseArray &database_metas = part_trans_task.get_dict_database_array();
|
||||
DictTableArray &table_metas = part_trans_task.get_dict_table_array();
|
||||
const common::ObCompatibilityMode &compatible_mode = tenant_info.get_compatibility_mode();
|
||||
lib::Worker::CompatMode compat_mode = lib::Worker::CompatMode::INVALID;
|
||||
|
||||
if (OB_FAIL(schema_inc_replay_.replay(part_trans_task, tenant_metas, database_metas, table_metas, tenant_info))) {
|
||||
if (OB_FAIL(schema_inc_replay_.replay(part_trans_task, tenant_metas, database_metas,
|
||||
table_metas, tenant_info))) {
|
||||
LOG_ERROR("schema_inc_replay_ replay failed", KR(ret), K(part_trans_task), K(tenant_info));
|
||||
} else {}
|
||||
}
|
||||
} else if (OB_FAIL(convert_to_compat_mode(compatible_mode, compat_mode))) {
|
||||
LOG_ERROR("convert to compat mode fail", KR(ret), K(compatible_mode));
|
||||
} else {
|
||||
lib::CompatModeGuard g(compat_mode);
|
||||
bool is_build_baseline = true;
|
||||
if (OB_FAIL(part_trans_parser_->parse(part_trans_task, is_build_baseline, stop_flag))) {
|
||||
LOG_ERROR("parse DDL task fail", KR(ret), K(part_trans_task), K(is_build_baseline));
|
||||
} else {
|
||||
// Iterate through each statement of the DDL
|
||||
IStmtTask *stmt_task = part_trans_task.get_stmt_list().head_;
|
||||
while (NULL != stmt_task && OB_SUCCESS == ret) {
|
||||
DdlStmtTask *ddl_stmt = dynamic_cast<DdlStmtTask *>(stmt_task);
|
||||
if (OB_UNLIKELY(! stmt_task->is_ddl_stmt()) || OB_ISNULL(ddl_stmt)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("invalid DDL statement", KR(ret), KPC(stmt_task), K(ddl_stmt),
|
||||
"trans_id", part_trans_task.get_trans_id());
|
||||
} else {
|
||||
const ObSchemaOperationType op_type = (ObSchemaOperationType) ddl_stmt->get_operation_type();
|
||||
const uint64_t op_table_id = ddl_stmt->get_op_table_id();
|
||||
if (need_remove_by_op_type_(op_type)) {
|
||||
if (OB_FAIL(tenant_info.remove_table_meta(op_table_id))) {
|
||||
LOG_ERROR("ddl stmt is DROP_TABLE or DROP_INDEX and remove table meta failed", KR(ret),
|
||||
K(op_table_id), KPC(ddl_stmt), "trans_id", part_trans_task.get_trans_id());
|
||||
} else {
|
||||
ISTAT("remove table meta success",
|
||||
"ddl_op_tenant_id", ddl_stmt->get_op_tenant_id(),
|
||||
"ddl_op_databse_id", ddl_stmt->get_op_database_id(),
|
||||
"ddl_op_table_id", ddl_stmt->get_op_table_id(),
|
||||
"ddl_op_tablegroup_id", ddl_stmt->get_op_tablegroup_id(),
|
||||
"ddl_operation_type", ddl_stmt->get_operation_type(),
|
||||
"ddl_op_schema_version", ddl_stmt->get_op_schema_version(),
|
||||
"ddl_stmt_str", ddl_stmt->get_ddl_stmt_str(),
|
||||
"ddl_exec_tenant_id", ddl_stmt->get_exec_tenant_id(),
|
||||
"trans_id", part_trans_task.get_trans_id());
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
stmt_task = stmt_task->get_next();
|
||||
}
|
||||
}
|
||||
} // while
|
||||
}
|
||||
} // else replay
|
||||
}
|
||||
} else {
|
||||
ISTAT("ignore DDL_TRANS PartTransTask which trans commit verison is greater than start_timestamp_ns",
|
||||
"tenant_id", part_trans_task.get_tenant_id(),
|
||||
|
||||
@ -17,6 +17,7 @@
|
||||
#include "ob_log_part_trans_task_queue.h" // SafePartTransTaskQueue
|
||||
#include "ob_log_meta_data_struct.h" // ObDictTenantInfo
|
||||
#include "ob_log_schema_incremental_replay.h" // ObLogSchemaIncReplay
|
||||
#include "ob_log_part_trans_parser.h" // IObLogPartTransParser
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -50,7 +51,7 @@ public:
|
||||
ObDictTenantInfo &tenant_info);
|
||||
|
||||
public:
|
||||
int init();
|
||||
int init(IObLogPartTransParser &part_trans_parser);
|
||||
void destroy();
|
||||
|
||||
private:
|
||||
@ -86,10 +87,16 @@ private:
|
||||
PartTransTask &part_trans_task,
|
||||
ReplayInfoStat &replay_info_stat);
|
||||
|
||||
bool need_remove_by_op_type_(const ObSchemaOperationType op_type)
|
||||
{
|
||||
return OB_DDL_DROP_TABLE == op_type || OB_DDL_DROP_INDEX == op_type || OB_DDL_DROP_GLOBAL_INDEX == op_type;
|
||||
}
|
||||
|
||||
private:
|
||||
bool is_inited_;
|
||||
SafePartTransTaskQueue queue_;
|
||||
ObLogSchemaIncReplay schema_inc_replay_;
|
||||
IObLogPartTransParser *part_trans_parser_;
|
||||
|
||||
DISALLOW_COPY_AND_ASSIGN(ObLogMetaDataReplayer);
|
||||
};
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
#include "logservice/restoreservice/ob_log_archive_piece_mgr.h"
|
||||
#include "logservice/data_dictionary/ob_data_dict_meta_info.h"
|
||||
#include "logservice/archiveservice/ob_archive_define.h"
|
||||
#include "ob_log_part_trans_parser.h"
|
||||
|
||||
|
||||
#define _STAT(level, fmt, args...) _OBLOG_LOG(level, "[LOG_META_DATA] [SERVICE] " fmt, ##args)
|
||||
@ -42,7 +43,8 @@ ObLogMetaDataService::ObLogMetaDataService() :
|
||||
fetcher_(),
|
||||
baseline_loader_(),
|
||||
incremental_replayer_(),
|
||||
fetcher_dispatcher_()
|
||||
fetcher_dispatcher_(),
|
||||
part_trans_parser_(NULL)
|
||||
{
|
||||
}
|
||||
|
||||
@ -58,6 +60,7 @@ int ObLogMetaDataService::init(
|
||||
IObLogSysLsTaskHandler *sys_ls_handler,
|
||||
common::ObMySQLProxy *proxy,
|
||||
IObLogErrHandler *err_handler,
|
||||
IObLogPartTransParser &part_trans_parser,
|
||||
const int64_t cluster_id,
|
||||
const ObLogConfig &cfg,
|
||||
const int64_t start_seq)
|
||||
@ -69,7 +72,7 @@ int ObLogMetaDataService::init(
|
||||
LOG_ERROR("init twice", KR(ret));
|
||||
} else if (OB_FAIL(baseline_loader_.init(cfg))) {
|
||||
LOG_ERROR("ObLogMetaDataBaselineLoader init fail", KR(ret));
|
||||
} else if (OB_FAIL(incremental_replayer_.init())) {
|
||||
} else if (OB_FAIL(incremental_replayer_.init(part_trans_parser))) {
|
||||
LOG_ERROR("ObLogMetaDataReplayer init fail", KR(ret));
|
||||
} else if (OB_FAIL(fetcher_dispatcher_.init(&incremental_replayer_, start_seq))) {
|
||||
LOG_ERROR("ObLogMetaDataFetcherDispatcher init fail", KR(ret));
|
||||
@ -94,6 +97,7 @@ void ObLogMetaDataService::destroy()
|
||||
baseline_loader_.destroy();
|
||||
incremental_replayer_.destroy();
|
||||
fetcher_dispatcher_.destroy();
|
||||
part_trans_parser_ = NULL;
|
||||
is_inited_ = false;
|
||||
}
|
||||
}
|
||||
|
||||
@ -36,6 +36,7 @@ namespace libobcdc
|
||||
class IObLogSysLsTaskHandler;
|
||||
class ObLogSysTableHelper;
|
||||
class IObLogErrHandler;
|
||||
class IObLogPartTransParser;
|
||||
|
||||
class ObLogMetaDataService
|
||||
{
|
||||
@ -52,6 +53,7 @@ public:
|
||||
IObLogSysLsTaskHandler *sys_ls_handler,
|
||||
common::ObMySQLProxy *proxy,
|
||||
IObLogErrHandler *err_handler,
|
||||
IObLogPartTransParser &part_trans_parser,
|
||||
const int64_t cluster_id,
|
||||
const ObLogConfig &cfg,
|
||||
const int64_t start_seq);
|
||||
@ -117,6 +119,7 @@ private:
|
||||
ObLogMetaDataBaselineLoader baseline_loader_;
|
||||
ObLogMetaDataReplayer incremental_replayer_;
|
||||
ObLogMetaDataFetcherDispatcher fetcher_dispatcher_;
|
||||
IObLogPartTransParser *part_trans_parser_;
|
||||
|
||||
DISALLOW_COPY_AND_ASSIGN(ObLogMetaDataService);
|
||||
};
|
||||
|
||||
@ -335,6 +335,36 @@ int ObDictTenantInfo::replace_dict_table_meta(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDictTenantInfo::remove_table_meta(const uint64_t table_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_ERROR("ObDictTenantInfo has not been initialized", KR(ret));
|
||||
} else {
|
||||
MetaDataKey meta_data_key(table_id);
|
||||
datadict::ObDictTableMeta *old_table_meta = nullptr;
|
||||
if (OB_FAIL(get_table_meta(table_id, old_table_meta))) {
|
||||
if (OB_ENTRY_NOT_EXIST != ret) {
|
||||
LOG_ERROR("tenant_info get_table_meta failed", KR(ret), K(table_id), K(old_table_meta));
|
||||
} else {
|
||||
// Does not exist locally, insert directly
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
} else {
|
||||
// Exist locally, free it
|
||||
if (OB_FAIL(free_dict_table_meta(old_table_meta))) {
|
||||
LOG_ERROR("free_dict_table_meta failed", KR(ret), K(table_id));
|
||||
} else if (OB_FAIL(table_map_.erase(meta_data_key))) {
|
||||
LOG_ERROR("db_map_ erase failed", KR(ret), K(meta_data_key));
|
||||
} else {
|
||||
LOG_INFO("remove_table_meta success", K(table_id));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDictTenantInfo::get_tenant_schema_info(TenantSchemaInfo &tenant_schema_info)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
@ -172,6 +172,7 @@ public:
|
||||
int free_dict_table_meta(datadict::ObDictTableMeta *dict_table_meta);
|
||||
int insert_dict_table_meta(datadict::ObDictTableMeta *dict_table_meta);
|
||||
int replace_dict_table_meta(const datadict::ObDictTableMeta &new_dict_table_meta);
|
||||
int remove_table_meta(const uint64_t table_id);
|
||||
|
||||
// Get TenantSchemaInfo
|
||||
int get_tenant_schema_info(TenantSchemaInfo &tenant_schema_info);
|
||||
|
||||
@ -38,6 +38,7 @@ ObLogPartTransParser::ObLogPartTransParser() :
|
||||
inited_(false),
|
||||
br_pool_(NULL),
|
||||
meta_manager_(NULL),
|
||||
all_ddl_operation_table_schema_info_(),
|
||||
cluster_id_(OB_INVALID_CLUSTER_ID),
|
||||
total_log_size_(0),
|
||||
remaining_log_size_(0),
|
||||
@ -55,6 +56,7 @@ void ObLogPartTransParser::destroy()
|
||||
cluster_id_ = OB_INVALID_CLUSTER_ID;
|
||||
br_pool_ = NULL;
|
||||
meta_manager_ = NULL;
|
||||
all_ddl_operation_table_schema_info_.reset();
|
||||
}
|
||||
|
||||
int ObLogPartTransParser::init(
|
||||
@ -71,6 +73,8 @@ int ObLogPartTransParser::init(
|
||||
|| OB_UNLIKELY(OB_INVALID_CLUSTER_ID == cluster_id)) {
|
||||
LOG_ERROR("invalid argument", K(br_pool), K(meta_manager), K(cluster_id));
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
} else if (OB_FAIL(all_ddl_operation_table_schema_info_.init())) {
|
||||
LOG_ERROR("init all ddl operation table schema info failed", KR(ret));
|
||||
} else {
|
||||
cluster_id_ = cluster_id;
|
||||
inited_ = true;
|
||||
@ -104,7 +108,7 @@ void ObLogPartTransParser::print_stat_info()
|
||||
SIZE_TO_STR(total_traffic), SIZE_TO_STR(remaining_traffic), SIZE_TO_STR(filtered_out_traffic));
|
||||
}
|
||||
|
||||
int ObLogPartTransParser::parse(PartTransTask &task, volatile bool &stop_flag)
|
||||
int ObLogPartTransParser::parse(PartTransTask &task, const bool is_build_baseline, volatile bool &stop_flag)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
@ -119,10 +123,9 @@ int ObLogPartTransParser::parse(PartTransTask &task, volatile bool &stop_flag)
|
||||
LOG_ERROR("invalid task", KR(ret), K(task));
|
||||
} else {
|
||||
const SortedRedoLogList &sorted_redo_list = task.get_sorted_redo_list();
|
||||
|
||||
// Parse Redo logs if they exist
|
||||
if (sorted_redo_list.log_num_ > 0 && OB_FAIL(parse_ddl_redo_log_(task, stop_flag))) {
|
||||
LOG_ERROR("parse_ddl_redo_log_ fail", KR(ret), K(task));
|
||||
if (sorted_redo_list.log_num_ > 0 && OB_FAIL(parse_ddl_redo_log_(task, is_build_baseline, stop_flag))) {
|
||||
LOG_ERROR("parse_ddl_redo_log_ fail", KR(ret), K(task), K(is_build_baseline));
|
||||
}
|
||||
}
|
||||
|
||||
@ -133,6 +136,7 @@ int ObLogPartTransParser::parse(ObLogEntryTask &task, volatile bool &stop_flag)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
PartTransTask *part_trans_task = NULL;
|
||||
const bool is_build_baseline = false;
|
||||
|
||||
if (OB_UNLIKELY(! inited_)) {
|
||||
LOG_ERROR("not init", K(inited_));
|
||||
@ -179,9 +183,10 @@ int ObLogPartTransParser::parse(ObLogEntryTask &task, volatile bool &stop_flag)
|
||||
} else if (OB_UNLIKELY(! redo_node->check_data_integrity())) {
|
||||
ret = OB_INVALID_DATA;
|
||||
LOG_ERROR("redo data is not valid", KR(ret), KPC(redo_node));
|
||||
} else if (OB_FAIL(parse_stmts_(tenant, *redo_node,
|
||||
} else if (OB_FAIL(parse_stmts_(tenant, *redo_node, is_build_baseline,
|
||||
task, *part_trans_task, row_index, stop_flag))) {
|
||||
LOG_ERROR("parse_stmts_ fail", KR(ret), K(tenant), KPC(redo_node), K(task), K(row_index));
|
||||
LOG_ERROR("parse_stmts_ fail", KR(ret), K(tenant), KPC(redo_node),
|
||||
K(is_build_baseline), K(task), K(row_index));
|
||||
} else {
|
||||
ATOMIC_AAF(&total_log_size_, redo_node->get_data_len());
|
||||
LOG_DEBUG("[PARSE] LogEntryTask parse succ", K(task));
|
||||
@ -192,7 +197,7 @@ int ObLogPartTransParser::parse(ObLogEntryTask &task, volatile bool &stop_flag)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLogPartTransParser::parse_ddl_redo_log_(PartTransTask &task, volatile bool &stop_flag)
|
||||
int ObLogPartTransParser::parse_ddl_redo_log_(PartTransTask &task, const bool is_build_baseline, volatile bool &stop_flag)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t redo_num = 0;
|
||||
@ -213,7 +218,7 @@ int ObLogPartTransParser::parse_ddl_redo_log_(PartTransTask &task, volatile bool
|
||||
|
||||
// DDL data/non-PG partitioned data need to be deserialized in whole rows, not filtered
|
||||
// otherwise need to get tenant structure and perform filtering
|
||||
if (! should_not_filter_row_(task)) {
|
||||
if (! should_not_filter_row_(task) && !is_build_baseline) {
|
||||
if (OB_FAIL(TCTX.get_tenant_guard(tenant_id, guard))) {
|
||||
// tenant must exist here
|
||||
LOG_ERROR("get_tenant_guard fail", KR(ret), K(tenant_id));
|
||||
@ -237,9 +242,10 @@ int ObLogPartTransParser::parse_ddl_redo_log_(PartTransTask &task, volatile bool
|
||||
else if (OB_UNLIKELY(! redo_node->check_data_integrity())) {
|
||||
LOG_ERROR("redo data is not valid", KPC(redo_node));
|
||||
ret = OB_INVALID_DATA;
|
||||
} else if (OB_FAIL(parse_stmts_(tenant, *redo_node,
|
||||
invalid_redo_log_entry_task, task, row_index, stop_flag))) {
|
||||
LOG_ERROR("parse_stmts_ fail", KR(ret), K(tenant), "redo_node", *redo_node, K(task), K(row_index));
|
||||
} else if (OB_FAIL(parse_stmts_(tenant, *redo_node, is_build_baseline,
|
||||
invalid_redo_log_entry_task, task, row_index, stop_flag))) {
|
||||
LOG_ERROR("parse_stmts_ fail", KR(ret), K(tenant), "redo_node", *redo_node,
|
||||
K(is_build_baseline), K(task), K(row_index));
|
||||
} else {
|
||||
redo_num += redo_node->get_log_num();
|
||||
redo_node = static_cast<DdlRedoLogNode *>(redo_node->get_next());
|
||||
@ -254,6 +260,7 @@ int ObLogPartTransParser::parse_ddl_redo_log_(PartTransTask &task, volatile bool
|
||||
int ObLogPartTransParser::parse_stmts_(
|
||||
ObLogTenant *tenant,
|
||||
const RedoLogMetaNode &redo_log_node,
|
||||
const bool is_build_baseline,
|
||||
ObLogEntryTask &redo_log_entry_task,
|
||||
PartTransTask &task,
|
||||
uint64_t &row_index,
|
||||
@ -263,7 +270,7 @@ int ObLogPartTransParser::parse_stmts_(
|
||||
const char *redo_data = redo_log_node.get_data();
|
||||
const int64_t redo_data_len = redo_log_node.get_data_len();
|
||||
|
||||
if (OB_ISNULL(tenant) || OB_ISNULL(redo_data) || OB_UNLIKELY(redo_data_len <= 0)) {
|
||||
if ((OB_ISNULL(tenant) && !is_build_baseline) || OB_ISNULL(redo_data) || OB_UNLIKELY(redo_data_len <= 0)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_ERROR("invalid argument", KR(ret), KPC(tenant), K(redo_data), K(redo_data_len), K(task), K(redo_log_entry_task));
|
||||
} else {
|
||||
@ -296,6 +303,7 @@ int ObLogPartTransParser::parse_stmts_(
|
||||
tablet_id,
|
||||
redo_data,
|
||||
redo_data_len,
|
||||
is_build_baseline,
|
||||
pos,
|
||||
task,
|
||||
redo_log_entry_task,
|
||||
@ -305,11 +313,11 @@ int ObLogPartTransParser::parse_stmts_(
|
||||
LOG_ERROR("parse_mutator_row_ failed", KR(ret),
|
||||
"tls_id", task.get_tls_id(),
|
||||
"trans_id", task.get_trans_id(),
|
||||
K(tablet_id), K(redo_log_entry_task), K(row_index));
|
||||
K(tablet_id), K(redo_log_entry_task), K(is_build_baseline), K(row_index));
|
||||
} else if (! is_ignored) {
|
||||
// parse row data
|
||||
if (is_ddl_trans) {
|
||||
if (is_all_ddl_operation_lob_aux_tablet(task.get_ls_id(), tablet_id)) {
|
||||
if (!is_build_baseline && is_all_ddl_operation_lob_aux_tablet(task.get_ls_id(), tablet_id)) {
|
||||
LOG_INFO("is_all_ddl_operation_lob_aux_tablet", "tls_id", task.get_tls_id(),
|
||||
"trans_id", task.get_trans_id(), K(tablet_id));
|
||||
|
||||
@ -320,7 +328,8 @@ int ObLogPartTransParser::parse_stmts_(
|
||||
// data in non ddl table already filtered while parse_mutator_row_
|
||||
} else if (OB_FAIL(parse_ddl_stmts_(
|
||||
row_index,
|
||||
tenant->get_all_ddl_operation_schema_info(),
|
||||
all_ddl_operation_table_schema_info_,
|
||||
is_build_baseline,
|
||||
*row,
|
||||
task,
|
||||
stop_flag))) {
|
||||
@ -405,6 +414,7 @@ int ObLogPartTransParser::parse_mutator_row_(
|
||||
const ObTabletID &tablet_id,
|
||||
const char *redo_data,
|
||||
const int64_t redo_data_len,
|
||||
const bool is_build_baseline,
|
||||
int64_t &pos,
|
||||
PartTransTask &part_trans_task,
|
||||
ObLogEntryTask &redo_log_entry_task,
|
||||
@ -413,7 +423,7 @@ int ObLogPartTransParser::parse_mutator_row_(
|
||||
bool &is_ignored)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
IObLogPartMgr &part_mgr = tenant->get_part_mgr();
|
||||
|
||||
is_ignored = false;
|
||||
row = NULL;
|
||||
bool need_rollback = false;
|
||||
@ -443,16 +453,23 @@ int ObLogPartTransParser::parse_mutator_row_(
|
||||
|| is_all_ddl_operation_lob_aux_tablet(part_trans_task.get_ls_id(), tablet_id))) {
|
||||
need_filter = true;
|
||||
filter_reason = "NON_DDL_RELATED_TABLE";
|
||||
} else if (part_trans_task.is_ddl_trans() && is_build_baseline
|
||||
&& is_all_ddl_operation_lob_aux_tablet(part_trans_task.get_ls_id(), tablet_id)) {
|
||||
need_filter = true;
|
||||
filter_reason = "DDL_OPERATION_LOB_AUX_TABLE_IN_BUILD_BASELINE";
|
||||
} else if (part_trans_task.is_ddl_trans()) {
|
||||
// do nothing, ddl trans should not be filtered
|
||||
} else if (OB_FAIL(part_mgr.is_exist_table_id_cache(table_info.get_table_id(), is_in_table_id_cache))) {
|
||||
LOG_ERROR("check is_exist_table_id_cache failed", KR(ret),
|
||||
"tls_id", part_trans_task.get_tls_id(),
|
||||
"trans_id", part_trans_task.get_trans_id(),
|
||||
K(tablet_id), K(table_info));
|
||||
} else {
|
||||
need_filter = ! is_in_table_id_cache;
|
||||
filter_reason = "NOT_EXIST_IN_TB_ID_CACHE";
|
||||
IObLogPartMgr &part_mgr = tenant->get_part_mgr();
|
||||
if (OB_FAIL(part_mgr.is_exist_table_id_cache(table_info.get_table_id(), is_in_table_id_cache))) {
|
||||
LOG_ERROR("check is_exist_table_id_cache failed", KR(ret),
|
||||
"tls_id", part_trans_task.get_tls_id(),
|
||||
"trans_id", part_trans_task.get_trans_id(),
|
||||
K(tablet_id), K(table_info));
|
||||
} else {
|
||||
need_filter = ! is_in_table_id_cache;
|
||||
filter_reason = "NOT_EXIST_IN_TB_ID_CACHE";
|
||||
}
|
||||
}
|
||||
|
||||
if (need_filter) {
|
||||
@ -461,7 +478,8 @@ int ObLogPartTransParser::parse_mutator_row_(
|
||||
"trans_id", part_trans_task.get_trans_id(),
|
||||
K(tablet_id),
|
||||
K(table_info),
|
||||
K(filter_reason));
|
||||
K(filter_reason),
|
||||
K(is_build_baseline));
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
@ -708,6 +726,7 @@ bool ObLogPartTransParser::should_not_filter_row_(PartTransTask &task)
|
||||
int ObLogPartTransParser::parse_ddl_stmts_(
|
||||
const uint64_t row_index,
|
||||
const ObLogAllDdlOperationSchemaInfo &all_ddl_operation_table_schema,
|
||||
const bool is_build_baseline,
|
||||
MutatorRow &row,
|
||||
PartTransTask &task,
|
||||
volatile bool &stop_flag)
|
||||
@ -739,10 +758,10 @@ int ObLogPartTransParser::parse_ddl_stmts_(
|
||||
|
||||
// Parsing DDL statement information
|
||||
bool is_valid_ddl = false;
|
||||
if (OB_FAIL(stmt_task->parse_ddl_info(br, row_index, all_ddl_operation_table_schema,
|
||||
if (OB_FAIL(stmt_task->parse_ddl_info(br, row_index, all_ddl_operation_table_schema, is_build_baseline,
|
||||
is_valid_ddl, update_schema_version, exec_tennat_id, stop_flag))) {
|
||||
LOG_ERROR("parse_ddl_info fail", KR(ret), K(*stmt_task), K(br), K(row_index), K(is_valid_ddl),
|
||||
K(update_schema_version), K(exec_tennat_id));
|
||||
LOG_ERROR("parse_ddl_info fail", KR(ret), K(*stmt_task), K(br), K(row_index), K(is_build_baseline),
|
||||
K(is_valid_ddl), K(update_schema_version), K(exec_tennat_id));
|
||||
} else if (! is_valid_ddl) {
|
||||
// Discard invalid DDL statement tasks
|
||||
stmt_task->~DdlStmtTask();
|
||||
|
||||
@ -34,7 +34,9 @@ public:
|
||||
enum { DATA_OP_TIMEOUT = 200 * 1000 };
|
||||
|
||||
public:
|
||||
virtual int parse(PartTransTask &task, volatile bool &stop_flag) = 0;
|
||||
// is_build_baseline: in data_dict refresh mode and build baseline stage, we need ddl parser
|
||||
// to parse ddl stmt and delete dropped table
|
||||
virtual int parse(PartTransTask &task, const bool is_build_baseline, volatile bool &stop_flag) = 0;
|
||||
|
||||
virtual int parse(ObLogEntryTask &task, volatile bool &stop_flag) = 0;
|
||||
|
||||
@ -55,7 +57,7 @@ public:
|
||||
virtual ~ObLogPartTransParser();
|
||||
|
||||
public:
|
||||
virtual int parse(PartTransTask &task, volatile bool &stop_flag);
|
||||
virtual int parse(PartTransTask &task, const bool is_build_baseline, volatile bool &stop_flag);
|
||||
virtual int parse(ObLogEntryTask &task, volatile bool &stop_flag);
|
||||
|
||||
public:
|
||||
@ -71,10 +73,11 @@ private:
|
||||
const PartTransTask &part_trans_task,
|
||||
const MutatorRow &row,
|
||||
bool &need_rollback);
|
||||
int parse_ddl_redo_log_(PartTransTask &task, volatile bool &stop_flag);
|
||||
int parse_ddl_redo_log_(PartTransTask &task, const bool is_build_baseline, volatile bool &stop_flag);
|
||||
int parse_stmts_(
|
||||
ObLogTenant *tenant,
|
||||
const RedoLogMetaNode &redo_log_node,
|
||||
const bool is_build_baseline,
|
||||
ObLogEntryTask &redo_log_entry_task,
|
||||
PartTransTask &task,
|
||||
uint64_t &row_index,
|
||||
@ -120,6 +123,7 @@ private:
|
||||
int parse_ddl_stmts_(
|
||||
const uint64_t row_index,
|
||||
const ObLogAllDdlOperationSchemaInfo &all_ddl_operation_table_schema,
|
||||
const bool is_build_baseline,
|
||||
MutatorRow &row,
|
||||
PartTransTask &task,
|
||||
volatile bool &stop_flag);
|
||||
@ -148,6 +152,7 @@ private:
|
||||
const ObTabletID &tablet_id,
|
||||
const char *redo_data,
|
||||
const int64_t redo_data_len,
|
||||
const bool is_build_baseline,
|
||||
int64_t &pos,
|
||||
PartTransTask &part_trans_task,
|
||||
ObLogEntryTask &redo_log_entry_task,
|
||||
@ -158,6 +163,7 @@ private:
|
||||
bool inited_;
|
||||
IObLogBRPool *br_pool_;
|
||||
IObLogMetaManager *meta_manager_;
|
||||
ObLogAllDdlOperationSchemaInfo all_ddl_operation_table_schema_info_;
|
||||
|
||||
// The cluster ID of this cluster
|
||||
// Set as the unique ID of the DDL
|
||||
|
||||
@ -1286,6 +1286,7 @@ int DdlStmtTask::parse_ddl_info(
|
||||
ObLogBR *br,
|
||||
const uint64_t row_index,
|
||||
const ObLogAllDdlOperationSchemaInfo &all_ddl_operation_table_schema_info,
|
||||
const bool is_build_baseline,
|
||||
bool &is_valid_ddl,
|
||||
int64_t &update_schema_version,
|
||||
uint64_t &exec_tenant_id,
|
||||
@ -1311,7 +1312,7 @@ int DdlStmtTask::parse_ddl_info(
|
||||
false,
|
||||
&all_ddl_operation_table_schema_info))) {
|
||||
LOG_ERROR("parse columns fail", KR(ret), K(row_));
|
||||
} else if (OB_FAIL(parse_ddl_info_(contain_ddl_stmt, update_schema_version, stop_flag))) {
|
||||
} else if (OB_FAIL(parse_ddl_info_(is_build_baseline, contain_ddl_stmt, update_schema_version, stop_flag))) {
|
||||
if (OB_INVALID_DATA == ret) {
|
||||
// If invalid data is encountered, the log is printed but the dirty data is ignored
|
||||
LOG_ERROR("fail to parse DDL, __all_ddl_operation table data is invalid",
|
||||
@ -1428,6 +1429,7 @@ int DdlStmtTask::parse_ddl_info(
|
||||
}
|
||||
|
||||
int DdlStmtTask::parse_ddl_info_(
|
||||
const bool is_build_baseline,
|
||||
bool &contain_ddl_stmt,
|
||||
int64_t &update_schema_version,
|
||||
volatile bool &stop_flag)
|
||||
@ -1444,7 +1446,7 @@ int DdlStmtTask::parse_ddl_info_(
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
} else {
|
||||
PartTransTask &part_trans_task = get_host();
|
||||
if (nullptr != new_lob_ctx_cols && new_lob_ctx_cols->has_out_row_lob()) {
|
||||
if (nullptr != new_lob_ctx_cols && new_lob_ctx_cols->has_out_row_lob() && !is_build_baseline) {
|
||||
new_lob_ctx_cols->reset(
|
||||
this,
|
||||
part_trans_task.get_tenant_id(),
|
||||
@ -1504,8 +1506,8 @@ int DdlStmtTask::parse_ddl_info_(
|
||||
update_schema_version = ddl_op_schema_version_;
|
||||
|
||||
// parse normal columns
|
||||
if (OB_FAIL(parse_ddl_info_from_normal_columns_(*new_cols, *new_lob_ctx_cols))) {
|
||||
LOG_ERROR("parse_ddl_info_from_normal_columns_ fail", KR(ret), K(*new_cols), K(*new_lob_ctx_cols));
|
||||
if (OB_FAIL(parse_ddl_info_from_normal_columns_(is_build_baseline, *new_cols, *new_lob_ctx_cols))) {
|
||||
LOG_ERROR("parse_ddl_info_from_normal_columns_ fail", KR(ret), K(is_build_baseline), K(*new_cols), K(*new_lob_ctx_cols));
|
||||
} else {
|
||||
// verify parse result
|
||||
if (ddl_stmt_str_.empty()) {
|
||||
@ -1660,6 +1662,7 @@ int DdlStmtTask::parse_schema_version_(ObObj &value, int64_t &schema_version)
|
||||
}
|
||||
|
||||
int DdlStmtTask::parse_ddl_info_from_normal_columns_(
|
||||
const bool is_build_baseline,
|
||||
ColValueList &col_value_list,
|
||||
ObLobDataOutRowCtxList &new_lob_ctx_cols)
|
||||
{
|
||||
@ -1732,6 +1735,8 @@ int DdlStmtTask::parse_ddl_info_from_normal_columns_(
|
||||
case ALL_DDL_OPERATION_TABLE_DDL_STMT_STR_COLUMN_ID: {
|
||||
if (! cv_node->is_out_row_) {
|
||||
ddl_stmt_str_ = value.get_varchar();
|
||||
} else if (is_build_baseline) {
|
||||
// do nothing
|
||||
} else {
|
||||
ObString *new_col_str = nullptr;
|
||||
|
||||
|
||||
@ -534,6 +534,7 @@ public:
|
||||
ObLogBR *br,
|
||||
const uint64_t row_index,
|
||||
const ObLogAllDdlOperationSchemaInfo &all_ddl_operation_table_schema_info,
|
||||
const bool is_build_baseline,
|
||||
bool &is_valid_ddl,
|
||||
int64_t &update_schema_version,
|
||||
uint64_t &exec_tennat_id,
|
||||
@ -573,11 +574,13 @@ public:
|
||||
|
||||
private:
|
||||
int parse_ddl_info_(
|
||||
const bool is_build_baseline,
|
||||
bool &contain_ddl_stmt,
|
||||
int64_t &update_schema_version,
|
||||
volatile bool &stop_flag);
|
||||
int parse_schema_version_(ObObj &col_value, int64_t &schema_version);
|
||||
int parse_ddl_info_from_normal_columns_(
|
||||
const bool is_build_baseline,
|
||||
ColValueList &col_value_list,
|
||||
ObLobDataOutRowCtxList &new_lob_ctx_cols);
|
||||
// 1. schema non-split mode returns the pure_id itself
|
||||
|
||||
@ -1672,6 +1672,22 @@ int read_from_file(const char *file_path, char *buf, const int64_t buf_len)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int convert_to_compat_mode(const common::ObCompatibilityMode &compatible_mode,
|
||||
lib::Worker::CompatMode &compat_mode)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (common::ObCompatibilityMode::MYSQL_MODE == compatible_mode) {
|
||||
compat_mode = lib::Worker::CompatMode::MYSQL;
|
||||
} else if (common::ObCompatibilityMode::ORACLE_MODE == compatible_mode) {
|
||||
compat_mode = lib::Worker::CompatMode::ORACLE;
|
||||
} else {
|
||||
ret = OB_INVALID_DATA;
|
||||
LOG_ERROR("invalid compatible_mode", KR(ret), K(compatible_mode));
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////
|
||||
|
||||
} // namespace libocdc
|
||||
|
||||
@ -675,6 +675,10 @@ int read_from_file(const char *file_path, char *buf, const int64_t buf_len);
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
// convert to compat mode
|
||||
int convert_to_compat_mode(const common::ObCompatibilityMode &compatible_mode,
|
||||
lib::Worker::CompatMode &compat_mode);
|
||||
|
||||
} // namespace libobcdc
|
||||
} // namespace oceanbase
|
||||
#endif /* OCEANBASE_LIBOBCDC_UTILS_H__ */
|
||||
|
||||
Reference in New Issue
Block a user