fix adaptors not removed properly in some cases

This commit is contained in:
obdev 2024-09-18 05:22:31 +00:00 committed by ob-robot
parent 8263f2ec96
commit 0f2c8934f8
8 changed files with 182 additions and 128 deletions

View File

@ -244,9 +244,11 @@ static int try_free_memdata_resource(ObVectorIndexRecordType type,
return ret;
}
ObPluginVectorIndexAdaptor::ObPluginVectorIndexAdaptor(common::ObIAllocator *allocator, lib::MemoryContext &entity)
ObPluginVectorIndexAdaptor::ObPluginVectorIndexAdaptor(common::ObIAllocator *allocator,
lib::MemoryContext &entity,
uint64_t tenant_id)
: create_type_(CreateTypeMax), type_(VIAT_MAX),
algo_data_(nullptr), incr_data_(nullptr), snap_data_(nullptr), vbitmap_data_(nullptr),
algo_data_(nullptr), incr_data_(nullptr), snap_data_(nullptr), vbitmap_data_(nullptr), tenant_id_(tenant_id),
snapshot_tablet_id_(ObTabletID(ObTabletID::INVALID_TABLET_ID)),
inc_tablet_id_(ObTabletID(ObTabletID::INVALID_TABLET_ID)),
vbitmap_tablet_id_(ObTabletID(ObTabletID::INVALID_TABLET_ID)),
@ -586,7 +588,7 @@ int ObPluginVectorIndexAdaptor::init_mem_data(ObVectorIndexRecordType type)
} else if (type == VIRT_INC) {
TCWLockGuard lock_guard(incr_data_->mem_data_rwlock_);
if (!incr_data_->is_inited()) {
if (OB_FAIL(incr_data_->mem_ctx_->init(parent_mem_ctx_, all_vsag_use_mem_))) {
if (OB_FAIL(incr_data_->mem_ctx_->init(parent_mem_ctx_, all_vsag_use_mem_, tenant_id_))) {
LOG_WARN("failed to init incr data mem ctx.", K(ret));
} else if (OB_FAIL(obvectorutil::create_index(incr_data_->index_,
obvectorlib::HNSW_TYPE,
@ -641,7 +643,7 @@ int ObPluginVectorIndexAdaptor::init_mem_data(ObVectorIndexRecordType type)
} else if (type == VIRT_SNAP) {
TCWLockGuard lock_guard(snap_data_->mem_data_rwlock_);
if (!snap_data_->is_inited()) {
if (OB_FAIL(snap_data_->mem_ctx_->init(parent_mem_ctx_, all_vsag_use_mem_))) {
if (OB_FAIL(snap_data_->mem_ctx_->init(parent_mem_ctx_, all_vsag_use_mem_, tenant_id_))) {
LOG_WARN("failed to init incr data mem ctx.", K(ret));
} else if (OB_FAIL(obvectorutil::create_index(snap_data_->index_,
obvectorlib::HNSW_TYPE,
@ -748,7 +750,7 @@ int ObPluginVectorIndexAdaptor::insert_rows(blocksstable::ObDatumRow *rows,
{
INIT_SUCC(ret);
int64_t dim = 0;
ObArenaAllocator tmp_allocator("VectorAdaptor", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID());
ObArenaAllocator tmp_allocator("VectorAdaptor", OB_MALLOC_NORMAL_BLOCK_SIZE, tenant_id_);
if (OB_ISNULL(rows)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get rows null.", K(ret));
@ -983,7 +985,7 @@ int ObPluginVectorIndexAdaptor::complete_delta_buffer_table_data(ObVectorQueryAd
float *vectors = nullptr;
uint64_t *vids = nullptr;
int count = 0;
ObArenaAllocator tmp_allocator("VectorAdaptor", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID());
ObArenaAllocator tmp_allocator("VectorAdaptor", OB_MALLOC_NORMAL_BLOCK_SIZE, tenant_id_);
if (OB_ISNULL(ctx)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get invalid ctx.", K(ret));
@ -1552,7 +1554,7 @@ int ObPluginVectorIndexAdaptor::query_result(ObVectorQueryAdaptorResultContext *
}
} else if (ctx->flag_ == PVQP_SECOND) {
ObArenaAllocator tmp_allocator("VectorAdaptor", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID());
ObArenaAllocator tmp_allocator("VectorAdaptor", OB_MALLOC_NORMAL_BLOCK_SIZE, tenant_id_);
if (OB_ISNULL(query_cond->row_iter_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get snapshot table iter null.", K(ret), KP(query_cond));
@ -1583,11 +1585,12 @@ int ObPluginVectorIndexAdaptor::query_result(ObVectorQueryAdaptorResultContext *
}
int ObPluginVectorIndexAdaptor::cast_roaringbitmap_to_stdmap(const roaring::api::roaring64_bitmap_t *bitmap,
std::map<int, bool> &mymap)
std::map<int, bool> &mymap,
uint64_t tenant_id)
{
INIT_SUCC(ret);
uint64_t bitmap_cnt = roaring64_bitmap_get_cardinality(bitmap);
ObArenaAllocator tmp_allocator("VectorAdaptor", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID());
ObArenaAllocator tmp_allocator("VectorAdaptor", OB_MALLOC_NORMAL_BLOCK_SIZE, tenant_id);
uint64_t *buf = nullptr;
if (bitmap_cnt == 0) {
@ -1967,8 +1970,8 @@ int ObPluginVectorIndexAdaptor::check_vsag_mem_used()
mem_check_cnt_++;
if (mem_check_cnt_ % 10 == 0) {
mem_check_cnt_ %= 10;
if (OB_FAIL(ObPluginVectorIndexHelper::get_vector_memory_limit_size(MTL_ID(), mem_size))) {
LOG_WARN("failed to get vector mem limit size.", K(ret), K(MTL_ID()));
if (OB_FAIL(ObPluginVectorIndexHelper::get_vector_memory_limit_size(tenant_id_, mem_size))) {
LOG_WARN("failed to get vector mem limit size.", K(ret), K(tenant_id_));
} else if (ATOMIC_LOAD(all_vsag_use_mem_) > mem_size) {
ret = OB_ERR_VSAG_MEM_LIMIT_EXCEEDED;
LOG_USER_ERROR(OB_ERR_VSAG_MEM_LIMIT_EXCEEDED, (int)mem_size >> 20);
@ -2074,11 +2077,13 @@ void *ObVsagMemContext::Reallocate(void* p, size_t size)
return new_ptr;
}
int ObVsagMemContext::init(lib::MemoryContext &parent_mem_context, uint64_t *all_vsag_use_mem)
int ObVsagMemContext::init(lib::MemoryContext &parent_mem_context,
uint64_t *all_vsag_use_mem,
uint64_t tenant_id)
{
INIT_SUCC(ret);
lib::ContextParam param;
ObMemAttr attr(MTL_ID(), "VIndexVsagADP");
ObMemAttr attr(tenant_id, "VIndexVsagADP");
SET_IGNORE_MEM_VERSION(attr);
param.set_mem_attr(attr)
.set_page_size(OB_MALLOC_MIDDLE_BLOCK_SIZE)

View File

@ -321,7 +321,7 @@ class ObPluginVectorIndexAdaptor
{
public:
friend class ObVsagMemContext;
ObPluginVectorIndexAdaptor(common::ObIAllocator *allocator, lib::MemoryContext &entity);
ObPluginVectorIndexAdaptor(common::ObIAllocator *allocator, lib::MemoryContext &entity, uint64_t tenant_id);
~ObPluginVectorIndexAdaptor();
int init(ObString init_str, int64_t dim, lib::MemoryContext &parent_mem_ctx, uint64_t *all_vsag_use_mem);
@ -329,6 +329,7 @@ public:
int init(lib::MemoryContext &parent_mem_ctx, uint64_t *all_vsag_use_mem);
int set_param(ObString init_str, int64_t dim);
int get_index_type() { return type_; };
uint64_t get_tenant_id() {return tenant_id_; };
// -- start 调试使用
void init_incr_tablet() {inc_tablet_id_ = ObTabletID(common::ObTabletID::MIN_VALID_TABLET_ID); }
@ -422,7 +423,8 @@ public:
ObVectorIndexAlgorithmType &type,
void *&param);
static int cast_roaringbitmap_to_stdmap(const roaring::api::roaring64_bitmap_t *bitmap,
std::map<int, bool> &mymap);
std::map<int, bool> &mymap,
uint64_t tenant_id);
int check_vsag_mem_used();
uint64_t get_all_vsag_mem_used() {
return ATOMIC_LOAD(all_vsag_use_mem_);
@ -471,7 +473,8 @@ public:
ObAdapterCreateType &get_create_type() { return create_type_; };
void set_create_type(ObAdapterCreateType type) { create_type_ = type; };
TO_STRING_KV(K_(create_type), K_(type), KP_(algo_data), KP_(incr_data), KP_(snap_data), KP_(vbitmap_data),
TO_STRING_KV(K_(create_type), K_(type), KP_(algo_data),
KP_(incr_data), KP_(snap_data), KP_(vbitmap_data), K_(tenant_id),
K_(data_tablet_id),K_(rowkey_vid_tablet_id), K_(vid_rowkey_tablet_id),
K_(inc_tablet_id), K_(vbitmap_tablet_id), K_(snapshot_tablet_id),
K_(data_table_id), K_(rowkey_vid_table_id), K_(vid_rowkey_table_id),
@ -518,6 +521,8 @@ private:
ObVectorIndexMemData *snap_data_;
ObVectorIndexMemData *vbitmap_data_;
uint64_t tenant_id_;
ObTabletID snapshot_tablet_id_;
ObTabletID inc_tablet_id_;
ObTabletID vbitmap_tablet_id_;
@ -611,7 +616,7 @@ public:
all_vsag_use_mem_ = nullptr;
}
}
int init(lib::MemoryContext &parent_mem_context, uint64_t *all_vsag_use_mem);
int init(lib::MemoryContext &parent_mem_context, uint64_t *all_vsag_use_mem, uint64_t tenant_id);
bool is_inited() { return OB_NOT_NULL(mem_context_); }
std::string Name() override {

View File

@ -78,6 +78,8 @@ void ObPluginVectorIndexLoadScheduler::clean_deprecated_adapters()
ObPluginVectorIndexAdaptor *adapter = iter->second;
ObSchemaGetterGuard schema_guard;
const ObTableSchema *table_schema;
ObTabletID tablet_id = iter->first;
ObTabletHandle tablet_handle;
if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_schema_guard(tenant_id_, schema_guard))) {
LOG_WARN("fail to get schema guard", KR(ret), K(tenant_id_));
} else if (OB_FAIL(schema_guard.get_table_schema(tenant_id_, adapter->get_vbitmap_table_id(), table_schema))) {
@ -94,11 +96,22 @@ void ObPluginVectorIndexLoadScheduler::clean_deprecated_adapters()
LOG_WARN("push back table id failed",
K(delete_tablet_id_array.count()), K(adapter->get_snap_tablet_id()), KR(ret));
}
} else if (OB_FAIL(ls_->get_tablet_svr()->get_tablet(tablet_id, tablet_handle))) {
if (OB_TABLET_NOT_EXIST != ret) {
LOG_WARN("fail to get tablet", K(ret), K(tablet_id));
} else {
ret = OB_SUCCESS; // not found, moved from this ls
if (OB_FAIL(delete_tablet_id_array.push_back(tablet_id))) {
LOG_WARN("push back table id failed",
K(delete_tablet_id_array.count()), K(adapter->get_inc_tablet_id()), KR(ret));
}
}
}
}
LOG_INFO("try erase complete vector index adapter",
K(index_ls_mgr->get_ls_id()), K(delete_tablet_id_array.count())); // debug, remove later
if (delete_tablet_id_array.count() > 0) {
LOG_INFO("try erase complete vector index adapter",
K(index_ls_mgr->get_ls_id()), K(delete_tablet_id_array.count()));
}
for (int64_t i = 0; OB_SUCC(ret) && i < delete_tablet_id_array.count(); i++) {
if (OB_FAIL(index_ls_mgr->erase_complete_adapter(delete_tablet_id_array.at(i)))) {
@ -140,8 +153,10 @@ void ObPluginVectorIndexLoadScheduler::clean_deprecated_adapters()
}
}
LOG_INFO("try erase partial vector index adapter",
K(index_ls_mgr->get_ls_id()), K(delete_tablet_id_array.count())); // debug, remove later
if (delete_tablet_id_array.count() > 0) {
LOG_INFO("try erase partial vector index adapter",
K(index_ls_mgr->get_ls_id()), K(delete_tablet_id_array.count()));
}
for (int64_t i = 0; OB_SUCC(ret) && i < delete_tablet_id_array.count(); i++) {
if (OB_FAIL(index_ls_mgr->erase_partial_adapter(delete_tablet_id_array.at(i)))) {
@ -162,7 +177,6 @@ bool ObPluginVectorIndexLoadScheduler::check_can_do_work()
{
bool bret = true;
int ret = OB_SUCCESS;
int64_t tenant_id = MTL_ID();
uint64_t tenant_data_version = 0;
bool is_oracle_mode = false;
@ -170,15 +184,15 @@ bool ObPluginVectorIndexLoadScheduler::check_can_do_work()
LOG_WARN("fail to check oracle mode", KR(ret), K_(tenant_id));
} else if (is_oracle_mode) {
bret = false;
LOG_DEBUG("vector index not support oracle mode", K(tenant_id));
} else if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id, tenant_data_version))) {
LOG_DEBUG("vector index not support oracle mode", K_(tenant_id));
} else if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id_, tenant_data_version))) {
bret = false;
LOG_WARN("get tenant data version failed", K(ret));
} else if (tenant_data_version < DATA_VERSION_4_3_3_0) {
bret = false;
LOG_DEBUG("vector index can not work with data version less than 4_3_3", K(tenant_data_version));
} else if (is_user_tenant(tenant_id)) {
if (OB_FAIL(GET_MIN_DATA_VERSION(gen_meta_tenant_id(tenant_id), tenant_data_version))) {
} else if (is_user_tenant(tenant_id_)) {
if (OB_FAIL(GET_MIN_DATA_VERSION(gen_meta_tenant_id(tenant_id_), tenant_data_version))) {
bret = false;
LOG_WARN("get tenant data version failed", K(ret));
} else if (tenant_data_version < DATA_VERSION_4_3_3_0) {
@ -209,10 +223,12 @@ int ObPluginVectorIndexLoadScheduler::check_schema_version()
return ret;
}
int ObPluginVectorIndexLoadScheduler::check_parital_index_adpter_exist(ObPluginVectorIndexMgr *mgr)
int ObPluginVectorIndexLoadScheduler::check_index_adpter_exist(ObPluginVectorIndexMgr *mgr)
{
int ret = OB_SUCCESS;
if (!mgr->get_partial_adapter_map().empty()) {
if (!mgr->get_partial_adapter_map().empty() || !mgr->get_complete_adapter_map().empty()) {
// partial map not empty, exist adapter create by dml/ddl data complement/query
// complete adapter not empty, also need check for transfer
mark_tenant_need_check();
}
return ret;
@ -386,7 +402,7 @@ int ObPluginVectorIndexLoadScheduler::execute_adapter_maintenance()
ObSEArray<uint64_t, DEFAULT_TABLE_ARRAY_SIZE> table_id_array;
ObVecIdxSharedTableInfoMap shared_table_info_map;
ObMemAttr memattr(MTL_ID(), "VecIdxInfo");
ObMemAttr memattr(tenant_id_, "VecIdxInfo");
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("tablet vector index scheduler not init", KR(ret));
@ -836,7 +852,7 @@ int ObPluginVectorIndexLoadScheduler::check_and_execute_adapter_maintenance_task
// if schema version change, or exist partial adapter(create by access) need do maintenance
if (OB_FAIL(check_schema_version())) {
LOG_WARN("fail to check schema version", KR(ret));
} else if (OB_NOT_NULL(mgr) && OB_FAIL(check_parital_index_adpter_exist(mgr))) {
} else if (OB_NOT_NULL(mgr) && OB_FAIL(check_index_adpter_exist(mgr))) {
LOG_WARN("fail to check exist paritial index adapter", KR(ret));
} else if (local_tenant_task_.need_check_) {
if (OB_FAIL(execute_adapter_maintenance())) {
@ -965,7 +981,7 @@ int ObPluginVectorIndexLoadScheduler::check_and_execute_memdata_sync_task(ObPlug
} else {
// generate one task
char *task_ctx_buf =
static_cast<char *>(mgr->get_task_allocator().alloc(sizeof(ObPluginVectorIndexTaskCtx)));
static_cast<char *>(mgr->get_processing_allocator().alloc(sizeof(ObPluginVectorIndexTaskCtx)));
ObPluginVectorIndexTaskCtx* task_ctx = nullptr;
if (OB_ISNULL(task_ctx_buf)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
@ -976,7 +992,7 @@ int ObPluginVectorIndexLoadScheduler::check_and_execute_memdata_sync_task(ObPlug
}
if (OB_FAIL(ret) && OB_NOT_NULL(task_ctx)) {
task_ctx->~ObPluginVectorIndexTaskCtx();
mgr->get_task_allocator().free(task_ctx);
mgr->get_processing_allocator().free(task_ctx);
task_ctx = nullptr;
}
}
@ -1161,7 +1177,7 @@ int ObPluginVectorIndexLoadScheduler::handle_replay_result(ObVectorIndexSyncLog
ObTabletID tablet_id = ls_log.get_tablet_id_array().at(i);
uint64_t table_id = ls_log.get_table_id_array().at(i);
char *task_ctx_buf =
static_cast<char *>(mgr->get_task_allocator().alloc(sizeof(ObPluginVectorIndexTaskCtx)));
static_cast<char *>(mgr->get_waiting_allocator().alloc(sizeof(ObPluginVectorIndexTaskCtx)));
ObPluginVectorIndexTaskCtx* task_ctx = nullptr;
if (OB_ISNULL(task_ctx_buf)) {
@ -1180,7 +1196,7 @@ int ObPluginVectorIndexLoadScheduler::handle_replay_result(ObVectorIndexSyncLog
}
if (OB_FAIL(ret) && OB_NOT_NULL(task_ctx)) {
task_ctx->~ObPluginVectorIndexTaskCtx();
mgr->get_task_allocator().free(task_ctx); // not really free
mgr->get_waiting_allocator().free(task_ctx); // not really free
task_ctx = nullptr;
}
}
@ -1381,6 +1397,7 @@ int ObVectorIndexTask::init(ObPluginVectorIndexLoadScheduler *schedular,
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), KP(schedular), KP(mgr), KP(task_ctx));
} else {
allocator_.set_tenant_id(mgr->get_tenant_id());
ls_id_ = mgr->get_ls_id();
vec_idx_scheduler_ = schedular;
vec_idx_mgr_ = mgr;
@ -1411,7 +1428,8 @@ int ObVectorIndexTask::process()
while(!need_stop && OB_SUCC(ret)) {
// need set context? should set attr in constructor
lib::ContextParam param;
param.set_mem_attr(MTL_ID(), "VecIdxTaskCtx", ObCtxIds::DEFAULT_CTX_ID) // 这里是dag的MTL
// use dag mtl id for param refer to TTLtask
param.set_mem_attr(MTL_ID(), "VecIdxTaskCtx", ObCtxIds::DEFAULT_CTX_ID)
.set_properties(lib::USE_TL_PAGE_OPTIONAL);
CREATE_WITH_TEMP_CONTEXT(param) {
if (OB_FAIL(process_one())) {

View File

@ -250,7 +250,7 @@ public:
bool &is_vector_index_table,
bool &is_shared_index_table);
void clean_deprecated_adapters();
int check_parital_index_adpter_exist(ObPluginVectorIndexMgr *mgr);
int check_index_adpter_exist(ObPluginVectorIndexMgr *mgr);
int log_tablets_need_memdata_sync(ObPluginVectorIndexMgr *mgr);
int execute_all_memdata_sync_task(ObPluginVectorIndexMgr *mgr);

View File

@ -36,12 +36,13 @@ void ObPluginVectorIndexMgr::destroy()
release_all_adapters();
partial_index_adpt_map_.destroy();
complete_index_adpt_map_.destroy();
first_mem_sync_map_.destroy();
second_mem_sync_map_.destroy();
// elements memory in adpt map will be released by allocator in service, refine later;
// elements memory in mem_sync_map should be released here, they are alloc by ob_malloc;
// should use 2 allocator to avoid accumulation
task_allocator_.reset();
// elements memory in mem_sync_map should be released here
first_mem_sync_map_.destroy();
first_task_allocator_.reset();
second_mem_sync_map_.destroy();
second_task_allocator_.reset();
}
}
@ -74,13 +75,13 @@ int ObPluginVectorIndexMgr::init(uint64_t tenant_id,
{
int ret = OB_SUCCESS;
int64_t hash_capacity = common::hash::cal_next_prime(DEFAULT_ADAPTER_HASH_SIZE);
if (OB_FAIL(complete_index_adpt_map_.create(hash_capacity, "VecIdxAdpt"))) {
if (OB_FAIL(complete_index_adpt_map_.create(hash_capacity, "VecIdxAdptMap", "VecIdxAdptMap", tenant_id))) {
LOG_WARN("fail to create full index adapter map", K(ls_id), KR(ret));
} else if (OB_FAIL(partial_index_adpt_map_.create(hash_capacity, "VecIdxAdpt"))) {
} else if (OB_FAIL(partial_index_adpt_map_.create(hash_capacity, "VecIdxAdptMap", "VecIdxAdptMap", tenant_id))) {
LOG_WARN("fail to create partial index adapter map", K(ls_id), KR(ret));
} else if (OB_FAIL(first_mem_sync_map_.create(hash_capacity, "VecIdxAdpt", "VecIdxAdpt"))) {
} else if (OB_FAIL(first_mem_sync_map_.create(hash_capacity, "VecIdxTaskMap", "VecIdxTaskMap", tenant_id))) {
LOG_WARN("fail to create first mem sync set", K(ls_id), KR(ret));
} else if (OB_FAIL(second_mem_sync_map_.create(hash_capacity, "VecIdxAdpt", "VecIdxAdpt"))) {
} else if (OB_FAIL(second_mem_sync_map_.create(hash_capacity, "VecIdxTaskMap", "VecIdxTaskMap", tenant_id))) {
LOG_WARN("fail to create second mem sync set", K(ls_id), KR(ret));
} else {
ls_tablet_task_ctx_.task_id_ = 0;
@ -225,7 +226,7 @@ int ObPluginVectorIndexMgr::create_partial_adapter(ObTabletID idx_tablet_id,
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate memory for vector index adapter", KR(ret));
} else {
tmp_vec_idx_adpt = new(adpt_buff)ObPluginVectorIndexAdaptor(&allocator, memory_context_);
tmp_vec_idx_adpt = new(adpt_buff)ObPluginVectorIndexAdaptor(&allocator, memory_context_, tenant_id_);
ObVectorIndexRecordType record_type = ObPluginVectorIndexUtils::index_type_to_record_type(type);
if (record_type >= VIRT_MAX) {
ret = OB_ERR_UNEXPECTED;
@ -395,6 +396,7 @@ int ObPluginVectorIndexMgr::check_need_mem_data_sync_task(bool &need_sync)
if (get_processing_map().size() > 0) {
if (ls_tablet_task_ctx_.all_finished_) { // is false
get_processing_map().reuse();
get_processing_allocator().reset();
// release task ctx
ls_tablet_task_ctx_.all_finished_ = false;
LOG_INFO("release processing set to waiting set",
@ -652,7 +654,7 @@ int ObPluginVectorIndexService::acquire_vector_index_mgr(ObLSID ls_id, ObPluginV
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate memeory for new vector index mgr", KR(ret));
} else {
ObPluginVectorIndexMgr *new_ls_index_mgr = new(mgr_buff)ObPluginVectorIndexMgr(memory_context_);
ObPluginVectorIndexMgr *new_ls_index_mgr = new(mgr_buff)ObPluginVectorIndexMgr(memory_context_, tenant_id_);
if (OB_FAIL(new_ls_index_mgr->init(tenant_id_, ls_id, memory_context_, &all_vsag_use_mem_))) {
LOG_WARN("failed to init ls vector index mgr", K(ls_id), KR(ret));
} else if (OB_FAIL(get_ls_index_mgr_map().set_refactored(ls_id, new_ls_index_mgr))) {
@ -750,7 +752,7 @@ int ObPluginVectorIndexService::init(const uint64_t tenant_id,
ObLSService *ls_service)
{
int ret = OB_SUCCESS;
lib::ObMemAttr mem_attr(MTL_ID(), "VecIdxSrv");
lib::ObMemAttr mem_attr(tenant_id, "VecIdxSrv");
if (IS_INIT) {
ret = OB_INIT_TWICE;
LOG_WARN("init twice", KR(ret), K(tenant_id));
@ -759,14 +761,17 @@ int ObPluginVectorIndexService::init(const uint64_t tenant_id,
|| OB_ISNULL(ls_service)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument to init ObPluginVectorIndexService", KR(ret), K(tenant_id));
} else if (OB_FAIL(index_ls_mgr_map_.create(common::hash::cal_next_prime(DEFAULT_LS_HASH_SIZE), "VecIdxLSMgr"))) {
} else if (OB_FAIL(index_ls_mgr_map_.create(common::hash::cal_next_prime(DEFAULT_LS_HASH_SIZE),
"VecIdxLSMgr",
"VecIdxLSMgr",
tenant_id))) {
LOG_WARN("create ls mgr ", KR(ret), K(tenant_id));
} else if (FALSE_IT(alloc_.set_tenant_id(MTL_ID()))) {
} else if (FALSE_IT(alloc_.set_tenant_id(tenant_id))) {
} else if (OB_FAIL(allocator_.init(&alloc_, OB_MALLOC_MIDDLE_BLOCK_SIZE, mem_attr))) {
LOG_WARN("ObTenantSrs allocator init failed.", K(ret));
} else {
lib::ContextParam param;
param.set_mem_attr(MTL_ID())
param.set_mem_attr(tenant_id)
.set_properties(lib::ADD_CHILD_THREAD_SAFE | lib::ALLOC_THREAD_SAFE | lib::RETURN_MALLOC_DEFAULT)
.set_page_size(OB_MALLOC_MIDDLE_BLOCK_SIZE)
.set_label("VectorIndexVsag")
@ -820,7 +825,7 @@ void ObPluginVectorIndexService::wait()
}
}
// ToDo: debug functions, remove later virtual-table ready
// debug functions
void ObPluginVectorIndexMgr::dump_all_inst()
{
int ret = OB_SUCCESS;
@ -897,7 +902,7 @@ int ObPluginVectorIndexMgr::replace_with_complete_adapter(ObVectorIndexAdapterCa
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate memory for vector index adapter", KR(ret));
} else {
new_adapter = new(adpt_buff)ObPluginVectorIndexAdaptor(&allocator, memory_context_);
new_adapter = new(adpt_buff)ObPluginVectorIndexAdaptor(&allocator, memory_context_, tenant_id_);
new_adapter->set_create_type(CreateTypeComplete);
if (OB_FAIL(new_adapter->merge_parital_index_adapter(inc_adapter_guard.get_adatper()))) {
LOG_WARN("failed to merge inc index adapter", KPC(inc_adapter_guard.get_adatper()), KR(ret));
@ -981,7 +986,7 @@ int ObPluginVectorIndexMgr::replace_with_full_partial_adapter(ObVectorIndexAcqui
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate memory for vector index adapter", KR(ret));
} else {
new_adapter = new(adpt_buff)ObPluginVectorIndexAdaptor(&allocator, memory_context_);
new_adapter = new(adpt_buff)ObPluginVectorIndexAdaptor(&allocator, memory_context_, tenant_id_);
new_adapter->set_create_type(CreateTypeFullPartial);
if (OB_FAIL(new_adapter->set_tablet_id(VIRT_INC, ctx.inc_tablet_id_))) {
LOG_WARN("failed to set inc tablet id", K(ctx), KR(ret));

View File

@ -73,7 +73,7 @@ typedef common::hash::ObHashMap<common::ObTabletID, ObPluginVectorIndexTaskCtx*>
class ObPluginVectorIndexMgr
{
public:
ObPluginVectorIndexMgr(lib::MemoryContext &memory_context)
ObPluginVectorIndexMgr(lib::MemoryContext &memory_context, uint64_t tenant_id)
: is_inited_(false),
need_check_(false),
ls_id_(),
@ -81,13 +81,14 @@ public:
partial_index_adpt_map_(),
adapter_map_rwlock_(),
ls_tablet_task_ctx_(),
tenant_id_(0),
tenant_id_(tenant_id),
interval_factor_(0),
vector_index_service_(nullptr),
processing_first_mem_sync_(true),
first_mem_sync_map_(),
second_mem_sync_map_(),
task_allocator_(ObMemAttr(MTL_ID(), "VecIdxTask")),
first_task_allocator_(ObMemAttr(tenant_id, "VecIdxTask")),
second_task_allocator_(ObMemAttr(tenant_id, "VecIdxTask")),
memory_context_(memory_context),
all_vsag_use_mem_(nullptr)
{}
@ -151,10 +152,10 @@ public:
VectorIndexMemSyncMap &get_processing_map() { return processing_first_mem_sync_ ? first_mem_sync_map_ : second_mem_sync_map_; }
VectorIndexMemSyncMap &get_waiting_map() { return processing_first_mem_sync_ ? second_mem_sync_map_ : first_mem_sync_map_; }
ObIAllocator &get_processing_allocator() { return processing_first_mem_sync_ ? first_task_allocator_ : second_task_allocator_; }
ObIAllocator &get_waiting_allocator() { return processing_first_mem_sync_ ? second_task_allocator_ : first_task_allocator_; }
void switch_processing_map() { processing_first_mem_sync_ = !processing_first_mem_sync_; }
ObIAllocator &get_task_allocator() { return task_allocator_; }
// debug interface
void dump_all_inst();
// for virtual table
@ -197,11 +198,12 @@ private:
int64_t local_schema_version_; // detect schema change
// pingpong map for follower receive memdata sync task from log
// pingpong map/allocator for follower receive memdata sync task from log
bool processing_first_mem_sync_;
VectorIndexMemSyncMap first_mem_sync_map_;
VectorIndexMemSyncMap second_mem_sync_map_;
ObArenaAllocator task_allocator_;
ObArenaAllocator first_task_allocator_;
ObArenaAllocator second_task_allocator_;
lib::MemoryContext &memory_context_;
uint64_t *all_vsag_use_mem_;
};

View File

@ -192,14 +192,13 @@ int ObPluginVectorIndexUtils::read_object_from_vid_rowkey_table_iter(ObObj *inpu
int ObPluginVectorIndexUtils::get_vec_column_id(
ObSEArray<uint64_t, 4> &vector_column_ids,
uint64_t incr_index_table_id,
uint64_t data_table_id)
uint64_t data_table_id,
uint64_t tenant_id)
{
INIT_SUCC(ret);
ObSchemaGetterGuard schema_guard;
const ObTableSchema *delta_buffer_schema = nullptr;
const ObTableSchema *table_schema = nullptr;
uint64_t tenant_id = MTL_ID();
ObMultiVersionSchemaService *schema_service = MTL(schema::ObTenantSchemaService*)->get_schema_service();
if (OB_ISNULL(schema_service)) {
ret = OB_ERR_UNEXPECTED;
@ -520,52 +519,54 @@ int ObPluginVectorIndexUtils::refresh_memdata(ObLSID &ls_id,
LOG_WARN("fail to test read local data.", K(ret), K(ls_id), K(INDEX_TYPE_IS_NOT));
}
#endif
common::ObNewRowIterator *delta_buf_iter = nullptr;
ObAccessService *tsc_service = MTL(ObAccessService *);
storage::ObTableScanParam inc_scan_param;
schema::ObTableParam inc_table_param(allocator);
ObArenaAllocator tmp_allocator("VectorAdaptor", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID());
if (OB_ISNULL(adapter)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid adapter", K(ret), KPC(adapter));
} else if (OB_FAIL(read_local_tablet(ls_id,
adapter,
target_scn,
INDEX_TYPE_VEC_DELTA_BUFFER_LOCAL,
allocator,
inc_scan_param,
inc_table_param,
delta_buf_iter))) {
LOG_WARN("fail to read local tablet", KR(ret), K(ls_id), K(INDEX_TYPE_VEC_DELTA_BUFFER_LOCAL), KPC(adapter));
} else {
ObVectorQueryAdaptorResultContext ada_ctx(&allocator, &tmp_allocator);
if (OB_FAIL(adapter->check_delta_buffer_table_readnext_status(&ada_ctx, delta_buf_iter, target_scn))) {
LOG_WARN("fail to check_delta_buffer_table_readnext_status.", K(ret));
} else if (OB_FAIL(try_sync_vbitmap_memdata(ls_id, adapter, target_scn, allocator, ada_ctx))) {
LOG_WARN("failed to sync vbitmap", KR(ret));
} else if (ada_ctx.get_status() == PVQ_COM_DATA) {
if (OB_FAIL(read_vector_info(adapter, allocator, ls_id, target_scn, ada_ctx))) {
LOG_WARN("failed to read vector_info", KR(ret));
} else if (OB_FAIL(adapter->complete_delta_buffer_table_data(&ada_ctx))) {
LOG_WARN("failed to complete delta buffer", KR(ret));
} else if (OB_FAIL(try_sync_snapshot_memdata(ls_id, adapter, target_scn, allocator, ada_ctx))) {
LOG_WARN("failed to refresh mem snapshots", KR(ret));
MTL_SWITCH(adapter->get_tenant_id()) {
common::ObNewRowIterator *delta_buf_iter = nullptr;
ObAccessService *tsc_service = MTL(ObAccessService *);
storage::ObTableScanParam inc_scan_param;
schema::ObTableParam inc_table_param(allocator);
if (OB_FAIL(read_local_tablet(ls_id,
adapter,
target_scn,
INDEX_TYPE_VEC_DELTA_BUFFER_LOCAL,
allocator,
inc_scan_param,
inc_table_param,
delta_buf_iter))) {
LOG_WARN("fail to read local tablet", KR(ret), K(ls_id), K(INDEX_TYPE_VEC_DELTA_BUFFER_LOCAL), KPC(adapter));
} else {
ObArenaAllocator tmp_allocator("VectorAdaptor", OB_MALLOC_NORMAL_BLOCK_SIZE, adapter->get_tenant_id());
ObVectorQueryAdaptorResultContext ada_ctx(&allocator, &tmp_allocator);
if (OB_FAIL(adapter->check_delta_buffer_table_readnext_status(&ada_ctx, delta_buf_iter, target_scn))) {
LOG_WARN("fail to check_delta_buffer_table_readnext_status.", K(ret));
} else if (OB_FAIL(try_sync_vbitmap_memdata(ls_id, adapter, target_scn, allocator, ada_ctx))) {
LOG_WARN("failed to sync vbitmap", KR(ret));
} else if (ada_ctx.get_status() == PVQ_COM_DATA) {
if (OB_FAIL(read_vector_info(adapter, allocator, ls_id, target_scn, ada_ctx))) {
LOG_WARN("failed to read vector_info", KR(ret));
} else if (OB_FAIL(adapter->complete_delta_buffer_table_data(&ada_ctx))) {
LOG_WARN("failed to complete delta buffer", KR(ret));
} else if (OB_FAIL(try_sync_snapshot_memdata(ls_id, adapter, target_scn, allocator, ada_ctx))) {
LOG_WARN("failed to refresh mem snapshots", KR(ret));
}
} else if (ada_ctx.get_status() == PVQ_LACK_SCN) {
if (OB_FAIL(try_sync_snapshot_memdata(ls_id, adapter, target_scn, allocator, ada_ctx))) {
LOG_WARN("failed to refresh mem snapshots without refresh incr", KR(ret));
}
}
}
} else if (ada_ctx.get_status() == PVQ_LACK_SCN) {
if (OB_FAIL(try_sync_snapshot_memdata(ls_id, adapter, target_scn, allocator, ada_ctx))) {
LOG_WARN("failed to refresh mem snapshots without refresh incr", KR(ret));
}
}
}
if (OB_NOT_NULL(delta_buf_iter) && OB_NOT_NULL(tsc_service)) {
int tmp_ret = tsc_service->revert_scan_iter(delta_buf_iter);
if (tmp_ret != OB_SUCCESS) {
LOG_WARN("revert delta_buf_iter failed", K(ret));
if (OB_NOT_NULL(delta_buf_iter) && OB_NOT_NULL(tsc_service)) {
int tmp_ret = tsc_service->revert_scan_iter(delta_buf_iter);
if (tmp_ret != OB_SUCCESS) {
LOG_WARN("revert delta_buf_iter failed", K(ret));
}
delta_buf_iter = nullptr;
}
}
delta_buf_iter = nullptr;
}
return ret;
}
@ -644,7 +645,6 @@ int ObPluginVectorIndexUtils::read_local_tablet(ObLSID &ls_id,
target_scn))) {
LOG_WARN("fail to get tablet handle", KR(ret), K(tablet_id));
} else {
uint64_t tenant_id = MTL_ID();
scan_param.ls_id_ = ls_id;
scan_param.tablet_id_ = tablet_id;
scan_param.schema_version_ = tablet_handle.get_obj()->get_tablet_meta().max_sync_storage_schema_version_;
@ -687,7 +687,7 @@ int ObPluginVectorIndexUtils::read_local_tablet(ObLSID &ls_id,
}
} else {
// vid_rowkey table or data table, get rowkey while complete
if (OB_FAIL(get_shared_table_rowkey_colum_count(type, table_id, col_cnt))) {
if (OB_FAIL(get_shared_table_rowkey_colum_count(type, adapter->get_tenant_id(), table_id, col_cnt))) {
LOG_WARN("fail to get index aux table colum count", KR(ret), K(type));
} else if (OB_ISNULL(buf = allocator.alloc(sizeof(ObObj) * col_cnt * 2))) {
ret = OB_ERR_UNEXPECTED;
@ -763,7 +763,11 @@ int ObPluginVectorIndexUtils::init_common_scan_param(storage::ObTableScanParam&
uint32 col_cnt = 0;
if (is_vec_index(type) || type == INDEX_TYPE_IS_NOT){
if (OB_FAIL(get_special_index_aux_table_column_count(type, table_id, col_cnt, scan_param))) {
if(OB_ISNULL(adapter)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get null adapter", KR(ret), K(type));
} else if (OB_FAIL(get_special_index_aux_table_column_count(type, adapter->get_tenant_id(),
table_id, col_cnt, scan_param))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected index type", KR(ret), K(type));
}
@ -819,9 +823,18 @@ int ObPluginVectorIndexUtils::init_table_param(ObTableParam *table_param,
ObSchemaGetterGuard schema_guard;
const ObTableSchema *table_schema = NULL;
ObSEArray<uint64_t, 4> column_ids;
if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_schema_guard(MTL_ID(), schema_guard))) {
LOG_WARN("fail to get schema guard", KR(ret), K(MTL_ID()));
} else if (OB_FAIL(schema_guard.get_table_schema(MTL_ID(), table_id, table_schema))) {
uint64_t tenant_id = 0;
if (OB_ISNULL(adapter)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get null adapter", KR(ret), K(inc_table_id), K(data_table_id), K(table_id), K(type));
} else {
tenant_id = adapter->get_tenant_id();
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_schema_guard(tenant_id, schema_guard))) {
LOG_WARN("fail to get schema guard", KR(ret), K(tenant_id));
} else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, table_id, table_schema))) {
LOG_WARN("fail to get schema", KR(ret), KR(table_id));
} else if (OB_ISNULL(table_schema)) {
ret = OB_TABLE_NOT_EXIST; // table may be removed, handle in scheduler routine
@ -829,7 +842,7 @@ int ObPluginVectorIndexUtils::init_table_param(ObTableParam *table_param,
} else if (is_vec_delta_buffer_type(type)) {
ObArray<uint64_t> tmp_column_ids;
const ObTableSchema *data_table_schema = NULL;
if (OB_FAIL(schema_guard.get_table_schema(MTL_ID(), data_table_id, data_table_schema))) {
if (OB_FAIL(schema_guard.get_table_schema(tenant_id, data_table_id, data_table_schema))) {
LOG_WARN("fail to get schema", KR(ret), KR(data_table_id));
} else if (OB_ISNULL(table_schema) || OB_ISNULL(data_table_schema)) {
ret = OB_TABLE_NOT_EXIST; // table may be removed, handle in scheduler routine
@ -877,7 +890,7 @@ int ObPluginVectorIndexUtils::init_table_param(ObTableParam *table_param,
// different with other index, refer to ObTscCgService::extract_vec_ir_access_columns
ObArray<uint64_t> tmp_column_ids;
const ObTableSchema *data_table_schema = NULL;
if (OB_FAIL(schema_guard.get_table_schema(MTL_ID(), data_table_id, data_table_schema))) {
if (OB_FAIL(schema_guard.get_table_schema(tenant_id, data_table_id, data_table_schema))) {
LOG_WARN("fail to get schema", KR(ret), KR(data_table_id));
} else if (OB_ISNULL(table_schema) || OB_ISNULL(data_table_schema)) {
ret = OB_TABLE_NOT_EXIST; // table may be removed, handle in scheduler routine
@ -961,7 +974,7 @@ int ObPluginVectorIndexUtils::init_table_param(ObTableParam *table_param,
} else if (is_vec_index_snapshot_data_type(type)) {
const ObTableSchema *data_table_schema = NULL;
ObSEArray<uint64_t, 4> tmp_column_ids;
if (OB_FAIL(schema_guard.get_table_schema(MTL_ID(), data_table_id, data_table_schema))) {
if (OB_FAIL(schema_guard.get_table_schema(tenant_id, data_table_id, data_table_schema))) {
LOG_WARN("fail to get schema", KR(ret), KR(data_table_id));
} else if (OB_ISNULL(table_schema) || OB_ISNULL(data_table_schema)) {
ret = OB_TABLE_NOT_EXIST; // table may be removed, handle in scheduler routine
@ -1007,7 +1020,7 @@ int ObPluginVectorIndexUtils::init_table_param(ObTableParam *table_param,
}
}
} else if (type == INDEX_TYPE_IS_NOT) {
if (OB_FAIL(get_vec_column_id(column_ids, inc_table_id, table_id))) {
if (OB_FAIL(get_vec_column_id(column_ids, inc_table_id, table_id, tenant_id))) {
LOG_WARN("failed to get vec column id.", K(ret));
} else {
if (OB_FAIL(table_param->convert(*table_schema, column_ids, sql::ObStoragePushdownFlag()))) {
@ -1044,6 +1057,7 @@ int ObPluginVectorIndexUtils::get_non_shared_index_aux_table_colum_count(schema:
int ObPluginVectorIndexUtils::get_special_index_aux_table_column_count(
schema::ObIndexType type,
uint64_t tenant_id,
uint64_t table_id,
uint32 &col_cnt,
storage::ObTableScanParam& scan_param)
@ -1052,9 +1066,9 @@ int ObPluginVectorIndexUtils::get_special_index_aux_table_column_count(
ObSchemaGetterGuard schema_guard;
const ObTableSchema *table_schema = NULL;
ObSEArray<uint64_t, 4> column_ids;
if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_schema_guard(MTL_ID(), schema_guard))) {
LOG_WARN("fail to get schema guard", KR(ret), K(MTL_ID()));
} else if (OB_FAIL(schema_guard.get_table_schema(MTL_ID(), table_id, table_schema))) {
if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_schema_guard(tenant_id, schema_guard))) {
LOG_WARN("fail to get schema guard", KR(ret), K(tenant_id));
} else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, table_id, table_schema))) {
LOG_WARN("fail to get schema", KR(ret), K(table_id));
} else if (OB_ISNULL(table_schema)) {
ret = OB_TABLE_NOT_EXIST; // table may be removed, handle in scheduler routine
@ -1067,7 +1081,7 @@ int ObPluginVectorIndexUtils::get_special_index_aux_table_column_count(
col_cnt = column_ids.count();
}
if (OB_SUCC(ret)) {
LOG_INFO("get_special_index_aux_table_column_count", K(type), K(col_cnt), K(column_ids)); // remove after debug;
LOG_INFO("get_special_index_aux_table_column_count", K(type), K(col_cnt), K(column_ids), K(tenant_id)); // remove after debug;
}
return ret;
}
@ -1099,6 +1113,7 @@ int ObPluginVectorIndexUtils::get_non_shared_index_aux_table_rowkey_colum_count(
}
int ObPluginVectorIndexUtils::get_shared_table_rowkey_colum_count(schema::ObIndexType type,
uint64_t tenant_id,
uint64_t table_id,
uint32 &col_cnt)
{
@ -1106,9 +1121,9 @@ int ObPluginVectorIndexUtils::get_shared_table_rowkey_colum_count(schema::ObInde
ObSchemaGetterGuard schema_guard;
const ObTableSchema *table_schema = NULL;
ObSEArray<uint64_t, 4> column_ids;
if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_schema_guard(MTL_ID(), schema_guard))) {
LOG_WARN("fail to get schema guard", KR(ret), K(MTL_ID()));
} else if (OB_FAIL(schema_guard.get_table_schema(MTL_ID(), table_id, table_schema))) {
if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_schema_guard(tenant_id, schema_guard))) {
LOG_WARN("fail to get schema guard", KR(ret), K(tenant_id));
} else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, table_id, table_schema))) {
LOG_WARN("fail to get schema", KR(ret), KR(table_id));
} else if (OB_ISNULL(table_schema)) {
ret = OB_TABLE_NOT_EXIST; // table may be removed, handle in scheduler routine

View File

@ -86,7 +86,8 @@ public:
bool &get_data);
static int get_vec_column_id (ObSEArray<uint64_t, 4> &vector_column_ids,
uint64_t incr_index_table_id,
uint64_t data_table_id);
uint64_t data_table_id,
uint64_t tenant_id);
static int read_vector_info(ObPluginVectorIndexAdaptor *adapter,
ObIAllocator &allocator,
ObLSID &ls_id,
@ -122,12 +123,15 @@ private:
ObPluginVectorIndexAdaptor *adapter);
static int get_non_shared_index_aux_table_colum_count(schema::ObIndexType type, uint32 &col_cnt);
static int get_non_shared_index_aux_table_rowkey_colum_count(schema::ObIndexType type, uint32 &col_cnt);
static int get_special_index_aux_table_column_count(
schema::ObIndexType type,
uint64_t table_id,
uint32 &col_cnt,
storage::ObTableScanParam& scan_param);
static int get_shared_table_rowkey_colum_count(schema::ObIndexType type, uint64_t table_id, uint32 &col_cnt);
static int get_special_index_aux_table_column_count(schema::ObIndexType type,
uint64_t tenant_id,
uint64_t table_id,
uint32 &col_cnt,
storage::ObTableScanParam& scan_param);
static int get_shared_table_rowkey_colum_count(schema::ObIndexType type,
uint64_t tenant_id,
uint64_t table_id,
uint32 &col_cnt);
static int try_sync_snapshot_memdata(ObLSID &ls_id,
ObPluginVectorIndexAdaptor *adapter,
SCN &target_scn,