diff --git a/src/logservice/libobcdc/src/ob_log_ddl_parser.cpp b/src/logservice/libobcdc/src/ob_log_ddl_parser.cpp index bff5a0da02..d42fa1fb38 100644 --- a/src/logservice/libobcdc/src/ob_log_ddl_parser.cpp +++ b/src/logservice/libobcdc/src/ob_log_ddl_parser.cpp @@ -168,10 +168,12 @@ int ObLogDdlParser::handle(void *data, "compat_mode", print_compat_mode(compat_mode), KPC(task)); } else { lib::CompatModeGuard g(compat_mode); + const bool is_build_baseline = false; // Parse DDL task - if (OB_FAIL(part_trans_parser_->parse(*task, stop_flag))) { - LOG_ERROR("parse DDL task fail", KR(ret), KPC(task), "compat_mode", print_compat_mode(compat_mode)); + if (OB_FAIL(part_trans_parser_->parse(*task, is_build_baseline, stop_flag))) { + LOG_ERROR("parse DDL task fail", KR(ret), KPC(task), K(is_build_baseline), + "compat_mode", print_compat_mode(compat_mode)); } else { // The DDL task does not need to go through the formatter module, and here the formatting is set to complete directly // DDL Handler directly waits for formatting to complete or not diff --git a/src/logservice/libobcdc/src/ob_log_instance.cpp b/src/logservice/libobcdc/src/ob_log_instance.cpp index 2913dc24c8..4a172e68f6 100644 --- a/src/logservice/libobcdc/src/ob_log_instance.cpp +++ b/src/logservice/libobcdc/src/ob_log_instance.cpp @@ -998,7 +998,7 @@ int ObLogInstance::init_components_(const uint64_t start_tstamp_ns) if (OB_SUCC(ret)) { if (is_data_dict_refresh_mode(refresh_mode_)) { if (OB_FAIL(ObLogMetaDataService::get_instance().init(start_tstamp_ns, fetching_mode, archive_dest, - sys_ls_handler_, &mysql_proxy_.get_ob_mysql_proxy(), err_handler, + sys_ls_handler_, &mysql_proxy_.get_ob_mysql_proxy(), err_handler, *part_trans_parser_, cluster_info.cluster_id_, TCONF, start_seq))) { LOG_ERROR("ObLogMetaDataService init failed", KR(ret), K(start_tstamp_ns)); } diff --git a/src/logservice/libobcdc/src/ob_log_meta_data_replayer.cpp b/src/logservice/libobcdc/src/ob_log_meta_data_replayer.cpp index 7c7ec67414..a7ac9f8a5f 100644 --- a/src/logservice/libobcdc/src/ob_log_meta_data_replayer.cpp +++ b/src/logservice/libobcdc/src/ob_log_meta_data_replayer.cpp @@ -28,7 +28,8 @@ namespace libobcdc ObLogMetaDataReplayer::ObLogMetaDataReplayer() : is_inited_(false), queue_(), - schema_inc_replay_() + schema_inc_replay_(), + part_trans_parser_(NULL) { } @@ -37,7 +38,7 @@ ObLogMetaDataReplayer::~ObLogMetaDataReplayer() destroy(); } -int ObLogMetaDataReplayer::init() +int ObLogMetaDataReplayer::init(IObLogPartTransParser &part_trans_parser) { int ret = OB_SUCCESS; @@ -47,6 +48,7 @@ int ObLogMetaDataReplayer::init() } else if (OB_FAIL(schema_inc_replay_.init(true/*is_start_progress*/))) { LOG_ERROR("schema_inc_replay_ init failed", KR(ret)); } else { + part_trans_parser_ = &part_trans_parser; is_inited_ = true; } @@ -58,6 +60,7 @@ void ObLogMetaDataReplayer::destroy() if (IS_INIT) { queue_.reset(); schema_inc_replay_.destroy(); + part_trans_parser_ = NULL; is_inited_ = false; } } @@ -154,6 +157,7 @@ int ObLogMetaDataReplayer::handle_ddl_trans_( ReplayInfoStat &replay_info_stat) { int ret = OB_SUCCESS; + bool stop_flag = false; if (OB_UNLIKELY(! part_trans_task.is_ddl_trans())) { ret = OB_ERR_UNEXPECTED; @@ -173,12 +177,57 @@ int ObLogMetaDataReplayer::handle_ddl_trans_( DictTenantArray &tenant_metas = part_trans_task.get_dict_tenant_array(); DictDatabaseArray &database_metas = part_trans_task.get_dict_database_array(); DictTableArray &table_metas = part_trans_task.get_dict_table_array(); + const common::ObCompatibilityMode &compatible_mode = tenant_info.get_compatibility_mode(); + lib::Worker::CompatMode compat_mode = lib::Worker::CompatMode::INVALID; - if (OB_FAIL(schema_inc_replay_.replay(part_trans_task, tenant_metas, database_metas, table_metas, tenant_info))) { + if (OB_FAIL(schema_inc_replay_.replay(part_trans_task, tenant_metas, database_metas, + table_metas, tenant_info))) { LOG_ERROR("schema_inc_replay_ replay failed", KR(ret), K(part_trans_task), K(tenant_info)); - } else {} - } + } else if (OB_FAIL(convert_to_compat_mode(compatible_mode, compat_mode))) { + LOG_ERROR("convert to compat mode fail", KR(ret), K(compatible_mode)); + } else { + lib::CompatModeGuard g(compat_mode); + bool is_build_baseline = true; + if (OB_FAIL(part_trans_parser_->parse(part_trans_task, is_build_baseline, stop_flag))) { + LOG_ERROR("parse DDL task fail", KR(ret), K(part_trans_task), K(is_build_baseline)); + } else { + // Iterate through each statement of the DDL + IStmtTask *stmt_task = part_trans_task.get_stmt_list().head_; + while (NULL != stmt_task && OB_SUCCESS == ret) { + DdlStmtTask *ddl_stmt = dynamic_cast(stmt_task); + if (OB_UNLIKELY(! stmt_task->is_ddl_stmt()) || OB_ISNULL(ddl_stmt)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("invalid DDL statement", KR(ret), KPC(stmt_task), K(ddl_stmt), + "trans_id", part_trans_task.get_trans_id()); + } else { + const ObSchemaOperationType op_type = (ObSchemaOperationType) ddl_stmt->get_operation_type(); + const uint64_t op_table_id = ddl_stmt->get_op_table_id(); + if (need_remove_by_op_type_(op_type)) { + if (OB_FAIL(tenant_info.remove_table_meta(op_table_id))) { + LOG_ERROR("ddl stmt is DROP_TABLE or DROP_INDEX and remove table meta failed", KR(ret), + K(op_table_id), KPC(ddl_stmt), "trans_id", part_trans_task.get_trans_id()); + } else { + ISTAT("remove table meta success", + "ddl_op_tenant_id", ddl_stmt->get_op_tenant_id(), + "ddl_op_databse_id", ddl_stmt->get_op_database_id(), + "ddl_op_table_id", ddl_stmt->get_op_table_id(), + "ddl_op_tablegroup_id", ddl_stmt->get_op_tablegroup_id(), + "ddl_operation_type", ddl_stmt->get_operation_type(), + "ddl_op_schema_version", ddl_stmt->get_op_schema_version(), + "ddl_stmt_str", ddl_stmt->get_ddl_stmt_str(), + "ddl_exec_tenant_id", ddl_stmt->get_exec_tenant_id(), + "trans_id", part_trans_task.get_trans_id()); + } + } + if (OB_SUCC(ret)) { + stmt_task = stmt_task->get_next(); + } + } + } // while + } + } // else replay + } } else { ISTAT("ignore DDL_TRANS PartTransTask which trans commit verison is greater than start_timestamp_ns", "tenant_id", part_trans_task.get_tenant_id(), diff --git a/src/logservice/libobcdc/src/ob_log_meta_data_replayer.h b/src/logservice/libobcdc/src/ob_log_meta_data_replayer.h index 22921d1c3f..9c20255572 100644 --- a/src/logservice/libobcdc/src/ob_log_meta_data_replayer.h +++ b/src/logservice/libobcdc/src/ob_log_meta_data_replayer.h @@ -17,6 +17,7 @@ #include "ob_log_part_trans_task_queue.h" // SafePartTransTaskQueue #include "ob_log_meta_data_struct.h" // ObDictTenantInfo #include "ob_log_schema_incremental_replay.h" // ObLogSchemaIncReplay +#include "ob_log_part_trans_parser.h" // IObLogPartTransParser namespace oceanbase { @@ -50,7 +51,7 @@ public: ObDictTenantInfo &tenant_info); public: - int init(); + int init(IObLogPartTransParser &part_trans_parser); void destroy(); private: @@ -86,10 +87,16 @@ private: PartTransTask &part_trans_task, ReplayInfoStat &replay_info_stat); + bool need_remove_by_op_type_(const ObSchemaOperationType op_type) + { + return OB_DDL_DROP_TABLE == op_type || OB_DDL_DROP_INDEX == op_type || OB_DDL_DROP_GLOBAL_INDEX == op_type; + } + private: bool is_inited_; SafePartTransTaskQueue queue_; ObLogSchemaIncReplay schema_inc_replay_; + IObLogPartTransParser *part_trans_parser_; DISALLOW_COPY_AND_ASSIGN(ObLogMetaDataReplayer); }; diff --git a/src/logservice/libobcdc/src/ob_log_meta_data_service.cpp b/src/logservice/libobcdc/src/ob_log_meta_data_service.cpp index fd15696326..f99163ab3e 100644 --- a/src/logservice/libobcdc/src/ob_log_meta_data_service.cpp +++ b/src/logservice/libobcdc/src/ob_log_meta_data_service.cpp @@ -18,6 +18,7 @@ #include "logservice/restoreservice/ob_log_archive_piece_mgr.h" #include "logservice/data_dictionary/ob_data_dict_meta_info.h" #include "logservice/archiveservice/ob_archive_define.h" +#include "ob_log_part_trans_parser.h" #define _STAT(level, fmt, args...) _OBLOG_LOG(level, "[LOG_META_DATA] [SERVICE] " fmt, ##args) @@ -42,7 +43,8 @@ ObLogMetaDataService::ObLogMetaDataService() : fetcher_(), baseline_loader_(), incremental_replayer_(), - fetcher_dispatcher_() + fetcher_dispatcher_(), + part_trans_parser_(NULL) { } @@ -58,6 +60,7 @@ int ObLogMetaDataService::init( IObLogSysLsTaskHandler *sys_ls_handler, common::ObMySQLProxy *proxy, IObLogErrHandler *err_handler, + IObLogPartTransParser &part_trans_parser, const int64_t cluster_id, const ObLogConfig &cfg, const int64_t start_seq) @@ -69,7 +72,7 @@ int ObLogMetaDataService::init( LOG_ERROR("init twice", KR(ret)); } else if (OB_FAIL(baseline_loader_.init(cfg))) { LOG_ERROR("ObLogMetaDataBaselineLoader init fail", KR(ret)); - } else if (OB_FAIL(incremental_replayer_.init())) { + } else if (OB_FAIL(incremental_replayer_.init(part_trans_parser))) { LOG_ERROR("ObLogMetaDataReplayer init fail", KR(ret)); } else if (OB_FAIL(fetcher_dispatcher_.init(&incremental_replayer_, start_seq))) { LOG_ERROR("ObLogMetaDataFetcherDispatcher init fail", KR(ret)); @@ -94,6 +97,7 @@ void ObLogMetaDataService::destroy() baseline_loader_.destroy(); incremental_replayer_.destroy(); fetcher_dispatcher_.destroy(); + part_trans_parser_ = NULL; is_inited_ = false; } } diff --git a/src/logservice/libobcdc/src/ob_log_meta_data_service.h b/src/logservice/libobcdc/src/ob_log_meta_data_service.h index f33dd65f22..424ee21ff7 100644 --- a/src/logservice/libobcdc/src/ob_log_meta_data_service.h +++ b/src/logservice/libobcdc/src/ob_log_meta_data_service.h @@ -36,6 +36,7 @@ namespace libobcdc class IObLogSysLsTaskHandler; class ObLogSysTableHelper; class IObLogErrHandler; +class IObLogPartTransParser; class ObLogMetaDataService { @@ -52,6 +53,7 @@ public: IObLogSysLsTaskHandler *sys_ls_handler, common::ObMySQLProxy *proxy, IObLogErrHandler *err_handler, + IObLogPartTransParser &part_trans_parser, const int64_t cluster_id, const ObLogConfig &cfg, const int64_t start_seq); @@ -117,6 +119,7 @@ private: ObLogMetaDataBaselineLoader baseline_loader_; ObLogMetaDataReplayer incremental_replayer_; ObLogMetaDataFetcherDispatcher fetcher_dispatcher_; + IObLogPartTransParser *part_trans_parser_; DISALLOW_COPY_AND_ASSIGN(ObLogMetaDataService); }; diff --git a/src/logservice/libobcdc/src/ob_log_meta_data_struct.cpp b/src/logservice/libobcdc/src/ob_log_meta_data_struct.cpp index c69ff87df6..36838da167 100644 --- a/src/logservice/libobcdc/src/ob_log_meta_data_struct.cpp +++ b/src/logservice/libobcdc/src/ob_log_meta_data_struct.cpp @@ -335,6 +335,36 @@ int ObDictTenantInfo::replace_dict_table_meta( return ret; } +int ObDictTenantInfo::remove_table_meta(const uint64_t table_id) +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_ERROR("ObDictTenantInfo has not been initialized", KR(ret)); + } else { + MetaDataKey meta_data_key(table_id); + datadict::ObDictTableMeta *old_table_meta = nullptr; + if (OB_FAIL(get_table_meta(table_id, old_table_meta))) { + if (OB_ENTRY_NOT_EXIST != ret) { + LOG_ERROR("tenant_info get_table_meta failed", KR(ret), K(table_id), K(old_table_meta)); + } else { + // Does not exist locally, insert directly + ret = OB_SUCCESS; + } + } else { + // Exist locally, free it + if (OB_FAIL(free_dict_table_meta(old_table_meta))) { + LOG_ERROR("free_dict_table_meta failed", KR(ret), K(table_id)); + } else if (OB_FAIL(table_map_.erase(meta_data_key))) { + LOG_ERROR("db_map_ erase failed", KR(ret), K(meta_data_key)); + } else { + LOG_INFO("remove_table_meta success", K(table_id)); + } + } + } + return ret; +} + int ObDictTenantInfo::get_tenant_schema_info(TenantSchemaInfo &tenant_schema_info) { int ret = OB_SUCCESS; diff --git a/src/logservice/libobcdc/src/ob_log_meta_data_struct.h b/src/logservice/libobcdc/src/ob_log_meta_data_struct.h index ba4342f7f5..e26d541d0a 100644 --- a/src/logservice/libobcdc/src/ob_log_meta_data_struct.h +++ b/src/logservice/libobcdc/src/ob_log_meta_data_struct.h @@ -172,6 +172,7 @@ public: int free_dict_table_meta(datadict::ObDictTableMeta *dict_table_meta); int insert_dict_table_meta(datadict::ObDictTableMeta *dict_table_meta); int replace_dict_table_meta(const datadict::ObDictTableMeta &new_dict_table_meta); + int remove_table_meta(const uint64_t table_id); // Get TenantSchemaInfo int get_tenant_schema_info(TenantSchemaInfo &tenant_schema_info); diff --git a/src/logservice/libobcdc/src/ob_log_part_trans_parser.cpp b/src/logservice/libobcdc/src/ob_log_part_trans_parser.cpp index 30be1aa8e3..3d59038d60 100644 --- a/src/logservice/libobcdc/src/ob_log_part_trans_parser.cpp +++ b/src/logservice/libobcdc/src/ob_log_part_trans_parser.cpp @@ -38,6 +38,7 @@ ObLogPartTransParser::ObLogPartTransParser() : inited_(false), br_pool_(NULL), meta_manager_(NULL), + all_ddl_operation_table_schema_info_(), cluster_id_(OB_INVALID_CLUSTER_ID), total_log_size_(0), remaining_log_size_(0), @@ -55,6 +56,7 @@ void ObLogPartTransParser::destroy() cluster_id_ = OB_INVALID_CLUSTER_ID; br_pool_ = NULL; meta_manager_ = NULL; + all_ddl_operation_table_schema_info_.reset(); } int ObLogPartTransParser::init( @@ -71,6 +73,8 @@ int ObLogPartTransParser::init( || OB_UNLIKELY(OB_INVALID_CLUSTER_ID == cluster_id)) { LOG_ERROR("invalid argument", K(br_pool), K(meta_manager), K(cluster_id)); ret = OB_INVALID_ARGUMENT; + } else if (OB_FAIL(all_ddl_operation_table_schema_info_.init())) { + LOG_ERROR("init all ddl operation table schema info failed", KR(ret)); } else { cluster_id_ = cluster_id; inited_ = true; @@ -104,7 +108,7 @@ void ObLogPartTransParser::print_stat_info() SIZE_TO_STR(total_traffic), SIZE_TO_STR(remaining_traffic), SIZE_TO_STR(filtered_out_traffic)); } -int ObLogPartTransParser::parse(PartTransTask &task, volatile bool &stop_flag) +int ObLogPartTransParser::parse(PartTransTask &task, const bool is_build_baseline, volatile bool &stop_flag) { int ret = OB_SUCCESS; @@ -119,10 +123,9 @@ int ObLogPartTransParser::parse(PartTransTask &task, volatile bool &stop_flag) LOG_ERROR("invalid task", KR(ret), K(task)); } else { const SortedRedoLogList &sorted_redo_list = task.get_sorted_redo_list(); - // Parse Redo logs if they exist - if (sorted_redo_list.log_num_ > 0 && OB_FAIL(parse_ddl_redo_log_(task, stop_flag))) { - LOG_ERROR("parse_ddl_redo_log_ fail", KR(ret), K(task)); + if (sorted_redo_list.log_num_ > 0 && OB_FAIL(parse_ddl_redo_log_(task, is_build_baseline, stop_flag))) { + LOG_ERROR("parse_ddl_redo_log_ fail", KR(ret), K(task), K(is_build_baseline)); } } @@ -133,6 +136,7 @@ int ObLogPartTransParser::parse(ObLogEntryTask &task, volatile bool &stop_flag) { int ret = OB_SUCCESS; PartTransTask *part_trans_task = NULL; + const bool is_build_baseline = false; if (OB_UNLIKELY(! inited_)) { LOG_ERROR("not init", K(inited_)); @@ -179,9 +183,10 @@ int ObLogPartTransParser::parse(ObLogEntryTask &task, volatile bool &stop_flag) } else if (OB_UNLIKELY(! redo_node->check_data_integrity())) { ret = OB_INVALID_DATA; LOG_ERROR("redo data is not valid", KR(ret), KPC(redo_node)); - } else if (OB_FAIL(parse_stmts_(tenant, *redo_node, + } else if (OB_FAIL(parse_stmts_(tenant, *redo_node, is_build_baseline, task, *part_trans_task, row_index, stop_flag))) { - LOG_ERROR("parse_stmts_ fail", KR(ret), K(tenant), KPC(redo_node), K(task), K(row_index)); + LOG_ERROR("parse_stmts_ fail", KR(ret), K(tenant), KPC(redo_node), + K(is_build_baseline), K(task), K(row_index)); } else { ATOMIC_AAF(&total_log_size_, redo_node->get_data_len()); LOG_DEBUG("[PARSE] LogEntryTask parse succ", K(task)); @@ -192,7 +197,7 @@ int ObLogPartTransParser::parse(ObLogEntryTask &task, volatile bool &stop_flag) return ret; } -int ObLogPartTransParser::parse_ddl_redo_log_(PartTransTask &task, volatile bool &stop_flag) +int ObLogPartTransParser::parse_ddl_redo_log_(PartTransTask &task, const bool is_build_baseline, volatile bool &stop_flag) { int ret = OB_SUCCESS; int64_t redo_num = 0; @@ -213,7 +218,7 @@ int ObLogPartTransParser::parse_ddl_redo_log_(PartTransTask &task, volatile bool // DDL data/non-PG partitioned data need to be deserialized in whole rows, not filtered // otherwise need to get tenant structure and perform filtering - if (! should_not_filter_row_(task)) { + if (! should_not_filter_row_(task) && !is_build_baseline) { if (OB_FAIL(TCTX.get_tenant_guard(tenant_id, guard))) { // tenant must exist here LOG_ERROR("get_tenant_guard fail", KR(ret), K(tenant_id)); @@ -237,9 +242,10 @@ int ObLogPartTransParser::parse_ddl_redo_log_(PartTransTask &task, volatile bool else if (OB_UNLIKELY(! redo_node->check_data_integrity())) { LOG_ERROR("redo data is not valid", KPC(redo_node)); ret = OB_INVALID_DATA; - } else if (OB_FAIL(parse_stmts_(tenant, *redo_node, - invalid_redo_log_entry_task, task, row_index, stop_flag))) { - LOG_ERROR("parse_stmts_ fail", KR(ret), K(tenant), "redo_node", *redo_node, K(task), K(row_index)); + } else if (OB_FAIL(parse_stmts_(tenant, *redo_node, is_build_baseline, + invalid_redo_log_entry_task, task, row_index, stop_flag))) { + LOG_ERROR("parse_stmts_ fail", KR(ret), K(tenant), "redo_node", *redo_node, + K(is_build_baseline), K(task), K(row_index)); } else { redo_num += redo_node->get_log_num(); redo_node = static_cast(redo_node->get_next()); @@ -254,6 +260,7 @@ int ObLogPartTransParser::parse_ddl_redo_log_(PartTransTask &task, volatile bool int ObLogPartTransParser::parse_stmts_( ObLogTenant *tenant, const RedoLogMetaNode &redo_log_node, + const bool is_build_baseline, ObLogEntryTask &redo_log_entry_task, PartTransTask &task, uint64_t &row_index, @@ -263,7 +270,7 @@ int ObLogPartTransParser::parse_stmts_( const char *redo_data = redo_log_node.get_data(); const int64_t redo_data_len = redo_log_node.get_data_len(); - if (OB_ISNULL(tenant) || OB_ISNULL(redo_data) || OB_UNLIKELY(redo_data_len <= 0)) { + if ((OB_ISNULL(tenant) && !is_build_baseline) || OB_ISNULL(redo_data) || OB_UNLIKELY(redo_data_len <= 0)) { ret = OB_INVALID_ARGUMENT; LOG_ERROR("invalid argument", KR(ret), KPC(tenant), K(redo_data), K(redo_data_len), K(task), K(redo_log_entry_task)); } else { @@ -296,6 +303,7 @@ int ObLogPartTransParser::parse_stmts_( tablet_id, redo_data, redo_data_len, + is_build_baseline, pos, task, redo_log_entry_task, @@ -305,11 +313,11 @@ int ObLogPartTransParser::parse_stmts_( LOG_ERROR("parse_mutator_row_ failed", KR(ret), "tls_id", task.get_tls_id(), "trans_id", task.get_trans_id(), - K(tablet_id), K(redo_log_entry_task), K(row_index)); + K(tablet_id), K(redo_log_entry_task), K(is_build_baseline), K(row_index)); } else if (! is_ignored) { // parse row data if (is_ddl_trans) { - if (is_all_ddl_operation_lob_aux_tablet(task.get_ls_id(), tablet_id)) { + if (!is_build_baseline && is_all_ddl_operation_lob_aux_tablet(task.get_ls_id(), tablet_id)) { LOG_INFO("is_all_ddl_operation_lob_aux_tablet", "tls_id", task.get_tls_id(), "trans_id", task.get_trans_id(), K(tablet_id)); @@ -320,7 +328,8 @@ int ObLogPartTransParser::parse_stmts_( // data in non ddl table already filtered while parse_mutator_row_ } else if (OB_FAIL(parse_ddl_stmts_( row_index, - tenant->get_all_ddl_operation_schema_info(), + all_ddl_operation_table_schema_info_, + is_build_baseline, *row, task, stop_flag))) { @@ -405,6 +414,7 @@ int ObLogPartTransParser::parse_mutator_row_( const ObTabletID &tablet_id, const char *redo_data, const int64_t redo_data_len, + const bool is_build_baseline, int64_t &pos, PartTransTask &part_trans_task, ObLogEntryTask &redo_log_entry_task, @@ -413,7 +423,7 @@ int ObLogPartTransParser::parse_mutator_row_( bool &is_ignored) { int ret = OB_SUCCESS; - IObLogPartMgr &part_mgr = tenant->get_part_mgr(); + is_ignored = false; row = NULL; bool need_rollback = false; @@ -443,16 +453,23 @@ int ObLogPartTransParser::parse_mutator_row_( || is_all_ddl_operation_lob_aux_tablet(part_trans_task.get_ls_id(), tablet_id))) { need_filter = true; filter_reason = "NON_DDL_RELATED_TABLE"; + } else if (part_trans_task.is_ddl_trans() && is_build_baseline + && is_all_ddl_operation_lob_aux_tablet(part_trans_task.get_ls_id(), tablet_id)) { + need_filter = true; + filter_reason = "DDL_OPERATION_LOB_AUX_TABLE_IN_BUILD_BASELINE"; } else if (part_trans_task.is_ddl_trans()) { // do nothing, ddl trans should not be filtered - } else if (OB_FAIL(part_mgr.is_exist_table_id_cache(table_info.get_table_id(), is_in_table_id_cache))) { - LOG_ERROR("check is_exist_table_id_cache failed", KR(ret), - "tls_id", part_trans_task.get_tls_id(), - "trans_id", part_trans_task.get_trans_id(), - K(tablet_id), K(table_info)); } else { - need_filter = ! is_in_table_id_cache; - filter_reason = "NOT_EXIST_IN_TB_ID_CACHE"; + IObLogPartMgr &part_mgr = tenant->get_part_mgr(); + if (OB_FAIL(part_mgr.is_exist_table_id_cache(table_info.get_table_id(), is_in_table_id_cache))) { + LOG_ERROR("check is_exist_table_id_cache failed", KR(ret), + "tls_id", part_trans_task.get_tls_id(), + "trans_id", part_trans_task.get_trans_id(), + K(tablet_id), K(table_info)); + } else { + need_filter = ! is_in_table_id_cache; + filter_reason = "NOT_EXIST_IN_TB_ID_CACHE"; + } } if (need_filter) { @@ -461,7 +478,8 @@ int ObLogPartTransParser::parse_mutator_row_( "trans_id", part_trans_task.get_trans_id(), K(tablet_id), K(table_info), - K(filter_reason)); + K(filter_reason), + K(is_build_baseline)); } if (OB_SUCC(ret)) { @@ -708,6 +726,7 @@ bool ObLogPartTransParser::should_not_filter_row_(PartTransTask &task) int ObLogPartTransParser::parse_ddl_stmts_( const uint64_t row_index, const ObLogAllDdlOperationSchemaInfo &all_ddl_operation_table_schema, + const bool is_build_baseline, MutatorRow &row, PartTransTask &task, volatile bool &stop_flag) @@ -739,10 +758,10 @@ int ObLogPartTransParser::parse_ddl_stmts_( // Parsing DDL statement information bool is_valid_ddl = false; - if (OB_FAIL(stmt_task->parse_ddl_info(br, row_index, all_ddl_operation_table_schema, + if (OB_FAIL(stmt_task->parse_ddl_info(br, row_index, all_ddl_operation_table_schema, is_build_baseline, is_valid_ddl, update_schema_version, exec_tennat_id, stop_flag))) { - LOG_ERROR("parse_ddl_info fail", KR(ret), K(*stmt_task), K(br), K(row_index), K(is_valid_ddl), - K(update_schema_version), K(exec_tennat_id)); + LOG_ERROR("parse_ddl_info fail", KR(ret), K(*stmt_task), K(br), K(row_index), K(is_build_baseline), + K(is_valid_ddl), K(update_schema_version), K(exec_tennat_id)); } else if (! is_valid_ddl) { // Discard invalid DDL statement tasks stmt_task->~DdlStmtTask(); diff --git a/src/logservice/libobcdc/src/ob_log_part_trans_parser.h b/src/logservice/libobcdc/src/ob_log_part_trans_parser.h index a1279427d8..3a4cddcc07 100644 --- a/src/logservice/libobcdc/src/ob_log_part_trans_parser.h +++ b/src/logservice/libobcdc/src/ob_log_part_trans_parser.h @@ -34,7 +34,9 @@ public: enum { DATA_OP_TIMEOUT = 200 * 1000 }; public: - virtual int parse(PartTransTask &task, volatile bool &stop_flag) = 0; + // is_build_baseline: in data_dict refresh mode and build baseline stage, we need ddl parser + // to parse ddl stmt and delete dropped table + virtual int parse(PartTransTask &task, const bool is_build_baseline, volatile bool &stop_flag) = 0; virtual int parse(ObLogEntryTask &task, volatile bool &stop_flag) = 0; @@ -55,7 +57,7 @@ public: virtual ~ObLogPartTransParser(); public: - virtual int parse(PartTransTask &task, volatile bool &stop_flag); + virtual int parse(PartTransTask &task, const bool is_build_baseline, volatile bool &stop_flag); virtual int parse(ObLogEntryTask &task, volatile bool &stop_flag); public: @@ -71,10 +73,11 @@ private: const PartTransTask &part_trans_task, const MutatorRow &row, bool &need_rollback); - int parse_ddl_redo_log_(PartTransTask &task, volatile bool &stop_flag); + int parse_ddl_redo_log_(PartTransTask &task, const bool is_build_baseline, volatile bool &stop_flag); int parse_stmts_( ObLogTenant *tenant, const RedoLogMetaNode &redo_log_node, + const bool is_build_baseline, ObLogEntryTask &redo_log_entry_task, PartTransTask &task, uint64_t &row_index, @@ -120,6 +123,7 @@ private: int parse_ddl_stmts_( const uint64_t row_index, const ObLogAllDdlOperationSchemaInfo &all_ddl_operation_table_schema, + const bool is_build_baseline, MutatorRow &row, PartTransTask &task, volatile bool &stop_flag); @@ -148,6 +152,7 @@ private: const ObTabletID &tablet_id, const char *redo_data, const int64_t redo_data_len, + const bool is_build_baseline, int64_t &pos, PartTransTask &part_trans_task, ObLogEntryTask &redo_log_entry_task, @@ -158,6 +163,7 @@ private: bool inited_; IObLogBRPool *br_pool_; IObLogMetaManager *meta_manager_; + ObLogAllDdlOperationSchemaInfo all_ddl_operation_table_schema_info_; // The cluster ID of this cluster // Set as the unique ID of the DDL diff --git a/src/logservice/libobcdc/src/ob_log_part_trans_task.cpp b/src/logservice/libobcdc/src/ob_log_part_trans_task.cpp index 003d548980..1c94720a4f 100644 --- a/src/logservice/libobcdc/src/ob_log_part_trans_task.cpp +++ b/src/logservice/libobcdc/src/ob_log_part_trans_task.cpp @@ -1286,6 +1286,7 @@ int DdlStmtTask::parse_ddl_info( ObLogBR *br, const uint64_t row_index, const ObLogAllDdlOperationSchemaInfo &all_ddl_operation_table_schema_info, + const bool is_build_baseline, bool &is_valid_ddl, int64_t &update_schema_version, uint64_t &exec_tenant_id, @@ -1311,7 +1312,7 @@ int DdlStmtTask::parse_ddl_info( false, &all_ddl_operation_table_schema_info))) { LOG_ERROR("parse columns fail", KR(ret), K(row_)); - } else if (OB_FAIL(parse_ddl_info_(contain_ddl_stmt, update_schema_version, stop_flag))) { + } else if (OB_FAIL(parse_ddl_info_(is_build_baseline, contain_ddl_stmt, update_schema_version, stop_flag))) { if (OB_INVALID_DATA == ret) { // If invalid data is encountered, the log is printed but the dirty data is ignored LOG_ERROR("fail to parse DDL, __all_ddl_operation table data is invalid", @@ -1428,6 +1429,7 @@ int DdlStmtTask::parse_ddl_info( } int DdlStmtTask::parse_ddl_info_( + const bool is_build_baseline, bool &contain_ddl_stmt, int64_t &update_schema_version, volatile bool &stop_flag) @@ -1444,7 +1446,7 @@ int DdlStmtTask::parse_ddl_info_( ret = OB_ERR_UNEXPECTED; } else { PartTransTask &part_trans_task = get_host(); - if (nullptr != new_lob_ctx_cols && new_lob_ctx_cols->has_out_row_lob()) { + if (nullptr != new_lob_ctx_cols && new_lob_ctx_cols->has_out_row_lob() && !is_build_baseline) { new_lob_ctx_cols->reset( this, part_trans_task.get_tenant_id(), @@ -1504,8 +1506,8 @@ int DdlStmtTask::parse_ddl_info_( update_schema_version = ddl_op_schema_version_; // parse normal columns - if (OB_FAIL(parse_ddl_info_from_normal_columns_(*new_cols, *new_lob_ctx_cols))) { - LOG_ERROR("parse_ddl_info_from_normal_columns_ fail", KR(ret), K(*new_cols), K(*new_lob_ctx_cols)); + if (OB_FAIL(parse_ddl_info_from_normal_columns_(is_build_baseline, *new_cols, *new_lob_ctx_cols))) { + LOG_ERROR("parse_ddl_info_from_normal_columns_ fail", KR(ret), K(is_build_baseline), K(*new_cols), K(*new_lob_ctx_cols)); } else { // verify parse result if (ddl_stmt_str_.empty()) { @@ -1660,6 +1662,7 @@ int DdlStmtTask::parse_schema_version_(ObObj &value, int64_t &schema_version) } int DdlStmtTask::parse_ddl_info_from_normal_columns_( + const bool is_build_baseline, ColValueList &col_value_list, ObLobDataOutRowCtxList &new_lob_ctx_cols) { @@ -1732,6 +1735,8 @@ int DdlStmtTask::parse_ddl_info_from_normal_columns_( case ALL_DDL_OPERATION_TABLE_DDL_STMT_STR_COLUMN_ID: { if (! cv_node->is_out_row_) { ddl_stmt_str_ = value.get_varchar(); + } else if (is_build_baseline) { + // do nothing } else { ObString *new_col_str = nullptr; diff --git a/src/logservice/libobcdc/src/ob_log_part_trans_task.h b/src/logservice/libobcdc/src/ob_log_part_trans_task.h index fb518f9ce3..fe15d80f70 100644 --- a/src/logservice/libobcdc/src/ob_log_part_trans_task.h +++ b/src/logservice/libobcdc/src/ob_log_part_trans_task.h @@ -534,6 +534,7 @@ public: ObLogBR *br, const uint64_t row_index, const ObLogAllDdlOperationSchemaInfo &all_ddl_operation_table_schema_info, + const bool is_build_baseline, bool &is_valid_ddl, int64_t &update_schema_version, uint64_t &exec_tennat_id, @@ -573,11 +574,13 @@ public: private: int parse_ddl_info_( + const bool is_build_baseline, bool &contain_ddl_stmt, int64_t &update_schema_version, volatile bool &stop_flag); int parse_schema_version_(ObObj &col_value, int64_t &schema_version); int parse_ddl_info_from_normal_columns_( + const bool is_build_baseline, ColValueList &col_value_list, ObLobDataOutRowCtxList &new_lob_ctx_cols); // 1. schema non-split mode returns the pure_id itself diff --git a/src/logservice/libobcdc/src/ob_log_utils.cpp b/src/logservice/libobcdc/src/ob_log_utils.cpp index 47d22077d0..6011a1975b 100644 --- a/src/logservice/libobcdc/src/ob_log_utils.cpp +++ b/src/logservice/libobcdc/src/ob_log_utils.cpp @@ -1672,6 +1672,22 @@ int read_from_file(const char *file_path, char *buf, const int64_t buf_len) return ret; } +int convert_to_compat_mode(const common::ObCompatibilityMode &compatible_mode, + lib::Worker::CompatMode &compat_mode) +{ + int ret = OB_SUCCESS; + if (common::ObCompatibilityMode::MYSQL_MODE == compatible_mode) { + compat_mode = lib::Worker::CompatMode::MYSQL; + } else if (common::ObCompatibilityMode::ORACLE_MODE == compatible_mode) { + compat_mode = lib::Worker::CompatMode::ORACLE; + } else { + ret = OB_INVALID_DATA; + LOG_ERROR("invalid compatible_mode", KR(ret), K(compatible_mode)); + } + + return ret; +} + ////////////////////////////////////////////////////////////////// } // namespace libocdc diff --git a/src/logservice/libobcdc/src/ob_log_utils.h b/src/logservice/libobcdc/src/ob_log_utils.h index 388b4ec3b1..6c9b01e35b 100644 --- a/src/logservice/libobcdc/src/ob_log_utils.h +++ b/src/logservice/libobcdc/src/ob_log_utils.h @@ -675,6 +675,10 @@ int read_from_file(const char *file_path, char *buf, const int64_t buf_len); } \ } while (0) +// convert to compat mode +int convert_to_compat_mode(const common::ObCompatibilityMode &compatible_mode, + lib::Worker::CompatMode &compat_mode); + } // namespace libobcdc } // namespace oceanbase #endif /* OCEANBASE_LIBOBCDC_UTILS_H__ */