Fix update execution id core

This commit is contained in:
Hongqin-Li
2023-01-29 16:04:46 +08:00
committed by ob-robot
parent 0a971dad66
commit b87473dac2
11 changed files with 298 additions and 256 deletions

View File

@ -1019,7 +1019,7 @@ int ObCheckTabletDataComplementOp::check_task_inner_sql_session_status(
const common::ObAddr &inner_sql_exec_addr,
const common::ObCurTraceId::TraceId &trace_id,
const uint64_t tenant_id,
const int64_t schema_version,
const int64_t execution_id,
const int64_t scn,
bool &is_old_task_session_exist)
{
@ -1034,9 +1034,6 @@ int ObCheckTabletDataComplementOp::check_task_inner_sql_session_status(
} else if (OB_UNLIKELY(OB_INVALID_ID == tenant_id || trace_id.is_invalid() || !inner_sql_exec_addr.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(tenant_id), K(trace_id), K(inner_sql_exec_addr));
} else if (!inner_sql_exec_addr.ip_to_string(ip_str, sizeof(ip_str))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ip to string failed", K(ret), K(inner_sql_exec_addr));
} else {
ret = OB_SUCCESS;
common::ObMySQLProxy &proxy = root_service->get_sql_proxy();
@ -1048,21 +1045,44 @@ int ObCheckTabletDataComplementOp::check_task_inner_sql_session_status(
if (OB_UNLIKELY(0 > trace_id.to_string(trace_id_str, sizeof(trace_id_str)))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get trace id string failed", K(ret), K(trace_id));
} else if (OB_FAIL(sql_string.assign_fmt(" SELECT id as session_id FROM %s WHERE trace_id = \"%s\" "
" and svr_ip = \"%s\" and svr_port = %d and info like \"%cINSERT%cINTO%cSELECT%c%ld%c%ld%c\" ",
OB_ALL_VIRTUAL_SESSION_INFO_TNAME,
trace_id_str,
ip_str,
inner_sql_exec_addr.get_port(),
charater,
charater,
charater,
charater,
schema_version,
charater,
scn,
charater ))) {
LOG_WARN("assign sql string failed", K(ret));
} else if (!inner_sql_exec_addr.is_valid()) {
if (OB_FAIL(sql_string.assign_fmt(" SELECT id as session_id FROM %s WHERE trace_id = \"%s\" "
" and info like \"%cINSERT%c('ddl_execution_id', %ld)%cINTO%cSELECT%c%ld%c\" ",
OB_ALL_VIRTUAL_SESSION_INFO_TNAME,
trace_id_str,
charater,
charater,
execution_id,
charater,
charater,
charater,
scn,
charater ))) {
LOG_WARN("assign sql string failed", K(ret));
}
} else {
if (!inner_sql_exec_addr.ip_to_string(ip_str, sizeof(ip_str))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ip to string failed", K(ret), K(inner_sql_exec_addr));
} else if (OB_FAIL(sql_string.assign_fmt(" SELECT id as session_id FROM %s WHERE trace_id = \"%s\" "
" and svr_ip = \"%s\" and svr_port = %d and info like \"%cINSERT%c('ddl_execution_id', %ld)%cINTO%cSELECT%c%ld%c\" ",
OB_ALL_VIRTUAL_SESSION_INFO_TNAME,
trace_id_str,
ip_str,
inner_sql_exec_addr.get_port(),
charater,
charater,
execution_id,
charater,
charater,
charater,
scn,
charater ))) {
LOG_WARN("assign sql string failed", K(ret));
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(proxy.read(res, OB_SYS_TENANT_ID, sql_string.ptr(), &inner_sql_exec_addr))) {
LOG_WARN("query ddl task record failed", K(ret), K(sql_string));
} else if (OB_ISNULL((result = res.get_result()))) {
@ -1569,7 +1589,7 @@ int ObCheckTabletDataComplementOp::check_and_wait_old_complement_task(
} else if (is_all_sstable_build_finished) {
LOG_INFO("all tablet sstable has build finished");
} else {
if (OB_FAIL(check_task_inner_sql_session_status(inner_sql_exec_addr, trace_id, tenant_id, schema_version, scn, is_old_task_session_exist))) {
if (OB_FAIL(check_task_inner_sql_session_status(inner_sql_exec_addr, trace_id, tenant_id, execution_id, scn, is_old_task_session_exist))) {
LOG_WARN("fail check task inner sql session status", K(ret), K(trace_id), K(inner_sql_exec_addr));
} else if (!is_old_task_session_exist) {
LOG_WARN("old inner sql session is not exist.", K(ret));