From fdf9ca8decb4ee84b385bc9d4e04e5a69ec31d20 Mon Sep 17 00:00:00 2001 From: obdev Date: Thu, 2 Mar 2023 17:52:08 +0000 Subject: [PATCH] Add direct load information to __all_virtual_long_ops_status --- .../ddl_task/ob_table_redefinition_task.cpp | 81 +++++++++++++++++++ .../ddl_task/ob_table_redefinition_task.h | 5 ++ 2 files changed, 86 insertions(+) diff --git a/src/rootserver/ddl_task/ob_table_redefinition_task.cpp b/src/rootserver/ddl_task/ob_table_redefinition_task.cpp index d56cbfbaf9..2555e0c253 100644 --- a/src/rootserver/ddl_task/ob_table_redefinition_task.cpp +++ b/src/rootserver/ddl_task/ob_table_redefinition_task.cpp @@ -22,6 +22,9 @@ #include "rootserver/ddl_task/ob_ddl_redefinition_task.h" #include "storage/tablelock/ob_table_lock_service.h" #include "observer/ob_server_event_history_table_operator.h" +#include "sql/engine/cmd/ob_load_data_utils.h" +#include "lib/mysqlclient/ob_mysql_proxy.h" +#include "lib/mysqlclient/ob_mysql_result.h" using namespace oceanbase::lib; using namespace oceanbase::common; @@ -1136,6 +1139,15 @@ int ObTableRedefinitionTask::collect_longops_stat(ObLongopsValue &value) } break; } + case ObDDLTaskStatus::REPENDING: { + if (OB_FAIL(databuff_printf(stat_info_.message_, + MAX_LONG_OPS_MESSAGE_LENGTH, + pos, + "STATUS: REPENDING"))) { + LOG_WARN("failed to print", K(ret)); + } + break; + } case ObDDLTaskStatus::FAIL: { if (OB_FAIL(databuff_printf(stat_info_.message_, MAX_LONG_OPS_MESSAGE_LENGTH, @@ -1159,6 +1171,28 @@ int ObTableRedefinitionTask::collect_longops_stat(ObLongopsValue &value) break; } } + + // append direct load information to the message + if (OB_SUCC(ret) && (ObDDLType::DDL_DIRECT_LOAD == get_task_type())) { + common::ObArenaAllocator allocator(lib::ObLabel("RedefTask")); + sql::ObLoadDataStat job_stat; + if (OB_FAIL(get_direct_load_job_stat(allocator, job_stat))) { + LOG_WARN("failed to get direct load job_stat", KR(ret)); + } else if (job_stat.job_id_ > 0) { + databuff_printf(stat_info_.message_, MAX_LONG_OPS_MESSAGE_LENGTH, pos, + ", TABLE_ID: %ld, BATCH_SIZE: %ld, PARALLEL: %ld, MAX_ALLOWED_ERROR_ROWS: %ld" + ", DETECTED_ERROR_ROWS: %ld, PROCESSED_ROWS: %ld, LOAD_STATUS: %.*s", + job_stat.job_id_, + job_stat.batch_size_, + job_stat.parallel_, + job_stat.max_allowed_error_rows_, + job_stat.detected_error_rows_, + job_stat.coordinator.received_rows_, + job_stat.coordinator.status_.length(), + job_stat.coordinator.status_.ptr()); + } + } + if (OB_FAIL(ret)) { } else if (OB_FAIL(copy_longops_stat(value))) { LOG_WARN("failed to collect common longops stat", K(ret)); @@ -1221,3 +1255,50 @@ void ObTableRedefinitionTask::flt_set_status_span_tag() const } } } + +int ObTableRedefinitionTask::get_direct_load_job_stat(common::ObArenaAllocator &allocator, + sql::ObLoadDataStat &job_stat) +{ + int ret = OB_SUCCESS; + ObMySQLProxy &sql_proxy = *GCTX.sql_proxy_; + ObSqlString select_sql; + sqlclient::ObMySQLResult *select_result = NULL; + SMART_VAR(ObMySQLProxy::MySQLResult, select_res) { + if (OB_FAIL(select_sql.assign_fmt( + "SELECT JOB_ID, BATCH_SIZE, PARALLEL, MAX_ALLOWED_ERROR_ROWS, DETECTED_ERROR_ROWS, " + "COORDINATOR_RECEIVED_ROWS, COORDINATOR_STATUS FROM %s WHERE TENANT_ID=%lu " + "AND JOB_ID=%ld AND JOB_TYPE='direct' AND COORDINATOR_STATUS!='none'", + OB_ALL_VIRTUAL_LOAD_DATA_STAT_TNAME, tenant_id_, object_id_))) { + LOG_WARN("failed to assign sql", KR(ret)); + } else if (OB_FAIL(sql_proxy.read(select_res, OB_SYS_TENANT_ID, select_sql.ptr()))) { + LOG_WARN("fail to execute sql", KR(ret), K(select_sql)); + } else if (OB_ISNULL(select_result = select_res.get_result())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("error unexpected, query result must not be NULL", KR(ret)); + } + while (OB_SUCC(ret)) { + if (OB_FAIL(select_result->next())) { + if (OB_ITER_END == ret) { + ret = OB_SUCCESS; + break; + } else { + LOG_WARN("failed to get next row", KR(ret)); + } + } else { + ObString load_status; + EXTRACT_INT_FIELD_MYSQL(*select_result, "JOB_ID", job_stat.job_id_, int64_t); + EXTRACT_INT_FIELD_MYSQL(*select_result, "BATCH_SIZE", job_stat.batch_size_, int64_t); + EXTRACT_INT_FIELD_MYSQL(*select_result, "PARALLEL", job_stat.parallel_, int64_t); + EXTRACT_INT_FIELD_MYSQL(*select_result, "MAX_ALLOWED_ERROR_ROWS", job_stat.max_allowed_error_rows_, int64_t); + EXTRACT_INT_FIELD_MYSQL(*select_result, "DETECTED_ERROR_ROWS", job_stat.detected_error_rows_, int64_t); + EXTRACT_INT_FIELD_MYSQL(*select_result, "COORDINATOR_RECEIVED_ROWS", job_stat.coordinator.received_rows_, int64_t); + EXTRACT_VARCHAR_FIELD_MYSQL(*select_result, "COORDINATOR_STATUS", load_status); + if (OB_SUCC(ret) + && OB_FAIL(ob_write_string(allocator, load_status, job_stat.coordinator.status_))) { + LOG_WARN("failed to write string", KR(ret)); + } + } + } + } + return ret; +} \ No newline at end of file diff --git a/src/rootserver/ddl_task/ob_table_redefinition_task.h b/src/rootserver/ddl_task/ob_table_redefinition_task.h index 6f28b032e1..f42c76b622 100644 --- a/src/rootserver/ddl_task/ob_table_redefinition_task.h +++ b/src/rootserver/ddl_task/ob_table_redefinition_task.h @@ -18,6 +18,10 @@ namespace oceanbase { +namespace sql +{ + class ObLoadDataStat; +} namespace rootserver { class ObRootService; @@ -87,6 +91,7 @@ private: int replica_end_check(const int ret_code); int check_modify_autoinc(bool &modify_autoinc); int check_use_heap_table_ddl_plan(bool &use_heap_table_ddl_plan); + int get_direct_load_job_stat(common::ObArenaAllocator &allocator, sql::ObLoadDataStat &job_stat); private: static const int64_t OB_TABLE_REDEFINITION_TASK_VERSION = 1L; bool has_rebuild_index_;