diff --git a/src/logservice/libobcdc/src/ob_cdc_multi_data_source_info.h b/src/logservice/libobcdc/src/ob_cdc_multi_data_source_info.h index 71c0e5c64..534c1a852 100644 --- a/src/logservice/libobcdc/src/ob_cdc_multi_data_source_info.h +++ b/src/logservice/libobcdc/src/ob_cdc_multi_data_source_info.h @@ -95,6 +95,13 @@ public: DictDatabaseArray &get_dict_database_array() { return dict_database_metas_; } DictTableArray &get_dict_table_array() { return dict_table_metas_; } + bool is_empty_dict_info() const + { + return (0 == dict_tenant_metas_.count()) + && (0 == dict_database_metas_.count()) + && (0 == dict_table_metas_.count()); + } + int get_new_tenant_scehma_info( const uint64_t tenant_id, TenantSchemaInfo &tenant_schema_info); diff --git a/src/logservice/libobcdc/src/ob_log_mysql_proxy.cpp b/src/logservice/libobcdc/src/ob_log_mysql_proxy.cpp index 0f299f55a..91a1fb129 100644 --- a/src/logservice/libobcdc/src/ob_log_mysql_proxy.cpp +++ b/src/logservice/libobcdc/src/ob_log_mysql_proxy.cpp @@ -191,7 +191,9 @@ int ObLogMysqlProxy::detect_tenant_mode_(ServerProviderType *server_provider) ret = OB_ERR_UNEXPECTED; LOG_ERROR("invalid mysql_conn_config", KR(ret), K(config)); } else if (OB_FAIL(conn.init(config, enable_ssl_client_authentication))) { - LOG_ERROR("init ObLogMySQLConnector failed", KR(ret), K(config), K(enable_ssl_client_authentication)); + LOG_WARN("init ObLogMySQLConnector failed", KR(ret), K(config), K(enable_ssl_client_authentication)); + // reset OB_SUCCESS, need retry + ret = OB_SUCCESS; } else { is_oracle_mode_ = conn.is_oracle_mode(); detect_succ = true; diff --git a/src/logservice/libobcdc/src/ob_log_part_trans_task.cpp b/src/logservice/libobcdc/src/ob_log_part_trans_task.cpp index 2e379f96c..2c7177b92 100644 --- a/src/logservice/libobcdc/src/ob_log_part_trans_task.cpp +++ b/src/logservice/libobcdc/src/ob_log_part_trans_task.cpp @@ -2642,7 +2642,10 @@ int PartTransTask::get_database_schema_info_with_inc_dict( return ret; } -int PartTransTask::get_table_meta_with_inc_dict(const uint64_t tenant_id, const uint64_t table_id, const datadict::ObDictTableMeta *&tb_meta) +int PartTransTask::get_table_meta_with_inc_dict( + const uint64_t tenant_id, + const uint64_t table_id, + const datadict::ObDictTableMeta *&tb_meta) { int ret = OB_SUCCESS; @@ -2682,6 +2685,45 @@ int PartTransTask::get_table_meta_with_inc_dict(const uint64_t tenant_id, const return ret; } +int PartTransTask::check_for_ddl_trans( + bool &is_not_barrier, + ObSchemaOperationType &op_type) const +{ + int ret = OB_SUCCESS; + is_not_barrier = false; + int64_t other_ddl_count = 0; + IStmtTask *stmt_task = get_stmt_list().head_; + + while (NULL != stmt_task && OB_SUCC(ret)) { + DdlStmtTask *ddl_stmt = dynamic_cast(stmt_task); + + if (OB_UNLIKELY(! stmt_task->is_ddl_stmt()) || OB_ISNULL(ddl_stmt)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("invalid DDL statement", KR(ret), KPC(stmt_task), K(ddl_stmt)); + } else { + op_type = static_cast(ddl_stmt->get_operation_type()); + + if (OB_DDL_CREATE_TABLE == op_type + || OB_DDL_TRUNCATE_TABLE == op_type) { + is_not_barrier = true; + } else { + ++other_ddl_count; + } + stmt_task = stmt_task->get_next(); + } + } // while + + if (OB_SUCC(ret)) { + // Normally, a DDL transaction only contains one DDL statement. + // If there are multiple statements, the DDL transaction is treated as barrier to avoid misjudgments + if (other_ddl_count > 0) { + is_not_barrier = false; + } + } + + return ret; +} + int PartTransTask::alloc_log_entry_node_(const palf::LSN &lsn, LogEntryNode *&log_entry_node) { int ret = OB_SUCCESS; diff --git a/src/logservice/libobcdc/src/ob_log_part_trans_task.h b/src/logservice/libobcdc/src/ob_log_part_trans_task.h index 371f87147..61af2b8f7 100644 --- a/src/logservice/libobcdc/src/ob_log_part_trans_task.h +++ b/src/logservice/libobcdc/src/ob_log_part_trans_task.h @@ -1130,6 +1130,14 @@ public: // NOTICE: ONLY AVALIABLE FOR DDL_TRANS. int get_table_meta_with_inc_dict(const uint64_t tenant_id, const uint64_t table_id, const datadict::ObDictTableMeta *&tb_meta); + // Check if the DDL transaction needs to be treated as a barrier. + // + // @param [out] is_not_barrier is not a barrier + // @param [out] op_type Schema operation type + int check_for_ddl_trans( + bool &is_not_barrier, + ObSchemaOperationType &op_type) const; + TO_STRING_KV( "state", serve_state_, "type", print_task_type(type_), diff --git a/src/logservice/libobcdc/src/ob_log_sequencer1.cpp b/src/logservice/libobcdc/src/ob_log_sequencer1.cpp index 8ec998f5b..648681e4f 100644 --- a/src/logservice/libobcdc/src/ob_log_sequencer1.cpp +++ b/src/logservice/libobcdc/src/ob_log_sequencer1.cpp @@ -868,7 +868,7 @@ int ObLogSequencer::handle_multi_data_source_info_( } } else if (tablet_change_info.is_delete_tablet_op()) { // 1. delete tablet should wait all task in dml_parse done. - if (OB_FAIL(wait_until_parser_done_(stop_flag))) { + if (OB_FAIL(wait_until_parser_done_("delete_tablet_op", stop_flag))) { if (OB_IN_STOP_STATE != ret) { LOG_ERROR("wait_until_parser_done_ failed", KR(ret), KPC(part_trans_task)); } @@ -888,24 +888,48 @@ int ObLogSequencer::handle_multi_data_source_info_( ObLogDDLProcessor *ddl_processor = TCTX.ddl_processor_; LOG_DEBUG("handle_ddl_trans and mds for data_dict mode begin", KPC(part_trans_task)); - if (OB_ISNULL(ddl_processor)) { - ret = OB_ERR_UNEXPECTED; - LOG_ERROR("expect valid ddl_processor", KR(ret)); - // Barrier transaction: should wait all task in dml_parser/reader/Formatter done. - } else if (OB_FAIL(wait_until_formatter_done_(stop_flag))) { - if (OB_IN_STOP_STATE != ret) { - LOG_ERROR("wait_until_formatter_done_ failed", KR(ret), KPC(part_trans_task)); - } - } else if (OB_FAIL(ddl_processor->handle_ddl_trans(*part_trans_task, tenant, stop_flag))) { - if (OB_IN_STOP_STATE != ret) { - LOG_ERROR("handle_ddl_trans for data_dict mode failed", KR(ret), K(tenant), KPC(part_trans_task)); - } - } else if (part_trans_task->get_multi_data_source_info().is_ddl_trans() - && OB_FAIL(handle_ddl_multi_data_source_info_(*part_trans_task, tenant, trans_ctx))) { - LOG_ERROR("handle_ddl_multi_data_source_info_ failed", KR(ret), KPC(part_trans_task), K(trans_ctx), - K(stop_flag)); + if (part_trans_task->get_multi_data_source_info().is_empty_dict_info()) { + _LOG_INFO("[IS_NOT_BARRIER] [EMPTY_DICT] tls_id=%s trans_id=%s is_sp=%d", + to_cstring(part_trans_task->get_tls_id()), + to_cstring(part_trans_task->get_trans_id()), + part_trans_task->is_single_ls_trans()); } else { - LOG_DEBUG("handle_ddl_trans and mds for data_dict mode done", KPC(part_trans_task)); + bool is_not_barrier = false; + ObSchemaOperationType op_type; + + if (OB_FAIL(part_trans_task->check_for_ddl_trans(is_not_barrier, op_type))) { + LOG_ERROR("part_trans_task check_for_ddl_trans failed", KR(ret), KPC(part_trans_task), K(is_not_barrier)); + } else if (is_not_barrier) { + _LOG_INFO("[IS_NOT_BARRIER] [DDL] tls_id=%s trans_id=%s is_dist=%d OP_TYPE=%s(%d)", + to_cstring(part_trans_task->get_tls_id()), + to_cstring(part_trans_task->get_trans_id()), + part_trans_task->is_dist_trans(), + ObSchemaOperation::type_str(op_type), op_type); + } else { + // Barrier transaction: should wait all task in dml_parser/reader/Formatter done. + if (OB_FAIL(wait_until_formatter_done_(stop_flag))) { + if (OB_IN_STOP_STATE != ret) { + LOG_ERROR("wait_until_formatter_done_ failed", KR(ret), KPC(part_trans_task)); + } + } // wait_until_formatter_done_ + } + } + + if (OB_SUCC(ret)) { + if (OB_ISNULL(ddl_processor)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("expect valid ddl_processor", KR(ret)); + } else if (OB_FAIL(ddl_processor->handle_ddl_trans(*part_trans_task, tenant, stop_flag))) { + if (OB_IN_STOP_STATE != ret) { + LOG_ERROR("handle_ddl_trans for data_dict mode failed", KR(ret), K(tenant), KPC(part_trans_task)); + } + } else if (part_trans_task->get_multi_data_source_info().is_ddl_trans() + && OB_FAIL(handle_ddl_multi_data_source_info_(*part_trans_task, tenant, trans_ctx))) { + LOG_ERROR("handle_ddl_multi_data_source_info_ failed", KR(ret), KPC(part_trans_task), K(trans_ctx), + K(stop_flag)); + } else { + LOG_DEBUG("handle_ddl_trans and mds for data_dict mode done", KPC(part_trans_task)); + } } } @@ -960,7 +984,9 @@ int ObLogSequencer::handle_ddl_multi_data_source_info_( return ret; } -int ObLogSequencer::wait_until_parser_done_(volatile bool &stop_flag) +int ObLogSequencer::wait_until_parser_done_( + const char *caller, + volatile bool &stop_flag) { int ret = OB_SUCCESS; @@ -979,7 +1005,7 @@ int ObLogSequencer::wait_until_parser_done_(volatile bool &stop_flag) const static int64_t PRINT_WAIT_PARSER_TIMEOUT = 10 * _SEC_; if (REACH_TIME_INTERVAL(PRINT_WAIT_PARSER_TIMEOUT)) { LOG_INFO("DDL barrier waiting reader and dml_parser empty", - K(reader_task_count), K(dml_parser_task_count)); + K(caller), K(reader_task_count), K(dml_parser_task_count)); } else { LOG_DEBUG("DDL barrier waiting reader and dml_parser empty", K(reader_task_count), K(dml_parser_task_count)); @@ -1007,7 +1033,7 @@ int ObLogSequencer::wait_until_formatter_done_(volatile bool &stop_flag) if (OB_ISNULL(TCTX.formatter_)) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("formatter is null", KR(ret)); - } else if (OB_FAIL(wait_until_parser_done_(stop_flag))) { + } else if (OB_FAIL(wait_until_parser_done_("wait_formatted", stop_flag))) { if (OB_IN_STOP_STATE != ret) { LOG_ERROR("wait_until_parser_done_ failed", KR(ret)); } @@ -1032,8 +1058,8 @@ int ObLogSequencer::wait_until_formatter_done_(volatile bool &stop_flag) LOG_DEBUG("DDL barrier transaction waiting Formatter empty", K(formatter_br_task_count), K(formatter_log_entray_task_count), K(lob_merger_task_count)); } - // sleep 100ms and retry - const static int64_t WAIT_FORMATTER_EMPTY_TIME = 100 * 1000; + // sleep 10ms and retry + const static int64_t WAIT_FORMATTER_EMPTY_TIME = 10 * 1000; ob_usleep(WAIT_FORMATTER_EMPTY_TIME); } else { break; diff --git a/src/logservice/libobcdc/src/ob_log_sequencer1.h b/src/logservice/libobcdc/src/ob_log_sequencer1.h index 64d4035dd..3258c9092 100644 --- a/src/logservice/libobcdc/src/ob_log_sequencer1.h +++ b/src/logservice/libobcdc/src/ob_log_sequencer1.h @@ -172,7 +172,9 @@ private: ObLogTenant &tenant, TransCtx &trans_ctx); // wait reader/parser module empty - int wait_until_parser_done_(volatile bool &stop_flag); + int wait_until_parser_done_( + const char *caller, + volatile bool &stop_flag); // wait reader/parser/formatter module empty int wait_until_formatter_done_(volatile bool &stop_flag); int recycle_resources_after_trans_ready_(TransCtx &trans_ctx, ObLogTenant &tenant); diff --git a/src/logservice/logrouteservice/ob_ls_server_list.cpp b/src/logservice/logrouteservice/ob_ls_server_list.cpp index 8547ac1d6..2b6751dde 100644 --- a/src/logservice/logrouteservice/ob_ls_server_list.cpp +++ b/src/logservice/logrouteservice/ob_ls_server_list.cpp @@ -236,7 +236,7 @@ bool LSSvrList::need_switch_server(const ObLSRouterKey &key, // Switch the Server scenario and consider that the Server is always serving svr_item.check_and_update_serve_info(true/*is_always_serving*/, next_lsn, is_log_served, is_svr_invalid); - LOG_INFO("need_switch_server", K(key), K(next_lsn), K(cur_svr), K(svr_item), K(is_log_served), K(is_svr_invalid)); + LOG_TRACE("need_switch_server", K(key), K(next_lsn), K(cur_svr), K(svr_item), K(is_log_served), K(is_svr_invalid)); if (is_log_served && !is_svr_invalid && !blacklist.exist(svr_item.svr_)) { if (cur_svr == svr_item.svr_) {