[CP] fill import table statistic

This commit is contained in:
hamstersox
2023-09-20 15:48:06 +00:00
committed by ob-robot
parent f4f4170dbe
commit 4e7afbb24f
5 changed files with 74 additions and 36 deletions

View File

@ -307,9 +307,31 @@ int ObImportTableJobScheduler::do_after_import_all_table_(share::ObImportTableJo
{
int ret = OB_SUCCESS;
common::ObArray<share::ObImportTableTask> import_tasks;
ObImportTableJobStatus next_status = ObImportTableJobStatus::get_next_status(job.get_status());
if (OB_FAIL(get_import_table_tasks_(job, import_tasks))) {
LOG_WARN("failed to get import table tasks", K(ret), K(job));
} else if (!next_status.is_valid()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid import job status", K(ret), K(next_status));
} else if (OB_FAIL(update_statistic_(import_tasks, job))) {
LOG_WARN("failed to update statistic", K(ret));
} else if (OB_FAIL(advance_status_(*sql_proxy_, job, next_status))) {
LOG_WARN("failed to advance to next status", K(ret));
} else {
LOG_INFO("[IMPORT_TABLE]importing table finished", K(import_tasks), K(next_status));
ROOTSERVICE_EVENT_ADD("import_table", "import table task finish",
"tenant_id", job.get_tenant_id(),
"job_id", job.get_job_id(),
"succeed_import_table_count", job.get_finished_table_count(),
"failed_import_table_count", job.get_failed_table_count());
}
return ret;
}
int ObImportTableJobScheduler::update_statistic_(
common::ObIArray<share::ObImportTableTask> &import_tasks, share::ObImportTableJob &job)
{
int ret = OB_SUCCESS;
int64_t succeed_task_cnt = 0;
int64_t failed_task_cnt = 0;
ObImportResult::Comment comment;
@ -320,34 +342,26 @@ int ObImportTableJobScheduler::do_after_import_all_table_(share::ObImportTableJo
succeed_task_cnt++;
} else {
failed_task_cnt++;
if (OB_FAIL(databuff_printf(comment.ptr(), comment.capacity(), pos,
"%s%s%.*s", failed_task_cnt == 1 ? "import failed table list: " : "",
failed_task_cnt == 1 ? "" : ",",
task.get_src_table().length(), task.get_src_table().ptr()))) {
if (OB_SIZE_OVERFLOW == ret) {
ret = OB_SUCCESS;
} else {
LOG_WARN("failed to databuff_printf", K(ret));
}
}
}
}
ObImportResult result;
if (OB_FAIL(databuff_printf(comment.ptr(), comment.capacity(), pos,
"import succeed table count: %ld, failed table count: %ld", succeed_task_cnt, failed_task_cnt))) {
if (OB_SIZE_OVERFLOW == ret) {
ret = OB_SUCCESS;
} else {
LOG_WARN("failed to databuff_printf", K(ret));
}
}
ObImportTableJobStatus next_status = ObImportTableJobStatus::get_next_status(job.get_status());
job.get_result().set_result(true, comment);
if (OB_FAIL(ret)) {
} else if (!next_status.is_valid()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid import job status", K(ret), K(next_status));
} else if (OB_FAIL(advance_status_(*sql_proxy_, job, next_status))) {
LOG_WARN("failed to advance to next status", K(ret));
} else {
LOG_INFO("[IMPORT_TABLE]importing table finished", K(import_tasks), K(next_status));
ROOTSERVICE_EVENT_ADD("import_table", "import table task finish",
"tenant_id", job.get_tenant_id(),
"job_id", job.get_job_id(),
"succeed_import_table_count", succeed_task_cnt,
"failed_import_table_count", failed_task_cnt);
result.set_result(true, comment);
job.set_result(result);
job.set_finished_table_count(succeed_task_cnt);
job.set_failed_table_count(failed_task_cnt);
if (FAILEDx(job_helper_.report_statistics(*sql_proxy_, job))) {
LOG_WARN("failed to report statistics", K(ret));
}
return ret;
}

View File

@ -55,6 +55,7 @@ private:
int deal_with_import_table_task_(share::ObImportTableJob &job);
int process_import_table_task_(share::ObImportTableTask &task);
int do_after_import_all_table_(share::ObImportTableJob &job);
int update_statistic_(common::ObIArray<share::ObImportTableTask> &import_tasks, share::ObImportTableJob &job);
int canceling_(share::ObImportTableJob &job);
int finish_(const share::ObImportTableJob &job);
int persist_import_table_task_(common::ObMySQLTransaction &trans, const share::ObImportTableTask &task);

View File

@ -284,6 +284,19 @@ int ObImportTableJobPersistHelper::report_import_job_statistics(
return ret;
}
int ObImportTableJobPersistHelper::report_statistics(common::ObISQLClient &proxy, const ObImportTableJob &job) const
{
int ret = OB_SUCCESS;
int64_t affected_rows = 0;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObImportTableTaskPersistHelper not init", K(ret));
} else if (OB_FAIL(table_op_.update_row(proxy, job, affected_rows))) {
LOG_WARN("failed to update row", K(ret), K(job));
}
return ret;
}
ObImportTableTaskPersistHelper::ObImportTableTaskPersistHelper()
: is_inited_(false), tenant_id_(), table_op_()
{

View File

@ -45,6 +45,7 @@ public:
int get_import_table_job_by_initiator(common::ObISQLClient &proxy,
const uint64_t initiator_tenant_id, const uint64_t initiator_job_id, ObImportTableJob &job) const;
int force_cancel_import_job(common::ObISQLClient &proxy) const;
int report_statistics(common::ObISQLClient &proxy, const ObImportTableJob &job) const;
TO_STRING_KV(K_(is_inited), K_(tenant_id));
private:
DISALLOW_COPY_AND_ASSIGN(ObImportTableJobPersistHelper);

View File

@ -170,6 +170,13 @@ ObImportTableTaskStatus ObImportTableTaskStatus::get_next_status(const int err_c
} \
}
#define FILL_HEX_STR_COLUMN(COLUMN_NAME) \
if (OB_SUCC(ret)) { \
if (OB_FAIL((dml.add_column(#COLUMN_NAME, ObHexEscapeSqlStr(COLUMN_NAME##_))))) { \
LOG_WARN("failed to add column", K(ret)); \
} \
}
void ObImportTableTask::reset()
{
@ -383,15 +390,15 @@ int ObImportTableTask::fill_dml(share::ObDMLSqlSplicer &dml) const
}
FILL_INT_COLUMN(job_id)
FILL_INT_COLUMN(src_tenant_id)
FILL_STR_COLUMN(src_tablespace)
FILL_STR_COLUMN(src_tablegroup)
FILL_STR_COLUMN(src_database)
FILL_STR_COLUMN(src_table)
FILL_STR_COLUMN(src_partition)
FILL_STR_COLUMN(target_tablespace)
FILL_STR_COLUMN(target_tablegroup)
FILL_STR_COLUMN(target_database)
FILL_STR_COLUMN(target_table)
FILL_HEX_STR_COLUMN(src_tablespace)
FILL_HEX_STR_COLUMN(src_tablegroup)
FILL_HEX_STR_COLUMN(src_database)
FILL_HEX_STR_COLUMN(src_table)
FILL_HEX_STR_COLUMN(src_partition)
FILL_HEX_STR_COLUMN(target_tablespace)
FILL_HEX_STR_COLUMN(target_tablegroup)
FILL_HEX_STR_COLUMN(target_database)
FILL_HEX_STR_COLUMN(target_table)
FILL_INT_COLUMN(table_column)
FILL_INT_COLUMN(start_ts)
FILL_INT_COLUMN(completion_ts)
@ -719,6 +726,8 @@ int ObImportTableJob::fill_dml(share::ObDMLSqlSplicer &dml) const
LOG_WARN("failed to add column", K(ret));
} else if (OB_FAIL(dml.add_column("import_all", import_all))) {
LOG_WARN("failed to add column", K(ret));
} else if (OB_FAIL(dml.add_column(OB_STR_COMMENT, result_.get_comment()))) {
LOG_WARN("failed to add column", K(ret));
}
FILL_INT_COLUMN(initiator_tenant_id)
FILL_INT_COLUMN(initiator_job_id)
@ -737,8 +746,6 @@ int ObImportTableJob::fill_dml(share::ObDMLSqlSplicer &dml) const
if (OB_SUCC(ret) && status_.is_finish()) {
if (OB_FAIL(dml.add_column(OB_STR_RESULT, result_.get_result_str()))) {
LOG_WARN("failed to add column", K(ret));
} else if (OB_FAIL(dml.add_column(OB_STR_COMMENT, result_.get_comment()))) {
LOG_WARN("failed to add column", K(ret));
}
}
@ -848,6 +855,7 @@ int ObImportTableJob::assign(const ObImportTableJob &that)
set_total_bytes(that.get_total_bytes());
set_finished_bytes(that.get_finished_bytes());
set_failed_bytes(that.get_failed_bytes());
set_result(that.get_result());
}
return ret;
}
@ -1383,3 +1391,4 @@ int ObRecoverTableJob::fill_dml(share::ObDMLSqlSplicer &dml) const
#undef FILL_INT_COLUMN
#undef FILL_UINT_COLUMN
#undef FILL_STR_COLUMN
#undef FILL_HEX_STR_COLUMN