[vector index] fix some error of post create hung
This commit is contained in:
parent
0eefcec064
commit
49fab102f5
@ -327,6 +327,7 @@ PCODE_DEF(OB_DROP_DIRECTORY, 0x382)
|
||||
PCODE_DEF(OB_UPDATE_MAX_USED_TENANT_ID, 0x383)
|
||||
PCODE_DEF(OB_REFRESH_SCHEMA, 0x384)
|
||||
PCODE_DEF(OB_REBUILD_VEC_INDEX, 0x385)
|
||||
PCODE_DEF(OB_DROP_INDEX_ON_FAILED, 0x386)
|
||||
// ob server
|
||||
//PCODE_DEF(OB_MIGRATE_OVER, 0x402)
|
||||
//PCODE_DEF(OB_CLEAR_REBUILD_ROOT_PARTITION, 0x403): not supported on 4.0
|
||||
|
@ -114,6 +114,7 @@ void oceanbase::observer::init_srv_xlator_for_rootserver(ObSrvRpcXlator *xlator)
|
||||
RPC_PROCESSOR(rootserver::ObRpcCreateAuxIndexP, *gctx_.root_service_);
|
||||
RPC_PROCESSOR(rootserver::ObRpcCreateIndexP, *gctx_.root_service_);
|
||||
RPC_PROCESSOR(rootserver::ObRpcDropIndexP, *gctx_.root_service_);
|
||||
RPC_PROCESSOR(rootserver::ObRpcDropIndexOnFailedP, *gctx_.root_service_);
|
||||
RPC_PROCESSOR(rootserver::ObRpcCreateMLogP, *gctx_.root_service_);
|
||||
RPC_PROCESSOR(rootserver::ObRpcCreateTableLikeP, *gctx_.root_service_);
|
||||
RPC_PROCESSOR(rootserver::ObRpcExecuteBootstrapP, *gctx_.root_service_);
|
||||
|
@ -1091,6 +1091,7 @@ int ObDDLScheduler::create_ddl_task(const ObCreateDDLTaskParam ¶m,
|
||||
param.consumer_group_id_,
|
||||
param.vec_vid_rowkey_schema_,
|
||||
param.vec_rowkey_vid_schema_,
|
||||
param.vec_domain_index_schema_,
|
||||
param.vec_index_id_schema_,
|
||||
param.vec_snapshot_data_schema_,
|
||||
param.tenant_data_version_,
|
||||
@ -1885,6 +1886,7 @@ int ObDDLScheduler::create_drop_vec_index_task(
|
||||
const int64_t consumer_group_id,
|
||||
const share::schema::ObTableSchema *vid_rowkey_schema,
|
||||
const share::schema::ObTableSchema *rowkey_vid_schema,
|
||||
const share::schema::ObTableSchema *domain_index_schema,
|
||||
const share::schema::ObTableSchema *index_id_schema,
|
||||
const share::schema::ObTableSchema *snapshot_data_schema,
|
||||
const uint64_t tenant_data_version,
|
||||
@ -1913,9 +1915,11 @@ int ObDDLScheduler::create_drop_vec_index_task(
|
||||
LOG_WARN("invalid argument", K(ret), KP(index_schema), K(schema_version));
|
||||
} else if (OB_FAIL(ObDDLTask::fetch_new_task_id(root_service_->get_sql_proxy(), index_schema->get_tenant_id(), task_id))) {
|
||||
LOG_WARN("fetch new task id failed", K(ret));
|
||||
} else if (OB_FAIL(index_schema->get_index_name(vec_domain_index_name))) {
|
||||
LOG_WARN("fail to get vec index name", K(ret), KPC(index_schema));
|
||||
} else {
|
||||
if (OB_FAIL(ret) || OB_ISNULL(domain_index_schema)) {
|
||||
} else if (OB_FAIL(domain_index_schema->get_index_name(vec_domain_index_name))) {
|
||||
LOG_WARN("fail to get vid rowkey name", K(ret), KPC(domain_index_schema));
|
||||
}
|
||||
if (OB_FAIL(ret) || OB_ISNULL(vid_rowkey_schema)) {
|
||||
} else if (OB_FAIL(vid_rowkey_schema->get_index_name(vec_vid_rowkey_name))) {
|
||||
LOG_WARN("fail to get vid rowkey name", K(ret), KPC(vid_rowkey_schema));
|
||||
@ -1938,10 +1942,11 @@ int ObDDLScheduler::create_drop_vec_index_task(
|
||||
|
||||
uint64_t vid_rowkey_table_id = OB_ISNULL(vid_rowkey_schema) ? OB_INVALID_ID : vid_rowkey_schema->get_table_id();
|
||||
uint64_t rowkey_vid_table_id = OB_ISNULL(rowkey_vid_schema) ? OB_INVALID_ID : rowkey_vid_schema->get_table_id();
|
||||
uint64_t domain_index_table_id = OB_ISNULL(domain_index_schema) ? OB_INVALID_ID : domain_index_schema->get_table_id();
|
||||
uint64_t index_id_table_id = OB_ISNULL(index_id_schema) ? OB_INVALID_ID : index_id_schema->get_table_id();
|
||||
uint64_t snapshot_data_table_id = OB_ISNULL(snapshot_data_schema) ? OB_INVALID_ID : snapshot_data_schema->get_table_id();
|
||||
|
||||
const ObVecIndexDDLChildTaskInfo domain_index(vec_domain_index_name, index_schema->get_table_id(), init_task_id);
|
||||
const ObVecIndexDDLChildTaskInfo domain_index(vec_domain_index_name, domain_index_table_id, init_task_id);
|
||||
const ObVecIndexDDLChildTaskInfo vid_rowkey(vec_vid_rowkey_name, vid_rowkey_table_id, init_task_id);
|
||||
const ObVecIndexDDLChildTaskInfo rowkey_vid(vec_rowkey_vid_name, rowkey_vid_table_id, init_task_id);
|
||||
const ObVecIndexDDLChildTaskInfo index_id(vec_index_id_name, index_id_table_id, init_task_id);
|
||||
|
@ -508,6 +508,7 @@ private:
|
||||
const int64_t consumer_group_id,
|
||||
const share::schema::ObTableSchema *vid_rowkey_schema_,
|
||||
const share::schema::ObTableSchema *rowkey_vid_schema_,
|
||||
const share::schema::ObTableSchema *domain_index_schema,
|
||||
const share::schema::ObTableSchema *delta_buffer_schema_,
|
||||
const share::schema::ObTableSchema *index_snapshot_data_schema_,
|
||||
const uint64_t tenant_data_version,
|
||||
|
@ -43,6 +43,7 @@
|
||||
#include "observer/ob_server_struct.h"
|
||||
#include "share/ob_ddl_sim_point.h"
|
||||
#include "rootserver/ddl_task/ob_rebuild_index_task.h"
|
||||
#include "rootserver/ddl_task/ob_vec_index_build_task.h"
|
||||
|
||||
const bool OB_DDL_TASK_ENABLE_TRACING = false;
|
||||
|
||||
@ -214,7 +215,7 @@ ObCreateDDLTaskParam::ObCreateDDLTaskParam()
|
||||
consumer_group_id_(0), parent_task_id_(0), task_id_(0), type_(DDL_INVALID), src_table_schema_(nullptr),
|
||||
dest_table_schema_(nullptr), ddl_arg_(nullptr), allocator_(nullptr),
|
||||
aux_rowkey_doc_schema_(nullptr), aux_doc_rowkey_schema_(nullptr), aux_doc_word_schema_(nullptr),
|
||||
vec_rowkey_vid_schema_(nullptr), vec_vid_rowkey_schema_(nullptr), vec_index_id_schema_(nullptr), vec_snapshot_data_schema_(nullptr),
|
||||
vec_rowkey_vid_schema_(nullptr), vec_vid_rowkey_schema_(nullptr), vec_domain_index_schema_(nullptr), vec_index_id_schema_(nullptr), vec_snapshot_data_schema_(nullptr),
|
||||
tenant_data_version_(0), ddl_need_retry_at_executor_(false), is_pre_split_(false)
|
||||
{
|
||||
}
|
||||
@ -236,7 +237,7 @@ ObCreateDDLTaskParam::ObCreateDDLTaskParam(const uint64_t tenant_id,
|
||||
parent_task_id_(parent_task_id), task_id_(task_id), type_(type), src_table_schema_(src_table_schema), dest_table_schema_(dest_table_schema),
|
||||
ddl_arg_(ddl_arg), allocator_(allocator), aux_rowkey_doc_schema_(nullptr), aux_doc_rowkey_schema_(nullptr),
|
||||
aux_doc_word_schema_(nullptr),
|
||||
vec_rowkey_vid_schema_(nullptr), vec_vid_rowkey_schema_(nullptr), vec_index_id_schema_(nullptr), vec_snapshot_data_schema_(nullptr),
|
||||
vec_rowkey_vid_schema_(nullptr), vec_vid_rowkey_schema_(nullptr), vec_domain_index_schema_(nullptr), vec_index_id_schema_(nullptr), vec_snapshot_data_schema_(nullptr),
|
||||
tenant_data_version_(0),
|
||||
ddl_need_retry_at_executor_(ddl_need_retry_at_executor), is_pre_split_(false)
|
||||
{
|
||||
@ -2987,6 +2988,61 @@ int ObDDLTaskRecordOperator::update_status_and_message(
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
int ObDDLTaskRecordOperator::update_parent_task_message(
|
||||
const int64_t tenant_id,
|
||||
const int64_t parent_task_id,
|
||||
const ObTableSchema &index_schema,
|
||||
const uint64_t target_id,
|
||||
ObDDLUpateParentTaskIDType update_type,
|
||||
ObIAllocator &allocator,
|
||||
common::ObISQLClient &proxy)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObDDLTaskRecord task_record;
|
||||
if (OB_INVALID_ID == tenant_id || OB_INVALID_ID == parent_task_id || OB_INVALID_ID == target_id) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(tenant_id), K(parent_task_id), K(target_id));
|
||||
} else if (OB_FAIL(get_ddl_task_record(tenant_id, parent_task_id, GCTX.root_service_->get_sql_proxy(), allocator, task_record))) {
|
||||
LOG_WARN("fail to get ddl task record", K(ret), K(parent_task_id));
|
||||
} else {
|
||||
if (task_record.ddl_type_ == DDL_CREATE_VEC_INDEX) {
|
||||
SMART_VAR(ObVecIndexBuildTask, task) {
|
||||
if (OB_FAIL(task.init(task_record))) {
|
||||
LOG_WARN("fail to init ObVecIndexBuildTask", K(ret), K(task_record));
|
||||
} else if (UPDATE_CREATE_INDEX_ID == update_type) {
|
||||
if (index_schema.is_vec_rowkey_vid_type()) {
|
||||
task.set_rowkey_vid_aux_table_id(target_id);
|
||||
task.set_rowkey_vid_task_submitted(true);
|
||||
} else if (index_schema.is_vec_vid_rowkey_type()) {
|
||||
task.set_vid_rowkey_aux_table_id(target_id);
|
||||
task.set_vid_rowkey_task_submitted(true);
|
||||
} else if (index_schema.is_vec_delta_buffer_type()) {
|
||||
task.set_delta_buffer_table_id(target_id);
|
||||
task.set_delta_buffer_task_submitted(true);
|
||||
} else if (index_schema.is_vec_index_id_type()) {
|
||||
task.set_index_id_table_id(target_id);
|
||||
task.set_index_id_task_submitted(true);
|
||||
} else if (index_schema.is_vec_index_snapshot_data_type()) {
|
||||
task.set_index_snapshot_data_table_id(target_id);
|
||||
task.set_index_snapshot_data_task_submitted(true);
|
||||
}
|
||||
} else if (UPDATE_DROP_INDEX_TASK_ID == update_type) {
|
||||
task.set_drop_index_task_id(target_id);
|
||||
task.set_drop_index_task_submitted(true);
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(task.update_task_message(proxy))) {
|
||||
LOG_WARN("fail to update task message", K(ret), K(parent_task_id));
|
||||
}
|
||||
} else {
|
||||
// TODO: other ddl type need to be update parent task message, now skip.
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDDLTaskRecordOperator::update_ret_code_and_message(
|
||||
common::ObISQLClient &proxy,
|
||||
const uint64_t tenant_id,
|
||||
|
@ -159,6 +159,11 @@ public:
|
||||
int64_t task_id_;
|
||||
};
|
||||
|
||||
enum ObDDLUpateParentTaskIDType
|
||||
{
|
||||
UPDATE_CREATE_INDEX_ID = 0,
|
||||
UPDATE_DROP_INDEX_TASK_ID,
|
||||
};
|
||||
|
||||
struct ObVecIndexDDLChildTaskInfo final
|
||||
{
|
||||
@ -240,7 +245,7 @@ public:
|
||||
TO_STRING_KV(K_(tenant_id), K_(object_id), K_(schema_version), K_(parallelism), K_(consumer_group_id), K_(parent_task_id), K_(task_id),
|
||||
K_(type), KPC_(src_table_schema), KPC_(dest_table_schema), KPC_(ddl_arg), K_(tenant_data_version),
|
||||
K_(sub_task_trace_id), KPC_(aux_rowkey_doc_schema), KPC_(aux_doc_rowkey_schema), KPC_(aux_doc_word_schema),
|
||||
K_(vec_rowkey_vid_schema), K_(vec_vid_rowkey_schema), K_(vec_index_id_schema), K_(vec_snapshot_data_schema),
|
||||
K_(vec_rowkey_vid_schema), K_(vec_vid_rowkey_schema), K_(vec_domain_index_schema), K_(vec_index_id_schema), K_(vec_snapshot_data_schema),
|
||||
K_(ddl_need_retry_at_executor), K_(is_pre_split));
|
||||
public:
|
||||
int32_t sub_task_trace_id_;
|
||||
@ -261,6 +266,7 @@ public:
|
||||
const ObTableSchema *aux_doc_word_schema_;
|
||||
const ObTableSchema *vec_rowkey_vid_schema_;
|
||||
const ObTableSchema *vec_vid_rowkey_schema_;
|
||||
const ObTableSchema *vec_domain_index_schema_;
|
||||
const ObTableSchema *vec_index_id_schema_;
|
||||
const ObTableSchema *vec_snapshot_data_schema_;
|
||||
uint64_t tenant_data_version_;
|
||||
@ -315,6 +321,15 @@ public:
|
||||
const int ret_code,
|
||||
ObString &message);
|
||||
|
||||
static int update_parent_task_message(
|
||||
const int64_t tenant_id,
|
||||
const int64_t parent_task_id,
|
||||
const ObTableSchema &index_schema,
|
||||
const uint64_t target_id,
|
||||
ObDDLUpateParentTaskIDType update_type,
|
||||
ObIAllocator &allocator,
|
||||
common::ObISQLClient &proxy);
|
||||
|
||||
static int get_schedule_info_for_update(
|
||||
common::ObISQLClient &proxy,
|
||||
const uint64_t tenant_id,
|
||||
|
@ -72,7 +72,6 @@ int ObDropVecIndexTask::init(
|
||||
if (OB_UNLIKELY(OB_INVALID_ID == tenant_id
|
||||
|| task_id <= 0
|
||||
|| OB_INVALID_ID == data_table_id
|
||||
|| !domain_index.is_valid()
|
||||
|| schema_version <= 0)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid arguments", K(ret), K(tenant_id), K(task_id), K(data_table_id), K(rowkey_vid),
|
||||
@ -99,7 +98,7 @@ int ObDropVecIndexTask::init(
|
||||
set_gmt_create(ObTimeUtility::current_time());
|
||||
tenant_id_ = tenant_id;
|
||||
object_id_ = data_table_id;
|
||||
target_object_id_ = domain_index.table_id_;
|
||||
target_object_id_ = data_table_id; // not use this id
|
||||
schema_version_ = schema_version;
|
||||
task_id_ = task_id;
|
||||
parent_task_id_ = 0; // no parent task
|
||||
|
@ -601,8 +601,6 @@ int ObVecIndexBuildTask::prepare_aux_table(const ObIndexType index_type,
|
||||
obrpc::ObRpcProxy::myaddr_,
|
||||
OB_VEC_INDEX_BUILD_CHILD_TASK_NUM))) {
|
||||
LOG_WARN("fail to prepare_aux_table", K(ret), K(index_type));
|
||||
} else if (OB_FAIL(update_task_message())) {
|
||||
LOG_WARN("fail to update task message", K(ret), K(index_type));
|
||||
}
|
||||
} // samart var
|
||||
return ret;
|
||||
@ -1677,6 +1675,7 @@ int ObVecIndexBuildTask::ChangeTaskStatusFn::operator()(common::hash::HashMapPai
|
||||
if (OB_ENTRY_NOT_EXIST == ret) {
|
||||
// ongoing child task
|
||||
ret = OB_SUCCESS;
|
||||
not_finished_cnt_++;
|
||||
ObMySQLTransaction trans;
|
||||
if (OB_FAIL(trans.start(&rt_service_->get_sql_proxy(),
|
||||
dest_tenant_id_))) {
|
||||
@ -1714,7 +1713,8 @@ int ObVecIndexBuildTask::clean_on_failed()
|
||||
LOG_WARN("task status not match", K(ret), K(task_status_));
|
||||
} else {
|
||||
// 1. cancel ongoing build index task
|
||||
ChangeTaskStatusFn change_statu_fn(dependent_task_result_map_, dst_tenant_id_, root_service_);
|
||||
int64_t not_finished_cnt = 0;
|
||||
ChangeTaskStatusFn change_statu_fn(dependent_task_result_map_, dst_tenant_id_, root_service_, not_finished_cnt);
|
||||
if (OB_FAIL(dependent_task_result_map_.foreach_refactored(change_statu_fn))) {
|
||||
if (OB_ITER_END != ret) {
|
||||
LOG_WARN("foreach refactored failed", K(ret), K(dst_tenant_id_));
|
||||
@ -1724,6 +1724,8 @@ int ObVecIndexBuildTask::clean_on_failed()
|
||||
}
|
||||
// 2. drop already built index
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (not_finished_cnt > 0) {
|
||||
LOG_INFO("child task not finished, not submit drop vec index task.", K(not_finished_cnt));
|
||||
} else if (!drop_index_task_submitted_) {
|
||||
if (OB_FAIL(submit_drop_vec_index_task())) {
|
||||
LOG_WARN("failed to drop vec index", K(ret));
|
||||
@ -1753,6 +1755,9 @@ int ObVecIndexBuildTask::submit_drop_vec_index_task()
|
||||
const ObTableSchema *index_table_schema = nullptr;
|
||||
const ObDatabaseSchema *database_schema = nullptr;
|
||||
const ObTableSchema *data_table_schema = nullptr;
|
||||
|
||||
obrpc::ObDropIndexArg drop_index_arg;
|
||||
obrpc::ObDropIndexRes drop_index_res;
|
||||
ObString index_name;
|
||||
ObSqlString drop_index_sql;
|
||||
bool is_index_exist = true;
|
||||
@ -1764,20 +1769,28 @@ int ObVecIndexBuildTask::submit_drop_vec_index_task()
|
||||
LOG_WARN("should not be null", K(ret));
|
||||
} else if (OB_FAIL(schema_service.get_tenant_schema_guard(tenant_id_, schema_guard))) {
|
||||
LOG_WARN("get tenant schema guard failed", K(ret), K(tenant_id_));
|
||||
} else if (OB_FAIL(schema_guard.check_table_exist(tenant_id_, index_table_id, is_index_exist))) {
|
||||
LOG_WARN("check table exist failed", K(ret), K_(tenant_id), K(index_table_id));
|
||||
} else if (!is_index_exist) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("vec index aux schema is nullptr, fail to roll back", K(ret), K(index_table_id), K(delta_buffer_table_id_), K(index_table_id_));
|
||||
} else if (OB_FAIL(schema_guard.get_table_schema(tenant_id_, index_table_id, index_table_schema))) {
|
||||
LOG_WARN("get index schema failed", K(ret), K(tenant_id_), K(index_table_id));
|
||||
} else if (OB_INVALID_ID != rowkey_vid_aux_table_id_ &&
|
||||
OB_FAIL(drop_index_arg.index_ids_.push_back(rowkey_vid_aux_table_id_))) {
|
||||
LOG_WARN("fail to push back rowkey_vid_aux_table_id_", K(ret));
|
||||
} else if (OB_INVALID_ID != vid_rowkey_aux_table_id_ &&
|
||||
OB_FAIL(drop_index_arg.index_ids_.push_back(vid_rowkey_aux_table_id_))) {
|
||||
LOG_WARN("fail to push back vid_rowkey_aux_table_id_", K(ret));
|
||||
} else if (OB_INVALID_ID != delta_buffer_table_id_ &&
|
||||
OB_FAIL(drop_index_arg.index_ids_.push_back(delta_buffer_table_id_))) {
|
||||
LOG_WARN("fail to push back delta_buffer_table_id_", K(ret), K(delta_buffer_table_id_));
|
||||
} else if (OB_INVALID_ID != index_id_table_id_ &&
|
||||
OB_FAIL(drop_index_arg.index_ids_.push_back(index_id_table_id_))) {
|
||||
LOG_WARN("fail to push back index_id_table_id_", K(ret), K(index_id_table_id_));
|
||||
} else if (OB_INVALID_ID != index_snapshot_data_table_id_ &&
|
||||
OB_FAIL(drop_index_arg.index_ids_.push_back(index_snapshot_data_table_id_))) {
|
||||
LOG_WARN("fail to push back index_snapshot_data_table_id_", K(ret));
|
||||
} else if (drop_index_arg.index_ids_.count() <= 0) {
|
||||
LOG_INFO("no table need to be drop, skip", K(ret)); // no table exist, skip drop
|
||||
} else if (schema_guard.get_table_schema(tenant_id_, drop_index_arg.index_ids_.at(0), index_table_schema)) {
|
||||
LOG_WARN("fail to get table schema", K(ret), K(drop_index_arg.index_ids_.at(0)));
|
||||
} else if (OB_ISNULL(index_table_schema)) {
|
||||
ret = OB_SCHEMA_ERROR;
|
||||
LOG_WARN("index schema is null", K(ret), K(index_table_id));
|
||||
} else if (index_table_schema->is_in_recyclebin()) {
|
||||
// the index has been dropped, just finish this task
|
||||
} else if (OB_FAIL(index_table_schema->get_index_name(index_name))) {
|
||||
LOG_WARN("get index name failed", KR(ret), K(index_table_schema->get_table_type()), KPC(index_table_schema));
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("index schema is null", K(ret), KP(index_table_schema));
|
||||
} else if (OB_FAIL(schema_guard.get_database_schema(tenant_id_, index_table_schema->get_database_id(), database_schema))) {
|
||||
LOG_WARN("get database schema failed", KR(ret), K(index_table_schema->get_database_id()));
|
||||
} else if (OB_FAIL(schema_guard.get_table_schema(tenant_id_, index_table_schema->get_data_table_id(), data_table_schema))) {
|
||||
@ -1787,32 +1800,27 @@ int ObVecIndexBuildTask::submit_drop_vec_index_task()
|
||||
LOG_WARN("get null schema", KR(ret), KP(database_schema), KP(data_table_schema));
|
||||
} else {
|
||||
int64_t ddl_rpc_timeout = 0;
|
||||
obrpc::ObDropIndexArg drop_index_arg;
|
||||
obrpc::ObDropIndexRes drop_index_res;
|
||||
drop_index_arg.is_inner_ = true;
|
||||
drop_index_arg.tenant_id_ = tenant_id_;
|
||||
drop_index_arg.exec_tenant_id_ = tenant_id_;
|
||||
drop_index_arg.index_table_id_ = index_table_id;
|
||||
drop_index_arg.session_id_ = data_table_schema->get_session_id();
|
||||
drop_index_arg.index_name_ = index_name;
|
||||
drop_index_arg.table_name_ = data_table_schema->get_table_name();
|
||||
drop_index_arg.database_name_ = database_schema->get_database_name_str();
|
||||
drop_index_arg.index_name_ = index_table_schema->get_table_name(); // not in used
|
||||
drop_index_arg.index_action_type_ = obrpc::ObIndexArg::DROP_INDEX;
|
||||
drop_index_arg.is_add_to_scheduler_ = true;
|
||||
drop_index_arg.task_id_ = task_id_;
|
||||
drop_index_arg.is_vec_inner_drop_ = has_aux_table ? true : false; // if want to drop only one index, is_vec_inner_drop_ should be false, else should be true.
|
||||
drop_index_arg.task_id_ = task_id_; // parent task
|
||||
drop_index_arg.session_id_ = data_table_schema->get_session_id();
|
||||
drop_index_arg.table_name_ = data_table_schema->get_table_name();
|
||||
drop_index_arg.database_name_ = database_schema->get_database_name_str();
|
||||
drop_index_arg.is_vec_inner_drop_ = true; // if want to drop only one index, is_vec_inner_drop_ should be false, else should be true.
|
||||
if (OB_FAIL(ObDDLUtil::get_ddl_rpc_timeout(index_table_schema->get_all_part_num() + data_table_schema->get_all_part_num(), ddl_rpc_timeout))) {
|
||||
LOG_WARN("failed to get ddl rpc timeout", KR(ret));
|
||||
} else if (OB_FAIL(DDL_SIM(tenant_id_, task_id_, DROP_INDEX_RPC_FAILED))) {
|
||||
LOG_WARN("ddl sim failure", KR(ret), K(tenant_id_), K(task_id_));
|
||||
} else if (OB_FAIL(root_service_->get_common_rpc_proxy().timeout(ddl_rpc_timeout).drop_index(drop_index_arg, drop_index_res))) {
|
||||
} else if (OB_FAIL(root_service_->get_common_rpc_proxy().timeout(ddl_rpc_timeout).drop_index_on_failed(drop_index_arg, drop_index_res))) {
|
||||
LOG_WARN("drop index failed", KR(ret), K(ddl_rpc_timeout));
|
||||
} else {
|
||||
drop_index_task_submitted_ = true;
|
||||
drop_index_task_id_ = drop_index_res.task_id_;
|
||||
if (OB_FAIL(update_task_message())) {
|
||||
LOG_WARN("fail to update task message", K(ret));
|
||||
}
|
||||
LOG_INFO("success submit drop vec index task", K(ret), K(drop_index_task_id_));
|
||||
}
|
||||
}
|
||||
@ -1982,7 +1990,7 @@ int ObVecIndexBuildTask::cleanup_impl()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObVecIndexBuildTask::update_task_message()
|
||||
int ObVecIndexBuildTask::update_task_message(common::ObISQLClient &proxy)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
char *buf = nullptr;
|
||||
@ -1998,7 +2006,7 @@ int ObVecIndexBuildTask::update_task_message()
|
||||
LOG_WARN("failed to serialize params to message", KR(ret));
|
||||
} else {
|
||||
msg.assign(buf, serialize_param_size);
|
||||
if (OB_FAIL(ObDDLTaskRecordOperator::update_message(root_service_->get_sql_proxy(), tenant_id_, task_id_, msg))) {
|
||||
if (OB_FAIL(ObDDLTaskRecordOperator::update_message(proxy, tenant_id_, task_id_, msg))) {
|
||||
LOG_WARN("failed to update message", KR(ret));
|
||||
}
|
||||
}
|
||||
|
@ -69,6 +69,22 @@ public:
|
||||
K(drop_index_task_submitted_), K(schema_version_), K(execution_id_),
|
||||
K(consumer_group_id_), K(trace_id_), K(parallelism_), K(create_index_arg_));
|
||||
|
||||
public:
|
||||
void set_rowkey_vid_aux_table_id(const uint64_t id) { rowkey_vid_aux_table_id_ = id; }
|
||||
void set_vid_rowkey_aux_table_id(const uint64_t id) { vid_rowkey_aux_table_id_ = id; }
|
||||
void set_delta_buffer_table_id(const uint64_t id) { delta_buffer_table_id_ = id; }
|
||||
void set_index_id_table_id(const uint64_t id) { index_id_table_id_ = id; }
|
||||
void set_index_snapshot_data_table_id(const uint64_t id) { index_snapshot_data_table_id_ = id; }
|
||||
void set_drop_index_task_id(const uint64_t id) { drop_index_task_id_ = id; }
|
||||
void set_rowkey_vid_task_submitted(const bool status) { rowkey_vid_task_submitted_ = status; }
|
||||
void set_vid_rowkey_task_submitted(const bool status) { vid_rowkey_task_submitted_ = status; }
|
||||
void set_delta_buffer_task_submitted(const bool status) { delta_buffer_task_submitted_ = status; }
|
||||
void set_index_id_task_submitted(const bool status) { index_id_task_submitted_ = status; }
|
||||
void set_index_snapshot_data_task_submitted(const bool status) { index_snapshot_data_task_submitted_ = status; }
|
||||
void set_drop_index_task_submitted(const bool status) { drop_index_task_submitted_ = status; }
|
||||
|
||||
int update_task_message(common::ObISQLClient &proxy);
|
||||
|
||||
private:
|
||||
int get_next_status(share::ObDDLTaskStatus &next_status);
|
||||
int prepare_aux_table(const ObIndexType index_type,
|
||||
@ -85,6 +101,7 @@ private:
|
||||
int construct_delta_buffer_arg(obrpc::ObCreateIndexArg &arg);
|
||||
int construct_index_id_arg(obrpc::ObCreateIndexArg &arg);
|
||||
int construct_index_snapshot_data_arg(obrpc::ObCreateIndexArg &arg);
|
||||
|
||||
int record_index_table_id(
|
||||
const obrpc::ObCreateIndexArg *create_index_arg_,
|
||||
uint64_t &aux_table_id);
|
||||
@ -108,16 +125,16 @@ private:
|
||||
const obrpc::ObCreateIndexArg &source_arg,
|
||||
obrpc::ObCreateIndexArg &dest_arg);
|
||||
int print_child_task_ids(char *buf, int64_t len);
|
||||
int update_task_message();
|
||||
|
||||
private:
|
||||
struct ChangeTaskStatusFn final
|
||||
{
|
||||
public:
|
||||
ChangeTaskStatusFn(common::hash::ObHashMap<uint64_t, share::ObDomainDependTaskStatus> &dependent_task_result_map, const uint64_t tenant_id, ObRootService *root_service) :
|
||||
ChangeTaskStatusFn(common::hash::ObHashMap<uint64_t, share::ObDomainDependTaskStatus> &dependent_task_result_map, const uint64_t tenant_id, ObRootService *root_service, int64_t ¬_finished_cnt) :
|
||||
dependent_task_result_map_(dependent_task_result_map),
|
||||
rt_service_(root_service),
|
||||
dest_tenant_id_(tenant_id)
|
||||
dest_tenant_id_(tenant_id),
|
||||
not_finished_cnt_(not_finished_cnt)
|
||||
{}
|
||||
public:
|
||||
~ChangeTaskStatusFn() = default;
|
||||
@ -126,6 +143,7 @@ private:
|
||||
common::hash::ObHashMap<uint64_t, share::ObDomainDependTaskStatus> &dependent_task_result_map_;
|
||||
ObRootService *rt_service_;
|
||||
uint64_t dest_tenant_id_;
|
||||
int64_t ¬_finished_cnt_;
|
||||
};
|
||||
struct CheckTaskStatusFn final
|
||||
{
|
||||
|
@ -6471,6 +6471,7 @@ int ObDDLService::create_aux_index_task_(
|
||||
if (OB_FAIL(GCTX.root_service_->get_ddl_task_scheduler().
|
||||
create_ddl_task(param, trans, task_record))) {
|
||||
if (OB_ENTRY_EXIST == ret) {
|
||||
trans.reset_last_error();
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
LOG_WARN("submit create index ddl task failed", K(ret));
|
||||
@ -6578,6 +6579,11 @@ int ObDDLService::create_aux_index(
|
||||
} else if (FALSE_IT(result.ddl_task_id_ = task_record.task_id_)) {
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(ObDDLTaskRecordOperator::update_parent_task_message(tenant_id,
|
||||
arg.task_id_, *idx_schema, result.aux_table_id_, ObDDLUpateParentTaskIDType::UPDATE_CREATE_INDEX_ID, allocator, trans))) {
|
||||
LOG_WARN("fail to update parent task message", K(ret), K(arg.task_id_), K(idx_schema));
|
||||
}
|
||||
} else { // 3. index scheme not exist, generate schema && create ddl task
|
||||
ObTableSchema index_schema;
|
||||
if (OB_FAIL(generate_aux_index_schema_(tenant_id,
|
||||
@ -6592,6 +6598,9 @@ int ObDDLService::create_aux_index(
|
||||
LOG_WARN("failed to generate aux index schema", K(ret), K(create_index_arg));
|
||||
} else if (FALSE_IT(result.schema_generated_ = true)) {
|
||||
} else if (FALSE_IT(result.aux_table_id_ = index_schema.get_table_id())) {
|
||||
} else if (OB_FAIL(ObDDLTaskRecordOperator::update_parent_task_message(tenant_id,
|
||||
arg.task_id_, index_schema, result.aux_table_id_, ObDDLUpateParentTaskIDType::UPDATE_CREATE_INDEX_ID, allocator, trans))) {
|
||||
LOG_WARN("fail to update parent task message", K(ret), K(arg.task_id_), K(index_schema));
|
||||
} else if (OB_FAIL(create_aux_index_task_(data_schema,
|
||||
&index_schema,
|
||||
create_index_arg,
|
||||
@ -6615,9 +6624,12 @@ int ObDDLService::create_aux_index(
|
||||
if (OB_FAIL(publish_schema(tenant_id))) {
|
||||
LOG_WARN("fail to publish schema", K(ret), K(tenant_id));
|
||||
} else if (OB_INVALID_ID == result.ddl_task_id_) { // no need to schedule
|
||||
} else if (OB_FAIL(GCTX.root_service_->get_ddl_task_scheduler().
|
||||
} else {
|
||||
DEBUG_SYNC(CREATE_AUX_INDEX_TABLE);
|
||||
if (OB_FAIL(GCTX.root_service_->get_ddl_task_scheduler().
|
||||
schedule_ddl_task(task_record))) {
|
||||
LOG_WARN("fail to schedule ddl task", K(ret), K(task_record));
|
||||
LOG_WARN("fail to schedule ddl task", K(ret), K(task_record));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -100,6 +100,129 @@ int ObIndexBuilder::create_index(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObIndexBuilder::drop_index_on_failed(const ObDropIndexArg &arg, obrpc::ObDropIndexRes &res)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const uint64_t tenant_id = arg.tenant_id_;
|
||||
const bool is_index = false;
|
||||
ObArenaAllocator allocator(ObModIds::OB_SCHEMA);
|
||||
const ObTableSchema *data_table_schema = NULL;
|
||||
ObSchemaGetterGuard schema_guard;
|
||||
bool is_db_in_recyclebin = false;
|
||||
uint64_t compat_version = 0;
|
||||
if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id, compat_version))) {
|
||||
LOG_WARN("failed to get data version", KR(ret), K(tenant_id));
|
||||
} else if (compat_version < DATA_VERSION_4_3_3_0) {
|
||||
ret = OB_NOT_SUPPORTED;
|
||||
LOG_WARN("drop index on failed before version 4.3.3.0 is not supported", KR(ret), K(compat_version));
|
||||
LOG_USER_ERROR(OB_NOT_SUPPORTED, "drop index on failed before version 4.3.3.0 is");
|
||||
} else if (OB_FALSE_IT(schema_guard.set_session_id(arg.session_id_))) {
|
||||
} else if (!ddl_service_.is_inited()) {
|
||||
ret = OB_INNER_STAT_ERROR;
|
||||
LOG_WARN("ddl_service not init", K(ret), K(ddl_service_.is_inited()));
|
||||
} else if (!arg.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid arg", K(arg), K(ret));
|
||||
} else if (OB_FAIL(ddl_service_.get_tenant_schema_guard_with_version_in_inner_table(tenant_id, schema_guard))) {
|
||||
LOG_WARN("get_schema_guard failed", K(ret));
|
||||
} else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, arg.database_name_,
|
||||
arg.table_name_, is_index, data_table_schema, arg.is_hidden_))) {
|
||||
LOG_WARN("failed to get data table schema", K(arg), K(ret));
|
||||
} else if (NULL == data_table_schema) {
|
||||
ret = OB_TABLE_NOT_EXIST;
|
||||
LOG_WARN("fail to drop index on failed, data table not exist", K(ret), K(arg));
|
||||
} else if (arg.is_in_recyclebin_) {
|
||||
// internal delete index
|
||||
} else if (data_table_schema->is_in_recyclebin()) {
|
||||
ret = OB_ERR_OPERATION_ON_RECYCLE_OBJECT;
|
||||
LOG_WARN("can not drop index of table in recyclebin.", K(ret), K(arg));
|
||||
} else if (OB_FAIL(schema_guard.check_database_in_recyclebin(tenant_id,
|
||||
data_table_schema->get_database_id(), is_db_in_recyclebin))) {
|
||||
LOG_WARN("check database in recyclebin failed", K(ret), K(tenant_id));
|
||||
} else if (is_db_in_recyclebin) {
|
||||
ret = OB_ERR_OPERATION_ON_RECYCLE_OBJECT;
|
||||
LOG_WARN("Can not drop index of db in recyclebin", K(ret), K(arg));
|
||||
} else if (!arg.is_add_to_scheduler_) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("not add to scheduler to drop, not expected", K(ret), K(arg));
|
||||
} else if (arg.index_ids_.count() <= 0) {
|
||||
res.task_id_ = -1; // no need to drop
|
||||
LOG_INFO("target indexes to be drop is empty", K(ret));
|
||||
} else {
|
||||
ObDDLOperator ddl_operator(ddl_service_.get_schema_service(), ddl_service_.get_sql_proxy());
|
||||
ObDDLSQLTransaction trans(&ddl_service_.get_schema_service());
|
||||
int64_t refreshed_schema_version = 0;
|
||||
ObArenaAllocator allocator(lib::ObLabel("DdlTaskTmp"));
|
||||
ObDDLTaskRecord task_record;
|
||||
|
||||
bool has_index_task = false;
|
||||
typedef common::ObSEArray<share::schema::ObTableSchema, 4> TableSchemaArray;
|
||||
SMART_VAR(TableSchemaArray, new_index_schemas) {
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < arg.index_ids_.count(); ++i) {
|
||||
bool is_index_exist = false;
|
||||
const int64_t index_id = arg.index_ids_.at(i);
|
||||
const share::schema::ObTableSchema *index_table_schema= nullptr;
|
||||
if (OB_FAIL(schema_guard.check_table_exist(tenant_id, index_id, is_index_exist))) {
|
||||
LOG_WARN("check table exist failed", K(ret), K(tenant_id), K(index_id));
|
||||
} else if (!is_index_exist) { // skip
|
||||
LOG_INFO("vec index schema is nullptr", K(ret), K(index_id));
|
||||
} else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, index_id, index_table_schema))) {
|
||||
LOG_WARN("fail to get index table schema", K(ret), K(tenant_id), K(index_id));
|
||||
} else if (OB_ISNULL(index_table_schema)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected index table nullptr", K(ret), K(index_id));
|
||||
} else if (OB_FAIL(new_index_schemas.push_back(*index_table_schema))) {
|
||||
LOG_WARN("fail to push vec rowkey vid table schema", K(ret), KPC(index_table_schema));
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (new_index_schemas.count() <= 0) {
|
||||
res.task_id_ = -1; // no need to drop
|
||||
LOG_INFO("target indexes to be drop is empty", K(ret));
|
||||
} else if (OB_FAIL(schema_guard.get_schema_version(tenant_id, refreshed_schema_version))) {
|
||||
LOG_WARN("failed to get tenant schema version", KR(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(trans.start(&ddl_service_.get_sql_proxy(), tenant_id, refreshed_schema_version))) {
|
||||
LOG_WARN("start transaction failed", KR(ret), K(tenant_id), K(refreshed_schema_version));
|
||||
} else {
|
||||
bool has_exist = false;
|
||||
const ObTableSchema &new_index_schema = new_index_schemas.at(new_index_schemas.count() - 1);
|
||||
if (OB_FAIL(submit_drop_index_task(trans, *data_table_schema, new_index_schemas, arg, allocator, has_exist, task_record))) {
|
||||
LOG_WARN("submit drop index task failed", K(ret), K(task_record));
|
||||
} else if (has_exist) {
|
||||
res.task_id_ = task_record.task_id_;
|
||||
} else {
|
||||
res.tenant_id_ = new_index_schema.get_tenant_id();
|
||||
res.schema_version_ = new_index_schema.get_schema_version();
|
||||
res.task_id_ = task_record.task_id_;
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(ObDDLTaskRecordOperator::update_parent_task_message(tenant_id,
|
||||
arg.task_id_, new_index_schemas.at(0), res.task_id_,
|
||||
ObDDLUpateParentTaskIDType::UPDATE_DROP_INDEX_TASK_ID, allocator, trans))) {
|
||||
LOG_WARN("fail to update parent task message", K(ret), K(arg.task_id_), K(res.task_id_));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (trans.is_started()) {
|
||||
int temp_ret = OB_SUCCESS;
|
||||
if (OB_SUCCESS != (temp_ret = trans.end(OB_SUCC(ret)))) {
|
||||
LOG_WARN_RET(temp_ret, "trans end failed", "is_commit", OB_SUCCESS == ret, K(temp_ret));
|
||||
ret = (OB_SUCC(ret)) ? temp_ret : ret;
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_FAIL(ddl_service_.publish_schema(tenant_id))) {
|
||||
LOG_WARN("fail to publish schema", K(ret), K(tenant_id));
|
||||
} else if (OB_TMP_FAIL(GCTX.root_service_->get_ddl_task_scheduler().schedule_ddl_task(task_record))) {
|
||||
LOG_WARN("fail to schedule ddl task", K(tmp_ret), K(task_record));
|
||||
}
|
||||
}
|
||||
}
|
||||
LOG_INFO("finish drop index on failed", K(ret), K(arg));
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObIndexBuilder::drop_index(const ObDropIndexArg &arg, obrpc::ObDropIndexRes &res)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -508,29 +631,31 @@ int ObIndexBuilder::submit_build_index_task(
|
||||
return ret;
|
||||
}
|
||||
|
||||
// if index_schemas has delta_buffer_table, than index_ith = domain_index_ith, ohterwide, index_ith = 0;
|
||||
int ObIndexBuilder::recognize_vec_index_schemas(
|
||||
const common::ObIArray<share::schema::ObTableSchema> &index_schemas,
|
||||
const bool is_vec_inner_drop,
|
||||
int64_t &index_ith,
|
||||
int64_t &rowkey_vid_ith,
|
||||
int64_t &vid_rowkey_ith,
|
||||
int64_t &domain_index_ith,
|
||||
int64_t &index_id_ith,
|
||||
int64_t &snapshot_data_ith)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
index_ith = -1;
|
||||
index_ith = 0;
|
||||
rowkey_vid_ith = -1;
|
||||
vid_rowkey_ith = -1;
|
||||
domain_index_ith = -1;
|
||||
index_id_ith = -1;
|
||||
snapshot_data_ith = -1;
|
||||
const int64_t VEC_DOMAIN_INDEX_TABLE_COUNT = 1; // delta_buffer_table
|
||||
const int64_t VEC_INDEX_TABLE_COUNT = 5;
|
||||
if (OB_UNLIKELY(VEC_DOMAIN_INDEX_TABLE_COUNT != index_schemas.count() &&
|
||||
!is_vec_inner_drop && (VEC_INDEX_TABLE_COUNT != index_schemas.count()))) {
|
||||
if (OB_UNLIKELY(!is_vec_inner_drop &&
|
||||
VEC_DOMAIN_INDEX_TABLE_COUNT != index_schemas.count() &&
|
||||
VEC_INDEX_TABLE_COUNT != index_schemas.count())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid arguments", K(ret), K(index_schemas));
|
||||
} else if (index_schemas.count() == 1) {
|
||||
index_ith = 0;
|
||||
} else {
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < index_schemas.count(); ++i) {
|
||||
if (index_schemas.at(i).is_vec_rowkey_vid_type()) {
|
||||
@ -547,6 +672,14 @@ int ObIndexBuilder::recognize_vec_index_schemas(
|
||||
} else {
|
||||
vid_rowkey_ith = i;
|
||||
}
|
||||
} else if (index_schemas.at(i).is_vec_delta_buffer_type()) {
|
||||
if (OB_UNLIKELY(-1 != domain_index_ith)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpeted error, there are multiple vid rowkey tables", K(ret), K(index_schemas));
|
||||
} else {
|
||||
domain_index_ith = i;
|
||||
index_ith = domain_index_ith; // if has domain index, index_ith = domain_index_ith
|
||||
}
|
||||
} else if (index_schemas.at(i).is_vec_index_id_type()) {
|
||||
if (OB_UNLIKELY(-1 != index_id_ith)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
@ -561,11 +694,9 @@ int ObIndexBuilder::recognize_vec_index_schemas(
|
||||
} else {
|
||||
snapshot_data_ith = i;
|
||||
}
|
||||
} else if (OB_UNLIKELY(-1 != index_ith)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpeted error, there are multiple user index tables", K(ret), K(index_schemas));
|
||||
} else {
|
||||
index_ith = i;
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected drop vec index schema", K(ret), K(index_schemas.at(i)));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -692,6 +823,7 @@ int ObIndexBuilder::submit_drop_index_task(ObMySQLTransaction &trans,
|
||||
int64_t aux_multivalue_ith = -1;
|
||||
int64_t vec_rowkey_vid_ith = -1;
|
||||
int64_t vec_vid_rowkey_ith = -1;
|
||||
int64_t vec_domain_index_ith = -1;
|
||||
int64_t vec_index_id_ith = -1;
|
||||
int64_t vec_snapshot_data_ith = -1;
|
||||
|
||||
@ -710,7 +842,7 @@ int ObIndexBuilder::submit_drop_index_task(ObMySQLTransaction &trans,
|
||||
aux_rowkey_doc_ith, aux_doc_rowkey_ith))) {
|
||||
LOG_WARN("fail to recognize index and aux table from schema array", K(ret));
|
||||
} else if (index_schemas.at(0).is_vec_index() && OB_FAIL(recognize_vec_index_schemas(index_schemas, arg.is_vec_inner_drop_, index_ith, vec_rowkey_vid_ith,
|
||||
vec_vid_rowkey_ith, vec_index_id_ith, vec_snapshot_data_ith))) {
|
||||
vec_vid_rowkey_ith, vec_domain_index_ith, vec_index_id_ith, vec_snapshot_data_ith))) {
|
||||
LOG_WARN("fail to recognize index and aux table from schema array", K(ret));
|
||||
} else if (OB_ISNULL(GCTX.root_service_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
@ -720,7 +852,7 @@ int ObIndexBuilder::submit_drop_index_task(ObMySQLTransaction &trans,
|
||||
LOG_WARN("unexpected error, invalid array index", K(ret), K(index_ith));
|
||||
} else {
|
||||
const ObTableSchema &index_schema = index_schemas.at(index_ith);
|
||||
const bool is_drop_vec_task = (!arg.is_inner_ || arg.is_vec_inner_drop_) && index_schema.is_vec_delta_buffer_type(); // delta_buffer_table
|
||||
const bool is_drop_vec_task = (!arg.is_inner_ && index_schema.is_vec_delta_buffer_type()) || arg.is_vec_inner_drop_; // inner drop or user drop
|
||||
const bool is_drop_fts_task = !arg.is_inner_ && index_schema.is_fts_index_aux();
|
||||
const bool is_drop_multivalue_task = !arg.is_inner_ && index_schema.is_multivalue_index_aux();
|
||||
const bool is_drop_fts_or_multivalue_task = is_drop_fts_task || is_drop_multivalue_task;
|
||||
@ -813,6 +945,7 @@ int ObIndexBuilder::submit_drop_index_task(ObMySQLTransaction &trans,
|
||||
&arg);
|
||||
param.vec_vid_rowkey_schema_ = vec_vid_rowkey_ith == -1 ? nullptr : &(index_schemas.at(vec_vid_rowkey_ith));
|
||||
param.vec_rowkey_vid_schema_ = vec_rowkey_vid_ith == -1 ? nullptr : &(index_schemas.at(vec_rowkey_vid_ith));
|
||||
param.vec_domain_index_schema_ = vec_domain_index_ith == -1 ? nullptr : &(index_schemas.at(vec_domain_index_ith));
|
||||
param.vec_index_id_schema_ = vec_index_id_ith == -1 ? nullptr : &(index_schemas.at(vec_index_id_ith));
|
||||
param.vec_snapshot_data_schema_ = vec_snapshot_data_ith == -1 ? nullptr : &(index_schemas.at(vec_snapshot_data_ith));
|
||||
if (OB_FAIL(GCTX.root_service_->get_ddl_task_scheduler().create_ddl_task(param, trans, task_record))) {
|
||||
|
@ -58,7 +58,6 @@ public:
|
||||
int create_index(const obrpc::ObCreateIndexArg &arg,
|
||||
obrpc::ObAlterTableRes &res);
|
||||
int drop_index(const obrpc::ObDropIndexArg &arg, obrpc::ObDropIndexRes &res);
|
||||
|
||||
// Check and update local index status.
|
||||
// if not all index table updated return OB_EAGAIN.
|
||||
int do_create_index(
|
||||
@ -109,6 +108,7 @@ public:
|
||||
const uint64_t tenant_data_version,
|
||||
common::ObIAllocator &allocator,
|
||||
ObDDLTaskRecord &task_record);
|
||||
int drop_index_on_failed(const obrpc::ObDropIndexArg &arg, obrpc::ObDropIndexRes &res);
|
||||
private:
|
||||
int recognize_vec_index_schemas(
|
||||
const common::ObIArray<share::schema::ObTableSchema> &index_schemas,
|
||||
@ -116,6 +116,7 @@ private:
|
||||
int64_t &index_ith,
|
||||
int64_t &rowkey_vid_ith,
|
||||
int64_t &vid_rowkey_ith,
|
||||
int64_t &domain_index_ith,
|
||||
int64_t &index_id_ith,
|
||||
int64_t &snapshot_data_ith);
|
||||
int recognize_fts_index_schemas(
|
||||
|
@ -4987,6 +4987,32 @@ int ObRootService::alter_tablegroup(const obrpc::ObAlterTablegroupArg &arg)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObRootService::drop_index_on_failed(const obrpc::ObDropIndexArg &arg, obrpc::ObDropIndexRes &res)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (!inited_) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not init", K(ret));
|
||||
} else if (!arg.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid arg", K(ret), K(arg));
|
||||
} else {
|
||||
ObIndexBuilder index_builder(ddl_service_);
|
||||
if (OB_FAIL(index_builder.drop_index_on_failed(arg, res))) {
|
||||
LOG_WARN("index_builder drop_index_on_failed failed", K(ret), K(arg));
|
||||
}
|
||||
}
|
||||
ROOTSERVICE_EVENT_ADD("ddl scheduler", "drop index on failed",
|
||||
"tenant_id", res.tenant_id_,
|
||||
"ret", ret,
|
||||
"trace_id", *ObCurTraceId::get_trace_id(),
|
||||
"task_id", res.task_id_,
|
||||
"table_id", arg.index_table_id_,
|
||||
"schema_version", res.schema_version_);
|
||||
LOG_INFO("finish drop index on fail ddl", K(ret), K(arg), "ddl_event_info", ObDDLEventInfo());
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObRootService::drop_index(const obrpc::ObDropIndexArg &arg, obrpc::ObDropIndexRes &res)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
@ -562,6 +562,7 @@ public:
|
||||
|
||||
int create_restore_point(const obrpc::ObCreateRestorePointArg &arg);
|
||||
int drop_restore_point(const obrpc::ObDropRestorePointArg &arg);
|
||||
int drop_index_on_failed(const obrpc::ObDropIndexArg &arg, obrpc::ObDropIndexRes &res);
|
||||
|
||||
//for inner table monitor, purge in fixed time
|
||||
int purge_expire_recycle_objects(const obrpc::ObPurgeRecycleBinArg &arg, obrpc::Int64 &affected_rows);
|
||||
|
@ -344,6 +344,7 @@ DEFINE_DDL_RS_RPC_PROCESSOR(obrpc::OB_TRUNCATE_TABLE_V2, ObRpcTruncateTableV2P,
|
||||
DEFINE_DDL_RS_RPC_PROCESSOR(obrpc::OB_CREATE_AUX_INDEX, ObRpcCreateAuxIndexP, create_aux_index(arg_, result_));
|
||||
DEFINE_DDL_RS_RPC_PROCESSOR(obrpc::OB_CREATE_INDEX, ObRpcCreateIndexP, create_index(arg_, result_));
|
||||
DEFINE_DDL_RS_RPC_PROCESSOR(obrpc::OB_DROP_INDEX, ObRpcDropIndexP, drop_index(arg_, result_));
|
||||
DEFINE_DDL_RS_RPC_PROCESSOR(obrpc::OB_DROP_INDEX_ON_FAILED, ObRpcDropIndexOnFailedP, drop_index_on_failed(arg_, result_));
|
||||
DEFINE_DDL_RS_RPC_PROCESSOR(obrpc::OB_REBUILD_VEC_INDEX, ObRpcRebuildVecIndexP, rebuild_vec_index(arg_, result_));
|
||||
DEFINE_DDL_RS_RPC_PROCESSOR(obrpc::OB_CREATE_MLOG, ObRpcCreateMLogP, create_mlog(arg_, result_));
|
||||
DEFINE_DDL_RS_RPC_PROCESSOR(obrpc::OB_CREATE_TABLE_LIKE, ObRpcCreateTableLikeP, create_table_like(arg_));
|
||||
|
@ -79,6 +79,7 @@ public:
|
||||
RPC_S(PRD create_aux_index, obrpc::OB_CREATE_AUX_INDEX, (obrpc::ObCreateAuxIndexArg), obrpc::ObCreateAuxIndexRes);
|
||||
RPC_S(PRD create_index, obrpc::OB_CREATE_INDEX, (ObCreateIndexArg), ObAlterTableRes);
|
||||
RPC_S(PRD drop_index, obrpc::OB_DROP_INDEX, (ObDropIndexArg), ObDropIndexRes);
|
||||
RPC_S(PRD drop_index_on_failed, obrpc::OB_DROP_INDEX_ON_FAILED, (ObDropIndexArg), ObDropIndexRes);
|
||||
RPC_S(PRD rebuild_vec_index, obrpc::OB_REBUILD_VEC_INDEX, (ObRebuildIndexArg), ObAlterTableRes);
|
||||
RPC_S(PRD create_mlog, obrpc::OB_CREATE_MLOG, (ObCreateMLogArg), ObCreateMLogRes);
|
||||
RPC_S(PRD flashback_index, obrpc::OB_FLASHBACK_INDEX, (ObFlashBackIndexArg));
|
||||
|
@ -635,6 +635,7 @@ class ObString;
|
||||
ACT(BUILD_VECTOR_INDEX_PREPARE_ROWKEY_VID,)\
|
||||
ACT(BUILD_VECTOR_INDEX_PREPARE_AUX_INDEX,)\
|
||||
ACT(BUILD_VECTOR_INDEX_PREPARE_VID_ROWKEY,)\
|
||||
ACT(CREATE_AUX_INDEX_TABLE,)\
|
||||
ACT(BEFORE_SEND_ALTER_TABLE,)\
|
||||
ACT(BEFOR_EXEC_REBUILD_TASK,)\
|
||||
ACT(BEFORE_CREATE_HIDDEN_TABLE_IN_LOAD,)\
|
||||
|
@ -3486,7 +3486,8 @@ DEF_TO_STRING(ObDropIndexArg) {
|
||||
K_(is_hidden),
|
||||
K_(is_inner),
|
||||
K_(is_vec_inner_drop),
|
||||
K_(only_set_status));
|
||||
K_(only_set_status),
|
||||
K_(index_ids));
|
||||
J_OBJ_END();
|
||||
return pos;
|
||||
}
|
||||
@ -3502,7 +3503,8 @@ OB_SERIALIZE_MEMBER((ObDropIndexArg, ObIndexArg),
|
||||
is_hidden_,
|
||||
is_inner_,
|
||||
is_vec_inner_drop_,
|
||||
only_set_status_);
|
||||
only_set_status_,
|
||||
index_ids_);
|
||||
|
||||
OB_SERIALIZE_MEMBER(ObDropIndexRes, tenant_id_, index_table_id_, schema_version_, task_id_);
|
||||
|
||||
|
@ -1400,6 +1400,7 @@ public:
|
||||
is_inner_ = false;
|
||||
is_vec_inner_drop_ = false;
|
||||
only_set_status_ = false;
|
||||
index_ids_.reset();
|
||||
}
|
||||
virtual ~ObDropIndexArg() {}
|
||||
void reset()
|
||||
@ -1412,6 +1413,7 @@ public:
|
||||
is_inner_ = false;
|
||||
is_vec_inner_drop_ = false;
|
||||
only_set_status_ = false;
|
||||
index_ids_.reset();
|
||||
}
|
||||
bool is_valid() const { return ObIndexArg::is_valid(); }
|
||||
uint64_t index_table_id_;
|
||||
@ -1421,6 +1423,7 @@ public:
|
||||
bool is_inner_;
|
||||
bool is_vec_inner_drop_;
|
||||
bool only_set_status_;
|
||||
common::ObSEArray<int64_t, 5> index_ids_;
|
||||
|
||||
DECLARE_VIRTUAL_TO_STRING;
|
||||
};
|
||||
|
@ -7767,7 +7767,7 @@ int ObDDLResolver::resolve_vec_index_constraint(
|
||||
} else if (!is_vector_memory_valid) {
|
||||
ret = OB_NOT_SUPPORTED;
|
||||
LOG_WARN("not support vector index when ob_vector_memory_limit_percentage is 0", K(ret));
|
||||
LOG_USER_ERROR(OB_NOT_SUPPORTED, "when ob_vector_memory_limit_percentage = 0 or memsotre_limit >= 85, vector index is");
|
||||
LOG_USER_ERROR(OB_NOT_SUPPORTED, "when ob_vector_memory_limit_percentage = 0 or memstore_limit >= 85, vector index is");
|
||||
} else {
|
||||
index_keyname_ = VEC_KEY;
|
||||
ParseNode *option_node = NULL;
|
||||
|
Loading…
x
Reference in New Issue
Block a user