diff --git a/src/rootserver/restore/ob_import_table_job_scheduler.cpp b/src/rootserver/restore/ob_import_table_job_scheduler.cpp index 066e5f7dca..57d5c740be 100644 --- a/src/rootserver/restore/ob_import_table_job_scheduler.cpp +++ b/src/rootserver/restore/ob_import_table_job_scheduler.cpp @@ -307,9 +307,31 @@ int ObImportTableJobScheduler::do_after_import_all_table_(share::ObImportTableJo { int ret = OB_SUCCESS; common::ObArray import_tasks; + ObImportTableJobStatus next_status = ObImportTableJobStatus::get_next_status(job.get_status()); if (OB_FAIL(get_import_table_tasks_(job, import_tasks))) { LOG_WARN("failed to get import table tasks", K(ret), K(job)); + } else if (!next_status.is_valid()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("invalid import job status", K(ret), K(next_status)); + } else if (OB_FAIL(update_statistic_(import_tasks, job))) { + LOG_WARN("failed to update statistic", K(ret)); + } else if (OB_FAIL(advance_status_(*sql_proxy_, job, next_status))) { + LOG_WARN("failed to advance to next status", K(ret)); + } else { + LOG_INFO("[IMPORT_TABLE]importing table finished", K(import_tasks), K(next_status)); + ROOTSERVICE_EVENT_ADD("import_table", "import table task finish", + "tenant_id", job.get_tenant_id(), + "job_id", job.get_job_id(), + "succeed_import_table_count", job.get_finished_table_count(), + "failed_import_table_count", job.get_failed_table_count()); } + return ret; +} + +int ObImportTableJobScheduler::update_statistic_( + common::ObIArray &import_tasks, share::ObImportTableJob &job) +{ + int ret = OB_SUCCESS; int64_t succeed_task_cnt = 0; int64_t failed_task_cnt = 0; ObImportResult::Comment comment; @@ -320,34 +342,26 @@ int ObImportTableJobScheduler::do_after_import_all_table_(share::ObImportTableJo succeed_task_cnt++; } else { failed_task_cnt++; - if (OB_FAIL(databuff_printf(comment.ptr(), comment.capacity(), pos, - "%s%s%.*s", failed_task_cnt == 1 ? "import failed table list: " : "", - failed_task_cnt == 1 ? "" : ",", - task.get_src_table().length(), task.get_src_table().ptr()))) { - if (OB_SIZE_OVERFLOW == ret) { - ret = OB_SUCCESS; - } else { - LOG_WARN("failed to databuff_printf", K(ret)); - } - } + } + } + ObImportResult result; + + if (OB_FAIL(databuff_printf(comment.ptr(), comment.capacity(), pos, + "import succeed table count: %ld, failed table count: %ld", succeed_task_cnt, failed_task_cnt))) { + if (OB_SIZE_OVERFLOW == ret) { + ret = OB_SUCCESS; + } else { + LOG_WARN("failed to databuff_printf", K(ret)); } } - ObImportTableJobStatus next_status = ObImportTableJobStatus::get_next_status(job.get_status()); - job.get_result().set_result(true, comment); - if (OB_FAIL(ret)) { - } else if (!next_status.is_valid()) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("invalid import job status", K(ret), K(next_status)); - } else if (OB_FAIL(advance_status_(*sql_proxy_, job, next_status))) { - LOG_WARN("failed to advance to next status", K(ret)); - } else { - LOG_INFO("[IMPORT_TABLE]importing table finished", K(import_tasks), K(next_status)); - ROOTSERVICE_EVENT_ADD("import_table", "import table task finish", - "tenant_id", job.get_tenant_id(), - "job_id", job.get_job_id(), - "succeed_import_table_count", succeed_task_cnt, - "failed_import_table_count", failed_task_cnt); + result.set_result(true, comment); + job.set_result(result); + job.set_finished_table_count(succeed_task_cnt); + job.set_failed_table_count(failed_task_cnt); + + if (FAILEDx(job_helper_.report_statistics(*sql_proxy_, job))) { + LOG_WARN("failed to report statistics", K(ret)); } return ret; } diff --git a/src/rootserver/restore/ob_import_table_job_scheduler.h b/src/rootserver/restore/ob_import_table_job_scheduler.h index 94d9fbc939..e0219751d2 100644 --- a/src/rootserver/restore/ob_import_table_job_scheduler.h +++ b/src/rootserver/restore/ob_import_table_job_scheduler.h @@ -55,6 +55,7 @@ private: int deal_with_import_table_task_(share::ObImportTableJob &job); int process_import_table_task_(share::ObImportTableTask &task); int do_after_import_all_table_(share::ObImportTableJob &job); + int update_statistic_(common::ObIArray &import_tasks, share::ObImportTableJob &job); int canceling_(share::ObImportTableJob &job); int finish_(const share::ObImportTableJob &job); int persist_import_table_task_(common::ObMySQLTransaction &trans, const share::ObImportTableTask &task); diff --git a/src/share/restore/ob_import_table_persist_helper.cpp b/src/share/restore/ob_import_table_persist_helper.cpp index b03aad15b9..597bd78ade 100644 --- a/src/share/restore/ob_import_table_persist_helper.cpp +++ b/src/share/restore/ob_import_table_persist_helper.cpp @@ -284,6 +284,19 @@ int ObImportTableJobPersistHelper::report_import_job_statistics( return ret; } +int ObImportTableJobPersistHelper::report_statistics(common::ObISQLClient &proxy, const ObImportTableJob &job) const +{ + int ret = OB_SUCCESS; + int64_t affected_rows = 0; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("ObImportTableTaskPersistHelper not init", K(ret)); + } else if (OB_FAIL(table_op_.update_row(proxy, job, affected_rows))) { + LOG_WARN("failed to update row", K(ret), K(job)); + } + return ret; +} + ObImportTableTaskPersistHelper::ObImportTableTaskPersistHelper() : is_inited_(false), tenant_id_(), table_op_() { diff --git a/src/share/restore/ob_import_table_persist_helper.h b/src/share/restore/ob_import_table_persist_helper.h index bb396ee55c..dc96930075 100644 --- a/src/share/restore/ob_import_table_persist_helper.h +++ b/src/share/restore/ob_import_table_persist_helper.h @@ -45,6 +45,7 @@ public: int get_import_table_job_by_initiator(common::ObISQLClient &proxy, const uint64_t initiator_tenant_id, const uint64_t initiator_job_id, ObImportTableJob &job) const; int force_cancel_import_job(common::ObISQLClient &proxy) const; + int report_statistics(common::ObISQLClient &proxy, const ObImportTableJob &job) const; TO_STRING_KV(K_(is_inited), K_(tenant_id)); private: DISALLOW_COPY_AND_ASSIGN(ObImportTableJobPersistHelper); diff --git a/src/share/restore/ob_import_table_struct.cpp b/src/share/restore/ob_import_table_struct.cpp index 1bd756111b..c8fcb24c1f 100644 --- a/src/share/restore/ob_import_table_struct.cpp +++ b/src/share/restore/ob_import_table_struct.cpp @@ -170,6 +170,13 @@ ObImportTableTaskStatus ObImportTableTaskStatus::get_next_status(const int err_c } \ } +#define FILL_HEX_STR_COLUMN(COLUMN_NAME) \ +if (OB_SUCC(ret)) { \ + if (OB_FAIL((dml.add_column(#COLUMN_NAME, ObHexEscapeSqlStr(COLUMN_NAME##_))))) { \ + LOG_WARN("failed to add column", K(ret)); \ + } \ +} + void ObImportTableTask::reset() { @@ -383,15 +390,15 @@ int ObImportTableTask::fill_dml(share::ObDMLSqlSplicer &dml) const } FILL_INT_COLUMN(job_id) FILL_INT_COLUMN(src_tenant_id) - FILL_STR_COLUMN(src_tablespace) - FILL_STR_COLUMN(src_tablegroup) - FILL_STR_COLUMN(src_database) - FILL_STR_COLUMN(src_table) - FILL_STR_COLUMN(src_partition) - FILL_STR_COLUMN(target_tablespace) - FILL_STR_COLUMN(target_tablegroup) - FILL_STR_COLUMN(target_database) - FILL_STR_COLUMN(target_table) + FILL_HEX_STR_COLUMN(src_tablespace) + FILL_HEX_STR_COLUMN(src_tablegroup) + FILL_HEX_STR_COLUMN(src_database) + FILL_HEX_STR_COLUMN(src_table) + FILL_HEX_STR_COLUMN(src_partition) + FILL_HEX_STR_COLUMN(target_tablespace) + FILL_HEX_STR_COLUMN(target_tablegroup) + FILL_HEX_STR_COLUMN(target_database) + FILL_HEX_STR_COLUMN(target_table) FILL_INT_COLUMN(table_column) FILL_INT_COLUMN(start_ts) FILL_INT_COLUMN(completion_ts) @@ -719,6 +726,8 @@ int ObImportTableJob::fill_dml(share::ObDMLSqlSplicer &dml) const LOG_WARN("failed to add column", K(ret)); } else if (OB_FAIL(dml.add_column("import_all", import_all))) { LOG_WARN("failed to add column", K(ret)); + } else if (OB_FAIL(dml.add_column(OB_STR_COMMENT, result_.get_comment()))) { + LOG_WARN("failed to add column", K(ret)); } FILL_INT_COLUMN(initiator_tenant_id) FILL_INT_COLUMN(initiator_job_id) @@ -737,8 +746,6 @@ int ObImportTableJob::fill_dml(share::ObDMLSqlSplicer &dml) const if (OB_SUCC(ret) && status_.is_finish()) { if (OB_FAIL(dml.add_column(OB_STR_RESULT, result_.get_result_str()))) { LOG_WARN("failed to add column", K(ret)); - } else if (OB_FAIL(dml.add_column(OB_STR_COMMENT, result_.get_comment()))) { - LOG_WARN("failed to add column", K(ret)); } } @@ -848,6 +855,7 @@ int ObImportTableJob::assign(const ObImportTableJob &that) set_total_bytes(that.get_total_bytes()); set_finished_bytes(that.get_finished_bytes()); set_failed_bytes(that.get_failed_bytes()); + set_result(that.get_result()); } return ret; } @@ -1383,3 +1391,4 @@ int ObRecoverTableJob::fill_dml(share::ObDMLSqlSplicer &dml) const #undef FILL_INT_COLUMN #undef FILL_UINT_COLUMN #undef FILL_STR_COLUMN +#undef FILL_HEX_STR_COLUMN