diff --git a/src/share/vector_index/ob_plugin_vector_index_adaptor.cpp b/src/share/vector_index/ob_plugin_vector_index_adaptor.cpp index 032c5b4e9..2534e05b9 100644 --- a/src/share/vector_index/ob_plugin_vector_index_adaptor.cpp +++ b/src/share/vector_index/ob_plugin_vector_index_adaptor.cpp @@ -244,9 +244,11 @@ static int try_free_memdata_resource(ObVectorIndexRecordType type, return ret; } -ObPluginVectorIndexAdaptor::ObPluginVectorIndexAdaptor(common::ObIAllocator *allocator, lib::MemoryContext &entity) +ObPluginVectorIndexAdaptor::ObPluginVectorIndexAdaptor(common::ObIAllocator *allocator, + lib::MemoryContext &entity, + uint64_t tenant_id) : create_type_(CreateTypeMax), type_(VIAT_MAX), - algo_data_(nullptr), incr_data_(nullptr), snap_data_(nullptr), vbitmap_data_(nullptr), + algo_data_(nullptr), incr_data_(nullptr), snap_data_(nullptr), vbitmap_data_(nullptr), tenant_id_(tenant_id), snapshot_tablet_id_(ObTabletID(ObTabletID::INVALID_TABLET_ID)), inc_tablet_id_(ObTabletID(ObTabletID::INVALID_TABLET_ID)), vbitmap_tablet_id_(ObTabletID(ObTabletID::INVALID_TABLET_ID)), @@ -586,7 +588,7 @@ int ObPluginVectorIndexAdaptor::init_mem_data(ObVectorIndexRecordType type) } else if (type == VIRT_INC) { TCWLockGuard lock_guard(incr_data_->mem_data_rwlock_); if (!incr_data_->is_inited()) { - if (OB_FAIL(incr_data_->mem_ctx_->init(parent_mem_ctx_, all_vsag_use_mem_))) { + if (OB_FAIL(incr_data_->mem_ctx_->init(parent_mem_ctx_, all_vsag_use_mem_, tenant_id_))) { LOG_WARN("failed to init incr data mem ctx.", K(ret)); } else if (OB_FAIL(obvectorutil::create_index(incr_data_->index_, obvectorlib::HNSW_TYPE, @@ -641,7 +643,7 @@ int ObPluginVectorIndexAdaptor::init_mem_data(ObVectorIndexRecordType type) } else if (type == VIRT_SNAP) { TCWLockGuard lock_guard(snap_data_->mem_data_rwlock_); if (!snap_data_->is_inited()) { - if (OB_FAIL(snap_data_->mem_ctx_->init(parent_mem_ctx_, all_vsag_use_mem_))) { + if (OB_FAIL(snap_data_->mem_ctx_->init(parent_mem_ctx_, all_vsag_use_mem_, tenant_id_))) { LOG_WARN("failed to init incr data mem ctx.", K(ret)); } else if (OB_FAIL(obvectorutil::create_index(snap_data_->index_, obvectorlib::HNSW_TYPE, @@ -748,7 +750,7 @@ int ObPluginVectorIndexAdaptor::insert_rows(blocksstable::ObDatumRow *rows, { INIT_SUCC(ret); int64_t dim = 0; - ObArenaAllocator tmp_allocator("VectorAdaptor", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()); + ObArenaAllocator tmp_allocator("VectorAdaptor", OB_MALLOC_NORMAL_BLOCK_SIZE, tenant_id_); if (OB_ISNULL(rows)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("get rows null.", K(ret)); @@ -983,7 +985,7 @@ int ObPluginVectorIndexAdaptor::complete_delta_buffer_table_data(ObVectorQueryAd float *vectors = nullptr; uint64_t *vids = nullptr; int count = 0; - ObArenaAllocator tmp_allocator("VectorAdaptor", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()); + ObArenaAllocator tmp_allocator("VectorAdaptor", OB_MALLOC_NORMAL_BLOCK_SIZE, tenant_id_); if (OB_ISNULL(ctx)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("get invalid ctx.", K(ret)); @@ -1552,7 +1554,7 @@ int ObPluginVectorIndexAdaptor::query_result(ObVectorQueryAdaptorResultContext * } } else if (ctx->flag_ == PVQP_SECOND) { - ObArenaAllocator tmp_allocator("VectorAdaptor", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()); + ObArenaAllocator tmp_allocator("VectorAdaptor", OB_MALLOC_NORMAL_BLOCK_SIZE, tenant_id_); if (OB_ISNULL(query_cond->row_iter_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("get snapshot table iter null.", K(ret), KP(query_cond)); @@ -1583,11 +1585,12 @@ int ObPluginVectorIndexAdaptor::query_result(ObVectorQueryAdaptorResultContext * } int ObPluginVectorIndexAdaptor::cast_roaringbitmap_to_stdmap(const roaring::api::roaring64_bitmap_t *bitmap, - std::map &mymap) + std::map &mymap, + uint64_t tenant_id) { INIT_SUCC(ret); uint64_t bitmap_cnt = roaring64_bitmap_get_cardinality(bitmap); - ObArenaAllocator tmp_allocator("VectorAdaptor", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()); + ObArenaAllocator tmp_allocator("VectorAdaptor", OB_MALLOC_NORMAL_BLOCK_SIZE, tenant_id); uint64_t *buf = nullptr; if (bitmap_cnt == 0) { @@ -1967,8 +1970,8 @@ int ObPluginVectorIndexAdaptor::check_vsag_mem_used() mem_check_cnt_++; if (mem_check_cnt_ % 10 == 0) { mem_check_cnt_ %= 10; - if (OB_FAIL(ObPluginVectorIndexHelper::get_vector_memory_limit_size(MTL_ID(), mem_size))) { - LOG_WARN("failed to get vector mem limit size.", K(ret), K(MTL_ID())); + if (OB_FAIL(ObPluginVectorIndexHelper::get_vector_memory_limit_size(tenant_id_, mem_size))) { + LOG_WARN("failed to get vector mem limit size.", K(ret), K(tenant_id_)); } else if (ATOMIC_LOAD(all_vsag_use_mem_) > mem_size) { ret = OB_ERR_VSAG_MEM_LIMIT_EXCEEDED; LOG_USER_ERROR(OB_ERR_VSAG_MEM_LIMIT_EXCEEDED, (int)mem_size >> 20); @@ -2074,11 +2077,13 @@ void *ObVsagMemContext::Reallocate(void* p, size_t size) return new_ptr; } -int ObVsagMemContext::init(lib::MemoryContext &parent_mem_context, uint64_t *all_vsag_use_mem) +int ObVsagMemContext::init(lib::MemoryContext &parent_mem_context, + uint64_t *all_vsag_use_mem, + uint64_t tenant_id) { INIT_SUCC(ret); lib::ContextParam param; - ObMemAttr attr(MTL_ID(), "VIndexVsagADP"); + ObMemAttr attr(tenant_id, "VIndexVsagADP"); SET_IGNORE_MEM_VERSION(attr); param.set_mem_attr(attr) .set_page_size(OB_MALLOC_MIDDLE_BLOCK_SIZE) diff --git a/src/share/vector_index/ob_plugin_vector_index_adaptor.h b/src/share/vector_index/ob_plugin_vector_index_adaptor.h index e59e9a589..8a6849df3 100644 --- a/src/share/vector_index/ob_plugin_vector_index_adaptor.h +++ b/src/share/vector_index/ob_plugin_vector_index_adaptor.h @@ -321,7 +321,7 @@ class ObPluginVectorIndexAdaptor { public: friend class ObVsagMemContext; - ObPluginVectorIndexAdaptor(common::ObIAllocator *allocator, lib::MemoryContext &entity); + ObPluginVectorIndexAdaptor(common::ObIAllocator *allocator, lib::MemoryContext &entity, uint64_t tenant_id); ~ObPluginVectorIndexAdaptor(); int init(ObString init_str, int64_t dim, lib::MemoryContext &parent_mem_ctx, uint64_t *all_vsag_use_mem); @@ -329,6 +329,7 @@ public: int init(lib::MemoryContext &parent_mem_ctx, uint64_t *all_vsag_use_mem); int set_param(ObString init_str, int64_t dim); int get_index_type() { return type_; }; + uint64_t get_tenant_id() {return tenant_id_; }; // -- start 调试使用 void init_incr_tablet() {inc_tablet_id_ = ObTabletID(common::ObTabletID::MIN_VALID_TABLET_ID); } @@ -422,7 +423,8 @@ public: ObVectorIndexAlgorithmType &type, void *¶m); static int cast_roaringbitmap_to_stdmap(const roaring::api::roaring64_bitmap_t *bitmap, - std::map &mymap); + std::map &mymap, + uint64_t tenant_id); int check_vsag_mem_used(); uint64_t get_all_vsag_mem_used() { return ATOMIC_LOAD(all_vsag_use_mem_); @@ -471,7 +473,8 @@ public: ObAdapterCreateType &get_create_type() { return create_type_; }; void set_create_type(ObAdapterCreateType type) { create_type_ = type; }; - TO_STRING_KV(K_(create_type), K_(type), KP_(algo_data), KP_(incr_data), KP_(snap_data), KP_(vbitmap_data), + TO_STRING_KV(K_(create_type), K_(type), KP_(algo_data), + KP_(incr_data), KP_(snap_data), KP_(vbitmap_data), K_(tenant_id), K_(data_tablet_id),K_(rowkey_vid_tablet_id), K_(vid_rowkey_tablet_id), K_(inc_tablet_id), K_(vbitmap_tablet_id), K_(snapshot_tablet_id), K_(data_table_id), K_(rowkey_vid_table_id), K_(vid_rowkey_table_id), @@ -518,6 +521,8 @@ private: ObVectorIndexMemData *snap_data_; ObVectorIndexMemData *vbitmap_data_; + uint64_t tenant_id_; + ObTabletID snapshot_tablet_id_; ObTabletID inc_tablet_id_; ObTabletID vbitmap_tablet_id_; @@ -611,7 +616,7 @@ public: all_vsag_use_mem_ = nullptr; } } - int init(lib::MemoryContext &parent_mem_context, uint64_t *all_vsag_use_mem); + int init(lib::MemoryContext &parent_mem_context, uint64_t *all_vsag_use_mem, uint64_t tenant_id); bool is_inited() { return OB_NOT_NULL(mem_context_); } std::string Name() override { diff --git a/src/share/vector_index/ob_plugin_vector_index_scheduler.cpp b/src/share/vector_index/ob_plugin_vector_index_scheduler.cpp index 57811f3b2..194f59bbc 100644 --- a/src/share/vector_index/ob_plugin_vector_index_scheduler.cpp +++ b/src/share/vector_index/ob_plugin_vector_index_scheduler.cpp @@ -78,6 +78,8 @@ void ObPluginVectorIndexLoadScheduler::clean_deprecated_adapters() ObPluginVectorIndexAdaptor *adapter = iter->second; ObSchemaGetterGuard schema_guard; const ObTableSchema *table_schema; + ObTabletID tablet_id = iter->first; + ObTabletHandle tablet_handle; if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_schema_guard(tenant_id_, schema_guard))) { LOG_WARN("fail to get schema guard", KR(ret), K(tenant_id_)); } else if (OB_FAIL(schema_guard.get_table_schema(tenant_id_, adapter->get_vbitmap_table_id(), table_schema))) { @@ -94,11 +96,22 @@ void ObPluginVectorIndexLoadScheduler::clean_deprecated_adapters() LOG_WARN("push back table id failed", K(delete_tablet_id_array.count()), K(adapter->get_snap_tablet_id()), KR(ret)); } + } else if (OB_FAIL(ls_->get_tablet_svr()->get_tablet(tablet_id, tablet_handle))) { + if (OB_TABLET_NOT_EXIST != ret) { + LOG_WARN("fail to get tablet", K(ret), K(tablet_id)); + } else { + ret = OB_SUCCESS; // not found, moved from this ls + if (OB_FAIL(delete_tablet_id_array.push_back(tablet_id))) { + LOG_WARN("push back table id failed", + K(delete_tablet_id_array.count()), K(adapter->get_inc_tablet_id()), KR(ret)); + } + } } } - - LOG_INFO("try erase complete vector index adapter", - K(index_ls_mgr->get_ls_id()), K(delete_tablet_id_array.count())); // debug, remove later + if (delete_tablet_id_array.count() > 0) { + LOG_INFO("try erase complete vector index adapter", + K(index_ls_mgr->get_ls_id()), K(delete_tablet_id_array.count())); + } for (int64_t i = 0; OB_SUCC(ret) && i < delete_tablet_id_array.count(); i++) { if (OB_FAIL(index_ls_mgr->erase_complete_adapter(delete_tablet_id_array.at(i)))) { @@ -140,8 +153,10 @@ void ObPluginVectorIndexLoadScheduler::clean_deprecated_adapters() } } - LOG_INFO("try erase partial vector index adapter", - K(index_ls_mgr->get_ls_id()), K(delete_tablet_id_array.count())); // debug, remove later + if (delete_tablet_id_array.count() > 0) { + LOG_INFO("try erase partial vector index adapter", + K(index_ls_mgr->get_ls_id()), K(delete_tablet_id_array.count())); + } for (int64_t i = 0; OB_SUCC(ret) && i < delete_tablet_id_array.count(); i++) { if (OB_FAIL(index_ls_mgr->erase_partial_adapter(delete_tablet_id_array.at(i)))) { @@ -162,7 +177,6 @@ bool ObPluginVectorIndexLoadScheduler::check_can_do_work() { bool bret = true; int ret = OB_SUCCESS; - int64_t tenant_id = MTL_ID(); uint64_t tenant_data_version = 0; bool is_oracle_mode = false; @@ -170,15 +184,15 @@ bool ObPluginVectorIndexLoadScheduler::check_can_do_work() LOG_WARN("fail to check oracle mode", KR(ret), K_(tenant_id)); } else if (is_oracle_mode) { bret = false; - LOG_DEBUG("vector index not support oracle mode", K(tenant_id)); - } else if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id, tenant_data_version))) { + LOG_DEBUG("vector index not support oracle mode", K_(tenant_id)); + } else if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id_, tenant_data_version))) { bret = false; LOG_WARN("get tenant data version failed", K(ret)); } else if (tenant_data_version < DATA_VERSION_4_3_3_0) { bret = false; LOG_DEBUG("vector index can not work with data version less than 4_3_3", K(tenant_data_version)); - } else if (is_user_tenant(tenant_id)) { - if (OB_FAIL(GET_MIN_DATA_VERSION(gen_meta_tenant_id(tenant_id), tenant_data_version))) { + } else if (is_user_tenant(tenant_id_)) { + if (OB_FAIL(GET_MIN_DATA_VERSION(gen_meta_tenant_id(tenant_id_), tenant_data_version))) { bret = false; LOG_WARN("get tenant data version failed", K(ret)); } else if (tenant_data_version < DATA_VERSION_4_3_3_0) { @@ -209,10 +223,12 @@ int ObPluginVectorIndexLoadScheduler::check_schema_version() return ret; } -int ObPluginVectorIndexLoadScheduler::check_parital_index_adpter_exist(ObPluginVectorIndexMgr *mgr) +int ObPluginVectorIndexLoadScheduler::check_index_adpter_exist(ObPluginVectorIndexMgr *mgr) { int ret = OB_SUCCESS; - if (!mgr->get_partial_adapter_map().empty()) { + if (!mgr->get_partial_adapter_map().empty() || !mgr->get_complete_adapter_map().empty()) { + // partial map not empty, exist adapter create by dml/ddl data complement/query + // complete adapter not empty, also need check for transfer mark_tenant_need_check(); } return ret; @@ -386,7 +402,7 @@ int ObPluginVectorIndexLoadScheduler::execute_adapter_maintenance() ObSEArray table_id_array; ObVecIdxSharedTableInfoMap shared_table_info_map; - ObMemAttr memattr(MTL_ID(), "VecIdxInfo"); + ObMemAttr memattr(tenant_id_, "VecIdxInfo"); if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("tablet vector index scheduler not init", KR(ret)); @@ -836,7 +852,7 @@ int ObPluginVectorIndexLoadScheduler::check_and_execute_adapter_maintenance_task // if schema version change, or exist partial adapter(create by access) need do maintenance if (OB_FAIL(check_schema_version())) { LOG_WARN("fail to check schema version", KR(ret)); - } else if (OB_NOT_NULL(mgr) && OB_FAIL(check_parital_index_adpter_exist(mgr))) { + } else if (OB_NOT_NULL(mgr) && OB_FAIL(check_index_adpter_exist(mgr))) { LOG_WARN("fail to check exist paritial index adapter", KR(ret)); } else if (local_tenant_task_.need_check_) { if (OB_FAIL(execute_adapter_maintenance())) { @@ -965,7 +981,7 @@ int ObPluginVectorIndexLoadScheduler::check_and_execute_memdata_sync_task(ObPlug } else { // generate one task char *task_ctx_buf = - static_cast(mgr->get_task_allocator().alloc(sizeof(ObPluginVectorIndexTaskCtx))); + static_cast(mgr->get_processing_allocator().alloc(sizeof(ObPluginVectorIndexTaskCtx))); ObPluginVectorIndexTaskCtx* task_ctx = nullptr; if (OB_ISNULL(task_ctx_buf)) { ret = OB_ALLOCATE_MEMORY_FAILED; @@ -976,7 +992,7 @@ int ObPluginVectorIndexLoadScheduler::check_and_execute_memdata_sync_task(ObPlug } if (OB_FAIL(ret) && OB_NOT_NULL(task_ctx)) { task_ctx->~ObPluginVectorIndexTaskCtx(); - mgr->get_task_allocator().free(task_ctx); + mgr->get_processing_allocator().free(task_ctx); task_ctx = nullptr; } } @@ -1161,7 +1177,7 @@ int ObPluginVectorIndexLoadScheduler::handle_replay_result(ObVectorIndexSyncLog ObTabletID tablet_id = ls_log.get_tablet_id_array().at(i); uint64_t table_id = ls_log.get_table_id_array().at(i); char *task_ctx_buf = - static_cast(mgr->get_task_allocator().alloc(sizeof(ObPluginVectorIndexTaskCtx))); + static_cast(mgr->get_waiting_allocator().alloc(sizeof(ObPluginVectorIndexTaskCtx))); ObPluginVectorIndexTaskCtx* task_ctx = nullptr; if (OB_ISNULL(task_ctx_buf)) { @@ -1180,7 +1196,7 @@ int ObPluginVectorIndexLoadScheduler::handle_replay_result(ObVectorIndexSyncLog } if (OB_FAIL(ret) && OB_NOT_NULL(task_ctx)) { task_ctx->~ObPluginVectorIndexTaskCtx(); - mgr->get_task_allocator().free(task_ctx); // not really free + mgr->get_waiting_allocator().free(task_ctx); // not really free task_ctx = nullptr; } } @@ -1381,6 +1397,7 @@ int ObVectorIndexTask::init(ObPluginVectorIndexLoadScheduler *schedular, ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", KR(ret), KP(schedular), KP(mgr), KP(task_ctx)); } else { + allocator_.set_tenant_id(mgr->get_tenant_id()); ls_id_ = mgr->get_ls_id(); vec_idx_scheduler_ = schedular; vec_idx_mgr_ = mgr; @@ -1411,7 +1428,8 @@ int ObVectorIndexTask::process() while(!need_stop && OB_SUCC(ret)) { // need set context? should set attr in constructor lib::ContextParam param; - param.set_mem_attr(MTL_ID(), "VecIdxTaskCtx", ObCtxIds::DEFAULT_CTX_ID) // 这里是dag的MTL + // use dag mtl id for param refer to TTLtask + param.set_mem_attr(MTL_ID(), "VecIdxTaskCtx", ObCtxIds::DEFAULT_CTX_ID) .set_properties(lib::USE_TL_PAGE_OPTIONAL); CREATE_WITH_TEMP_CONTEXT(param) { if (OB_FAIL(process_one())) { diff --git a/src/share/vector_index/ob_plugin_vector_index_scheduler.h b/src/share/vector_index/ob_plugin_vector_index_scheduler.h index ed5ee35bd..5fd949057 100644 --- a/src/share/vector_index/ob_plugin_vector_index_scheduler.h +++ b/src/share/vector_index/ob_plugin_vector_index_scheduler.h @@ -250,7 +250,7 @@ public: bool &is_vector_index_table, bool &is_shared_index_table); void clean_deprecated_adapters(); - int check_parital_index_adpter_exist(ObPluginVectorIndexMgr *mgr); + int check_index_adpter_exist(ObPluginVectorIndexMgr *mgr); int log_tablets_need_memdata_sync(ObPluginVectorIndexMgr *mgr); int execute_all_memdata_sync_task(ObPluginVectorIndexMgr *mgr); diff --git a/src/share/vector_index/ob_plugin_vector_index_service.cpp b/src/share/vector_index/ob_plugin_vector_index_service.cpp index a3c194367..a2a4b3b4b 100644 --- a/src/share/vector_index/ob_plugin_vector_index_service.cpp +++ b/src/share/vector_index/ob_plugin_vector_index_service.cpp @@ -36,12 +36,13 @@ void ObPluginVectorIndexMgr::destroy() release_all_adapters(); partial_index_adpt_map_.destroy(); complete_index_adpt_map_.destroy(); - first_mem_sync_map_.destroy(); - second_mem_sync_map_.destroy(); + // elements memory in adpt map will be released by allocator in service, refine later; - // elements memory in mem_sync_map should be released here, they are alloc by ob_malloc; - // should use 2 allocator to avoid accumulation - task_allocator_.reset(); + // elements memory in mem_sync_map should be released here + first_mem_sync_map_.destroy(); + first_task_allocator_.reset(); + second_mem_sync_map_.destroy(); + second_task_allocator_.reset(); } } @@ -74,13 +75,13 @@ int ObPluginVectorIndexMgr::init(uint64_t tenant_id, { int ret = OB_SUCCESS; int64_t hash_capacity = common::hash::cal_next_prime(DEFAULT_ADAPTER_HASH_SIZE); - if (OB_FAIL(complete_index_adpt_map_.create(hash_capacity, "VecIdxAdpt"))) { + if (OB_FAIL(complete_index_adpt_map_.create(hash_capacity, "VecIdxAdptMap", "VecIdxAdptMap", tenant_id))) { LOG_WARN("fail to create full index adapter map", K(ls_id), KR(ret)); - } else if (OB_FAIL(partial_index_adpt_map_.create(hash_capacity, "VecIdxAdpt"))) { + } else if (OB_FAIL(partial_index_adpt_map_.create(hash_capacity, "VecIdxAdptMap", "VecIdxAdptMap", tenant_id))) { LOG_WARN("fail to create partial index adapter map", K(ls_id), KR(ret)); - } else if (OB_FAIL(first_mem_sync_map_.create(hash_capacity, "VecIdxAdpt", "VecIdxAdpt"))) { + } else if (OB_FAIL(first_mem_sync_map_.create(hash_capacity, "VecIdxTaskMap", "VecIdxTaskMap", tenant_id))) { LOG_WARN("fail to create first mem sync set", K(ls_id), KR(ret)); - } else if (OB_FAIL(second_mem_sync_map_.create(hash_capacity, "VecIdxAdpt", "VecIdxAdpt"))) { + } else if (OB_FAIL(second_mem_sync_map_.create(hash_capacity, "VecIdxTaskMap", "VecIdxTaskMap", tenant_id))) { LOG_WARN("fail to create second mem sync set", K(ls_id), KR(ret)); } else { ls_tablet_task_ctx_.task_id_ = 0; @@ -225,7 +226,7 @@ int ObPluginVectorIndexMgr::create_partial_adapter(ObTabletID idx_tablet_id, ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("failed to allocate memory for vector index adapter", KR(ret)); } else { - tmp_vec_idx_adpt = new(adpt_buff)ObPluginVectorIndexAdaptor(&allocator, memory_context_); + tmp_vec_idx_adpt = new(adpt_buff)ObPluginVectorIndexAdaptor(&allocator, memory_context_, tenant_id_); ObVectorIndexRecordType record_type = ObPluginVectorIndexUtils::index_type_to_record_type(type); if (record_type >= VIRT_MAX) { ret = OB_ERR_UNEXPECTED; @@ -395,6 +396,7 @@ int ObPluginVectorIndexMgr::check_need_mem_data_sync_task(bool &need_sync) if (get_processing_map().size() > 0) { if (ls_tablet_task_ctx_.all_finished_) { // is false get_processing_map().reuse(); + get_processing_allocator().reset(); // release task ctx ls_tablet_task_ctx_.all_finished_ = false; LOG_INFO("release processing set to waiting set", @@ -652,7 +654,7 @@ int ObPluginVectorIndexService::acquire_vector_index_mgr(ObLSID ls_id, ObPluginV ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("failed to allocate memeory for new vector index mgr", KR(ret)); } else { - ObPluginVectorIndexMgr *new_ls_index_mgr = new(mgr_buff)ObPluginVectorIndexMgr(memory_context_); + ObPluginVectorIndexMgr *new_ls_index_mgr = new(mgr_buff)ObPluginVectorIndexMgr(memory_context_, tenant_id_); if (OB_FAIL(new_ls_index_mgr->init(tenant_id_, ls_id, memory_context_, &all_vsag_use_mem_))) { LOG_WARN("failed to init ls vector index mgr", K(ls_id), KR(ret)); } else if (OB_FAIL(get_ls_index_mgr_map().set_refactored(ls_id, new_ls_index_mgr))) { @@ -750,7 +752,7 @@ int ObPluginVectorIndexService::init(const uint64_t tenant_id, ObLSService *ls_service) { int ret = OB_SUCCESS; - lib::ObMemAttr mem_attr(MTL_ID(), "VecIdxSrv"); + lib::ObMemAttr mem_attr(tenant_id, "VecIdxSrv"); if (IS_INIT) { ret = OB_INIT_TWICE; LOG_WARN("init twice", KR(ret), K(tenant_id)); @@ -759,14 +761,17 @@ int ObPluginVectorIndexService::init(const uint64_t tenant_id, || OB_ISNULL(ls_service)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument to init ObPluginVectorIndexService", KR(ret), K(tenant_id)); - } else if (OB_FAIL(index_ls_mgr_map_.create(common::hash::cal_next_prime(DEFAULT_LS_HASH_SIZE), "VecIdxLSMgr"))) { + } else if (OB_FAIL(index_ls_mgr_map_.create(common::hash::cal_next_prime(DEFAULT_LS_HASH_SIZE), + "VecIdxLSMgr", + "VecIdxLSMgr", + tenant_id))) { LOG_WARN("create ls mgr ", KR(ret), K(tenant_id)); - } else if (FALSE_IT(alloc_.set_tenant_id(MTL_ID()))) { + } else if (FALSE_IT(alloc_.set_tenant_id(tenant_id))) { } else if (OB_FAIL(allocator_.init(&alloc_, OB_MALLOC_MIDDLE_BLOCK_SIZE, mem_attr))) { LOG_WARN("ObTenantSrs allocator init failed.", K(ret)); } else { lib::ContextParam param; - param.set_mem_attr(MTL_ID()) + param.set_mem_attr(tenant_id) .set_properties(lib::ADD_CHILD_THREAD_SAFE | lib::ALLOC_THREAD_SAFE | lib::RETURN_MALLOC_DEFAULT) .set_page_size(OB_MALLOC_MIDDLE_BLOCK_SIZE) .set_label("VectorIndexVsag") @@ -820,7 +825,7 @@ void ObPluginVectorIndexService::wait() } } -// ToDo: debug functions, remove later virtual-table ready +// debug functions void ObPluginVectorIndexMgr::dump_all_inst() { int ret = OB_SUCCESS; @@ -897,7 +902,7 @@ int ObPluginVectorIndexMgr::replace_with_complete_adapter(ObVectorIndexAdapterCa ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("failed to allocate memory for vector index adapter", KR(ret)); } else { - new_adapter = new(adpt_buff)ObPluginVectorIndexAdaptor(&allocator, memory_context_); + new_adapter = new(adpt_buff)ObPluginVectorIndexAdaptor(&allocator, memory_context_, tenant_id_); new_adapter->set_create_type(CreateTypeComplete); if (OB_FAIL(new_adapter->merge_parital_index_adapter(inc_adapter_guard.get_adatper()))) { LOG_WARN("failed to merge inc index adapter", KPC(inc_adapter_guard.get_adatper()), KR(ret)); @@ -981,7 +986,7 @@ int ObPluginVectorIndexMgr::replace_with_full_partial_adapter(ObVectorIndexAcqui ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("failed to allocate memory for vector index adapter", KR(ret)); } else { - new_adapter = new(adpt_buff)ObPluginVectorIndexAdaptor(&allocator, memory_context_); + new_adapter = new(adpt_buff)ObPluginVectorIndexAdaptor(&allocator, memory_context_, tenant_id_); new_adapter->set_create_type(CreateTypeFullPartial); if (OB_FAIL(new_adapter->set_tablet_id(VIRT_INC, ctx.inc_tablet_id_))) { LOG_WARN("failed to set inc tablet id", K(ctx), KR(ret)); diff --git a/src/share/vector_index/ob_plugin_vector_index_service.h b/src/share/vector_index/ob_plugin_vector_index_service.h index 466d5097f..044775e0b 100644 --- a/src/share/vector_index/ob_plugin_vector_index_service.h +++ b/src/share/vector_index/ob_plugin_vector_index_service.h @@ -73,7 +73,7 @@ typedef common::hash::ObHashMap class ObPluginVectorIndexMgr { public: - ObPluginVectorIndexMgr(lib::MemoryContext &memory_context) + ObPluginVectorIndexMgr(lib::MemoryContext &memory_context, uint64_t tenant_id) : is_inited_(false), need_check_(false), ls_id_(), @@ -81,13 +81,14 @@ public: partial_index_adpt_map_(), adapter_map_rwlock_(), ls_tablet_task_ctx_(), - tenant_id_(0), + tenant_id_(tenant_id), interval_factor_(0), vector_index_service_(nullptr), processing_first_mem_sync_(true), first_mem_sync_map_(), second_mem_sync_map_(), - task_allocator_(ObMemAttr(MTL_ID(), "VecIdxTask")), + first_task_allocator_(ObMemAttr(tenant_id, "VecIdxTask")), + second_task_allocator_(ObMemAttr(tenant_id, "VecIdxTask")), memory_context_(memory_context), all_vsag_use_mem_(nullptr) {} @@ -151,10 +152,10 @@ public: VectorIndexMemSyncMap &get_processing_map() { return processing_first_mem_sync_ ? first_mem_sync_map_ : second_mem_sync_map_; } VectorIndexMemSyncMap &get_waiting_map() { return processing_first_mem_sync_ ? second_mem_sync_map_ : first_mem_sync_map_; } + ObIAllocator &get_processing_allocator() { return processing_first_mem_sync_ ? first_task_allocator_ : second_task_allocator_; } + ObIAllocator &get_waiting_allocator() { return processing_first_mem_sync_ ? second_task_allocator_ : first_task_allocator_; } void switch_processing_map() { processing_first_mem_sync_ = !processing_first_mem_sync_; } - ObIAllocator &get_task_allocator() { return task_allocator_; } - // debug interface void dump_all_inst(); // for virtual table @@ -197,11 +198,12 @@ private: int64_t local_schema_version_; // detect schema change - // pingpong map for follower receive memdata sync task from log + // pingpong map/allocator for follower receive memdata sync task from log bool processing_first_mem_sync_; VectorIndexMemSyncMap first_mem_sync_map_; VectorIndexMemSyncMap second_mem_sync_map_; - ObArenaAllocator task_allocator_; + ObArenaAllocator first_task_allocator_; + ObArenaAllocator second_task_allocator_; lib::MemoryContext &memory_context_; uint64_t *all_vsag_use_mem_; }; diff --git a/src/share/vector_index/ob_plugin_vector_index_utils.cpp b/src/share/vector_index/ob_plugin_vector_index_utils.cpp index 08ed976dd..1b60f59cc 100644 --- a/src/share/vector_index/ob_plugin_vector_index_utils.cpp +++ b/src/share/vector_index/ob_plugin_vector_index_utils.cpp @@ -192,14 +192,13 @@ int ObPluginVectorIndexUtils::read_object_from_vid_rowkey_table_iter(ObObj *inpu int ObPluginVectorIndexUtils::get_vec_column_id( ObSEArray &vector_column_ids, uint64_t incr_index_table_id, - uint64_t data_table_id) + uint64_t data_table_id, + uint64_t tenant_id) { INIT_SUCC(ret); ObSchemaGetterGuard schema_guard; const ObTableSchema *delta_buffer_schema = nullptr; const ObTableSchema *table_schema = nullptr; - uint64_t tenant_id = MTL_ID(); - ObMultiVersionSchemaService *schema_service = MTL(schema::ObTenantSchemaService*)->get_schema_service(); if (OB_ISNULL(schema_service)) { ret = OB_ERR_UNEXPECTED; @@ -520,52 +519,54 @@ int ObPluginVectorIndexUtils::refresh_memdata(ObLSID &ls_id, LOG_WARN("fail to test read local data.", K(ret), K(ls_id), K(INDEX_TYPE_IS_NOT)); } #endif - - common::ObNewRowIterator *delta_buf_iter = nullptr; - ObAccessService *tsc_service = MTL(ObAccessService *); - storage::ObTableScanParam inc_scan_param; - schema::ObTableParam inc_table_param(allocator); - ObArenaAllocator tmp_allocator("VectorAdaptor", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()); - if (OB_ISNULL(adapter)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("invalid adapter", K(ret), KPC(adapter)); - } else if (OB_FAIL(read_local_tablet(ls_id, - adapter, - target_scn, - INDEX_TYPE_VEC_DELTA_BUFFER_LOCAL, - allocator, - inc_scan_param, - inc_table_param, - delta_buf_iter))) { - LOG_WARN("fail to read local tablet", KR(ret), K(ls_id), K(INDEX_TYPE_VEC_DELTA_BUFFER_LOCAL), KPC(adapter)); } else { - ObVectorQueryAdaptorResultContext ada_ctx(&allocator, &tmp_allocator); - if (OB_FAIL(adapter->check_delta_buffer_table_readnext_status(&ada_ctx, delta_buf_iter, target_scn))) { - LOG_WARN("fail to check_delta_buffer_table_readnext_status.", K(ret)); - } else if (OB_FAIL(try_sync_vbitmap_memdata(ls_id, adapter, target_scn, allocator, ada_ctx))) { - LOG_WARN("failed to sync vbitmap", KR(ret)); - } else if (ada_ctx.get_status() == PVQ_COM_DATA) { - if (OB_FAIL(read_vector_info(adapter, allocator, ls_id, target_scn, ada_ctx))) { - LOG_WARN("failed to read vector_info", KR(ret)); - } else if (OB_FAIL(adapter->complete_delta_buffer_table_data(&ada_ctx))) { - LOG_WARN("failed to complete delta buffer", KR(ret)); - } else if (OB_FAIL(try_sync_snapshot_memdata(ls_id, adapter, target_scn, allocator, ada_ctx))) { - LOG_WARN("failed to refresh mem snapshots", KR(ret)); + MTL_SWITCH(adapter->get_tenant_id()) { + common::ObNewRowIterator *delta_buf_iter = nullptr; + ObAccessService *tsc_service = MTL(ObAccessService *); + storage::ObTableScanParam inc_scan_param; + schema::ObTableParam inc_table_param(allocator); + if (OB_FAIL(read_local_tablet(ls_id, + adapter, + target_scn, + INDEX_TYPE_VEC_DELTA_BUFFER_LOCAL, + allocator, + inc_scan_param, + inc_table_param, + delta_buf_iter))) { + LOG_WARN("fail to read local tablet", KR(ret), K(ls_id), K(INDEX_TYPE_VEC_DELTA_BUFFER_LOCAL), KPC(adapter)); + } else { + ObArenaAllocator tmp_allocator("VectorAdaptor", OB_MALLOC_NORMAL_BLOCK_SIZE, adapter->get_tenant_id()); + ObVectorQueryAdaptorResultContext ada_ctx(&allocator, &tmp_allocator); + if (OB_FAIL(adapter->check_delta_buffer_table_readnext_status(&ada_ctx, delta_buf_iter, target_scn))) { + LOG_WARN("fail to check_delta_buffer_table_readnext_status.", K(ret)); + } else if (OB_FAIL(try_sync_vbitmap_memdata(ls_id, adapter, target_scn, allocator, ada_ctx))) { + LOG_WARN("failed to sync vbitmap", KR(ret)); + } else if (ada_ctx.get_status() == PVQ_COM_DATA) { + if (OB_FAIL(read_vector_info(adapter, allocator, ls_id, target_scn, ada_ctx))) { + LOG_WARN("failed to read vector_info", KR(ret)); + } else if (OB_FAIL(adapter->complete_delta_buffer_table_data(&ada_ctx))) { + LOG_WARN("failed to complete delta buffer", KR(ret)); + } else if (OB_FAIL(try_sync_snapshot_memdata(ls_id, adapter, target_scn, allocator, ada_ctx))) { + LOG_WARN("failed to refresh mem snapshots", KR(ret)); + } + } else if (ada_ctx.get_status() == PVQ_LACK_SCN) { + if (OB_FAIL(try_sync_snapshot_memdata(ls_id, adapter, target_scn, allocator, ada_ctx))) { + LOG_WARN("failed to refresh mem snapshots without refresh incr", KR(ret)); + } + } } - } else if (ada_ctx.get_status() == PVQ_LACK_SCN) { - if (OB_FAIL(try_sync_snapshot_memdata(ls_id, adapter, target_scn, allocator, ada_ctx))) { - LOG_WARN("failed to refresh mem snapshots without refresh incr", KR(ret)); - } - } - } - if (OB_NOT_NULL(delta_buf_iter) && OB_NOT_NULL(tsc_service)) { - int tmp_ret = tsc_service->revert_scan_iter(delta_buf_iter); - if (tmp_ret != OB_SUCCESS) { - LOG_WARN("revert delta_buf_iter failed", K(ret)); + if (OB_NOT_NULL(delta_buf_iter) && OB_NOT_NULL(tsc_service)) { + int tmp_ret = tsc_service->revert_scan_iter(delta_buf_iter); + if (tmp_ret != OB_SUCCESS) { + LOG_WARN("revert delta_buf_iter failed", K(ret)); + } + delta_buf_iter = nullptr; + } } - delta_buf_iter = nullptr; } return ret; } @@ -644,7 +645,6 @@ int ObPluginVectorIndexUtils::read_local_tablet(ObLSID &ls_id, target_scn))) { LOG_WARN("fail to get tablet handle", KR(ret), K(tablet_id)); } else { - uint64_t tenant_id = MTL_ID(); scan_param.ls_id_ = ls_id; scan_param.tablet_id_ = tablet_id; scan_param.schema_version_ = tablet_handle.get_obj()->get_tablet_meta().max_sync_storage_schema_version_; @@ -687,7 +687,7 @@ int ObPluginVectorIndexUtils::read_local_tablet(ObLSID &ls_id, } } else { // vid_rowkey table or data table, get rowkey while complete - if (OB_FAIL(get_shared_table_rowkey_colum_count(type, table_id, col_cnt))) { + if (OB_FAIL(get_shared_table_rowkey_colum_count(type, adapter->get_tenant_id(), table_id, col_cnt))) { LOG_WARN("fail to get index aux table colum count", KR(ret), K(type)); } else if (OB_ISNULL(buf = allocator.alloc(sizeof(ObObj) * col_cnt * 2))) { ret = OB_ERR_UNEXPECTED; @@ -763,7 +763,11 @@ int ObPluginVectorIndexUtils::init_common_scan_param(storage::ObTableScanParam& uint32 col_cnt = 0; if (is_vec_index(type) || type == INDEX_TYPE_IS_NOT){ - if (OB_FAIL(get_special_index_aux_table_column_count(type, table_id, col_cnt, scan_param))) { + if(OB_ISNULL(adapter)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get null adapter", KR(ret), K(type)); + } else if (OB_FAIL(get_special_index_aux_table_column_count(type, adapter->get_tenant_id(), + table_id, col_cnt, scan_param))) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected index type", KR(ret), K(type)); } @@ -819,9 +823,18 @@ int ObPluginVectorIndexUtils::init_table_param(ObTableParam *table_param, ObSchemaGetterGuard schema_guard; const ObTableSchema *table_schema = NULL; ObSEArray column_ids; - if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_schema_guard(MTL_ID(), schema_guard))) { - LOG_WARN("fail to get schema guard", KR(ret), K(MTL_ID())); - } else if (OB_FAIL(schema_guard.get_table_schema(MTL_ID(), table_id, table_schema))) { + uint64_t tenant_id = 0; + if (OB_ISNULL(adapter)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get null adapter", KR(ret), K(inc_table_id), K(data_table_id), K(table_id), K(type)); + } else { + tenant_id = adapter->get_tenant_id(); + } + + if (OB_FAIL(ret)) { + } else if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_schema_guard(tenant_id, schema_guard))) { + LOG_WARN("fail to get schema guard", KR(ret), K(tenant_id)); + } else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, table_id, table_schema))) { LOG_WARN("fail to get schema", KR(ret), KR(table_id)); } else if (OB_ISNULL(table_schema)) { ret = OB_TABLE_NOT_EXIST; // table may be removed, handle in scheduler routine @@ -829,7 +842,7 @@ int ObPluginVectorIndexUtils::init_table_param(ObTableParam *table_param, } else if (is_vec_delta_buffer_type(type)) { ObArray tmp_column_ids; const ObTableSchema *data_table_schema = NULL; - if (OB_FAIL(schema_guard.get_table_schema(MTL_ID(), data_table_id, data_table_schema))) { + if (OB_FAIL(schema_guard.get_table_schema(tenant_id, data_table_id, data_table_schema))) { LOG_WARN("fail to get schema", KR(ret), KR(data_table_id)); } else if (OB_ISNULL(table_schema) || OB_ISNULL(data_table_schema)) { ret = OB_TABLE_NOT_EXIST; // table may be removed, handle in scheduler routine @@ -877,7 +890,7 @@ int ObPluginVectorIndexUtils::init_table_param(ObTableParam *table_param, // different with other index, refer to ObTscCgService::extract_vec_ir_access_columns ObArray tmp_column_ids; const ObTableSchema *data_table_schema = NULL; - if (OB_FAIL(schema_guard.get_table_schema(MTL_ID(), data_table_id, data_table_schema))) { + if (OB_FAIL(schema_guard.get_table_schema(tenant_id, data_table_id, data_table_schema))) { LOG_WARN("fail to get schema", KR(ret), KR(data_table_id)); } else if (OB_ISNULL(table_schema) || OB_ISNULL(data_table_schema)) { ret = OB_TABLE_NOT_EXIST; // table may be removed, handle in scheduler routine @@ -961,7 +974,7 @@ int ObPluginVectorIndexUtils::init_table_param(ObTableParam *table_param, } else if (is_vec_index_snapshot_data_type(type)) { const ObTableSchema *data_table_schema = NULL; ObSEArray tmp_column_ids; - if (OB_FAIL(schema_guard.get_table_schema(MTL_ID(), data_table_id, data_table_schema))) { + if (OB_FAIL(schema_guard.get_table_schema(tenant_id, data_table_id, data_table_schema))) { LOG_WARN("fail to get schema", KR(ret), KR(data_table_id)); } else if (OB_ISNULL(table_schema) || OB_ISNULL(data_table_schema)) { ret = OB_TABLE_NOT_EXIST; // table may be removed, handle in scheduler routine @@ -1007,7 +1020,7 @@ int ObPluginVectorIndexUtils::init_table_param(ObTableParam *table_param, } } } else if (type == INDEX_TYPE_IS_NOT) { - if (OB_FAIL(get_vec_column_id(column_ids, inc_table_id, table_id))) { + if (OB_FAIL(get_vec_column_id(column_ids, inc_table_id, table_id, tenant_id))) { LOG_WARN("failed to get vec column id.", K(ret)); } else { if (OB_FAIL(table_param->convert(*table_schema, column_ids, sql::ObStoragePushdownFlag()))) { @@ -1044,6 +1057,7 @@ int ObPluginVectorIndexUtils::get_non_shared_index_aux_table_colum_count(schema: int ObPluginVectorIndexUtils::get_special_index_aux_table_column_count( schema::ObIndexType type, + uint64_t tenant_id, uint64_t table_id, uint32 &col_cnt, storage::ObTableScanParam& scan_param) @@ -1052,9 +1066,9 @@ int ObPluginVectorIndexUtils::get_special_index_aux_table_column_count( ObSchemaGetterGuard schema_guard; const ObTableSchema *table_schema = NULL; ObSEArray column_ids; - if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_schema_guard(MTL_ID(), schema_guard))) { - LOG_WARN("fail to get schema guard", KR(ret), K(MTL_ID())); - } else if (OB_FAIL(schema_guard.get_table_schema(MTL_ID(), table_id, table_schema))) { + if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_schema_guard(tenant_id, schema_guard))) { + LOG_WARN("fail to get schema guard", KR(ret), K(tenant_id)); + } else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, table_id, table_schema))) { LOG_WARN("fail to get schema", KR(ret), K(table_id)); } else if (OB_ISNULL(table_schema)) { ret = OB_TABLE_NOT_EXIST; // table may be removed, handle in scheduler routine @@ -1067,7 +1081,7 @@ int ObPluginVectorIndexUtils::get_special_index_aux_table_column_count( col_cnt = column_ids.count(); } if (OB_SUCC(ret)) { - LOG_INFO("get_special_index_aux_table_column_count", K(type), K(col_cnt), K(column_ids)); // remove after debug; + LOG_INFO("get_special_index_aux_table_column_count", K(type), K(col_cnt), K(column_ids), K(tenant_id)); // remove after debug; } return ret; } @@ -1099,6 +1113,7 @@ int ObPluginVectorIndexUtils::get_non_shared_index_aux_table_rowkey_colum_count( } int ObPluginVectorIndexUtils::get_shared_table_rowkey_colum_count(schema::ObIndexType type, + uint64_t tenant_id, uint64_t table_id, uint32 &col_cnt) { @@ -1106,9 +1121,9 @@ int ObPluginVectorIndexUtils::get_shared_table_rowkey_colum_count(schema::ObInde ObSchemaGetterGuard schema_guard; const ObTableSchema *table_schema = NULL; ObSEArray column_ids; - if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_schema_guard(MTL_ID(), schema_guard))) { - LOG_WARN("fail to get schema guard", KR(ret), K(MTL_ID())); - } else if (OB_FAIL(schema_guard.get_table_schema(MTL_ID(), table_id, table_schema))) { + if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_schema_guard(tenant_id, schema_guard))) { + LOG_WARN("fail to get schema guard", KR(ret), K(tenant_id)); + } else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, table_id, table_schema))) { LOG_WARN("fail to get schema", KR(ret), KR(table_id)); } else if (OB_ISNULL(table_schema)) { ret = OB_TABLE_NOT_EXIST; // table may be removed, handle in scheduler routine diff --git a/src/share/vector_index/ob_plugin_vector_index_utils.h b/src/share/vector_index/ob_plugin_vector_index_utils.h index 6bbd083ea..01650d898 100644 --- a/src/share/vector_index/ob_plugin_vector_index_utils.h +++ b/src/share/vector_index/ob_plugin_vector_index_utils.h @@ -86,7 +86,8 @@ public: bool &get_data); static int get_vec_column_id (ObSEArray &vector_column_ids, uint64_t incr_index_table_id, - uint64_t data_table_id); + uint64_t data_table_id, + uint64_t tenant_id); static int read_vector_info(ObPluginVectorIndexAdaptor *adapter, ObIAllocator &allocator, ObLSID &ls_id, @@ -122,12 +123,15 @@ private: ObPluginVectorIndexAdaptor *adapter); static int get_non_shared_index_aux_table_colum_count(schema::ObIndexType type, uint32 &col_cnt); static int get_non_shared_index_aux_table_rowkey_colum_count(schema::ObIndexType type, uint32 &col_cnt); - static int get_special_index_aux_table_column_count( - schema::ObIndexType type, - uint64_t table_id, - uint32 &col_cnt, - storage::ObTableScanParam& scan_param); - static int get_shared_table_rowkey_colum_count(schema::ObIndexType type, uint64_t table_id, uint32 &col_cnt); + static int get_special_index_aux_table_column_count(schema::ObIndexType type, + uint64_t tenant_id, + uint64_t table_id, + uint32 &col_cnt, + storage::ObTableScanParam& scan_param); + static int get_shared_table_rowkey_colum_count(schema::ObIndexType type, + uint64_t tenant_id, + uint64_t table_id, + uint32 &col_cnt); static int try_sync_snapshot_memdata(ObLSID &ls_id, ObPluginVectorIndexAdaptor *adapter, SCN &target_scn,