fix inner sql kill fail when execute addr is null
This commit is contained in:
parent
8cbcdf06cd
commit
66409181ed
@ -535,7 +535,7 @@ int ObInnerSqlRpcP::process()
|
||||
}
|
||||
/* init session info */
|
||||
const int64_t group_id = transmit_arg.get_consumer_group_id();
|
||||
if (OB_NOT_NULL(tmp_session)) {
|
||||
if (OB_SUCC(ret) && OB_NOT_NULL(tmp_session)) {
|
||||
tmp_session->set_current_trace_id(ObCurTraceId::get_trace_id());
|
||||
tmp_session->switch_tenant(transmit_arg.get_tenant_id());
|
||||
ObString sql_stmt(sql_str.ptr());
|
||||
|
@ -1010,8 +1010,8 @@ int ObDDLTask::switch_status(const ObDDLTaskStatus new_status, const bool enable
|
||||
}
|
||||
|
||||
if (OB_CANCELED == real_ret_code) {
|
||||
// (void)ObDDLTaskRecordOperator::kill_task_inner_sql(root_service->get_sql_proxy(),
|
||||
// trace_id_, tenant_id_, sql_exec_addr_); // ignore return code
|
||||
(void)ObDDLTaskRecordOperator::kill_task_inner_sql(root_service->get_sql_proxy(),
|
||||
trace_id_, tenant_id_, task_id_, snapshot_version_, sql_exec_addr_); // ignore return code
|
||||
LOG_WARN("ddl_task switch_status kill_task_inner_sql");
|
||||
}
|
||||
}
|
||||
@ -3146,17 +3146,16 @@ int ObDDLTaskRecordOperator::kill_task_inner_sql(
|
||||
common::ObMySQLProxy &proxy,
|
||||
const common::ObCurTraceId::TraceId &trace_id,
|
||||
const uint64_t tenant_id,
|
||||
const int64_t task_id,
|
||||
const int64_t snapshot_version,
|
||||
const common::ObAddr &sql_exec_addr)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
char ip_str[common::OB_IP_STR_BUFF];
|
||||
|
||||
if (OB_UNLIKELY(!proxy.is_inited()) || !sql_exec_addr.is_valid()) {
|
||||
if (OB_UNLIKELY(!proxy.is_inited() || trace_id.is_invalid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(proxy.is_inited()));
|
||||
} else if (!sql_exec_addr.ip_to_string(ip_str, sizeof(ip_str))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("ip to string failed", K(ret), K(sql_exec_addr));
|
||||
} else {
|
||||
LOG_INFO("start ddl kill inner sql session", K(ret), K(trace_id));
|
||||
ObSqlString sql_string;
|
||||
@ -3167,17 +3166,47 @@ int ObDDLTaskRecordOperator::kill_task_inner_sql(
|
||||
if (OB_UNLIKELY(0 > trace_id.to_string(trace_id_str, sizeof(trace_id_str)))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("get trace id string failed", K(ret), K(trace_id), K(tenant_id));
|
||||
} else if (OB_FAIL(sql_string.assign_fmt(" SELECT id as session_id FROM %s WHERE trace_id = \"%s\""
|
||||
" and svr_ip = \"%s\" and svr_port = %d and info like \"%cINSERT%cINTO%cSELECT%c\" ",
|
||||
OB_ALL_VIRTUAL_SESSION_INFO_TNAME,
|
||||
trace_id_str,
|
||||
ip_str,
|
||||
sql_exec_addr.get_port(),
|
||||
spec_charater,
|
||||
spec_charater,
|
||||
spec_charater,
|
||||
spec_charater))) {
|
||||
LOG_WARN("assign sql string failed", K(ret));
|
||||
} else if (!sql_exec_addr.is_valid()) {
|
||||
if (OB_FAIL(sql_string.assign_fmt(" SELECT id as session_id FROM %s WHERE trace_id = \"%s\" "
|
||||
" and tenant = (select tenant_name from __all_tenant where tenant_id = %lu) "
|
||||
" and info like \"%cINSERT%c('ddl_task_id', %ld)%cINTO%cSELECT%c%ld%c\" ",
|
||||
OB_ALL_VIRTUAL_SESSION_INFO_TNAME,
|
||||
trace_id_str,
|
||||
tenant_id,
|
||||
spec_charater,
|
||||
spec_charater,
|
||||
task_id,
|
||||
spec_charater,
|
||||
spec_charater,
|
||||
spec_charater,
|
||||
snapshot_version,
|
||||
spec_charater))) {
|
||||
LOG_WARN("assign sql string failed", K(ret));
|
||||
}
|
||||
} else {
|
||||
if (!sql_exec_addr.ip_to_string(ip_str, sizeof(ip_str))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("ip to string failed", K(ret), K(sql_exec_addr));
|
||||
} else if (OB_FAIL(sql_string.assign_fmt(" SELECT id as session_id FROM %s WHERE trace_id = \"%s\" "
|
||||
" and tenant = (select tenant_name from __all_tenant where tenant_id = %lu) "
|
||||
" and svr_ip = \"%s\" and svr_port = %d and info like \"%cINSERT%c('ddl_task_id', %ld)%cINTO%cSELECT%c%ld%c\" ",
|
||||
OB_ALL_VIRTUAL_SESSION_INFO_TNAME,
|
||||
trace_id_str,
|
||||
tenant_id,
|
||||
ip_str,
|
||||
sql_exec_addr.get_port(),
|
||||
spec_charater,
|
||||
spec_charater,
|
||||
task_id,
|
||||
spec_charater,
|
||||
spec_charater,
|
||||
spec_charater,
|
||||
snapshot_version,
|
||||
spec_charater))) {
|
||||
LOG_WARN("assign sql string failed", K(ret));
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(proxy.read(res, OB_SYS_TENANT_ID, sql_string.ptr(), &sql_exec_addr))) { // default use OB_SYS_TENANT_ID
|
||||
LOG_WARN("query ddl task record failed", K(ret), K(sql_string));
|
||||
} else if (OB_ISNULL((result = res.get_result()))) {
|
||||
@ -3195,10 +3224,12 @@ int ObDDLTaskRecordOperator::kill_task_inner_sql(
|
||||
}
|
||||
} else {
|
||||
EXTRACT_UINT_FIELD_MYSQL(*result, "session_id", session_id, uint64_t);
|
||||
if (OB_FAIL(kill_inner_sql(proxy, tenant_id, session_id))){
|
||||
LOG_WARN("fail to kill session", K(ret), K(session_id), K(trace_id));
|
||||
} else {
|
||||
LOG_WARN("succ to kill session", K(ret), K(session_id), K(trace_id));
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(kill_inner_sql(proxy, tenant_id, session_id))){
|
||||
LOG_WARN("fail to kill session", K(ret), K(session_id), K(trace_id));
|
||||
} else {
|
||||
LOG_WARN("succ to kill session", K(ret), K(session_id), K(trace_id));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -256,6 +256,8 @@ public:
|
||||
common::ObMySQLProxy &proxy,
|
||||
const common::ObCurTraceId::TraceId &trace_id,
|
||||
const uint64_t tenant_id,
|
||||
const int64_t task_id,
|
||||
const int64_t snapshot_version,
|
||||
const common::ObAddr &sql_exec_addr);
|
||||
|
||||
private:
|
||||
|
@ -1258,7 +1258,7 @@ int ObCheckTabletDataComplementOp::check_task_inner_sql_session_status(
|
||||
|
||||
int ObCheckTabletDataComplementOp::update_replica_merge_status(
|
||||
const ObTabletID &tablet_id,
|
||||
const int merge_status,
|
||||
const bool merge_status,
|
||||
hash::ObHashMap<ObTabletID, int32_t> &tablets_commited_map)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -1269,7 +1269,7 @@ int ObCheckTabletDataComplementOp::update_replica_merge_status(
|
||||
int32_t commited_count = 0;
|
||||
if (OB_SUCC(tablets_commited_map.get_refactored(tablet_id, commited_count))) {
|
||||
// overwrite
|
||||
if (true == merge_status) {
|
||||
if (merge_status) {
|
||||
commited_count++;
|
||||
if (OB_FAIL(tablets_commited_map.set_refactored(tablet_id, commited_count, true /* overwrite */))) {
|
||||
LOG_WARN("fail to insert map status", K(ret));
|
||||
@ -1277,7 +1277,7 @@ int ObCheckTabletDataComplementOp::update_replica_merge_status(
|
||||
}
|
||||
} else if (OB_HASH_NOT_EXIST == ret) { // new insert
|
||||
ret = OB_SUCCESS;
|
||||
if (true == merge_status) {
|
||||
if (merge_status) {
|
||||
commited_count = 1;
|
||||
if (OB_FAIL(tablets_commited_map.set_refactored(tablet_id, commited_count, true /* overwrite */))) {
|
||||
LOG_WARN("fail to insert map status", K(ret));
|
||||
|
@ -421,7 +421,7 @@ private:
|
||||
|
||||
static int update_replica_merge_status(
|
||||
const ObTabletID &tablet_id,
|
||||
const int merge_status,
|
||||
const bool merge_status,
|
||||
hash::ObHashMap<ObTabletID, int32_t> &tablets_commited_map);
|
||||
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user