[OBCDC] Optimize slow synchronization when dealing with a large number of DDL
This commit is contained in:
parent
2c601683f3
commit
d2198fb65f
@ -95,6 +95,13 @@ public:
|
||||
DictDatabaseArray &get_dict_database_array() { return dict_database_metas_; }
|
||||
DictTableArray &get_dict_table_array() { return dict_table_metas_; }
|
||||
|
||||
bool is_empty_dict_info() const
|
||||
{
|
||||
return (0 == dict_tenant_metas_.count())
|
||||
&& (0 == dict_database_metas_.count())
|
||||
&& (0 == dict_table_metas_.count());
|
||||
}
|
||||
|
||||
int get_new_tenant_scehma_info(
|
||||
const uint64_t tenant_id,
|
||||
TenantSchemaInfo &tenant_schema_info);
|
||||
|
@ -191,7 +191,9 @@ int ObLogMysqlProxy::detect_tenant_mode_(ServerProviderType *server_provider)
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("invalid mysql_conn_config", KR(ret), K(config));
|
||||
} else if (OB_FAIL(conn.init(config, enable_ssl_client_authentication))) {
|
||||
LOG_ERROR("init ObLogMySQLConnector failed", KR(ret), K(config), K(enable_ssl_client_authentication));
|
||||
LOG_WARN("init ObLogMySQLConnector failed", KR(ret), K(config), K(enable_ssl_client_authentication));
|
||||
// reset OB_SUCCESS, need retry
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
is_oracle_mode_ = conn.is_oracle_mode();
|
||||
detect_succ = true;
|
||||
|
@ -2642,7 +2642,10 @@ int PartTransTask::get_database_schema_info_with_inc_dict(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int PartTransTask::get_table_meta_with_inc_dict(const uint64_t tenant_id, const uint64_t table_id, const datadict::ObDictTableMeta *&tb_meta)
|
||||
int PartTransTask::get_table_meta_with_inc_dict(
|
||||
const uint64_t tenant_id,
|
||||
const uint64_t table_id,
|
||||
const datadict::ObDictTableMeta *&tb_meta)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
@ -2682,6 +2685,45 @@ int PartTransTask::get_table_meta_with_inc_dict(const uint64_t tenant_id, const
|
||||
return ret;
|
||||
}
|
||||
|
||||
int PartTransTask::check_for_ddl_trans(
|
||||
bool &is_not_barrier,
|
||||
ObSchemaOperationType &op_type) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
is_not_barrier = false;
|
||||
int64_t other_ddl_count = 0;
|
||||
IStmtTask *stmt_task = get_stmt_list().head_;
|
||||
|
||||
while (NULL != stmt_task && OB_SUCC(ret)) {
|
||||
DdlStmtTask *ddl_stmt = dynamic_cast<DdlStmtTask *>(stmt_task);
|
||||
|
||||
if (OB_UNLIKELY(! stmt_task->is_ddl_stmt()) || OB_ISNULL(ddl_stmt)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("invalid DDL statement", KR(ret), KPC(stmt_task), K(ddl_stmt));
|
||||
} else {
|
||||
op_type = static_cast<ObSchemaOperationType>(ddl_stmt->get_operation_type());
|
||||
|
||||
if (OB_DDL_CREATE_TABLE == op_type
|
||||
|| OB_DDL_TRUNCATE_TABLE == op_type) {
|
||||
is_not_barrier = true;
|
||||
} else {
|
||||
++other_ddl_count;
|
||||
}
|
||||
stmt_task = stmt_task->get_next();
|
||||
}
|
||||
} // while
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
// Normally, a DDL transaction only contains one DDL statement.
|
||||
// If there are multiple statements, the DDL transaction is treated as barrier to avoid misjudgments
|
||||
if (other_ddl_count > 0) {
|
||||
is_not_barrier = false;
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int PartTransTask::alloc_log_entry_node_(const palf::LSN &lsn, LogEntryNode *&log_entry_node)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
@ -1130,6 +1130,14 @@ public:
|
||||
// NOTICE: ONLY AVALIABLE FOR DDL_TRANS.
|
||||
int get_table_meta_with_inc_dict(const uint64_t tenant_id, const uint64_t table_id, const datadict::ObDictTableMeta *&tb_meta);
|
||||
|
||||
// Check if the DDL transaction needs to be treated as a barrier.
|
||||
//
|
||||
// @param [out] is_not_barrier is not a barrier
|
||||
// @param [out] op_type Schema operation type
|
||||
int check_for_ddl_trans(
|
||||
bool &is_not_barrier,
|
||||
ObSchemaOperationType &op_type) const;
|
||||
|
||||
TO_STRING_KV(
|
||||
"state", serve_state_,
|
||||
"type", print_task_type(type_),
|
||||
|
@ -868,7 +868,7 @@ int ObLogSequencer::handle_multi_data_source_info_(
|
||||
}
|
||||
} else if (tablet_change_info.is_delete_tablet_op()) {
|
||||
// 1. delete tablet should wait all task in dml_parse done.
|
||||
if (OB_FAIL(wait_until_parser_done_(stop_flag))) {
|
||||
if (OB_FAIL(wait_until_parser_done_("delete_tablet_op", stop_flag))) {
|
||||
if (OB_IN_STOP_STATE != ret) {
|
||||
LOG_ERROR("wait_until_parser_done_ failed", KR(ret), KPC(part_trans_task));
|
||||
}
|
||||
@ -888,24 +888,48 @@ int ObLogSequencer::handle_multi_data_source_info_(
|
||||
ObLogDDLProcessor *ddl_processor = TCTX.ddl_processor_;
|
||||
LOG_DEBUG("handle_ddl_trans and mds for data_dict mode begin", KPC(part_trans_task));
|
||||
|
||||
if (OB_ISNULL(ddl_processor)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("expect valid ddl_processor", KR(ret));
|
||||
// Barrier transaction: should wait all task in dml_parser/reader/Formatter done.
|
||||
} else if (OB_FAIL(wait_until_formatter_done_(stop_flag))) {
|
||||
if (OB_IN_STOP_STATE != ret) {
|
||||
LOG_ERROR("wait_until_formatter_done_ failed", KR(ret), KPC(part_trans_task));
|
||||
}
|
||||
} else if (OB_FAIL(ddl_processor->handle_ddl_trans(*part_trans_task, tenant, stop_flag))) {
|
||||
if (OB_IN_STOP_STATE != ret) {
|
||||
LOG_ERROR("handle_ddl_trans for data_dict mode failed", KR(ret), K(tenant), KPC(part_trans_task));
|
||||
}
|
||||
} else if (part_trans_task->get_multi_data_source_info().is_ddl_trans()
|
||||
&& OB_FAIL(handle_ddl_multi_data_source_info_(*part_trans_task, tenant, trans_ctx))) {
|
||||
LOG_ERROR("handle_ddl_multi_data_source_info_ failed", KR(ret), KPC(part_trans_task), K(trans_ctx),
|
||||
K(stop_flag));
|
||||
if (part_trans_task->get_multi_data_source_info().is_empty_dict_info()) {
|
||||
_LOG_INFO("[IS_NOT_BARRIER] [EMPTY_DICT] tls_id=%s trans_id=%s is_sp=%d",
|
||||
to_cstring(part_trans_task->get_tls_id()),
|
||||
to_cstring(part_trans_task->get_trans_id()),
|
||||
part_trans_task->is_single_ls_trans());
|
||||
} else {
|
||||
LOG_DEBUG("handle_ddl_trans and mds for data_dict mode done", KPC(part_trans_task));
|
||||
bool is_not_barrier = false;
|
||||
ObSchemaOperationType op_type;
|
||||
|
||||
if (OB_FAIL(part_trans_task->check_for_ddl_trans(is_not_barrier, op_type))) {
|
||||
LOG_ERROR("part_trans_task check_for_ddl_trans failed", KR(ret), KPC(part_trans_task), K(is_not_barrier));
|
||||
} else if (is_not_barrier) {
|
||||
_LOG_INFO("[IS_NOT_BARRIER] [DDL] tls_id=%s trans_id=%s is_dist=%d OP_TYPE=%s(%d)",
|
||||
to_cstring(part_trans_task->get_tls_id()),
|
||||
to_cstring(part_trans_task->get_trans_id()),
|
||||
part_trans_task->is_dist_trans(),
|
||||
ObSchemaOperation::type_str(op_type), op_type);
|
||||
} else {
|
||||
// Barrier transaction: should wait all task in dml_parser/reader/Formatter done.
|
||||
if (OB_FAIL(wait_until_formatter_done_(stop_flag))) {
|
||||
if (OB_IN_STOP_STATE != ret) {
|
||||
LOG_ERROR("wait_until_formatter_done_ failed", KR(ret), KPC(part_trans_task));
|
||||
}
|
||||
} // wait_until_formatter_done_
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_ISNULL(ddl_processor)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("expect valid ddl_processor", KR(ret));
|
||||
} else if (OB_FAIL(ddl_processor->handle_ddl_trans(*part_trans_task, tenant, stop_flag))) {
|
||||
if (OB_IN_STOP_STATE != ret) {
|
||||
LOG_ERROR("handle_ddl_trans for data_dict mode failed", KR(ret), K(tenant), KPC(part_trans_task));
|
||||
}
|
||||
} else if (part_trans_task->get_multi_data_source_info().is_ddl_trans()
|
||||
&& OB_FAIL(handle_ddl_multi_data_source_info_(*part_trans_task, tenant, trans_ctx))) {
|
||||
LOG_ERROR("handle_ddl_multi_data_source_info_ failed", KR(ret), KPC(part_trans_task), K(trans_ctx),
|
||||
K(stop_flag));
|
||||
} else {
|
||||
LOG_DEBUG("handle_ddl_trans and mds for data_dict mode done", KPC(part_trans_task));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -960,7 +984,9 @@ int ObLogSequencer::handle_ddl_multi_data_source_info_(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLogSequencer::wait_until_parser_done_(volatile bool &stop_flag)
|
||||
int ObLogSequencer::wait_until_parser_done_(
|
||||
const char *caller,
|
||||
volatile bool &stop_flag)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
@ -979,7 +1005,7 @@ int ObLogSequencer::wait_until_parser_done_(volatile bool &stop_flag)
|
||||
const static int64_t PRINT_WAIT_PARSER_TIMEOUT = 10 * _SEC_;
|
||||
if (REACH_TIME_INTERVAL(PRINT_WAIT_PARSER_TIMEOUT)) {
|
||||
LOG_INFO("DDL barrier waiting reader and dml_parser empty",
|
||||
K(reader_task_count), K(dml_parser_task_count));
|
||||
K(caller), K(reader_task_count), K(dml_parser_task_count));
|
||||
} else {
|
||||
LOG_DEBUG("DDL barrier waiting reader and dml_parser empty",
|
||||
K(reader_task_count), K(dml_parser_task_count));
|
||||
@ -1007,7 +1033,7 @@ int ObLogSequencer::wait_until_formatter_done_(volatile bool &stop_flag)
|
||||
if (OB_ISNULL(TCTX.formatter_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("formatter is null", KR(ret));
|
||||
} else if (OB_FAIL(wait_until_parser_done_(stop_flag))) {
|
||||
} else if (OB_FAIL(wait_until_parser_done_("wait_formatted", stop_flag))) {
|
||||
if (OB_IN_STOP_STATE != ret) {
|
||||
LOG_ERROR("wait_until_parser_done_ failed", KR(ret));
|
||||
}
|
||||
@ -1032,8 +1058,8 @@ int ObLogSequencer::wait_until_formatter_done_(volatile bool &stop_flag)
|
||||
LOG_DEBUG("DDL barrier transaction waiting Formatter empty",
|
||||
K(formatter_br_task_count), K(formatter_log_entray_task_count), K(lob_merger_task_count));
|
||||
}
|
||||
// sleep 100ms and retry
|
||||
const static int64_t WAIT_FORMATTER_EMPTY_TIME = 100 * 1000;
|
||||
// sleep 10ms and retry
|
||||
const static int64_t WAIT_FORMATTER_EMPTY_TIME = 10 * 1000;
|
||||
ob_usleep(WAIT_FORMATTER_EMPTY_TIME);
|
||||
} else {
|
||||
break;
|
||||
|
@ -172,7 +172,9 @@ private:
|
||||
ObLogTenant &tenant,
|
||||
TransCtx &trans_ctx);
|
||||
// wait reader/parser module empty
|
||||
int wait_until_parser_done_(volatile bool &stop_flag);
|
||||
int wait_until_parser_done_(
|
||||
const char *caller,
|
||||
volatile bool &stop_flag);
|
||||
// wait reader/parser/formatter module empty
|
||||
int wait_until_formatter_done_(volatile bool &stop_flag);
|
||||
int recycle_resources_after_trans_ready_(TransCtx &trans_ctx, ObLogTenant &tenant);
|
||||
|
@ -236,7 +236,7 @@ bool LSSvrList::need_switch_server(const ObLSRouterKey &key,
|
||||
// Switch the Server scenario and consider that the Server is always serving
|
||||
svr_item.check_and_update_serve_info(true/*is_always_serving*/, next_lsn, is_log_served, is_svr_invalid);
|
||||
|
||||
LOG_INFO("need_switch_server", K(key), K(next_lsn), K(cur_svr), K(svr_item), K(is_log_served), K(is_svr_invalid));
|
||||
LOG_TRACE("need_switch_server", K(key), K(next_lsn), K(cur_svr), K(svr_item), K(is_log_served), K(is_svr_invalid));
|
||||
|
||||
if (is_log_served && !is_svr_invalid && !blacklist.exist(svr_item.svr_)) {
|
||||
if (cur_svr == svr_item.svr_) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user