fix vector index 4200 error
This commit is contained in:
parent
f9ae33822f
commit
c7900fb10b
@ -956,11 +956,11 @@ int ObPluginVectorIndexLoadScheduler::check_and_execute_memdata_sync_task(ObPlug
|
||||
mgr->get_ls_task_ctx().non_memdata_task_cycle_++;
|
||||
if (mgr->get_ls_task_ctx().non_memdata_task_cycle_
|
||||
> ObPluginVectorIndexLSTaskCtx::NON_MEMDATA_TASK_CYCLE_MAX) {
|
||||
// too long to receive any sync task log, sync once forcely
|
||||
// save all tablet id to current refresh task recorder
|
||||
mgr->get_ls_task_ctx().non_memdata_task_cycle_ = 0;
|
||||
mgr->get_ls_task_ctx().need_memdata_sync_ = true;
|
||||
force_mem_data_sync = true;
|
||||
// disable force sync currently
|
||||
// mgr->get_ls_task_ctx().need_memdata_sync_ = true;
|
||||
// force_mem_data_sync = true;
|
||||
FLOG_INFO("not receive any sync task log", K(tenant_id_), K(ls_->get_ls_id()));
|
||||
}
|
||||
}
|
||||
|
||||
@ -1183,30 +1183,33 @@ int ObPluginVectorIndexLoadScheduler::handle_replay_result(ObVectorIndexSyncLog
|
||||
char *task_ctx_buf =
|
||||
static_cast<char *>(mgr->get_waiting_allocator().alloc(sizeof(ObPluginVectorIndexTaskCtx)));
|
||||
ObPluginVectorIndexTaskCtx* task_ctx = nullptr;
|
||||
|
||||
ObPluginVectorIndexTaskCtx* tmp_task_ctx = nullptr;
|
||||
if (OB_ISNULL(task_ctx_buf)) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("fail to alloc memdata sync task ctx", KR(ret));
|
||||
} else if (FALSE_IT(task_ctx = new(task_ctx_buf)ObPluginVectorIndexTaskCtx(tablet_id, table_id))) {
|
||||
} else if (OB_FAIL(waiting_task_map.set_refactored(tablet_id, task_ctx))) {
|
||||
if (ret != OB_HASH_EXIST) {
|
||||
LOG_WARN("failed to set vector index memdata sync task ctx", K(ret), K(tablet_id), KPC(task_ctx));
|
||||
} else if (OB_FAIL(waiting_task_map.get_refactored(tablet_id, tmp_task_ctx))) {
|
||||
if (ret == OB_HASH_NOT_EXIST) {
|
||||
ret = OB_SUCCESS;
|
||||
if (OB_FAIL(waiting_task_map.set_refactored(tablet_id, task_ctx))) {
|
||||
LOG_WARN("failed to set vector index memdata sync task ctx", K(ret), K(tablet_id), KPC(task_ctx));
|
||||
} else {
|
||||
LOG_INFO("success get replay vector index memdata sync task ctx", K(ret), K(tablet_id), KPC(task_ctx));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
LOG_INFO("success get replay vector index memdata sync task ctx", K(ret), K(tablet_id), KPC(task_ctx));
|
||||
} else { // // task already set, not scheduled
|
||||
LOG_INFO("duplicate vector index memdata sync task ctx", K(ret), K(tablet_id), KPC(task_ctx));
|
||||
}
|
||||
if (OB_FAIL(ret) && OB_NOT_NULL(task_ctx)) {
|
||||
if (ret == OB_HASH_EXIST) {
|
||||
ret = OB_SUCCESS;
|
||||
LOG_INFO("duplicate vector index memdata sync task ctx", K(ret), K(tablet_id), KPC(task_ctx));
|
||||
}
|
||||
task_ctx->~ObPluginVectorIndexTaskCtx();
|
||||
mgr->get_waiting_allocator().free(task_ctx); // not really free
|
||||
mgr->get_waiting_allocator().free(task_ctx); // arena allocator not really free
|
||||
task_ctx = nullptr;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (ret == OB_ALLOCATE_MEMORY_FAILED) {
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -1431,10 +1434,9 @@ int ObVectorIndexTask::process()
|
||||
bool need_stop = false;
|
||||
|
||||
while(!need_stop && OB_SUCC(ret)) {
|
||||
// need set context? should set attr in constructor
|
||||
lib::ContextParam param;
|
||||
// use dag mtl id for param refer to TTLtask
|
||||
param.set_mem_attr(MTL_ID(), "VecIdxTaskCtx", ObCtxIds::DEFAULT_CTX_ID)
|
||||
param.set_mem_attr(MTL_ID(), "VecIdxTaskCP", ObCtxIds::DEFAULT_CTX_ID)
|
||||
.set_properties(lib::USE_TL_PAGE_OPTIONAL);
|
||||
CREATE_WITH_TEMP_CONTEXT(param) {
|
||||
if (OB_FAIL(process_one())) {
|
||||
|
@ -178,8 +178,8 @@ struct ObPluginVectorIndexTaskCtx
|
||||
in_queue_(false),
|
||||
task_status_(ObVectorIndexTaskStatus::OB_TTL_TASK_PREPARE)
|
||||
{}
|
||||
TO_STRING_KV(K_(index_tablet_id), K_(index_table_id), K_(task_start_time), K_(last_modify_time),
|
||||
K_(failure_times), K_(in_queue), K_(task_status));
|
||||
TO_STRING_KV(K_(index_table_id), K_(index_tablet_id), K_(task_start_time), K_(last_modify_time),
|
||||
K_(failure_times), K_(err_code), K_(in_queue), K_(task_status));
|
||||
ObTabletID index_tablet_id_;
|
||||
uint64_t index_table_id_;
|
||||
int64_t task_start_time_;
|
||||
|
@ -280,6 +280,8 @@ int ObPluginVectorIndexMgr::get_or_create_partial_adapter_(ObTabletID tablet_id,
|
||||
} else { // not exist create new
|
||||
if (OB_FAIL(create_partial_adapter(tablet_id, ObTabletID(), type, allocator, OB_INVALID_ID, vec_index_param, dim))) {
|
||||
LOG_WARN("failed to create tmp vector index instance with ls", K(tablet_id), K(type), KR(ret));
|
||||
}
|
||||
if (OB_FAIL(ret) && (OB_HASH_EXIST != ret)) {
|
||||
} else if (OB_FAIL(get_adapter_inst_guard(tablet_id, adapter_guard))) {
|
||||
LOG_WARN("failed to get tmp vector index instance with ls", K(tablet_id), K(type), KR(ret));
|
||||
} else {
|
||||
@ -465,6 +467,8 @@ int ObPluginVectorIndexService::acquire_adapter_guard(ObLSID ls_id,
|
||||
ret = OB_SUCCESS;
|
||||
if (OB_FAIL(create_partial_adapter(ls_id, tablet_id, ObTabletID(), type, OB_INVALID_ID, vec_index_param, dim))) {
|
||||
LOG_WARN("failed to create tmp vector index instance", K(ls_id), K(tablet_id), K(type), KR(ret));
|
||||
}
|
||||
if (OB_FAIL(ret) && (OB_HASH_EXIST != ret)) {
|
||||
} else if (OB_FAIL(get_adapter_inst_guard(ls_id, tablet_id, adapter_guard))) {
|
||||
LOG_WARN("failed to get tmp vector index instance", K(ls_id), K(tablet_id), K(type), KR(ret));
|
||||
} else {
|
||||
@ -480,6 +484,8 @@ int ObPluginVectorIndexService::acquire_adapter_guard(ObLSID ls_id,
|
||||
} else { // not exist create new
|
||||
if (OB_FAIL(ls_index_mgr->create_partial_adapter(tablet_id, ObTabletID(), type, allocator_, OB_INVALID_ID, vec_index_param, dim))) {
|
||||
LOG_WARN("failed to create tmp vector index instance with ls", K(ls_id), K(tablet_id), K(type), KR(ret));
|
||||
}
|
||||
if (OB_FAIL(ret) && (OB_HASH_EXIST != ret)) {
|
||||
} else if (OB_FAIL(ls_index_mgr->get_adapter_inst_guard(tablet_id, adapter_guard))) {
|
||||
LOG_WARN("failed to get tmp vector index instance with ls", K(ls_id), K(tablet_id), K(type), KR(ret));
|
||||
} else {
|
||||
@ -658,7 +664,9 @@ int ObPluginVectorIndexService::acquire_vector_index_mgr(ObLSID ls_id, ObPluginV
|
||||
if (OB_FAIL(new_ls_index_mgr->init(tenant_id_, ls_id, memory_context_, &all_vsag_use_mem_))) {
|
||||
LOG_WARN("failed to init ls vector index mgr", K(ls_id), KR(ret));
|
||||
} else if (OB_FAIL(get_ls_index_mgr_map().set_refactored(ls_id, new_ls_index_mgr))) {
|
||||
LOG_WARN("set vector index mgr map faild", K(ls_id), KR(ret));
|
||||
if (ret != OB_HASH_EXIST) {
|
||||
LOG_WARN("set vector index mgr map faild", K(ls_id), KR(ret));
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
new_ls_index_mgr->~ObPluginVectorIndexMgr();
|
||||
@ -666,7 +674,7 @@ int ObPluginVectorIndexService::acquire_vector_index_mgr(ObLSID ls_id, ObPluginV
|
||||
new_ls_index_mgr = nullptr;
|
||||
mgr_buff = nullptr;
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
if (OB_FAIL(ret) && (OB_HASH_EXIST != ret)) {
|
||||
} else if (OB_FAIL(get_ls_index_mgr_map().get_refactored(ls_id, mgr))) {
|
||||
LOG_WARN("failed to get vector index mgr for ls", K(ls_id), KR(ret));
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user