modify column statistic when physical restore
This commit is contained in:
parent
ab066ef724
commit
9d65777404
@ -1609,6 +1609,8 @@ int ObRestoreScheduler::modify_schema(const ObPhysicalRestoreJob &job_info)
|
||||
LOG_WARN("fail to convert parameters", K(ret), K(job_info));
|
||||
} else if (need_log_nop && OB_FAIL(log_nop_operation(job_info))) {
|
||||
LOG_WARN("fail to log nop operation", KR(ret), K(job_info));
|
||||
} else if (OB_FAIL(convert_column_statistic(job_info.tenant_id_))) {
|
||||
LOG_WARN("failed to convert column statistic", K(ret), K(job_info));
|
||||
} else {
|
||||
// reset __all_restore_progress
|
||||
ObPhysicalRestoreTableOperator restore_op;
|
||||
@ -2576,6 +2578,19 @@ int ObRestoreScheduler::log_nop_operation(const ObPhysicalRestoreJob &job_info)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObRestoreScheduler::convert_column_statistic(const uint64_t tenant_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObColumnStatisticOperator op;
|
||||
const int64_t version = 1;
|
||||
if (OB_FAIL(op.init(sql_proxy_))) {
|
||||
LOG_WARN("fail init", K(ret));
|
||||
} else if (OB_FAIL(op.update_column_statistic_version(tenant_id, version))) {
|
||||
LOG_WARN("fail to get jobs", K(ret), K(tenant_id));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
// not reentrant
|
||||
int ObRestoreScheduler::create_user_partitions(const ObPhysicalRestoreJob &job_info)
|
||||
{
|
||||
|
@ -117,6 +117,7 @@ private:
|
||||
int update_index_status(const common::ObIArray<uint64_t>& index_ids, share::schema::ObIndexStatus index_status);
|
||||
int convert_parameters(const share::ObPhysicalRestoreJob& job_info);
|
||||
int log_nop_operation(const share::ObPhysicalRestoreJob& job_info);
|
||||
int convert_column_statistic(const uint64_t tenant_id);
|
||||
/*------------------------*/
|
||||
|
||||
/* filter schema */
|
||||
|
@ -1276,3 +1276,237 @@ int ObPhysicalRestoreTableOperator::get_restore_progress_statistic(
|
||||
LOG_TRACE("[RESTORE] get restore progress", KR(ret), K(job), K(statistic));
|
||||
return ret;
|
||||
}
|
||||
|
||||
ObColumnStatisticRowKey::ObColumnStatisticRowKey()
|
||||
: tenant_id_(OB_INVALID_ID), table_id_(OB_INVALID_ID), partition_id_(-1), column_id_(-1)
|
||||
{}
|
||||
|
||||
void ObColumnStatisticRowKey::reuse()
|
||||
{
|
||||
tenant_id_ = 0;
|
||||
table_id_ = 0;
|
||||
partition_id_ = 0;
|
||||
column_id_ = 0;
|
||||
}
|
||||
|
||||
void ObColumnStatisticRowKey::reset()
|
||||
{
|
||||
tenant_id_ = OB_INVALID_ID;
|
||||
table_id_ = OB_INVALID_ID;
|
||||
partition_id_ = -1;
|
||||
column_id_ = -1;
|
||||
}
|
||||
|
||||
ObColumnStatistic::ObColumnStatistic()
|
||||
: row_key_(), num_distinct_(), num_null_(), llc_bitmap_size_(), version_(), last_rebuild_version_()
|
||||
{}
|
||||
|
||||
/* ObColumnStatisticOperator */
|
||||
|
||||
ObColumnStatisticOperator::ObColumnStatisticOperator() : is_inited_(false), sql_client_(NULL)
|
||||
{}
|
||||
|
||||
int ObColumnStatisticOperator::init(common::ObISQLClient *sql_client)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_INIT) {
|
||||
ret = OB_INIT_TWICE;
|
||||
LOG_WARN("physical restore table operator init twice", K(ret));
|
||||
} else if (OB_ISNULL(sql_client)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("get invalid arg", K(ret));
|
||||
} else {
|
||||
sql_client_ = sql_client;
|
||||
is_inited_ = true;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObColumnStatisticOperator::update_column_statistic_version(const uint64_t tenant_id, const int64_t version)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObArray<ObColumnStatisticRowKey> row_key_list;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("do not init", K(ret));
|
||||
} else if (OB_INVALID_ID == tenant_id || version < 0) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("get invalid args", KR(ret), K(tenant_id), K(version));
|
||||
} else if (OB_FAIL(get_batch_end_key_for_update_(tenant_id, row_key_list))) {
|
||||
LOG_WARN("failed to get batch end key for remove", KR(ret), K(tenant_id));
|
||||
} else if (1 == row_key_list.count()) {
|
||||
// do nothing
|
||||
} else {
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < row_key_list.count() - 1; ++i) {
|
||||
const ObColumnStatisticRowKey &left_row_key = row_key_list.at(i);
|
||||
const ObColumnStatisticRowKey &right_row_key = row_key_list.at(i + 1);
|
||||
if (OB_FAIL(batch_update_column_statistic_version_(tenant_id, version, left_row_key, right_row_key))) {
|
||||
LOG_WARN(
|
||||
"failed to batch remove pg tasks", KR(ret), K(tenant_id), K(version), K(left_row_key), K(right_row_key));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObColumnStatisticOperator::get_next_end_key_for_update_(
|
||||
const uint64_t tenant_id, const ObColumnStatisticRowKey &prev_row_key, ObColumnStatisticRowKey &next_row_key)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
next_row_key.reset();
|
||||
ObSqlString sql;
|
||||
ObArray<ObColumnStatistic> stat_list;
|
||||
const int64_t BATCH_SIZE = 1024;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("do not init", K(ret));
|
||||
} else if (OB_INVALID_ID == tenant_id) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("get invalid args", KR(ret));
|
||||
} else if (OB_FAIL(sql.assign_fmt("SELECT * FROM %s", OB_ALL_COLUMN_STATISTIC_TNAME))) {
|
||||
LOG_WARN("failed to assign sql", KR(ret));
|
||||
} else if (OB_FAIL(sql.append_fmt(" WHERE (tenant_id, table_id, partition_id, column_id) > (%lu, %ld, %ld, %ld)",
|
||||
prev_row_key.tenant_id_,
|
||||
prev_row_key.table_id_,
|
||||
prev_row_key.partition_id_,
|
||||
prev_row_key.column_id_))) {
|
||||
LOG_WARN("failed to append sql", KR(ret), K(prev_row_key));
|
||||
} else if (OB_FAIL(sql.append_fmt(" ORDER BY tenant_id, table_id, partition_id, column_id"
|
||||
" LIMIT %ld ",
|
||||
BATCH_SIZE))) {
|
||||
LOG_WARN("failed to append sql", KR(ret));
|
||||
} else if (OB_FAIL(get_column_statistic_items_(tenant_id, sql, stat_list))) {
|
||||
LOG_WARN("failed to get pg backup task", K(ret), K(tenant_id), K(sql));
|
||||
} else if (stat_list.empty()) {
|
||||
ret = OB_ITER_END;
|
||||
LOG_WARN("no next end row key", KR(ret), K(sql), K(tenant_id));
|
||||
} else {
|
||||
const ObColumnStatistic &last_item = stat_list.at(stat_list.count() - 1);
|
||||
next_row_key.tenant_id_ = last_item.row_key_.tenant_id_;
|
||||
next_row_key.table_id_ = last_item.row_key_.table_id_;
|
||||
next_row_key.partition_id_ = last_item.row_key_.partition_id_;
|
||||
next_row_key.column_id_ = last_item.row_key_.column_id_;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObColumnStatisticOperator::get_batch_end_key_for_update_(
|
||||
const uint64_t tenant_id, common::ObIArray<ObColumnStatisticRowKey> &row_key_list)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
row_key_list.reset();
|
||||
ObColumnStatisticRowKey prev_row_key;
|
||||
ObColumnStatisticRowKey next_row_key;
|
||||
prev_row_key.reuse();
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("do not init", K(ret));
|
||||
} else if (OB_INVALID_ID == tenant_id) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("get invalid args", KR(ret));
|
||||
} else if (OB_FAIL(row_key_list.push_back(prev_row_key))) {
|
||||
LOG_WARN("failed to push back", KR(ret), K(prev_row_key));
|
||||
} else {
|
||||
do {
|
||||
next_row_key.reset();
|
||||
if (OB_FAIL(get_next_end_key_for_update_(tenant_id, prev_row_key, next_row_key))) {
|
||||
if (OB_ITER_END == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
break;
|
||||
} else {
|
||||
LOG_WARN("failed to get next end key for remove", KR(ret), K(prev_row_key));
|
||||
}
|
||||
} else if (OB_FAIL(row_key_list.push_back(next_row_key))) {
|
||||
LOG_WARN("failed to push back", KR(ret), K(next_row_key));
|
||||
} else {
|
||||
prev_row_key = next_row_key;
|
||||
}
|
||||
} while (OB_SUCC(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObColumnStatisticOperator::batch_update_column_statistic_version_(const uint64_t tenant_id, const int64_t version,
|
||||
const ObColumnStatisticRowKey &left_row_key, const ObColumnStatisticRowKey &right_row_key)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t affected_rows = 0;
|
||||
ObSqlString sql;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("do not init", K(ret));
|
||||
} else if (OB_INVALID_ID == tenant_id) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("get invalid args", KR(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(sql.assign_fmt("UPDATE %s SET version = %ld WHERE ", OB_ALL_COLUMN_STATISTIC_TNAME, version))) {
|
||||
LOG_WARN("failed to assign sql", KR(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(sql.append_fmt("(tenant_id, table_id, partition_id, column_id) > (%lu, %ld, %ld, %ld)",
|
||||
left_row_key.tenant_id_,
|
||||
left_row_key.table_id_,
|
||||
left_row_key.partition_id_,
|
||||
left_row_key.column_id_))) {
|
||||
LOG_WARN("failed to append sql", KR(ret), K(left_row_key));
|
||||
} else if (OB_FAIL(sql.append_fmt(" AND "))) {
|
||||
LOG_WARN("failed to append sql", KR(ret));
|
||||
} else if (OB_FAIL(sql.append_fmt("(tenant_id, table_id, partition_id, column_id) <= (%lu, %ld, %ld, %ld)",
|
||||
right_row_key.tenant_id_,
|
||||
right_row_key.table_id_,
|
||||
right_row_key.partition_id_,
|
||||
right_row_key.column_id_))) {
|
||||
LOG_WARN("failed to append sql", KR(ret), K(right_row_key));
|
||||
} else if (OB_FAIL(sql_client_->write(tenant_id, sql.ptr(), affected_rows))) {
|
||||
LOG_WARN("failed to execute sql", KR(ret), K(sql));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObColumnStatisticOperator::get_column_statistic_items_(
|
||||
const uint64_t tenant_id, const common::ObSqlString &sql, common::ObIArray<ObColumnStatistic> &stat_list)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
SMART_VAR(ObMySQLProxy::MySQLResult, res)
|
||||
{
|
||||
sqlclient::ObMySQLResult *result = NULL;
|
||||
if (OB_UNLIKELY(!sql.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid arguments", KR(ret), K(sql));
|
||||
} else if (OB_FAIL(sql_client_->read(res, tenant_id, sql.ptr()))) {
|
||||
LOG_WARN("failed to execute sql", KR(ret), K(tenant_id), K(sql));
|
||||
} else if (OB_ISNULL(result = res.get_result())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("error unexpected, query result must not be NULL", KR(ret));
|
||||
} else {
|
||||
while (OB_SUCC(ret)) {
|
||||
ObColumnStatistic item;
|
||||
if (OB_FAIL(result->next())) {
|
||||
if (OB_ITER_END == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
break;
|
||||
} else {
|
||||
LOG_WARN("failed to get next row", KR(ret));
|
||||
}
|
||||
} else if (OB_FAIL(extract_stat_item_(result, item))) {
|
||||
LOG_WARN("failed to extract item", KR(ret), K(item));
|
||||
} else if (OB_FAIL(stat_list.push_back(item))) {
|
||||
LOG_WARN("failed to push back item", KR(ret), K(item));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObColumnStatisticOperator::extract_stat_item_(sqlclient::ObMySQLResult *result, ObColumnStatistic &item)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_ISNULL(result)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("extract task item get invalid argument", KR(ret), KP(result));
|
||||
} else {
|
||||
EXTRACT_INT_FIELD_MYSQL(*result, "tenant_id", item.row_key_.tenant_id_, uint64_t);
|
||||
EXTRACT_INT_FIELD_MYSQL(*result, "table_id", item.row_key_.table_id_, uint64_t);
|
||||
EXTRACT_INT_FIELD_MYSQL(*result, "partition_id", item.row_key_.partition_id_, int64_t);
|
||||
EXTRACT_INT_FIELD_MYSQL(*result, "column_id", item.row_key_.column_id_, int64_t);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -108,6 +108,52 @@ int ObPhysicalRestoreTableOperator::update_restore_option(
|
||||
return ret;
|
||||
}
|
||||
|
||||
struct ObColumnStatisticRowKey {
|
||||
ObColumnStatisticRowKey();
|
||||
void reset();
|
||||
void reuse();
|
||||
TO_STRING_KV(K_(tenant_id), K_(table_id), K_(partition_id), K_(column_id));
|
||||
uint64_t tenant_id_;
|
||||
uint64_t table_id_;
|
||||
int64_t partition_id_;
|
||||
int64_t column_id_;
|
||||
};
|
||||
|
||||
struct ObColumnStatistic {
|
||||
ObColumnStatistic();
|
||||
TO_STRING_KV(K_(row_key), K_(version));
|
||||
ObColumnStatisticRowKey row_key_;
|
||||
int64_t num_distinct_;
|
||||
int64_t num_null_;
|
||||
int64_t llc_bitmap_size_;
|
||||
int64_t version_;
|
||||
int64_t last_rebuild_version_;
|
||||
};
|
||||
|
||||
// __all_column_statistic
|
||||
class ObColumnStatisticOperator {
|
||||
public:
|
||||
ObColumnStatisticOperator();
|
||||
virtual ~ObColumnStatisticOperator() = default;
|
||||
int init(common::ObISQLClient *sql_client);
|
||||
int update_column_statistic_version(const uint64_t tenant_id, const int64_t version);
|
||||
|
||||
private:
|
||||
int get_next_end_key_for_update_(
|
||||
const uint64_t tenant_id, const ObColumnStatisticRowKey &prev_row_key, ObColumnStatisticRowKey &next_row_key);
|
||||
int get_batch_end_key_for_update_(const uint64_t tenant_id, common::ObIArray<ObColumnStatisticRowKey> &row_key_list);
|
||||
int batch_update_column_statistic_version_(const uint64_t tenant_id, const int64_t version,
|
||||
const ObColumnStatisticRowKey &left_row_key, const ObColumnStatisticRowKey &right_row_key);
|
||||
int get_column_statistic_items_(
|
||||
const uint64_t tenant_id, const common::ObSqlString &sql, common::ObIArray<ObColumnStatistic> &stat_list);
|
||||
int extract_stat_item_(sqlclient::ObMySQLResult *result, ObColumnStatistic &item);
|
||||
|
||||
private:
|
||||
bool is_inited_;
|
||||
common::ObISQLClient *sql_client_;
|
||||
DISALLOW_COPY_AND_ASSIGN(ObColumnStatisticOperator);
|
||||
};
|
||||
|
||||
} // end namespace share
|
||||
} // end namespace oceanbase
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user