Add direct load information to __all_virtual_long_ops_status
This commit is contained in:
@ -22,6 +22,9 @@
|
||||
#include "rootserver/ddl_task/ob_ddl_redefinition_task.h"
|
||||
#include "storage/tablelock/ob_table_lock_service.h"
|
||||
#include "observer/ob_server_event_history_table_operator.h"
|
||||
#include "sql/engine/cmd/ob_load_data_utils.h"
|
||||
#include "lib/mysqlclient/ob_mysql_proxy.h"
|
||||
#include "lib/mysqlclient/ob_mysql_result.h"
|
||||
|
||||
using namespace oceanbase::lib;
|
||||
using namespace oceanbase::common;
|
||||
@ -1136,6 +1139,15 @@ int ObTableRedefinitionTask::collect_longops_stat(ObLongopsValue &value)
|
||||
}
|
||||
break;
|
||||
}
|
||||
case ObDDLTaskStatus::REPENDING: {
|
||||
if (OB_FAIL(databuff_printf(stat_info_.message_,
|
||||
MAX_LONG_OPS_MESSAGE_LENGTH,
|
||||
pos,
|
||||
"STATUS: REPENDING"))) {
|
||||
LOG_WARN("failed to print", K(ret));
|
||||
}
|
||||
break;
|
||||
}
|
||||
case ObDDLTaskStatus::FAIL: {
|
||||
if (OB_FAIL(databuff_printf(stat_info_.message_,
|
||||
MAX_LONG_OPS_MESSAGE_LENGTH,
|
||||
@ -1159,6 +1171,28 @@ int ObTableRedefinitionTask::collect_longops_stat(ObLongopsValue &value)
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// append direct load information to the message
|
||||
if (OB_SUCC(ret) && (ObDDLType::DDL_DIRECT_LOAD == get_task_type())) {
|
||||
common::ObArenaAllocator allocator(lib::ObLabel("RedefTask"));
|
||||
sql::ObLoadDataStat job_stat;
|
||||
if (OB_FAIL(get_direct_load_job_stat(allocator, job_stat))) {
|
||||
LOG_WARN("failed to get direct load job_stat", KR(ret));
|
||||
} else if (job_stat.job_id_ > 0) {
|
||||
databuff_printf(stat_info_.message_, MAX_LONG_OPS_MESSAGE_LENGTH, pos,
|
||||
", TABLE_ID: %ld, BATCH_SIZE: %ld, PARALLEL: %ld, MAX_ALLOWED_ERROR_ROWS: %ld"
|
||||
", DETECTED_ERROR_ROWS: %ld, PROCESSED_ROWS: %ld, LOAD_STATUS: %.*s",
|
||||
job_stat.job_id_,
|
||||
job_stat.batch_size_,
|
||||
job_stat.parallel_,
|
||||
job_stat.max_allowed_error_rows_,
|
||||
job_stat.detected_error_rows_,
|
||||
job_stat.coordinator.received_rows_,
|
||||
job_stat.coordinator.status_.length(),
|
||||
job_stat.coordinator.status_.ptr());
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(copy_longops_stat(value))) {
|
||||
LOG_WARN("failed to collect common longops stat", K(ret));
|
||||
@ -1221,3 +1255,50 @@ void ObTableRedefinitionTask::flt_set_status_span_tag() const
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int ObTableRedefinitionTask::get_direct_load_job_stat(common::ObArenaAllocator &allocator,
|
||||
sql::ObLoadDataStat &job_stat)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObMySQLProxy &sql_proxy = *GCTX.sql_proxy_;
|
||||
ObSqlString select_sql;
|
||||
sqlclient::ObMySQLResult *select_result = NULL;
|
||||
SMART_VAR(ObMySQLProxy::MySQLResult, select_res) {
|
||||
if (OB_FAIL(select_sql.assign_fmt(
|
||||
"SELECT JOB_ID, BATCH_SIZE, PARALLEL, MAX_ALLOWED_ERROR_ROWS, DETECTED_ERROR_ROWS, "
|
||||
"COORDINATOR_RECEIVED_ROWS, COORDINATOR_STATUS FROM %s WHERE TENANT_ID=%lu "
|
||||
"AND JOB_ID=%ld AND JOB_TYPE='direct' AND COORDINATOR_STATUS!='none'",
|
||||
OB_ALL_VIRTUAL_LOAD_DATA_STAT_TNAME, tenant_id_, object_id_))) {
|
||||
LOG_WARN("failed to assign sql", KR(ret));
|
||||
} else if (OB_FAIL(sql_proxy.read(select_res, OB_SYS_TENANT_ID, select_sql.ptr()))) {
|
||||
LOG_WARN("fail to execute sql", KR(ret), K(select_sql));
|
||||
} else if (OB_ISNULL(select_result = select_res.get_result())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("error unexpected, query result must not be NULL", KR(ret));
|
||||
}
|
||||
while (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(select_result->next())) {
|
||||
if (OB_ITER_END == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
break;
|
||||
} else {
|
||||
LOG_WARN("failed to get next row", KR(ret));
|
||||
}
|
||||
} else {
|
||||
ObString load_status;
|
||||
EXTRACT_INT_FIELD_MYSQL(*select_result, "JOB_ID", job_stat.job_id_, int64_t);
|
||||
EXTRACT_INT_FIELD_MYSQL(*select_result, "BATCH_SIZE", job_stat.batch_size_, int64_t);
|
||||
EXTRACT_INT_FIELD_MYSQL(*select_result, "PARALLEL", job_stat.parallel_, int64_t);
|
||||
EXTRACT_INT_FIELD_MYSQL(*select_result, "MAX_ALLOWED_ERROR_ROWS", job_stat.max_allowed_error_rows_, int64_t);
|
||||
EXTRACT_INT_FIELD_MYSQL(*select_result, "DETECTED_ERROR_ROWS", job_stat.detected_error_rows_, int64_t);
|
||||
EXTRACT_INT_FIELD_MYSQL(*select_result, "COORDINATOR_RECEIVED_ROWS", job_stat.coordinator.received_rows_, int64_t);
|
||||
EXTRACT_VARCHAR_FIELD_MYSQL(*select_result, "COORDINATOR_STATUS", load_status);
|
||||
if (OB_SUCC(ret)
|
||||
&& OB_FAIL(ob_write_string(allocator, load_status, job_stat.coordinator.status_))) {
|
||||
LOG_WARN("failed to write string", KR(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
Reference in New Issue
Block a user