From 9d65777404c4fc5021e59c2c3d279895dd7764cf Mon Sep 17 00:00:00 2001 From: oceanoverflow Date: Thu, 10 Mar 2022 10:20:07 +0800 Subject: [PATCH] modify column statistic when physical restore --- .../restore/ob_restore_scheduler.cpp | 15 ++ src/rootserver/restore/ob_restore_scheduler.h | 1 + .../ob_physical_restore_table_operator.cpp | 234 ++++++++++++++++++ .../ob_physical_restore_table_operator.h | 46 ++++ 4 files changed, 296 insertions(+) diff --git a/src/rootserver/restore/ob_restore_scheduler.cpp b/src/rootserver/restore/ob_restore_scheduler.cpp index c1794e320..e4f45f287 100644 --- a/src/rootserver/restore/ob_restore_scheduler.cpp +++ b/src/rootserver/restore/ob_restore_scheduler.cpp @@ -1609,6 +1609,8 @@ int ObRestoreScheduler::modify_schema(const ObPhysicalRestoreJob &job_info) LOG_WARN("fail to convert parameters", K(ret), K(job_info)); } else if (need_log_nop && OB_FAIL(log_nop_operation(job_info))) { LOG_WARN("fail to log nop operation", KR(ret), K(job_info)); + } else if (OB_FAIL(convert_column_statistic(job_info.tenant_id_))) { + LOG_WARN("failed to convert column statistic", K(ret), K(job_info)); } else { // reset __all_restore_progress ObPhysicalRestoreTableOperator restore_op; @@ -2576,6 +2578,19 @@ int ObRestoreScheduler::log_nop_operation(const ObPhysicalRestoreJob &job_info) return ret; } +int ObRestoreScheduler::convert_column_statistic(const uint64_t tenant_id) +{ + int ret = OB_SUCCESS; + ObColumnStatisticOperator op; + const int64_t version = 1; + if (OB_FAIL(op.init(sql_proxy_))) { + LOG_WARN("fail init", K(ret)); + } else if (OB_FAIL(op.update_column_statistic_version(tenant_id, version))) { + LOG_WARN("fail to get jobs", K(ret), K(tenant_id)); + } + return ret; +} + // not reentrant int ObRestoreScheduler::create_user_partitions(const ObPhysicalRestoreJob &job_info) { diff --git a/src/rootserver/restore/ob_restore_scheduler.h b/src/rootserver/restore/ob_restore_scheduler.h index ad1772381..c26a10cd8 100644 --- a/src/rootserver/restore/ob_restore_scheduler.h +++ b/src/rootserver/restore/ob_restore_scheduler.h @@ -117,6 +117,7 @@ private: int update_index_status(const common::ObIArray& index_ids, share::schema::ObIndexStatus index_status); int convert_parameters(const share::ObPhysicalRestoreJob& job_info); int log_nop_operation(const share::ObPhysicalRestoreJob& job_info); + int convert_column_statistic(const uint64_t tenant_id); /*------------------------*/ /* filter schema */ diff --git a/src/share/backup/ob_physical_restore_table_operator.cpp b/src/share/backup/ob_physical_restore_table_operator.cpp index 9d0c24eb2..d4241d6b9 100644 --- a/src/share/backup/ob_physical_restore_table_operator.cpp +++ b/src/share/backup/ob_physical_restore_table_operator.cpp @@ -1276,3 +1276,237 @@ int ObPhysicalRestoreTableOperator::get_restore_progress_statistic( LOG_TRACE("[RESTORE] get restore progress", KR(ret), K(job), K(statistic)); return ret; } + +ObColumnStatisticRowKey::ObColumnStatisticRowKey() + : tenant_id_(OB_INVALID_ID), table_id_(OB_INVALID_ID), partition_id_(-1), column_id_(-1) +{} + +void ObColumnStatisticRowKey::reuse() +{ + tenant_id_ = 0; + table_id_ = 0; + partition_id_ = 0; + column_id_ = 0; +} + +void ObColumnStatisticRowKey::reset() +{ + tenant_id_ = OB_INVALID_ID; + table_id_ = OB_INVALID_ID; + partition_id_ = -1; + column_id_ = -1; +} + +ObColumnStatistic::ObColumnStatistic() + : row_key_(), num_distinct_(), num_null_(), llc_bitmap_size_(), version_(), last_rebuild_version_() +{} + +/* ObColumnStatisticOperator */ + +ObColumnStatisticOperator::ObColumnStatisticOperator() : is_inited_(false), sql_client_(NULL) +{} + +int ObColumnStatisticOperator::init(common::ObISQLClient *sql_client) +{ + int ret = OB_SUCCESS; + if (IS_INIT) { + ret = OB_INIT_TWICE; + LOG_WARN("physical restore table operator init twice", K(ret)); + } else if (OB_ISNULL(sql_client)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("get invalid arg", K(ret)); + } else { + sql_client_ = sql_client; + is_inited_ = true; + } + return ret; +} + +int ObColumnStatisticOperator::update_column_statistic_version(const uint64_t tenant_id, const int64_t version) +{ + int ret = OB_SUCCESS; + ObArray row_key_list; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("do not init", K(ret)); + } else if (OB_INVALID_ID == tenant_id || version < 0) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("get invalid args", KR(ret), K(tenant_id), K(version)); + } else if (OB_FAIL(get_batch_end_key_for_update_(tenant_id, row_key_list))) { + LOG_WARN("failed to get batch end key for remove", KR(ret), K(tenant_id)); + } else if (1 == row_key_list.count()) { + // do nothing + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < row_key_list.count() - 1; ++i) { + const ObColumnStatisticRowKey &left_row_key = row_key_list.at(i); + const ObColumnStatisticRowKey &right_row_key = row_key_list.at(i + 1); + if (OB_FAIL(batch_update_column_statistic_version_(tenant_id, version, left_row_key, right_row_key))) { + LOG_WARN( + "failed to batch remove pg tasks", KR(ret), K(tenant_id), K(version), K(left_row_key), K(right_row_key)); + } + } + } + return ret; +} + +int ObColumnStatisticOperator::get_next_end_key_for_update_( + const uint64_t tenant_id, const ObColumnStatisticRowKey &prev_row_key, ObColumnStatisticRowKey &next_row_key) +{ + int ret = OB_SUCCESS; + next_row_key.reset(); + ObSqlString sql; + ObArray stat_list; + const int64_t BATCH_SIZE = 1024; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("do not init", K(ret)); + } else if (OB_INVALID_ID == tenant_id) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("get invalid args", KR(ret)); + } else if (OB_FAIL(sql.assign_fmt("SELECT * FROM %s", OB_ALL_COLUMN_STATISTIC_TNAME))) { + LOG_WARN("failed to assign sql", KR(ret)); + } else if (OB_FAIL(sql.append_fmt(" WHERE (tenant_id, table_id, partition_id, column_id) > (%lu, %ld, %ld, %ld)", + prev_row_key.tenant_id_, + prev_row_key.table_id_, + prev_row_key.partition_id_, + prev_row_key.column_id_))) { + LOG_WARN("failed to append sql", KR(ret), K(prev_row_key)); + } else if (OB_FAIL(sql.append_fmt(" ORDER BY tenant_id, table_id, partition_id, column_id" + " LIMIT %ld ", + BATCH_SIZE))) { + LOG_WARN("failed to append sql", KR(ret)); + } else if (OB_FAIL(get_column_statistic_items_(tenant_id, sql, stat_list))) { + LOG_WARN("failed to get pg backup task", K(ret), K(tenant_id), K(sql)); + } else if (stat_list.empty()) { + ret = OB_ITER_END; + LOG_WARN("no next end row key", KR(ret), K(sql), K(tenant_id)); + } else { + const ObColumnStatistic &last_item = stat_list.at(stat_list.count() - 1); + next_row_key.tenant_id_ = last_item.row_key_.tenant_id_; + next_row_key.table_id_ = last_item.row_key_.table_id_; + next_row_key.partition_id_ = last_item.row_key_.partition_id_; + next_row_key.column_id_ = last_item.row_key_.column_id_; + } + return ret; +} + +int ObColumnStatisticOperator::get_batch_end_key_for_update_( + const uint64_t tenant_id, common::ObIArray &row_key_list) +{ + int ret = OB_SUCCESS; + row_key_list.reset(); + ObColumnStatisticRowKey prev_row_key; + ObColumnStatisticRowKey next_row_key; + prev_row_key.reuse(); + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("do not init", K(ret)); + } else if (OB_INVALID_ID == tenant_id) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("get invalid args", KR(ret)); + } else if (OB_FAIL(row_key_list.push_back(prev_row_key))) { + LOG_WARN("failed to push back", KR(ret), K(prev_row_key)); + } else { + do { + next_row_key.reset(); + if (OB_FAIL(get_next_end_key_for_update_(tenant_id, prev_row_key, next_row_key))) { + if (OB_ITER_END == ret) { + ret = OB_SUCCESS; + break; + } else { + LOG_WARN("failed to get next end key for remove", KR(ret), K(prev_row_key)); + } + } else if (OB_FAIL(row_key_list.push_back(next_row_key))) { + LOG_WARN("failed to push back", KR(ret), K(next_row_key)); + } else { + prev_row_key = next_row_key; + } + } while (OB_SUCC(ret)); + } + return ret; +} + +int ObColumnStatisticOperator::batch_update_column_statistic_version_(const uint64_t tenant_id, const int64_t version, + const ObColumnStatisticRowKey &left_row_key, const ObColumnStatisticRowKey &right_row_key) +{ + int ret = OB_SUCCESS; + int64_t affected_rows = 0; + ObSqlString sql; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("do not init", K(ret)); + } else if (OB_INVALID_ID == tenant_id) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("get invalid args", KR(ret), K(tenant_id)); + } else if (OB_FAIL(sql.assign_fmt("UPDATE %s SET version = %ld WHERE ", OB_ALL_COLUMN_STATISTIC_TNAME, version))) { + LOG_WARN("failed to assign sql", KR(ret), K(tenant_id)); + } else if (OB_FAIL(sql.append_fmt("(tenant_id, table_id, partition_id, column_id) > (%lu, %ld, %ld, %ld)", + left_row_key.tenant_id_, + left_row_key.table_id_, + left_row_key.partition_id_, + left_row_key.column_id_))) { + LOG_WARN("failed to append sql", KR(ret), K(left_row_key)); + } else if (OB_FAIL(sql.append_fmt(" AND "))) { + LOG_WARN("failed to append sql", KR(ret)); + } else if (OB_FAIL(sql.append_fmt("(tenant_id, table_id, partition_id, column_id) <= (%lu, %ld, %ld, %ld)", + right_row_key.tenant_id_, + right_row_key.table_id_, + right_row_key.partition_id_, + right_row_key.column_id_))) { + LOG_WARN("failed to append sql", KR(ret), K(right_row_key)); + } else if (OB_FAIL(sql_client_->write(tenant_id, sql.ptr(), affected_rows))) { + LOG_WARN("failed to execute sql", KR(ret), K(sql)); + } + return ret; +} + +int ObColumnStatisticOperator::get_column_statistic_items_( + const uint64_t tenant_id, const common::ObSqlString &sql, common::ObIArray &stat_list) +{ + int ret = OB_SUCCESS; + SMART_VAR(ObMySQLProxy::MySQLResult, res) + { + sqlclient::ObMySQLResult *result = NULL; + if (OB_UNLIKELY(!sql.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid arguments", KR(ret), K(sql)); + } else if (OB_FAIL(sql_client_->read(res, tenant_id, sql.ptr()))) { + LOG_WARN("failed to execute sql", KR(ret), K(tenant_id), K(sql)); + } else if (OB_ISNULL(result = res.get_result())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("error unexpected, query result must not be NULL", KR(ret)); + } else { + while (OB_SUCC(ret)) { + ObColumnStatistic item; + if (OB_FAIL(result->next())) { + if (OB_ITER_END == ret) { + ret = OB_SUCCESS; + break; + } else { + LOG_WARN("failed to get next row", KR(ret)); + } + } else if (OB_FAIL(extract_stat_item_(result, item))) { + LOG_WARN("failed to extract item", KR(ret), K(item)); + } else if (OB_FAIL(stat_list.push_back(item))) { + LOG_WARN("failed to push back item", KR(ret), K(item)); + } + } + } + } + return ret; +} + +int ObColumnStatisticOperator::extract_stat_item_(sqlclient::ObMySQLResult *result, ObColumnStatistic &item) +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(result)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("extract task item get invalid argument", KR(ret), KP(result)); + } else { + EXTRACT_INT_FIELD_MYSQL(*result, "tenant_id", item.row_key_.tenant_id_, uint64_t); + EXTRACT_INT_FIELD_MYSQL(*result, "table_id", item.row_key_.table_id_, uint64_t); + EXTRACT_INT_FIELD_MYSQL(*result, "partition_id", item.row_key_.partition_id_, int64_t); + EXTRACT_INT_FIELD_MYSQL(*result, "column_id", item.row_key_.column_id_, int64_t); + } + return ret; +} diff --git a/src/share/backup/ob_physical_restore_table_operator.h b/src/share/backup/ob_physical_restore_table_operator.h index f761a1908..27431474e 100644 --- a/src/share/backup/ob_physical_restore_table_operator.h +++ b/src/share/backup/ob_physical_restore_table_operator.h @@ -108,6 +108,52 @@ int ObPhysicalRestoreTableOperator::update_restore_option( return ret; } +struct ObColumnStatisticRowKey { + ObColumnStatisticRowKey(); + void reset(); + void reuse(); + TO_STRING_KV(K_(tenant_id), K_(table_id), K_(partition_id), K_(column_id)); + uint64_t tenant_id_; + uint64_t table_id_; + int64_t partition_id_; + int64_t column_id_; +}; + +struct ObColumnStatistic { + ObColumnStatistic(); + TO_STRING_KV(K_(row_key), K_(version)); + ObColumnStatisticRowKey row_key_; + int64_t num_distinct_; + int64_t num_null_; + int64_t llc_bitmap_size_; + int64_t version_; + int64_t last_rebuild_version_; +}; + +// __all_column_statistic +class ObColumnStatisticOperator { +public: + ObColumnStatisticOperator(); + virtual ~ObColumnStatisticOperator() = default; + int init(common::ObISQLClient *sql_client); + int update_column_statistic_version(const uint64_t tenant_id, const int64_t version); + +private: + int get_next_end_key_for_update_( + const uint64_t tenant_id, const ObColumnStatisticRowKey &prev_row_key, ObColumnStatisticRowKey &next_row_key); + int get_batch_end_key_for_update_(const uint64_t tenant_id, common::ObIArray &row_key_list); + int batch_update_column_statistic_version_(const uint64_t tenant_id, const int64_t version, + const ObColumnStatisticRowKey &left_row_key, const ObColumnStatisticRowKey &right_row_key); + int get_column_statistic_items_( + const uint64_t tenant_id, const common::ObSqlString &sql, common::ObIArray &stat_list); + int extract_stat_item_(sqlclient::ObMySQLResult *result, ObColumnStatistic &item); + +private: + bool is_inited_; + common::ObISQLClient *sql_client_; + DISALLOW_COPY_AND_ASSIGN(ObColumnStatisticOperator); +}; + } // end namespace share } // end namespace oceanbase