Copy table redefinition indexes one by one
This commit is contained in:
@ -266,8 +266,14 @@ int ObColumnRedefinitionTask::copy_table_indexes()
|
|||||||
ret = OB_ERR_SYS;
|
ret = OB_ERR_SYS;
|
||||||
LOG_WARN("error sys, root service must not be nullptr", K(ret));
|
LOG_WARN("error sys, root service must not be nullptr", K(ret));
|
||||||
} else {
|
} else {
|
||||||
|
const int64_t MAX_ACTIVE_TASK_CNT = 1;
|
||||||
|
int64_t active_task_cnt = 0;
|
||||||
// check if has rebuild index
|
// check if has rebuild index
|
||||||
if (has_rebuild_index_) {
|
if (has_rebuild_index_) {
|
||||||
|
} else if (OB_FAIL(ObDDLTaskRecordOperator::get_create_index_task_cnt(GCTX.root_service_->get_sql_proxy(), tenant_id_, object_id_, active_task_cnt))) {
|
||||||
|
LOG_WARN("failed to check index task cnt", K(ret));
|
||||||
|
} else if (active_task_cnt >= MAX_ACTIVE_TASK_CNT) {
|
||||||
|
ret = OB_EAGAIN;
|
||||||
} else {
|
} else {
|
||||||
ObSchemaGetterGuard schema_guard;
|
ObSchemaGetterGuard schema_guard;
|
||||||
const ObTableSchema *table_schema = nullptr;
|
const ObTableSchema *table_schema = nullptr;
|
||||||
@ -332,6 +338,8 @@ int ObColumnRedefinitionTask::copy_table_indexes()
|
|||||||
// index status is final
|
// index status is final
|
||||||
need_rebuild_index = false;
|
need_rebuild_index = false;
|
||||||
LOG_INFO("index status is final", K(ret), K(task_id_), K(index_id), K(need_rebuild_index));
|
LOG_INFO("index status is final", K(ret), K(task_id_), K(index_id), K(need_rebuild_index));
|
||||||
|
} else if (active_task_cnt >= MAX_ACTIVE_TASK_CNT) {
|
||||||
|
ret = OB_EAGAIN;
|
||||||
} else {
|
} else {
|
||||||
create_index_arg.index_type_ = index_schema->get_index_type();
|
create_index_arg.index_type_ = index_schema->get_index_type();
|
||||||
ObCreateDDLTaskParam param(tenant_id_,
|
ObCreateDDLTaskParam param(tenant_id_,
|
||||||
@ -340,7 +348,7 @@ int ObColumnRedefinitionTask::copy_table_indexes()
|
|||||||
index_schema,
|
index_schema,
|
||||||
0/*object_id*/,
|
0/*object_id*/,
|
||||||
index_schema->get_schema_version(),
|
index_schema->get_schema_version(),
|
||||||
parallelism_ / index_ids.count()/*parallelism*/,
|
parallelism_,
|
||||||
consumer_group_id_,
|
consumer_group_id_,
|
||||||
&allocator_,
|
&allocator_,
|
||||||
&create_index_arg,
|
&create_index_arg,
|
||||||
@ -350,9 +358,11 @@ int ObColumnRedefinitionTask::copy_table_indexes()
|
|||||||
task_record))) {
|
task_record))) {
|
||||||
if (OB_ENTRY_EXIST == ret) {
|
if (OB_ENTRY_EXIST == ret) {
|
||||||
ret = OB_SUCCESS;
|
ret = OB_SUCCESS;
|
||||||
|
active_task_cnt += 1;
|
||||||
} else {
|
} else {
|
||||||
LOG_WARN("submit ddl task failed", K(ret));
|
LOG_WARN("submit ddl task failed", K(ret));
|
||||||
}
|
}
|
||||||
|
} else if (FALSE_IT(active_task_cnt += 1)) {
|
||||||
} else if (OB_FAIL(GCTX.root_service_->get_ddl_task_scheduler().schedule_ddl_task(task_record))) {
|
} else if (OB_FAIL(GCTX.root_service_->get_ddl_task_scheduler().schedule_ddl_task(task_record))) {
|
||||||
LOG_WARN("fail to schedule ddl task", K(ret), K(task_record));
|
LOG_WARN("fail to schedule ddl task", K(ret), K(task_record));
|
||||||
}
|
}
|
||||||
|
|||||||
@ -2854,6 +2854,39 @@ int ObDDLTaskRecordOperator::check_has_index_task(
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObDDLTaskRecordOperator::get_create_index_task_cnt(
|
||||||
|
common::ObISQLClient &proxy,
|
||||||
|
const uint64_t tenant_id,
|
||||||
|
const uint64_t data_table_id,
|
||||||
|
int64_t &task_cnt)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
if (OB_UNLIKELY(OB_INVALID_ID == tenant_id
|
||||||
|
|| OB_INVALID_ID == data_table_id)) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
LOG_WARN("invalid arg", K(ret), K(tenant_id), K(data_table_id));
|
||||||
|
} else {
|
||||||
|
ObSqlString sql_string;
|
||||||
|
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
|
||||||
|
sqlclient::ObMySQLResult *result = NULL;
|
||||||
|
if (OB_FAIL(sql_string.assign_fmt("SELECT COUNT(*) as cnt FROM %s WHERE object_id = %lu AND ddl_type = %d",
|
||||||
|
OB_ALL_DDL_TASK_STATUS_TNAME, data_table_id, ObDDLType::DDL_CREATE_INDEX))) {
|
||||||
|
LOG_WARN("assign sql string failed", K(ret));
|
||||||
|
} else if (OB_FAIL(proxy.read(res, tenant_id, sql_string.ptr()))) {
|
||||||
|
LOG_WARN("query ddl task record failed", K(ret), K(sql_string));
|
||||||
|
} else if (OB_ISNULL(result = res.get_result())) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("fail to get sql result", K(ret), KP(result));
|
||||||
|
} else if (OB_FAIL(result->next())) {
|
||||||
|
LOG_WARN("result next failed", K(ret), K(tenant_id));
|
||||||
|
} else {
|
||||||
|
EXTRACT_INT_FIELD_MYSQL(*result, "cnt", task_cnt, int64_t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
int ObDDLTaskRecordOperator::get_task_record(const uint64_t tenant_id,
|
int ObDDLTaskRecordOperator::get_task_record(const uint64_t tenant_id,
|
||||||
const ObSqlString &sql_string,
|
const ObSqlString &sql_string,
|
||||||
common::ObMySQLProxy &proxy,
|
common::ObMySQLProxy &proxy,
|
||||||
|
|||||||
@ -262,6 +262,12 @@ public:
|
|||||||
const uint64_t index_table_id,
|
const uint64_t index_table_id,
|
||||||
bool &has_index_task);
|
bool &has_index_task);
|
||||||
|
|
||||||
|
static int get_create_index_task_cnt(
|
||||||
|
common::ObISQLClient &proxy,
|
||||||
|
const uint64_t tenant_id,
|
||||||
|
const uint64_t data_table_id,
|
||||||
|
int64_t &task_cnt);
|
||||||
|
|
||||||
static int insert_record(
|
static int insert_record(
|
||||||
common::ObISQLClient &proxy,
|
common::ObISQLClient &proxy,
|
||||||
ObDDLTaskRecord &record);
|
ObDDLTaskRecord &record);
|
||||||
|
|||||||
@ -401,8 +401,14 @@ int ObTableRedefinitionTask::copy_table_indexes()
|
|||||||
ret = OB_ERR_SYS;
|
ret = OB_ERR_SYS;
|
||||||
LOG_WARN("error sys, root service must not be nullptr", K(ret));
|
LOG_WARN("error sys, root service must not be nullptr", K(ret));
|
||||||
} else {
|
} else {
|
||||||
|
const int64_t MAX_ACTIVE_TASK_CNT = 1;
|
||||||
|
int64_t active_task_cnt = 0;
|
||||||
// check if has rebuild index
|
// check if has rebuild index
|
||||||
if (has_rebuild_index_) {
|
if (has_rebuild_index_) {
|
||||||
|
} else if (OB_FAIL(ObDDLTaskRecordOperator::get_create_index_task_cnt(GCTX.root_service_->get_sql_proxy(), tenant_id_, object_id_, active_task_cnt))) {
|
||||||
|
LOG_WARN("failed to check index task cnt", K(ret));
|
||||||
|
} else if (active_task_cnt >= MAX_ACTIVE_TASK_CNT) {
|
||||||
|
ret = OB_EAGAIN;
|
||||||
} else {
|
} else {
|
||||||
ObSchemaGetterGuard schema_guard;
|
ObSchemaGetterGuard schema_guard;
|
||||||
const ObTableSchema *table_schema = nullptr;
|
const ObTableSchema *table_schema = nullptr;
|
||||||
@ -467,6 +473,8 @@ int ObTableRedefinitionTask::copy_table_indexes()
|
|||||||
// index status is final
|
// index status is final
|
||||||
need_rebuild_index = false;
|
need_rebuild_index = false;
|
||||||
LOG_INFO("index status is final", K(ret), K(task_id_), K(index_id), K(need_rebuild_index));
|
LOG_INFO("index status is final", K(ret), K(task_id_), K(index_id), K(need_rebuild_index));
|
||||||
|
} else if (active_task_cnt >= MAX_ACTIVE_TASK_CNT) {
|
||||||
|
ret = OB_EAGAIN;
|
||||||
} else {
|
} else {
|
||||||
create_index_arg.index_type_ = index_schema->get_index_type();
|
create_index_arg.index_type_ = index_schema->get_index_type();
|
||||||
ObCreateDDLTaskParam param(tenant_id_,
|
ObCreateDDLTaskParam param(tenant_id_,
|
||||||
@ -475,7 +483,7 @@ int ObTableRedefinitionTask::copy_table_indexes()
|
|||||||
index_schema,
|
index_schema,
|
||||||
0/*object_id*/,
|
0/*object_id*/,
|
||||||
index_schema->get_schema_version(),
|
index_schema->get_schema_version(),
|
||||||
parallelism_ / index_ids.count()/*parallelism*/,
|
parallelism_,
|
||||||
consumer_group_id_,
|
consumer_group_id_,
|
||||||
&allocator_,
|
&allocator_,
|
||||||
&create_index_arg,
|
&create_index_arg,
|
||||||
@ -483,9 +491,11 @@ int ObTableRedefinitionTask::copy_table_indexes()
|
|||||||
if (OB_FAIL(GCTX.root_service_->get_ddl_task_scheduler().create_ddl_task(param, *GCTX.sql_proxy_, task_record))) {
|
if (OB_FAIL(GCTX.root_service_->get_ddl_task_scheduler().create_ddl_task(param, *GCTX.sql_proxy_, task_record))) {
|
||||||
if (OB_ENTRY_EXIST == ret) {
|
if (OB_ENTRY_EXIST == ret) {
|
||||||
ret = OB_SUCCESS;
|
ret = OB_SUCCESS;
|
||||||
|
active_task_cnt += 1;
|
||||||
} else {
|
} else {
|
||||||
LOG_WARN("submit ddl task failed", K(ret));
|
LOG_WARN("submit ddl task failed", K(ret));
|
||||||
}
|
}
|
||||||
|
} else if (FALSE_IT(active_task_cnt += 1)) {
|
||||||
} else if (OB_FAIL(GCTX.root_service_->get_ddl_task_scheduler().schedule_ddl_task(task_record))) {
|
} else if (OB_FAIL(GCTX.root_service_->get_ddl_task_scheduler().schedule_ddl_task(task_record))) {
|
||||||
LOG_WARN("fail to schedule ddl task", K(ret), K(task_record));
|
LOG_WARN("fail to schedule ddl task", K(ret), K(task_record));
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user