From 49fab102f5cea9153aef1b28c47067fa2eee7938 Mon Sep 17 00:00:00 2001 From: obdev Date: Fri, 20 Sep 2024 07:02:25 +0000 Subject: [PATCH] [vector index] fix some error of post create hung --- deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h | 1 + src/observer/ob_srv_xlator_rootserver.cpp | 1 + src/rootserver/ddl_task/ob_ddl_scheduler.cpp | 11 +- src/rootserver/ddl_task/ob_ddl_scheduler.h | 1 + src/rootserver/ddl_task/ob_ddl_task.cpp | 60 ++++++- src/rootserver/ddl_task/ob_ddl_task.h | 17 +- .../ddl_task/ob_drop_vec_index_task.cpp | 3 +- .../ddl_task/ob_vec_index_build_task.cpp | 68 ++++---- .../ddl_task/ob_vec_index_build_task.h | 24 ++- src/rootserver/ob_ddl_service.cpp | 16 +- src/rootserver/ob_index_builder.cpp | 155 ++++++++++++++++-- src/rootserver/ob_index_builder.h | 3 +- src/rootserver/ob_root_service.cpp | 26 +++ src/rootserver/ob_root_service.h | 1 + src/rootserver/ob_rs_rpc_processor.h | 1 + src/share/ob_common_rpc_proxy.h | 1 + src/share/ob_debug_sync_point.h | 1 + src/share/ob_rpc_struct.cpp | 6 +- src/share/ob_rpc_struct.h | 3 + src/sql/resolver/ddl/ob_ddl_resolver.cpp | 2 +- 20 files changed, 343 insertions(+), 58 deletions(-) diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h b/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h index 37a6ebd3b..0bed244ef 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h @@ -327,6 +327,7 @@ PCODE_DEF(OB_DROP_DIRECTORY, 0x382) PCODE_DEF(OB_UPDATE_MAX_USED_TENANT_ID, 0x383) PCODE_DEF(OB_REFRESH_SCHEMA, 0x384) PCODE_DEF(OB_REBUILD_VEC_INDEX, 0x385) +PCODE_DEF(OB_DROP_INDEX_ON_FAILED, 0x386) // ob server //PCODE_DEF(OB_MIGRATE_OVER, 0x402) //PCODE_DEF(OB_CLEAR_REBUILD_ROOT_PARTITION, 0x403): not supported on 4.0 diff --git a/src/observer/ob_srv_xlator_rootserver.cpp b/src/observer/ob_srv_xlator_rootserver.cpp index 971a39a09..9507199a5 100644 --- a/src/observer/ob_srv_xlator_rootserver.cpp +++ b/src/observer/ob_srv_xlator_rootserver.cpp @@ -114,6 +114,7 @@ void oceanbase::observer::init_srv_xlator_for_rootserver(ObSrvRpcXlator *xlator) RPC_PROCESSOR(rootserver::ObRpcCreateAuxIndexP, *gctx_.root_service_); RPC_PROCESSOR(rootserver::ObRpcCreateIndexP, *gctx_.root_service_); RPC_PROCESSOR(rootserver::ObRpcDropIndexP, *gctx_.root_service_); + RPC_PROCESSOR(rootserver::ObRpcDropIndexOnFailedP, *gctx_.root_service_); RPC_PROCESSOR(rootserver::ObRpcCreateMLogP, *gctx_.root_service_); RPC_PROCESSOR(rootserver::ObRpcCreateTableLikeP, *gctx_.root_service_); RPC_PROCESSOR(rootserver::ObRpcExecuteBootstrapP, *gctx_.root_service_); diff --git a/src/rootserver/ddl_task/ob_ddl_scheduler.cpp b/src/rootserver/ddl_task/ob_ddl_scheduler.cpp index cf320a5b5..af1700322 100755 --- a/src/rootserver/ddl_task/ob_ddl_scheduler.cpp +++ b/src/rootserver/ddl_task/ob_ddl_scheduler.cpp @@ -1091,6 +1091,7 @@ int ObDDLScheduler::create_ddl_task(const ObCreateDDLTaskParam ¶m, param.consumer_group_id_, param.vec_vid_rowkey_schema_, param.vec_rowkey_vid_schema_, + param.vec_domain_index_schema_, param.vec_index_id_schema_, param.vec_snapshot_data_schema_, param.tenant_data_version_, @@ -1885,6 +1886,7 @@ int ObDDLScheduler::create_drop_vec_index_task( const int64_t consumer_group_id, const share::schema::ObTableSchema *vid_rowkey_schema, const share::schema::ObTableSchema *rowkey_vid_schema, + const share::schema::ObTableSchema *domain_index_schema, const share::schema::ObTableSchema *index_id_schema, const share::schema::ObTableSchema *snapshot_data_schema, const uint64_t tenant_data_version, @@ -1913,9 +1915,11 @@ int ObDDLScheduler::create_drop_vec_index_task( LOG_WARN("invalid argument", K(ret), KP(index_schema), K(schema_version)); } else if (OB_FAIL(ObDDLTask::fetch_new_task_id(root_service_->get_sql_proxy(), index_schema->get_tenant_id(), task_id))) { LOG_WARN("fetch new task id failed", K(ret)); - } else if (OB_FAIL(index_schema->get_index_name(vec_domain_index_name))) { - LOG_WARN("fail to get vec index name", K(ret), KPC(index_schema)); } else { + if (OB_FAIL(ret) || OB_ISNULL(domain_index_schema)) { + } else if (OB_FAIL(domain_index_schema->get_index_name(vec_domain_index_name))) { + LOG_WARN("fail to get vid rowkey name", K(ret), KPC(domain_index_schema)); + } if (OB_FAIL(ret) || OB_ISNULL(vid_rowkey_schema)) { } else if (OB_FAIL(vid_rowkey_schema->get_index_name(vec_vid_rowkey_name))) { LOG_WARN("fail to get vid rowkey name", K(ret), KPC(vid_rowkey_schema)); @@ -1938,10 +1942,11 @@ int ObDDLScheduler::create_drop_vec_index_task( uint64_t vid_rowkey_table_id = OB_ISNULL(vid_rowkey_schema) ? OB_INVALID_ID : vid_rowkey_schema->get_table_id(); uint64_t rowkey_vid_table_id = OB_ISNULL(rowkey_vid_schema) ? OB_INVALID_ID : rowkey_vid_schema->get_table_id(); + uint64_t domain_index_table_id = OB_ISNULL(domain_index_schema) ? OB_INVALID_ID : domain_index_schema->get_table_id(); uint64_t index_id_table_id = OB_ISNULL(index_id_schema) ? OB_INVALID_ID : index_id_schema->get_table_id(); uint64_t snapshot_data_table_id = OB_ISNULL(snapshot_data_schema) ? OB_INVALID_ID : snapshot_data_schema->get_table_id(); - const ObVecIndexDDLChildTaskInfo domain_index(vec_domain_index_name, index_schema->get_table_id(), init_task_id); + const ObVecIndexDDLChildTaskInfo domain_index(vec_domain_index_name, domain_index_table_id, init_task_id); const ObVecIndexDDLChildTaskInfo vid_rowkey(vec_vid_rowkey_name, vid_rowkey_table_id, init_task_id); const ObVecIndexDDLChildTaskInfo rowkey_vid(vec_rowkey_vid_name, rowkey_vid_table_id, init_task_id); const ObVecIndexDDLChildTaskInfo index_id(vec_index_id_name, index_id_table_id, init_task_id); diff --git a/src/rootserver/ddl_task/ob_ddl_scheduler.h b/src/rootserver/ddl_task/ob_ddl_scheduler.h index 78e6b4e5d..a7f835142 100755 --- a/src/rootserver/ddl_task/ob_ddl_scheduler.h +++ b/src/rootserver/ddl_task/ob_ddl_scheduler.h @@ -508,6 +508,7 @@ private: const int64_t consumer_group_id, const share::schema::ObTableSchema *vid_rowkey_schema_, const share::schema::ObTableSchema *rowkey_vid_schema_, + const share::schema::ObTableSchema *domain_index_schema, const share::schema::ObTableSchema *delta_buffer_schema_, const share::schema::ObTableSchema *index_snapshot_data_schema_, const uint64_t tenant_data_version, diff --git a/src/rootserver/ddl_task/ob_ddl_task.cpp b/src/rootserver/ddl_task/ob_ddl_task.cpp index 654a55a3e..da551cb7e 100644 --- a/src/rootserver/ddl_task/ob_ddl_task.cpp +++ b/src/rootserver/ddl_task/ob_ddl_task.cpp @@ -43,6 +43,7 @@ #include "observer/ob_server_struct.h" #include "share/ob_ddl_sim_point.h" #include "rootserver/ddl_task/ob_rebuild_index_task.h" +#include "rootserver/ddl_task/ob_vec_index_build_task.h" const bool OB_DDL_TASK_ENABLE_TRACING = false; @@ -214,7 +215,7 @@ ObCreateDDLTaskParam::ObCreateDDLTaskParam() consumer_group_id_(0), parent_task_id_(0), task_id_(0), type_(DDL_INVALID), src_table_schema_(nullptr), dest_table_schema_(nullptr), ddl_arg_(nullptr), allocator_(nullptr), aux_rowkey_doc_schema_(nullptr), aux_doc_rowkey_schema_(nullptr), aux_doc_word_schema_(nullptr), - vec_rowkey_vid_schema_(nullptr), vec_vid_rowkey_schema_(nullptr), vec_index_id_schema_(nullptr), vec_snapshot_data_schema_(nullptr), + vec_rowkey_vid_schema_(nullptr), vec_vid_rowkey_schema_(nullptr), vec_domain_index_schema_(nullptr), vec_index_id_schema_(nullptr), vec_snapshot_data_schema_(nullptr), tenant_data_version_(0), ddl_need_retry_at_executor_(false), is_pre_split_(false) { } @@ -236,7 +237,7 @@ ObCreateDDLTaskParam::ObCreateDDLTaskParam(const uint64_t tenant_id, parent_task_id_(parent_task_id), task_id_(task_id), type_(type), src_table_schema_(src_table_schema), dest_table_schema_(dest_table_schema), ddl_arg_(ddl_arg), allocator_(allocator), aux_rowkey_doc_schema_(nullptr), aux_doc_rowkey_schema_(nullptr), aux_doc_word_schema_(nullptr), - vec_rowkey_vid_schema_(nullptr), vec_vid_rowkey_schema_(nullptr), vec_index_id_schema_(nullptr), vec_snapshot_data_schema_(nullptr), + vec_rowkey_vid_schema_(nullptr), vec_vid_rowkey_schema_(nullptr), vec_domain_index_schema_(nullptr), vec_index_id_schema_(nullptr), vec_snapshot_data_schema_(nullptr), tenant_data_version_(0), ddl_need_retry_at_executor_(ddl_need_retry_at_executor), is_pre_split_(false) { @@ -2987,6 +2988,61 @@ int ObDDLTaskRecordOperator::update_status_and_message( return ret; } + +int ObDDLTaskRecordOperator::update_parent_task_message( + const int64_t tenant_id, + const int64_t parent_task_id, + const ObTableSchema &index_schema, + const uint64_t target_id, + ObDDLUpateParentTaskIDType update_type, + ObIAllocator &allocator, + common::ObISQLClient &proxy) +{ + int ret = OB_SUCCESS; + ObDDLTaskRecord task_record; + if (OB_INVALID_ID == tenant_id || OB_INVALID_ID == parent_task_id || OB_INVALID_ID == target_id) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret), K(tenant_id), K(parent_task_id), K(target_id)); + } else if (OB_FAIL(get_ddl_task_record(tenant_id, parent_task_id, GCTX.root_service_->get_sql_proxy(), allocator, task_record))) { + LOG_WARN("fail to get ddl task record", K(ret), K(parent_task_id)); + } else { + if (task_record.ddl_type_ == DDL_CREATE_VEC_INDEX) { + SMART_VAR(ObVecIndexBuildTask, task) { + if (OB_FAIL(task.init(task_record))) { + LOG_WARN("fail to init ObVecIndexBuildTask", K(ret), K(task_record)); + } else if (UPDATE_CREATE_INDEX_ID == update_type) { + if (index_schema.is_vec_rowkey_vid_type()) { + task.set_rowkey_vid_aux_table_id(target_id); + task.set_rowkey_vid_task_submitted(true); + } else if (index_schema.is_vec_vid_rowkey_type()) { + task.set_vid_rowkey_aux_table_id(target_id); + task.set_vid_rowkey_task_submitted(true); + } else if (index_schema.is_vec_delta_buffer_type()) { + task.set_delta_buffer_table_id(target_id); + task.set_delta_buffer_task_submitted(true); + } else if (index_schema.is_vec_index_id_type()) { + task.set_index_id_table_id(target_id); + task.set_index_id_task_submitted(true); + } else if (index_schema.is_vec_index_snapshot_data_type()) { + task.set_index_snapshot_data_table_id(target_id); + task.set_index_snapshot_data_task_submitted(true); + } + } else if (UPDATE_DROP_INDEX_TASK_ID == update_type) { + task.set_drop_index_task_id(target_id); + task.set_drop_index_task_submitted(true); + } + } + if (OB_FAIL(ret)) { + } else if (OB_FAIL(task.update_task_message(proxy))) { + LOG_WARN("fail to update task message", K(ret), K(parent_task_id)); + } + } else { + // TODO: other ddl type need to be update parent task message, now skip. + } + } + return ret; +} + int ObDDLTaskRecordOperator::update_ret_code_and_message( common::ObISQLClient &proxy, const uint64_t tenant_id, diff --git a/src/rootserver/ddl_task/ob_ddl_task.h b/src/rootserver/ddl_task/ob_ddl_task.h index acd0351cd..1f8057caa 100755 --- a/src/rootserver/ddl_task/ob_ddl_task.h +++ b/src/rootserver/ddl_task/ob_ddl_task.h @@ -159,6 +159,11 @@ public: int64_t task_id_; }; +enum ObDDLUpateParentTaskIDType +{ + UPDATE_CREATE_INDEX_ID = 0, + UPDATE_DROP_INDEX_TASK_ID, +}; struct ObVecIndexDDLChildTaskInfo final { @@ -240,7 +245,7 @@ public: TO_STRING_KV(K_(tenant_id), K_(object_id), K_(schema_version), K_(parallelism), K_(consumer_group_id), K_(parent_task_id), K_(task_id), K_(type), KPC_(src_table_schema), KPC_(dest_table_schema), KPC_(ddl_arg), K_(tenant_data_version), K_(sub_task_trace_id), KPC_(aux_rowkey_doc_schema), KPC_(aux_doc_rowkey_schema), KPC_(aux_doc_word_schema), - K_(vec_rowkey_vid_schema), K_(vec_vid_rowkey_schema), K_(vec_index_id_schema), K_(vec_snapshot_data_schema), + K_(vec_rowkey_vid_schema), K_(vec_vid_rowkey_schema), K_(vec_domain_index_schema), K_(vec_index_id_schema), K_(vec_snapshot_data_schema), K_(ddl_need_retry_at_executor), K_(is_pre_split)); public: int32_t sub_task_trace_id_; @@ -261,6 +266,7 @@ public: const ObTableSchema *aux_doc_word_schema_; const ObTableSchema *vec_rowkey_vid_schema_; const ObTableSchema *vec_vid_rowkey_schema_; + const ObTableSchema *vec_domain_index_schema_; const ObTableSchema *vec_index_id_schema_; const ObTableSchema *vec_snapshot_data_schema_; uint64_t tenant_data_version_; @@ -315,6 +321,15 @@ public: const int ret_code, ObString &message); + static int update_parent_task_message( + const int64_t tenant_id, + const int64_t parent_task_id, + const ObTableSchema &index_schema, + const uint64_t target_id, + ObDDLUpateParentTaskIDType update_type, + ObIAllocator &allocator, + common::ObISQLClient &proxy); + static int get_schedule_info_for_update( common::ObISQLClient &proxy, const uint64_t tenant_id, diff --git a/src/rootserver/ddl_task/ob_drop_vec_index_task.cpp b/src/rootserver/ddl_task/ob_drop_vec_index_task.cpp index 2a9587add..cab122fd4 100644 --- a/src/rootserver/ddl_task/ob_drop_vec_index_task.cpp +++ b/src/rootserver/ddl_task/ob_drop_vec_index_task.cpp @@ -72,7 +72,6 @@ int ObDropVecIndexTask::init( if (OB_UNLIKELY(OB_INVALID_ID == tenant_id || task_id <= 0 || OB_INVALID_ID == data_table_id - || !domain_index.is_valid() || schema_version <= 0)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arguments", K(ret), K(tenant_id), K(task_id), K(data_table_id), K(rowkey_vid), @@ -99,7 +98,7 @@ int ObDropVecIndexTask::init( set_gmt_create(ObTimeUtility::current_time()); tenant_id_ = tenant_id; object_id_ = data_table_id; - target_object_id_ = domain_index.table_id_; + target_object_id_ = data_table_id; // not use this id schema_version_ = schema_version; task_id_ = task_id; parent_task_id_ = 0; // no parent task diff --git a/src/rootserver/ddl_task/ob_vec_index_build_task.cpp b/src/rootserver/ddl_task/ob_vec_index_build_task.cpp index 63d2c66c8..930e70397 100644 --- a/src/rootserver/ddl_task/ob_vec_index_build_task.cpp +++ b/src/rootserver/ddl_task/ob_vec_index_build_task.cpp @@ -601,8 +601,6 @@ int ObVecIndexBuildTask::prepare_aux_table(const ObIndexType index_type, obrpc::ObRpcProxy::myaddr_, OB_VEC_INDEX_BUILD_CHILD_TASK_NUM))) { LOG_WARN("fail to prepare_aux_table", K(ret), K(index_type)); - } else if (OB_FAIL(update_task_message())) { - LOG_WARN("fail to update task message", K(ret), K(index_type)); } } // samart var return ret; @@ -1677,6 +1675,7 @@ int ObVecIndexBuildTask::ChangeTaskStatusFn::operator()(common::hash::HashMapPai if (OB_ENTRY_NOT_EXIST == ret) { // ongoing child task ret = OB_SUCCESS; + not_finished_cnt_++; ObMySQLTransaction trans; if (OB_FAIL(trans.start(&rt_service_->get_sql_proxy(), dest_tenant_id_))) { @@ -1714,7 +1713,8 @@ int ObVecIndexBuildTask::clean_on_failed() LOG_WARN("task status not match", K(ret), K(task_status_)); } else { // 1. cancel ongoing build index task - ChangeTaskStatusFn change_statu_fn(dependent_task_result_map_, dst_tenant_id_, root_service_); + int64_t not_finished_cnt = 0; + ChangeTaskStatusFn change_statu_fn(dependent_task_result_map_, dst_tenant_id_, root_service_, not_finished_cnt); if (OB_FAIL(dependent_task_result_map_.foreach_refactored(change_statu_fn))) { if (OB_ITER_END != ret) { LOG_WARN("foreach refactored failed", K(ret), K(dst_tenant_id_)); @@ -1724,6 +1724,8 @@ int ObVecIndexBuildTask::clean_on_failed() } // 2. drop already built index if (OB_FAIL(ret)) { + } else if (not_finished_cnt > 0) { + LOG_INFO("child task not finished, not submit drop vec index task.", K(not_finished_cnt)); } else if (!drop_index_task_submitted_) { if (OB_FAIL(submit_drop_vec_index_task())) { LOG_WARN("failed to drop vec index", K(ret)); @@ -1753,6 +1755,9 @@ int ObVecIndexBuildTask::submit_drop_vec_index_task() const ObTableSchema *index_table_schema = nullptr; const ObDatabaseSchema *database_schema = nullptr; const ObTableSchema *data_table_schema = nullptr; + + obrpc::ObDropIndexArg drop_index_arg; + obrpc::ObDropIndexRes drop_index_res; ObString index_name; ObSqlString drop_index_sql; bool is_index_exist = true; @@ -1764,20 +1769,28 @@ int ObVecIndexBuildTask::submit_drop_vec_index_task() LOG_WARN("should not be null", K(ret)); } else if (OB_FAIL(schema_service.get_tenant_schema_guard(tenant_id_, schema_guard))) { LOG_WARN("get tenant schema guard failed", K(ret), K(tenant_id_)); - } else if (OB_FAIL(schema_guard.check_table_exist(tenant_id_, index_table_id, is_index_exist))) { - LOG_WARN("check table exist failed", K(ret), K_(tenant_id), K(index_table_id)); - } else if (!is_index_exist) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("vec index aux schema is nullptr, fail to roll back", K(ret), K(index_table_id), K(delta_buffer_table_id_), K(index_table_id_)); - } else if (OB_FAIL(schema_guard.get_table_schema(tenant_id_, index_table_id, index_table_schema))) { - LOG_WARN("get index schema failed", K(ret), K(tenant_id_), K(index_table_id)); + } else if (OB_INVALID_ID != rowkey_vid_aux_table_id_ && + OB_FAIL(drop_index_arg.index_ids_.push_back(rowkey_vid_aux_table_id_))) { + LOG_WARN("fail to push back rowkey_vid_aux_table_id_", K(ret)); + } else if (OB_INVALID_ID != vid_rowkey_aux_table_id_ && + OB_FAIL(drop_index_arg.index_ids_.push_back(vid_rowkey_aux_table_id_))) { + LOG_WARN("fail to push back vid_rowkey_aux_table_id_", K(ret)); + } else if (OB_INVALID_ID != delta_buffer_table_id_ && + OB_FAIL(drop_index_arg.index_ids_.push_back(delta_buffer_table_id_))) { + LOG_WARN("fail to push back delta_buffer_table_id_", K(ret), K(delta_buffer_table_id_)); + } else if (OB_INVALID_ID != index_id_table_id_ && + OB_FAIL(drop_index_arg.index_ids_.push_back(index_id_table_id_))) { + LOG_WARN("fail to push back index_id_table_id_", K(ret), K(index_id_table_id_)); + } else if (OB_INVALID_ID != index_snapshot_data_table_id_ && + OB_FAIL(drop_index_arg.index_ids_.push_back(index_snapshot_data_table_id_))) { + LOG_WARN("fail to push back index_snapshot_data_table_id_", K(ret)); + } else if (drop_index_arg.index_ids_.count() <= 0) { + LOG_INFO("no table need to be drop, skip", K(ret)); // no table exist, skip drop + } else if (schema_guard.get_table_schema(tenant_id_, drop_index_arg.index_ids_.at(0), index_table_schema)) { + LOG_WARN("fail to get table schema", K(ret), K(drop_index_arg.index_ids_.at(0))); } else if (OB_ISNULL(index_table_schema)) { - ret = OB_SCHEMA_ERROR; - LOG_WARN("index schema is null", K(ret), K(index_table_id)); - } else if (index_table_schema->is_in_recyclebin()) { - // the index has been dropped, just finish this task - } else if (OB_FAIL(index_table_schema->get_index_name(index_name))) { - LOG_WARN("get index name failed", KR(ret), K(index_table_schema->get_table_type()), KPC(index_table_schema)); + ret = OB_ERR_UNEXPECTED; + LOG_WARN("index schema is null", K(ret), KP(index_table_schema)); } else if (OB_FAIL(schema_guard.get_database_schema(tenant_id_, index_table_schema->get_database_id(), database_schema))) { LOG_WARN("get database schema failed", KR(ret), K(index_table_schema->get_database_id())); } else if (OB_FAIL(schema_guard.get_table_schema(tenant_id_, index_table_schema->get_data_table_id(), data_table_schema))) { @@ -1787,32 +1800,27 @@ int ObVecIndexBuildTask::submit_drop_vec_index_task() LOG_WARN("get null schema", KR(ret), KP(database_schema), KP(data_table_schema)); } else { int64_t ddl_rpc_timeout = 0; - obrpc::ObDropIndexArg drop_index_arg; - obrpc::ObDropIndexRes drop_index_res; drop_index_arg.is_inner_ = true; drop_index_arg.tenant_id_ = tenant_id_; drop_index_arg.exec_tenant_id_ = tenant_id_; drop_index_arg.index_table_id_ = index_table_id; - drop_index_arg.session_id_ = data_table_schema->get_session_id(); - drop_index_arg.index_name_ = index_name; - drop_index_arg.table_name_ = data_table_schema->get_table_name(); - drop_index_arg.database_name_ = database_schema->get_database_name_str(); + drop_index_arg.index_name_ = index_table_schema->get_table_name(); // not in used drop_index_arg.index_action_type_ = obrpc::ObIndexArg::DROP_INDEX; drop_index_arg.is_add_to_scheduler_ = true; - drop_index_arg.task_id_ = task_id_; - drop_index_arg.is_vec_inner_drop_ = has_aux_table ? true : false; // if want to drop only one index, is_vec_inner_drop_ should be false, else should be true. + drop_index_arg.task_id_ = task_id_; // parent task + drop_index_arg.session_id_ = data_table_schema->get_session_id(); + drop_index_arg.table_name_ = data_table_schema->get_table_name(); + drop_index_arg.database_name_ = database_schema->get_database_name_str(); + drop_index_arg.is_vec_inner_drop_ = true; // if want to drop only one index, is_vec_inner_drop_ should be false, else should be true. if (OB_FAIL(ObDDLUtil::get_ddl_rpc_timeout(index_table_schema->get_all_part_num() + data_table_schema->get_all_part_num(), ddl_rpc_timeout))) { LOG_WARN("failed to get ddl rpc timeout", KR(ret)); } else if (OB_FAIL(DDL_SIM(tenant_id_, task_id_, DROP_INDEX_RPC_FAILED))) { LOG_WARN("ddl sim failure", KR(ret), K(tenant_id_), K(task_id_)); - } else if (OB_FAIL(root_service_->get_common_rpc_proxy().timeout(ddl_rpc_timeout).drop_index(drop_index_arg, drop_index_res))) { + } else if (OB_FAIL(root_service_->get_common_rpc_proxy().timeout(ddl_rpc_timeout).drop_index_on_failed(drop_index_arg, drop_index_res))) { LOG_WARN("drop index failed", KR(ret), K(ddl_rpc_timeout)); } else { drop_index_task_submitted_ = true; drop_index_task_id_ = drop_index_res.task_id_; - if (OB_FAIL(update_task_message())) { - LOG_WARN("fail to update task message", K(ret)); - } LOG_INFO("success submit drop vec index task", K(ret), K(drop_index_task_id_)); } } @@ -1982,7 +1990,7 @@ int ObVecIndexBuildTask::cleanup_impl() return ret; } -int ObVecIndexBuildTask::update_task_message() +int ObVecIndexBuildTask::update_task_message(common::ObISQLClient &proxy) { int ret = OB_SUCCESS; char *buf = nullptr; @@ -1998,7 +2006,7 @@ int ObVecIndexBuildTask::update_task_message() LOG_WARN("failed to serialize params to message", KR(ret)); } else { msg.assign(buf, serialize_param_size); - if (OB_FAIL(ObDDLTaskRecordOperator::update_message(root_service_->get_sql_proxy(), tenant_id_, task_id_, msg))) { + if (OB_FAIL(ObDDLTaskRecordOperator::update_message(proxy, tenant_id_, task_id_, msg))) { LOG_WARN("failed to update message", KR(ret)); } } diff --git a/src/rootserver/ddl_task/ob_vec_index_build_task.h b/src/rootserver/ddl_task/ob_vec_index_build_task.h index e6a683957..c120c39c5 100644 --- a/src/rootserver/ddl_task/ob_vec_index_build_task.h +++ b/src/rootserver/ddl_task/ob_vec_index_build_task.h @@ -69,6 +69,22 @@ public: K(drop_index_task_submitted_), K(schema_version_), K(execution_id_), K(consumer_group_id_), K(trace_id_), K(parallelism_), K(create_index_arg_)); +public: + void set_rowkey_vid_aux_table_id(const uint64_t id) { rowkey_vid_aux_table_id_ = id; } + void set_vid_rowkey_aux_table_id(const uint64_t id) { vid_rowkey_aux_table_id_ = id; } + void set_delta_buffer_table_id(const uint64_t id) { delta_buffer_table_id_ = id; } + void set_index_id_table_id(const uint64_t id) { index_id_table_id_ = id; } + void set_index_snapshot_data_table_id(const uint64_t id) { index_snapshot_data_table_id_ = id; } + void set_drop_index_task_id(const uint64_t id) { drop_index_task_id_ = id; } + void set_rowkey_vid_task_submitted(const bool status) { rowkey_vid_task_submitted_ = status; } + void set_vid_rowkey_task_submitted(const bool status) { vid_rowkey_task_submitted_ = status; } + void set_delta_buffer_task_submitted(const bool status) { delta_buffer_task_submitted_ = status; } + void set_index_id_task_submitted(const bool status) { index_id_task_submitted_ = status; } + void set_index_snapshot_data_task_submitted(const bool status) { index_snapshot_data_task_submitted_ = status; } + void set_drop_index_task_submitted(const bool status) { drop_index_task_submitted_ = status; } + + int update_task_message(common::ObISQLClient &proxy); + private: int get_next_status(share::ObDDLTaskStatus &next_status); int prepare_aux_table(const ObIndexType index_type, @@ -85,6 +101,7 @@ private: int construct_delta_buffer_arg(obrpc::ObCreateIndexArg &arg); int construct_index_id_arg(obrpc::ObCreateIndexArg &arg); int construct_index_snapshot_data_arg(obrpc::ObCreateIndexArg &arg); + int record_index_table_id( const obrpc::ObCreateIndexArg *create_index_arg_, uint64_t &aux_table_id); @@ -108,16 +125,16 @@ private: const obrpc::ObCreateIndexArg &source_arg, obrpc::ObCreateIndexArg &dest_arg); int print_child_task_ids(char *buf, int64_t len); - int update_task_message(); private: struct ChangeTaskStatusFn final { public: - ChangeTaskStatusFn(common::hash::ObHashMap &dependent_task_result_map, const uint64_t tenant_id, ObRootService *root_service) : + ChangeTaskStatusFn(common::hash::ObHashMap &dependent_task_result_map, const uint64_t tenant_id, ObRootService *root_service, int64_t ¬_finished_cnt) : dependent_task_result_map_(dependent_task_result_map), rt_service_(root_service), - dest_tenant_id_(tenant_id) + dest_tenant_id_(tenant_id), + not_finished_cnt_(not_finished_cnt) {} public: ~ChangeTaskStatusFn() = default; @@ -126,6 +143,7 @@ private: common::hash::ObHashMap &dependent_task_result_map_; ObRootService *rt_service_; uint64_t dest_tenant_id_; + int64_t ¬_finished_cnt_; }; struct CheckTaskStatusFn final { diff --git a/src/rootserver/ob_ddl_service.cpp b/src/rootserver/ob_ddl_service.cpp index 7f1b8558c..fe53d61e3 100755 --- a/src/rootserver/ob_ddl_service.cpp +++ b/src/rootserver/ob_ddl_service.cpp @@ -6471,6 +6471,7 @@ int ObDDLService::create_aux_index_task_( if (OB_FAIL(GCTX.root_service_->get_ddl_task_scheduler(). create_ddl_task(param, trans, task_record))) { if (OB_ENTRY_EXIST == ret) { + trans.reset_last_error(); ret = OB_SUCCESS; } else { LOG_WARN("submit create index ddl task failed", K(ret)); @@ -6578,6 +6579,11 @@ int ObDDLService::create_aux_index( } else if (FALSE_IT(result.ddl_task_id_ = task_record.task_id_)) { } } + if (OB_FAIL(ret)) { + } else if (OB_FAIL(ObDDLTaskRecordOperator::update_parent_task_message(tenant_id, + arg.task_id_, *idx_schema, result.aux_table_id_, ObDDLUpateParentTaskIDType::UPDATE_CREATE_INDEX_ID, allocator, trans))) { + LOG_WARN("fail to update parent task message", K(ret), K(arg.task_id_), K(idx_schema)); + } } else { // 3. index scheme not exist, generate schema && create ddl task ObTableSchema index_schema; if (OB_FAIL(generate_aux_index_schema_(tenant_id, @@ -6592,6 +6598,9 @@ int ObDDLService::create_aux_index( LOG_WARN("failed to generate aux index schema", K(ret), K(create_index_arg)); } else if (FALSE_IT(result.schema_generated_ = true)) { } else if (FALSE_IT(result.aux_table_id_ = index_schema.get_table_id())) { + } else if (OB_FAIL(ObDDLTaskRecordOperator::update_parent_task_message(tenant_id, + arg.task_id_, index_schema, result.aux_table_id_, ObDDLUpateParentTaskIDType::UPDATE_CREATE_INDEX_ID, allocator, trans))) { + LOG_WARN("fail to update parent task message", K(ret), K(arg.task_id_), K(index_schema)); } else if (OB_FAIL(create_aux_index_task_(data_schema, &index_schema, create_index_arg, @@ -6615,9 +6624,12 @@ int ObDDLService::create_aux_index( if (OB_FAIL(publish_schema(tenant_id))) { LOG_WARN("fail to publish schema", K(ret), K(tenant_id)); } else if (OB_INVALID_ID == result.ddl_task_id_) { // no need to schedule - } else if (OB_FAIL(GCTX.root_service_->get_ddl_task_scheduler(). + } else { + DEBUG_SYNC(CREATE_AUX_INDEX_TABLE); + if (OB_FAIL(GCTX.root_service_->get_ddl_task_scheduler(). schedule_ddl_task(task_record))) { - LOG_WARN("fail to schedule ddl task", K(ret), K(task_record)); + LOG_WARN("fail to schedule ddl task", K(ret), K(task_record)); + } } } } diff --git a/src/rootserver/ob_index_builder.cpp b/src/rootserver/ob_index_builder.cpp index 7d4a211d9..b417f0b9c 100644 --- a/src/rootserver/ob_index_builder.cpp +++ b/src/rootserver/ob_index_builder.cpp @@ -100,6 +100,129 @@ int ObIndexBuilder::create_index( return ret; } +int ObIndexBuilder::drop_index_on_failed(const ObDropIndexArg &arg, obrpc::ObDropIndexRes &res) +{ + int ret = OB_SUCCESS; + const uint64_t tenant_id = arg.tenant_id_; + const bool is_index = false; + ObArenaAllocator allocator(ObModIds::OB_SCHEMA); + const ObTableSchema *data_table_schema = NULL; + ObSchemaGetterGuard schema_guard; + bool is_db_in_recyclebin = false; + uint64_t compat_version = 0; + if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id, compat_version))) { + LOG_WARN("failed to get data version", KR(ret), K(tenant_id)); + } else if (compat_version < DATA_VERSION_4_3_3_0) { + ret = OB_NOT_SUPPORTED; + LOG_WARN("drop index on failed before version 4.3.3.0 is not supported", KR(ret), K(compat_version)); + LOG_USER_ERROR(OB_NOT_SUPPORTED, "drop index on failed before version 4.3.3.0 is"); + } else if (OB_FALSE_IT(schema_guard.set_session_id(arg.session_id_))) { + } else if (!ddl_service_.is_inited()) { + ret = OB_INNER_STAT_ERROR; + LOG_WARN("ddl_service not init", K(ret), K(ddl_service_.is_inited())); + } else if (!arg.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid arg", K(arg), K(ret)); + } else if (OB_FAIL(ddl_service_.get_tenant_schema_guard_with_version_in_inner_table(tenant_id, schema_guard))) { + LOG_WARN("get_schema_guard failed", K(ret)); + } else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, arg.database_name_, + arg.table_name_, is_index, data_table_schema, arg.is_hidden_))) { + LOG_WARN("failed to get data table schema", K(arg), K(ret)); + } else if (NULL == data_table_schema) { + ret = OB_TABLE_NOT_EXIST; + LOG_WARN("fail to drop index on failed, data table not exist", K(ret), K(arg)); + } else if (arg.is_in_recyclebin_) { + // internal delete index + } else if (data_table_schema->is_in_recyclebin()) { + ret = OB_ERR_OPERATION_ON_RECYCLE_OBJECT; + LOG_WARN("can not drop index of table in recyclebin.", K(ret), K(arg)); + } else if (OB_FAIL(schema_guard.check_database_in_recyclebin(tenant_id, + data_table_schema->get_database_id(), is_db_in_recyclebin))) { + LOG_WARN("check database in recyclebin failed", K(ret), K(tenant_id)); + } else if (is_db_in_recyclebin) { + ret = OB_ERR_OPERATION_ON_RECYCLE_OBJECT; + LOG_WARN("Can not drop index of db in recyclebin", K(ret), K(arg)); + } else if (!arg.is_add_to_scheduler_) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("not add to scheduler to drop, not expected", K(ret), K(arg)); + } else if (arg.index_ids_.count() <= 0) { + res.task_id_ = -1; // no need to drop + LOG_INFO("target indexes to be drop is empty", K(ret)); + } else { + ObDDLOperator ddl_operator(ddl_service_.get_schema_service(), ddl_service_.get_sql_proxy()); + ObDDLSQLTransaction trans(&ddl_service_.get_schema_service()); + int64_t refreshed_schema_version = 0; + ObArenaAllocator allocator(lib::ObLabel("DdlTaskTmp")); + ObDDLTaskRecord task_record; + + bool has_index_task = false; + typedef common::ObSEArray TableSchemaArray; + SMART_VAR(TableSchemaArray, new_index_schemas) { + for (int64_t i = 0; OB_SUCC(ret) && i < arg.index_ids_.count(); ++i) { + bool is_index_exist = false; + const int64_t index_id = arg.index_ids_.at(i); + const share::schema::ObTableSchema *index_table_schema= nullptr; + if (OB_FAIL(schema_guard.check_table_exist(tenant_id, index_id, is_index_exist))) { + LOG_WARN("check table exist failed", K(ret), K(tenant_id), K(index_id)); + } else if (!is_index_exist) { // skip + LOG_INFO("vec index schema is nullptr", K(ret), K(index_id)); + } else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, index_id, index_table_schema))) { + LOG_WARN("fail to get index table schema", K(ret), K(tenant_id), K(index_id)); + } else if (OB_ISNULL(index_table_schema)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected index table nullptr", K(ret), K(index_id)); + } else if (OB_FAIL(new_index_schemas.push_back(*index_table_schema))) { + LOG_WARN("fail to push vec rowkey vid table schema", K(ret), KPC(index_table_schema)); + } + } + if (OB_FAIL(ret)) { + } else if (new_index_schemas.count() <= 0) { + res.task_id_ = -1; // no need to drop + LOG_INFO("target indexes to be drop is empty", K(ret)); + } else if (OB_FAIL(schema_guard.get_schema_version(tenant_id, refreshed_schema_version))) { + LOG_WARN("failed to get tenant schema version", KR(ret), K(tenant_id)); + } else if (OB_FAIL(trans.start(&ddl_service_.get_sql_proxy(), tenant_id, refreshed_schema_version))) { + LOG_WARN("start transaction failed", KR(ret), K(tenant_id), K(refreshed_schema_version)); + } else { + bool has_exist = false; + const ObTableSchema &new_index_schema = new_index_schemas.at(new_index_schemas.count() - 1); + if (OB_FAIL(submit_drop_index_task(trans, *data_table_schema, new_index_schemas, arg, allocator, has_exist, task_record))) { + LOG_WARN("submit drop index task failed", K(ret), K(task_record)); + } else if (has_exist) { + res.task_id_ = task_record.task_id_; + } else { + res.tenant_id_ = new_index_schema.get_tenant_id(); + res.schema_version_ = new_index_schema.get_schema_version(); + res.task_id_ = task_record.task_id_; + } + if (OB_FAIL(ret)) { + } else if (OB_FAIL(ObDDLTaskRecordOperator::update_parent_task_message(tenant_id, + arg.task_id_, new_index_schemas.at(0), res.task_id_, + ObDDLUpateParentTaskIDType::UPDATE_DROP_INDEX_TASK_ID, allocator, trans))) { + LOG_WARN("fail to update parent task message", K(ret), K(arg.task_id_), K(res.task_id_)); + } + } + } + if (trans.is_started()) { + int temp_ret = OB_SUCCESS; + if (OB_SUCCESS != (temp_ret = trans.end(OB_SUCC(ret)))) { + LOG_WARN_RET(temp_ret, "trans end failed", "is_commit", OB_SUCCESS == ret, K(temp_ret)); + ret = (OB_SUCC(ret)) ? temp_ret : ret; + } + } + if (OB_SUCC(ret)) { + int tmp_ret = OB_SUCCESS; + if (OB_FAIL(ddl_service_.publish_schema(tenant_id))) { + LOG_WARN("fail to publish schema", K(ret), K(tenant_id)); + } else if (OB_TMP_FAIL(GCTX.root_service_->get_ddl_task_scheduler().schedule_ddl_task(task_record))) { + LOG_WARN("fail to schedule ddl task", K(tmp_ret), K(task_record)); + } + } + } + LOG_INFO("finish drop index on failed", K(ret), K(arg)); + return ret; +} + int ObIndexBuilder::drop_index(const ObDropIndexArg &arg, obrpc::ObDropIndexRes &res) { int ret = OB_SUCCESS; @@ -508,29 +631,31 @@ int ObIndexBuilder::submit_build_index_task( return ret; } +// if index_schemas has delta_buffer_table, than index_ith = domain_index_ith, ohterwide, index_ith = 0; int ObIndexBuilder::recognize_vec_index_schemas( const common::ObIArray &index_schemas, const bool is_vec_inner_drop, int64_t &index_ith, int64_t &rowkey_vid_ith, int64_t &vid_rowkey_ith, + int64_t &domain_index_ith, int64_t &index_id_ith, int64_t &snapshot_data_ith) { int ret = OB_SUCCESS; - index_ith = -1; + index_ith = 0; rowkey_vid_ith = -1; vid_rowkey_ith = -1; + domain_index_ith = -1; index_id_ith = -1; snapshot_data_ith = -1; const int64_t VEC_DOMAIN_INDEX_TABLE_COUNT = 1; // delta_buffer_table const int64_t VEC_INDEX_TABLE_COUNT = 5; - if (OB_UNLIKELY(VEC_DOMAIN_INDEX_TABLE_COUNT != index_schemas.count() && - !is_vec_inner_drop && (VEC_INDEX_TABLE_COUNT != index_schemas.count()))) { + if (OB_UNLIKELY(!is_vec_inner_drop && + VEC_DOMAIN_INDEX_TABLE_COUNT != index_schemas.count() && + VEC_INDEX_TABLE_COUNT != index_schemas.count())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arguments", K(ret), K(index_schemas)); - } else if (index_schemas.count() == 1) { - index_ith = 0; } else { for (int64_t i = 0; OB_SUCC(ret) && i < index_schemas.count(); ++i) { if (index_schemas.at(i).is_vec_rowkey_vid_type()) { @@ -547,6 +672,14 @@ int ObIndexBuilder::recognize_vec_index_schemas( } else { vid_rowkey_ith = i; } + } else if (index_schemas.at(i).is_vec_delta_buffer_type()) { + if (OB_UNLIKELY(-1 != domain_index_ith)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpeted error, there are multiple vid rowkey tables", K(ret), K(index_schemas)); + } else { + domain_index_ith = i; + index_ith = domain_index_ith; // if has domain index, index_ith = domain_index_ith + } } else if (index_schemas.at(i).is_vec_index_id_type()) { if (OB_UNLIKELY(-1 != index_id_ith)) { ret = OB_ERR_UNEXPECTED; @@ -561,11 +694,9 @@ int ObIndexBuilder::recognize_vec_index_schemas( } else { snapshot_data_ith = i; } - } else if (OB_UNLIKELY(-1 != index_ith)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpeted error, there are multiple user index tables", K(ret), K(index_schemas)); } else { - index_ith = i; + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected drop vec index schema", K(ret), K(index_schemas.at(i))); } } } @@ -692,6 +823,7 @@ int ObIndexBuilder::submit_drop_index_task(ObMySQLTransaction &trans, int64_t aux_multivalue_ith = -1; int64_t vec_rowkey_vid_ith = -1; int64_t vec_vid_rowkey_ith = -1; + int64_t vec_domain_index_ith = -1; int64_t vec_index_id_ith = -1; int64_t vec_snapshot_data_ith = -1; @@ -710,7 +842,7 @@ int ObIndexBuilder::submit_drop_index_task(ObMySQLTransaction &trans, aux_rowkey_doc_ith, aux_doc_rowkey_ith))) { LOG_WARN("fail to recognize index and aux table from schema array", K(ret)); } else if (index_schemas.at(0).is_vec_index() && OB_FAIL(recognize_vec_index_schemas(index_schemas, arg.is_vec_inner_drop_, index_ith, vec_rowkey_vid_ith, - vec_vid_rowkey_ith, vec_index_id_ith, vec_snapshot_data_ith))) { + vec_vid_rowkey_ith, vec_domain_index_ith, vec_index_id_ith, vec_snapshot_data_ith))) { LOG_WARN("fail to recognize index and aux table from schema array", K(ret)); } else if (OB_ISNULL(GCTX.root_service_)) { ret = OB_ERR_UNEXPECTED; @@ -720,7 +852,7 @@ int ObIndexBuilder::submit_drop_index_task(ObMySQLTransaction &trans, LOG_WARN("unexpected error, invalid array index", K(ret), K(index_ith)); } else { const ObTableSchema &index_schema = index_schemas.at(index_ith); - const bool is_drop_vec_task = (!arg.is_inner_ || arg.is_vec_inner_drop_) && index_schema.is_vec_delta_buffer_type(); // delta_buffer_table + const bool is_drop_vec_task = (!arg.is_inner_ && index_schema.is_vec_delta_buffer_type()) || arg.is_vec_inner_drop_; // inner drop or user drop const bool is_drop_fts_task = !arg.is_inner_ && index_schema.is_fts_index_aux(); const bool is_drop_multivalue_task = !arg.is_inner_ && index_schema.is_multivalue_index_aux(); const bool is_drop_fts_or_multivalue_task = is_drop_fts_task || is_drop_multivalue_task; @@ -813,6 +945,7 @@ int ObIndexBuilder::submit_drop_index_task(ObMySQLTransaction &trans, &arg); param.vec_vid_rowkey_schema_ = vec_vid_rowkey_ith == -1 ? nullptr : &(index_schemas.at(vec_vid_rowkey_ith)); param.vec_rowkey_vid_schema_ = vec_rowkey_vid_ith == -1 ? nullptr : &(index_schemas.at(vec_rowkey_vid_ith)); + param.vec_domain_index_schema_ = vec_domain_index_ith == -1 ? nullptr : &(index_schemas.at(vec_domain_index_ith)); param.vec_index_id_schema_ = vec_index_id_ith == -1 ? nullptr : &(index_schemas.at(vec_index_id_ith)); param.vec_snapshot_data_schema_ = vec_snapshot_data_ith == -1 ? nullptr : &(index_schemas.at(vec_snapshot_data_ith)); if (OB_FAIL(GCTX.root_service_->get_ddl_task_scheduler().create_ddl_task(param, trans, task_record))) { diff --git a/src/rootserver/ob_index_builder.h b/src/rootserver/ob_index_builder.h index 7ada85c49..89d2d05f7 100644 --- a/src/rootserver/ob_index_builder.h +++ b/src/rootserver/ob_index_builder.h @@ -58,7 +58,6 @@ public: int create_index(const obrpc::ObCreateIndexArg &arg, obrpc::ObAlterTableRes &res); int drop_index(const obrpc::ObDropIndexArg &arg, obrpc::ObDropIndexRes &res); - // Check and update local index status. // if not all index table updated return OB_EAGAIN. int do_create_index( @@ -109,6 +108,7 @@ public: const uint64_t tenant_data_version, common::ObIAllocator &allocator, ObDDLTaskRecord &task_record); + int drop_index_on_failed(const obrpc::ObDropIndexArg &arg, obrpc::ObDropIndexRes &res); private: int recognize_vec_index_schemas( const common::ObIArray &index_schemas, @@ -116,6 +116,7 @@ private: int64_t &index_ith, int64_t &rowkey_vid_ith, int64_t &vid_rowkey_ith, + int64_t &domain_index_ith, int64_t &index_id_ith, int64_t &snapshot_data_ith); int recognize_fts_index_schemas( diff --git a/src/rootserver/ob_root_service.cpp b/src/rootserver/ob_root_service.cpp index ab262e9f9..590b08cb4 100644 --- a/src/rootserver/ob_root_service.cpp +++ b/src/rootserver/ob_root_service.cpp @@ -4987,6 +4987,32 @@ int ObRootService::alter_tablegroup(const obrpc::ObAlterTablegroupArg &arg) return ret; } +int ObRootService::drop_index_on_failed(const obrpc::ObDropIndexArg &arg, obrpc::ObDropIndexRes &res) +{ + int ret = OB_SUCCESS; + if (!inited_) { + ret = OB_NOT_INIT; + LOG_WARN("not init", K(ret)); + } else if (!arg.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid arg", K(ret), K(arg)); + } else { + ObIndexBuilder index_builder(ddl_service_); + if (OB_FAIL(index_builder.drop_index_on_failed(arg, res))) { + LOG_WARN("index_builder drop_index_on_failed failed", K(ret), K(arg)); + } + } + ROOTSERVICE_EVENT_ADD("ddl scheduler", "drop index on failed", + "tenant_id", res.tenant_id_, + "ret", ret, + "trace_id", *ObCurTraceId::get_trace_id(), + "task_id", res.task_id_, + "table_id", arg.index_table_id_, + "schema_version", res.schema_version_); + LOG_INFO("finish drop index on fail ddl", K(ret), K(arg), "ddl_event_info", ObDDLEventInfo()); + return ret; +} + int ObRootService::drop_index(const obrpc::ObDropIndexArg &arg, obrpc::ObDropIndexRes &res) { int ret = OB_SUCCESS; diff --git a/src/rootserver/ob_root_service.h b/src/rootserver/ob_root_service.h index c260f3256..a20883438 100644 --- a/src/rootserver/ob_root_service.h +++ b/src/rootserver/ob_root_service.h @@ -562,6 +562,7 @@ public: int create_restore_point(const obrpc::ObCreateRestorePointArg &arg); int drop_restore_point(const obrpc::ObDropRestorePointArg &arg); + int drop_index_on_failed(const obrpc::ObDropIndexArg &arg, obrpc::ObDropIndexRes &res); //for inner table monitor, purge in fixed time int purge_expire_recycle_objects(const obrpc::ObPurgeRecycleBinArg &arg, obrpc::Int64 &affected_rows); diff --git a/src/rootserver/ob_rs_rpc_processor.h b/src/rootserver/ob_rs_rpc_processor.h index b04dd70dd..9d12978cf 100644 --- a/src/rootserver/ob_rs_rpc_processor.h +++ b/src/rootserver/ob_rs_rpc_processor.h @@ -344,6 +344,7 @@ DEFINE_DDL_RS_RPC_PROCESSOR(obrpc::OB_TRUNCATE_TABLE_V2, ObRpcTruncateTableV2P, DEFINE_DDL_RS_RPC_PROCESSOR(obrpc::OB_CREATE_AUX_INDEX, ObRpcCreateAuxIndexP, create_aux_index(arg_, result_)); DEFINE_DDL_RS_RPC_PROCESSOR(obrpc::OB_CREATE_INDEX, ObRpcCreateIndexP, create_index(arg_, result_)); DEFINE_DDL_RS_RPC_PROCESSOR(obrpc::OB_DROP_INDEX, ObRpcDropIndexP, drop_index(arg_, result_)); +DEFINE_DDL_RS_RPC_PROCESSOR(obrpc::OB_DROP_INDEX_ON_FAILED, ObRpcDropIndexOnFailedP, drop_index_on_failed(arg_, result_)); DEFINE_DDL_RS_RPC_PROCESSOR(obrpc::OB_REBUILD_VEC_INDEX, ObRpcRebuildVecIndexP, rebuild_vec_index(arg_, result_)); DEFINE_DDL_RS_RPC_PROCESSOR(obrpc::OB_CREATE_MLOG, ObRpcCreateMLogP, create_mlog(arg_, result_)); DEFINE_DDL_RS_RPC_PROCESSOR(obrpc::OB_CREATE_TABLE_LIKE, ObRpcCreateTableLikeP, create_table_like(arg_)); diff --git a/src/share/ob_common_rpc_proxy.h b/src/share/ob_common_rpc_proxy.h index 2edaf8e5d..7e864f9c4 100644 --- a/src/share/ob_common_rpc_proxy.h +++ b/src/share/ob_common_rpc_proxy.h @@ -79,6 +79,7 @@ public: RPC_S(PRD create_aux_index, obrpc::OB_CREATE_AUX_INDEX, (obrpc::ObCreateAuxIndexArg), obrpc::ObCreateAuxIndexRes); RPC_S(PRD create_index, obrpc::OB_CREATE_INDEX, (ObCreateIndexArg), ObAlterTableRes); RPC_S(PRD drop_index, obrpc::OB_DROP_INDEX, (ObDropIndexArg), ObDropIndexRes); + RPC_S(PRD drop_index_on_failed, obrpc::OB_DROP_INDEX_ON_FAILED, (ObDropIndexArg), ObDropIndexRes); RPC_S(PRD rebuild_vec_index, obrpc::OB_REBUILD_VEC_INDEX, (ObRebuildIndexArg), ObAlterTableRes); RPC_S(PRD create_mlog, obrpc::OB_CREATE_MLOG, (ObCreateMLogArg), ObCreateMLogRes); RPC_S(PRD flashback_index, obrpc::OB_FLASHBACK_INDEX, (ObFlashBackIndexArg)); diff --git a/src/share/ob_debug_sync_point.h b/src/share/ob_debug_sync_point.h index e24309f2b..7c2259907 100755 --- a/src/share/ob_debug_sync_point.h +++ b/src/share/ob_debug_sync_point.h @@ -635,6 +635,7 @@ class ObString; ACT(BUILD_VECTOR_INDEX_PREPARE_ROWKEY_VID,)\ ACT(BUILD_VECTOR_INDEX_PREPARE_AUX_INDEX,)\ ACT(BUILD_VECTOR_INDEX_PREPARE_VID_ROWKEY,)\ + ACT(CREATE_AUX_INDEX_TABLE,)\ ACT(BEFORE_SEND_ALTER_TABLE,)\ ACT(BEFOR_EXEC_REBUILD_TASK,)\ ACT(BEFORE_CREATE_HIDDEN_TABLE_IN_LOAD,)\ diff --git a/src/share/ob_rpc_struct.cpp b/src/share/ob_rpc_struct.cpp index 562d660c5..6ebb3b17d 100644 --- a/src/share/ob_rpc_struct.cpp +++ b/src/share/ob_rpc_struct.cpp @@ -3486,7 +3486,8 @@ DEF_TO_STRING(ObDropIndexArg) { K_(is_hidden), K_(is_inner), K_(is_vec_inner_drop), - K_(only_set_status)); + K_(only_set_status), + K_(index_ids)); J_OBJ_END(); return pos; } @@ -3502,7 +3503,8 @@ OB_SERIALIZE_MEMBER((ObDropIndexArg, ObIndexArg), is_hidden_, is_inner_, is_vec_inner_drop_, - only_set_status_); + only_set_status_, + index_ids_); OB_SERIALIZE_MEMBER(ObDropIndexRes, tenant_id_, index_table_id_, schema_version_, task_id_); diff --git a/src/share/ob_rpc_struct.h b/src/share/ob_rpc_struct.h index 43829e7cb..6ae004d9c 100644 --- a/src/share/ob_rpc_struct.h +++ b/src/share/ob_rpc_struct.h @@ -1400,6 +1400,7 @@ public: is_inner_ = false; is_vec_inner_drop_ = false; only_set_status_ = false; + index_ids_.reset(); } virtual ~ObDropIndexArg() {} void reset() @@ -1412,6 +1413,7 @@ public: is_inner_ = false; is_vec_inner_drop_ = false; only_set_status_ = false; + index_ids_.reset(); } bool is_valid() const { return ObIndexArg::is_valid(); } uint64_t index_table_id_; @@ -1421,6 +1423,7 @@ public: bool is_inner_; bool is_vec_inner_drop_; bool only_set_status_; + common::ObSEArray index_ids_; DECLARE_VIRTUAL_TO_STRING; }; diff --git a/src/sql/resolver/ddl/ob_ddl_resolver.cpp b/src/sql/resolver/ddl/ob_ddl_resolver.cpp index 438cc6e0f..5759a7aae 100644 --- a/src/sql/resolver/ddl/ob_ddl_resolver.cpp +++ b/src/sql/resolver/ddl/ob_ddl_resolver.cpp @@ -7767,7 +7767,7 @@ int ObDDLResolver::resolve_vec_index_constraint( } else if (!is_vector_memory_valid) { ret = OB_NOT_SUPPORTED; LOG_WARN("not support vector index when ob_vector_memory_limit_percentage is 0", K(ret)); - LOG_USER_ERROR(OB_NOT_SUPPORTED, "when ob_vector_memory_limit_percentage = 0 or memsotre_limit >= 85, vector index is"); + LOG_USER_ERROR(OB_NOT_SUPPORTED, "when ob_vector_memory_limit_percentage = 0 or memstore_limit >= 85, vector index is"); } else { index_keyname_ = VEC_KEY; ParseNode *option_node = NULL;