fix: add tenant name to match ddl inner sql
This commit is contained in:
@ -3291,7 +3291,7 @@ int ObDDLTaskRecordOperator::kill_task_inner_sql(
|
|||||||
LOG_WARN("assign sql string failed", K(ret));
|
LOG_WARN("assign sql string failed", K(ret));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
LOG_INFO("kill session sql string", K(sql_string), K(task_id), K(sql_exec_addr));
|
LOG_INFO("kill session inner sql", K(sql_string), K(task_id), K(sql_exec_addr));
|
||||||
if (OB_FAIL(ret)) {
|
if (OB_FAIL(ret)) {
|
||||||
} else if (OB_FAIL(proxy.read(res, OB_SYS_TENANT_ID, sql_string.ptr(), &sql_exec_addr))) { // default use OB_SYS_TENANT_ID
|
} else if (OB_FAIL(proxy.read(res, OB_SYS_TENANT_ID, sql_string.ptr(), &sql_exec_addr))) { // default use OB_SYS_TENANT_ID
|
||||||
LOG_WARN("query ddl task record failed", K(ret), K(sql_string));
|
LOG_WARN("query ddl task record failed", K(ret), K(sql_string));
|
||||||
|
|||||||
@ -1233,7 +1233,7 @@ int ObCheckTabletDataComplementOp::check_task_inner_sql_session_status(
|
|||||||
const common::ObAddr &inner_sql_exec_addr,
|
const common::ObAddr &inner_sql_exec_addr,
|
||||||
const common::ObCurTraceId::TraceId &trace_id,
|
const common::ObCurTraceId::TraceId &trace_id,
|
||||||
const uint64_t tenant_id,
|
const uint64_t tenant_id,
|
||||||
const int64_t execution_id,
|
const int64_t task_id,
|
||||||
const int64_t scn,
|
const int64_t scn,
|
||||||
bool &is_old_task_session_exist)
|
bool &is_old_task_session_exist)
|
||||||
{
|
{
|
||||||
@ -1261,12 +1261,14 @@ int ObCheckTabletDataComplementOp::check_task_inner_sql_session_status(
|
|||||||
LOG_WARN("get trace id string failed", K(ret), K(trace_id));
|
LOG_WARN("get trace id string failed", K(ret), K(trace_id));
|
||||||
} else if (!inner_sql_exec_addr.is_valid()) {
|
} else if (!inner_sql_exec_addr.is_valid()) {
|
||||||
if (OB_FAIL(sql_string.assign_fmt(" SELECT id as session_id FROM %s WHERE trace_id = \"%s\" "
|
if (OB_FAIL(sql_string.assign_fmt(" SELECT id as session_id FROM %s WHERE trace_id = \"%s\" "
|
||||||
" and info like \"%cINSERT%c('ddl_execution_id', %ld)%cINTO%cSELECT%c%ld%c\" ",
|
" and tenant = (select tenant_name from __all_tenant where tenant_id = %lu) "
|
||||||
|
" and info like \"%cINSERT%c('ddl_task_id', %ld)%cINTO%cSELECT%c%ld%c\" ",
|
||||||
OB_ALL_VIRTUAL_SESSION_INFO_TNAME,
|
OB_ALL_VIRTUAL_SESSION_INFO_TNAME,
|
||||||
trace_id_str,
|
trace_id_str,
|
||||||
|
tenant_id,
|
||||||
charater,
|
charater,
|
||||||
charater,
|
charater,
|
||||||
execution_id,
|
task_id,
|
||||||
charater,
|
charater,
|
||||||
charater,
|
charater,
|
||||||
charater,
|
charater,
|
||||||
@ -1279,14 +1281,16 @@ int ObCheckTabletDataComplementOp::check_task_inner_sql_session_status(
|
|||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("ip to string failed", K(ret), K(inner_sql_exec_addr));
|
LOG_WARN("ip to string failed", K(ret), K(inner_sql_exec_addr));
|
||||||
} else if (OB_FAIL(sql_string.assign_fmt(" SELECT id as session_id FROM %s WHERE trace_id = \"%s\" "
|
} else if (OB_FAIL(sql_string.assign_fmt(" SELECT id as session_id FROM %s WHERE trace_id = \"%s\" "
|
||||||
" and svr_ip = \"%s\" and svr_port = %d and info like \"%cINSERT%c('ddl_execution_id', %ld)%cINTO%cSELECT%c%ld%c\" ",
|
" and tenant = (select tenant_name from __all_tenant where tenant_id = %lu) "
|
||||||
|
" and svr_ip = \"%s\" and svr_port = %d and info like \"%cINSERT%c('ddl_task_id', %ld)%cINTO%cSELECT%c%ld%c\" ",
|
||||||
OB_ALL_VIRTUAL_SESSION_INFO_TNAME,
|
OB_ALL_VIRTUAL_SESSION_INFO_TNAME,
|
||||||
trace_id_str,
|
trace_id_str,
|
||||||
|
tenant_id,
|
||||||
ip_str,
|
ip_str,
|
||||||
inner_sql_exec_addr.get_port(),
|
inner_sql_exec_addr.get_port(),
|
||||||
charater,
|
charater,
|
||||||
charater,
|
charater,
|
||||||
execution_id,
|
task_id,
|
||||||
charater,
|
charater,
|
||||||
charater,
|
charater,
|
||||||
charater,
|
charater,
|
||||||
@ -1295,6 +1299,9 @@ int ObCheckTabletDataComplementOp::check_task_inner_sql_session_status(
|
|||||||
LOG_WARN("assign sql string failed", K(ret));
|
LOG_WARN("assign sql string failed", K(ret));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (REACH_TIME_INTERVAL(10L * 1000L * 1000L)) { // every 10s
|
||||||
|
LOG_INFO("check task inner sql string", K(sql_string));
|
||||||
|
}
|
||||||
|
|
||||||
if (OB_FAIL(ret)) {
|
if (OB_FAIL(ret)) {
|
||||||
} else if (OB_FAIL(proxy.read(res, OB_SYS_TENANT_ID, sql_string.ptr(), &inner_sql_exec_addr))) {
|
} else if (OB_FAIL(proxy.read(res, OB_SYS_TENANT_ID, sql_string.ptr(), &inner_sql_exec_addr))) {
|
||||||
@ -1807,7 +1814,7 @@ int ObCheckTabletDataComplementOp::check_finish_report_checksum(
|
|||||||
int ObCheckTabletDataComplementOp::check_and_wait_old_complement_task(
|
int ObCheckTabletDataComplementOp::check_and_wait_old_complement_task(
|
||||||
const uint64_t tenant_id,
|
const uint64_t tenant_id,
|
||||||
const uint64_t table_id,
|
const uint64_t table_id,
|
||||||
const uint64_t ddl_task_id,
|
const int64_t ddl_task_id,
|
||||||
const int64_t execution_id,
|
const int64_t execution_id,
|
||||||
const common::ObAddr &inner_sql_exec_addr,
|
const common::ObAddr &inner_sql_exec_addr,
|
||||||
const common::ObCurTraceId::TraceId &trace_id,
|
const common::ObCurTraceId::TraceId &trace_id,
|
||||||
@ -1824,7 +1831,7 @@ int ObCheckTabletDataComplementOp::check_and_wait_old_complement_task(
|
|||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("fail to check and wait complement task", K(ret), K(tenant_id), K(table_id));
|
LOG_WARN("fail to check and wait complement task", K(ret), K(tenant_id), K(table_id));
|
||||||
} else {
|
} else {
|
||||||
if (OB_FAIL(check_task_inner_sql_session_status(inner_sql_exec_addr, trace_id, tenant_id, execution_id, scn, is_old_task_session_exist))) {
|
if (OB_FAIL(check_task_inner_sql_session_status(inner_sql_exec_addr, trace_id, tenant_id, ddl_task_id, scn, is_old_task_session_exist))) {
|
||||||
LOG_WARN("fail check task inner sql session status", K(ret), K(trace_id), K(inner_sql_exec_addr));
|
LOG_WARN("fail check task inner sql session status", K(ret), K(trace_id), K(inner_sql_exec_addr));
|
||||||
} else if (is_old_task_session_exist) {
|
} else if (is_old_task_session_exist) {
|
||||||
ret = OB_EAGAIN;
|
ret = OB_EAGAIN;
|
||||||
|
|||||||
@ -441,7 +441,7 @@ public:
|
|||||||
static int check_and_wait_old_complement_task(
|
static int check_and_wait_old_complement_task(
|
||||||
const uint64_t tenant_id,
|
const uint64_t tenant_id,
|
||||||
const uint64_t index_table_id,
|
const uint64_t index_table_id,
|
||||||
const uint64_t ddl_task_id,
|
const int64_t ddl_task_id,
|
||||||
const int64_t execution_id,
|
const int64_t execution_id,
|
||||||
const common::ObAddr &inner_sql_exec_addr,
|
const common::ObAddr &inner_sql_exec_addr,
|
||||||
const common::ObCurTraceId::TraceId &trace_id,
|
const common::ObCurTraceId::TraceId &trace_id,
|
||||||
@ -468,7 +468,7 @@ private:
|
|||||||
const common::ObAddr &inner_sql_exec_addr,
|
const common::ObAddr &inner_sql_exec_addr,
|
||||||
const common::ObCurTraceId::TraceId &trace_id,
|
const common::ObCurTraceId::TraceId &trace_id,
|
||||||
const uint64_t tenant_id,
|
const uint64_t tenant_id,
|
||||||
const int64_t execution_id,
|
const int64_t task_id,
|
||||||
const int64_t scn,
|
const int64_t scn,
|
||||||
bool &is_old_task_session_exist);
|
bool &is_old_task_session_exist);
|
||||||
|
|
||||||
|
|||||||
@ -874,8 +874,8 @@ int ObPxSubCoord::end_ddl(const bool need_commit)
|
|||||||
LOG_WARN("ddl manager finish contex failed", K(ret), K(ddl_ctrl_));
|
LOG_WARN("ddl manager finish contex failed", K(ret), K(ddl_ctrl_));
|
||||||
}
|
}
|
||||||
LOG_INFO("end ddl sstable", K(ret), K(need_commit));
|
LOG_INFO("end ddl sstable", K(ret), K(need_commit));
|
||||||
}
|
|
||||||
DEBUG_SYNC(END_DDL_IN_PX_SUBCOORD);
|
DEBUG_SYNC(END_DDL_IN_PX_SUBCOORD);
|
||||||
|
}
|
||||||
if (OB_EAGAIN == ret) {
|
if (OB_EAGAIN == ret) {
|
||||||
ret = OB_STATE_NOT_MATCH; // avoid px hang
|
ret = OB_STATE_NOT_MATCH; // avoid px hang
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user