diff --git a/src/observer/ob_inner_sql_rpc_processor.cpp b/src/observer/ob_inner_sql_rpc_processor.cpp index 44cbd024e..bb214dbc4 100644 --- a/src/observer/ob_inner_sql_rpc_processor.cpp +++ b/src/observer/ob_inner_sql_rpc_processor.cpp @@ -535,7 +535,7 @@ int ObInnerSqlRpcP::process() } /* init session info */ const int64_t group_id = transmit_arg.get_consumer_group_id(); - if (OB_NOT_NULL(tmp_session)) { + if (OB_SUCC(ret) && OB_NOT_NULL(tmp_session)) { tmp_session->set_current_trace_id(ObCurTraceId::get_trace_id()); tmp_session->switch_tenant(transmit_arg.get_tenant_id()); ObString sql_stmt(sql_str.ptr()); diff --git a/src/rootserver/ddl_task/ob_ddl_task.cpp b/src/rootserver/ddl_task/ob_ddl_task.cpp index 32d04e41c..d6e62966f 100644 --- a/src/rootserver/ddl_task/ob_ddl_task.cpp +++ b/src/rootserver/ddl_task/ob_ddl_task.cpp @@ -1010,8 +1010,8 @@ int ObDDLTask::switch_status(const ObDDLTaskStatus new_status, const bool enable } if (OB_CANCELED == real_ret_code) { - // (void)ObDDLTaskRecordOperator::kill_task_inner_sql(root_service->get_sql_proxy(), - // trace_id_, tenant_id_, sql_exec_addr_); // ignore return code + (void)ObDDLTaskRecordOperator::kill_task_inner_sql(root_service->get_sql_proxy(), + trace_id_, tenant_id_, task_id_, snapshot_version_, sql_exec_addr_); // ignore return code LOG_WARN("ddl_task switch_status kill_task_inner_sql"); } } @@ -3146,17 +3146,16 @@ int ObDDLTaskRecordOperator::kill_task_inner_sql( common::ObMySQLProxy &proxy, const common::ObCurTraceId::TraceId &trace_id, const uint64_t tenant_id, + const int64_t task_id, + const int64_t snapshot_version, const common::ObAddr &sql_exec_addr) { int ret = OB_SUCCESS; char ip_str[common::OB_IP_STR_BUFF]; - if (OB_UNLIKELY(!proxy.is_inited()) || !sql_exec_addr.is_valid()) { + if (OB_UNLIKELY(!proxy.is_inited() || trace_id.is_invalid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(proxy.is_inited())); - } else if (!sql_exec_addr.ip_to_string(ip_str, sizeof(ip_str))) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("ip to string failed", K(ret), K(sql_exec_addr)); } else { LOG_INFO("start ddl kill inner sql session", K(ret), K(trace_id)); ObSqlString sql_string; @@ -3167,17 +3166,47 @@ int ObDDLTaskRecordOperator::kill_task_inner_sql( if (OB_UNLIKELY(0 > trace_id.to_string(trace_id_str, sizeof(trace_id_str)))) { ret = OB_ERR_UNEXPECTED; LOG_WARN("get trace id string failed", K(ret), K(trace_id), K(tenant_id)); - } else if (OB_FAIL(sql_string.assign_fmt(" SELECT id as session_id FROM %s WHERE trace_id = \"%s\"" - " and svr_ip = \"%s\" and svr_port = %d and info like \"%cINSERT%cINTO%cSELECT%c\" ", - OB_ALL_VIRTUAL_SESSION_INFO_TNAME, - trace_id_str, - ip_str, - sql_exec_addr.get_port(), - spec_charater, - spec_charater, - spec_charater, - spec_charater))) { - LOG_WARN("assign sql string failed", K(ret)); + } else if (!sql_exec_addr.is_valid()) { + if (OB_FAIL(sql_string.assign_fmt(" SELECT id as session_id FROM %s WHERE trace_id = \"%s\" " + " and tenant = (select tenant_name from __all_tenant where tenant_id = %lu) " + " and info like \"%cINSERT%c('ddl_task_id', %ld)%cINTO%cSELECT%c%ld%c\" ", + OB_ALL_VIRTUAL_SESSION_INFO_TNAME, + trace_id_str, + tenant_id, + spec_charater, + spec_charater, + task_id, + spec_charater, + spec_charater, + spec_charater, + snapshot_version, + spec_charater))) { + LOG_WARN("assign sql string failed", K(ret)); + } + } else { + if (!sql_exec_addr.ip_to_string(ip_str, sizeof(ip_str))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ip to string failed", K(ret), K(sql_exec_addr)); + } else if (OB_FAIL(sql_string.assign_fmt(" SELECT id as session_id FROM %s WHERE trace_id = \"%s\" " + " and tenant = (select tenant_name from __all_tenant where tenant_id = %lu) " + " and svr_ip = \"%s\" and svr_port = %d and info like \"%cINSERT%c('ddl_task_id', %ld)%cINTO%cSELECT%c%ld%c\" ", + OB_ALL_VIRTUAL_SESSION_INFO_TNAME, + trace_id_str, + tenant_id, + ip_str, + sql_exec_addr.get_port(), + spec_charater, + spec_charater, + task_id, + spec_charater, + spec_charater, + spec_charater, + snapshot_version, + spec_charater))) { + LOG_WARN("assign sql string failed", K(ret)); + } + } + if (OB_FAIL(ret)) { } else if (OB_FAIL(proxy.read(res, OB_SYS_TENANT_ID, sql_string.ptr(), &sql_exec_addr))) { // default use OB_SYS_TENANT_ID LOG_WARN("query ddl task record failed", K(ret), K(sql_string)); } else if (OB_ISNULL((result = res.get_result()))) { @@ -3195,10 +3224,12 @@ int ObDDLTaskRecordOperator::kill_task_inner_sql( } } else { EXTRACT_UINT_FIELD_MYSQL(*result, "session_id", session_id, uint64_t); - if (OB_FAIL(kill_inner_sql(proxy, tenant_id, session_id))){ - LOG_WARN("fail to kill session", K(ret), K(session_id), K(trace_id)); - } else { - LOG_WARN("succ to kill session", K(ret), K(session_id), K(trace_id)); + if (OB_SUCC(ret)) { + if (OB_FAIL(kill_inner_sql(proxy, tenant_id, session_id))){ + LOG_WARN("fail to kill session", K(ret), K(session_id), K(trace_id)); + } else { + LOG_WARN("succ to kill session", K(ret), K(session_id), K(trace_id)); + } } } } diff --git a/src/rootserver/ddl_task/ob_ddl_task.h b/src/rootserver/ddl_task/ob_ddl_task.h index 3f8bf256d..1c678cdec 100644 --- a/src/rootserver/ddl_task/ob_ddl_task.h +++ b/src/rootserver/ddl_task/ob_ddl_task.h @@ -256,6 +256,8 @@ public: common::ObMySQLProxy &proxy, const common::ObCurTraceId::TraceId &trace_id, const uint64_t tenant_id, + const int64_t task_id, + const int64_t snapshot_version, const common::ObAddr &sql_exec_addr); private: diff --git a/src/share/ob_ddl_common.cpp b/src/share/ob_ddl_common.cpp index c006ac367..09b13dad7 100644 --- a/src/share/ob_ddl_common.cpp +++ b/src/share/ob_ddl_common.cpp @@ -1258,7 +1258,7 @@ int ObCheckTabletDataComplementOp::check_task_inner_sql_session_status( int ObCheckTabletDataComplementOp::update_replica_merge_status( const ObTabletID &tablet_id, - const int merge_status, + const bool merge_status, hash::ObHashMap &tablets_commited_map) { int ret = OB_SUCCESS; @@ -1269,7 +1269,7 @@ int ObCheckTabletDataComplementOp::update_replica_merge_status( int32_t commited_count = 0; if (OB_SUCC(tablets_commited_map.get_refactored(tablet_id, commited_count))) { // overwrite - if (true == merge_status) { + if (merge_status) { commited_count++; if (OB_FAIL(tablets_commited_map.set_refactored(tablet_id, commited_count, true /* overwrite */))) { LOG_WARN("fail to insert map status", K(ret)); @@ -1277,7 +1277,7 @@ int ObCheckTabletDataComplementOp::update_replica_merge_status( } } else if (OB_HASH_NOT_EXIST == ret) { // new insert ret = OB_SUCCESS; - if (true == merge_status) { + if (merge_status) { commited_count = 1; if (OB_FAIL(tablets_commited_map.set_refactored(tablet_id, commited_count, true /* overwrite */))) { LOG_WARN("fail to insert map status", K(ret)); diff --git a/src/share/ob_ddl_common.h b/src/share/ob_ddl_common.h index 9ecc9f176..f51ddeefb 100644 --- a/src/share/ob_ddl_common.h +++ b/src/share/ob_ddl_common.h @@ -421,7 +421,7 @@ private: static int update_replica_merge_status( const ObTabletID &tablet_id, - const int merge_status, + const bool merge_status, hash::ObHashMap &tablets_commited_map);