Fix retire deadlock when retire mb handle in sync wash
This commit is contained in:
45
src/share/cache/ob_kvcache_store.cpp
vendored
45
src/share/cache/ob_kvcache_store.cpp
vendored
@ -112,9 +112,7 @@ void ObKVCacheStore::destroy()
|
|||||||
}
|
}
|
||||||
|
|
||||||
// free all mb handles cached by threads
|
// free all mb handles cached by threads
|
||||||
HazardList reclaim_list;
|
purge_mb_handle_retire_station();
|
||||||
get_retire_station().purge(reclaim_list);
|
|
||||||
reuse_mb_handles(reclaim_list);
|
|
||||||
|
|
||||||
ob_free(mb_handles_);
|
ob_free(mb_handles_);
|
||||||
mb_handles_ = NULL;
|
mb_handles_ = NULL;
|
||||||
@ -338,8 +336,8 @@ bool ObKVCacheStore::wash()
|
|||||||
}
|
}
|
||||||
current_time = ObTimeUtility::current_time();
|
current_time = ObTimeUtility::current_time();
|
||||||
wash_time = current_time - start_time;
|
wash_time = current_time - start_time;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
purge_mb_handle_retire_station();
|
||||||
COMMON_LOG(INFO, "Wash time detail, ", K(compute_wash_size_time), K(refresh_score_time), K(wash_time));
|
COMMON_LOG(INFO, "Wash time detail, ", K(compute_wash_size_time), K(refresh_score_time), K(wash_time));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -590,7 +588,7 @@ int ObKVCacheStore::try_flush_washable_mb(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
de_handle_ref(handle);
|
de_handle_ref(handle, false /* do_retire */);
|
||||||
}
|
}
|
||||||
if (can_try_wash) {
|
if (can_try_wash) {
|
||||||
void *buf = nullptr;
|
void *buf = nullptr;
|
||||||
@ -676,7 +674,7 @@ int ObKVCacheStore::try_flush_washable_mb(
|
|||||||
|
|
||||||
COMMON_LOG(INFO, "ObKVCache try flush washable memblock details", K(ret), K(force_flush), K(tenant_id),
|
COMMON_LOG(INFO, "ObKVCache try flush washable memblock details", K(ret), K(force_flush), K(tenant_id),
|
||||||
K(cache_id), K(size_washed), K(size_need_washed));
|
K(cache_id), K(size_washed), K(size_need_washed));
|
||||||
retire_mb_handles(retire_list);
|
retire_mb_handles(retire_list, true /* do retire */);
|
||||||
|
|
||||||
COMMON_LOG(DEBUG, "Try flush cache result", K(size_washed), K(size_need_washed), K(tenant_id), K(cache_id), K(ret));
|
COMMON_LOG(DEBUG, "Try flush cache result", K(size_washed), K(size_need_washed), K(tenant_id), K(cache_id), K(ret));
|
||||||
}
|
}
|
||||||
@ -711,6 +709,13 @@ int ObKVCacheStore::inner_push_memblock_info(const ObKVMemBlockHandle &handle, O
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ObKVCacheStore::purge_mb_handle_retire_station()
|
||||||
|
{
|
||||||
|
HazardList reclaim_list;
|
||||||
|
get_retire_station().purge(reclaim_list);
|
||||||
|
reuse_mb_handles(reclaim_list);
|
||||||
|
}
|
||||||
|
|
||||||
int ObKVCacheStore::get_memblock_info(const uint64_t tenant_id, ObIArray<ObKVCacheStoreMemblockInfo> &memblock_infos)
|
int ObKVCacheStore::get_memblock_info(const uint64_t tenant_id, ObIArray<ObKVCacheStoreMemblockInfo> &memblock_infos)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
@ -849,7 +854,7 @@ int ObKVCacheStore::mark_washable(ObKVMemBlockHandle *mb_handle)
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObKVCacheStore::free_mbhandle(ObKVMemBlockHandle *mb_handle)
|
int ObKVCacheStore::free_mbhandle(ObKVMemBlockHandle *mb_handle, const bool do_retire)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (!inited_) {
|
if (!inited_) {
|
||||||
@ -866,7 +871,7 @@ int ObKVCacheStore::free_mbhandle(ObKVMemBlockHandle *mb_handle)
|
|||||||
COMMON_LOG(ERROR, "do_wash_mb failed", K(ret));
|
COMMON_LOG(ERROR, "do_wash_mb failed", K(ret));
|
||||||
} else {
|
} else {
|
||||||
free_mb(*mb_handle->inst_->mb_list_handle_.get_resource_handle(), tenant_id, buf);
|
free_mb(*mb_handle->inst_->mb_list_handle_.get_resource_handle(), tenant_id, buf);
|
||||||
if (OB_FAIL(remove_mb_handle(mb_handle))) {
|
if (OB_FAIL(remove_mb_handle(mb_handle, do_retire))) {
|
||||||
COMMON_LOG(WARN, "remove_mb failed", K(ret));
|
COMMON_LOG(WARN, "remove_mb failed", K(ret));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -950,7 +955,7 @@ int ObKVCacheStore::alloc_mbhandle(
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ObKVCacheStore::de_handle_ref(ObKVMemBlockHandle *mb_handle)
|
void ObKVCacheStore::de_handle_ref(ObKVMemBlockHandle *mb_handle, const bool do_retire)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
uint32_t ref_cnt = 0;
|
uint32_t ref_cnt = 0;
|
||||||
@ -960,7 +965,7 @@ void ObKVCacheStore::de_handle_ref(ObKVMemBlockHandle *mb_handle)
|
|||||||
} else if (OB_FAIL(mb_handle->handle_ref_.dec_ref_cnt_and_inc_seq_num(ref_cnt))) {
|
} else if (OB_FAIL(mb_handle->handle_ref_.dec_ref_cnt_and_inc_seq_num(ref_cnt))) {
|
||||||
COMMON_LOG(ERROR, "Fail to dec ref cnt, ", K(ret));
|
COMMON_LOG(ERROR, "Fail to dec ref cnt, ", K(ret));
|
||||||
} else if (0 == ref_cnt) {
|
} else if (0 == ref_cnt) {
|
||||||
if (OB_FAIL(free_mbhandle(mb_handle))) {
|
if (OB_FAIL(free_mbhandle(mb_handle, do_retire))) {
|
||||||
COMMON_LOG(WARN, "free_mbhandle failed", K(ret));
|
COMMON_LOG(WARN, "free_mbhandle failed", K(ret));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1393,7 +1398,7 @@ int ObKVCacheStore::insert_mb_handle(ObDLink *head, ObKVMemBlockHandle *mb_handl
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObKVCacheStore::remove_mb_handle(ObKVMemBlockHandle *mb_handle)
|
int ObKVCacheStore::remove_mb_handle(ObKVMemBlockHandle *mb_handle, const bool do_retire)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (!inited_) {
|
if (!inited_) {
|
||||||
@ -1403,30 +1408,34 @@ int ObKVCacheStore::remove_mb_handle(ObKVMemBlockHandle *mb_handle)
|
|||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
COMMON_LOG(WARN, "invalid arguments", K(ret), KP(mb_handle));
|
COMMON_LOG(WARN, "invalid arguments", K(ret), KP(mb_handle));
|
||||||
} else {
|
} else {
|
||||||
{
|
if (do_retire) {
|
||||||
|
// default
|
||||||
QClockGuard guard(get_qclock());
|
QClockGuard guard(get_qclock());
|
||||||
dl_del(mb_handle);
|
dl_del(mb_handle);
|
||||||
|
} else {
|
||||||
|
// sync wash has already get qclock
|
||||||
|
dl_del(mb_handle);
|
||||||
}
|
}
|
||||||
retire_mb_handle(mb_handle);
|
retire_mb_handle(mb_handle, do_retire);
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ObKVCacheStore::retire_mb_handle(ObKVMemBlockHandle *mb_handle)
|
void ObKVCacheStore::retire_mb_handle(ObKVMemBlockHandle *mb_handle, const bool do_retire)
|
||||||
{
|
{
|
||||||
if (NULL != mb_handle) {
|
if (NULL != mb_handle) {
|
||||||
HazardList retire_list;
|
HazardList retire_list;
|
||||||
retire_list.push(&mb_handle->retire_link_);
|
retire_list.push(&mb_handle->retire_link_);
|
||||||
retire_mb_handles(retire_list);
|
retire_mb_handles(retire_list, do_retire);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void ObKVCacheStore::retire_mb_handles(HazardList &retire_list)
|
void ObKVCacheStore::retire_mb_handles(HazardList &retire_list, const bool do_retire)
|
||||||
{
|
{
|
||||||
if (retire_list.size() > 0) {
|
if (retire_list.size() > 0) {
|
||||||
HazardList reclaim_list;
|
HazardList reclaim_list;
|
||||||
int64_t retire_limit = RETIRE_LIMIT;
|
int64_t retire_limit = do_retire ? RETIRE_LIMIT : INT64_MAX;
|
||||||
if (wash_itid_ == get_itid()) {
|
if (wash_itid_ == get_itid()) { // wash thread should not sync wash
|
||||||
retire_limit = WASH_THREAD_RETIRE_LIMIT;
|
retire_limit = WASH_THREAD_RETIRE_LIMIT;
|
||||||
}
|
}
|
||||||
get_retire_station().retire(reclaim_list, retire_list, retire_limit);
|
get_retire_station().retire(reclaim_list, retire_list, retire_limit);
|
||||||
|
13
src/share/cache/ob_kvcache_store.h
vendored
13
src/share/cache/ob_kvcache_store.h
vendored
@ -45,7 +45,7 @@ public:
|
|||||||
const enum ObKVCachePolicy policy = LRU);
|
const enum ObKVCachePolicy policy = LRU);
|
||||||
protected:
|
protected:
|
||||||
virtual bool add_handle_ref(MBWrapper *mb_wrapper) = 0;
|
virtual bool add_handle_ref(MBWrapper *mb_wrapper) = 0;
|
||||||
virtual void de_handle_ref(MBWrapper *mb_wrapper) = 0;
|
virtual void de_handle_ref(MBWrapper *mb_wrapper, const bool do_retire = true) = 0;
|
||||||
virtual int alloc(ObKVCacheInst &inst, const enum ObKVCachePolicy policy,
|
virtual int alloc(ObKVCacheInst &inst, const enum ObKVCachePolicy policy,
|
||||||
const int64_t block_size, MBWrapper *&mb_wrapper) = 0;
|
const int64_t block_size, MBWrapper *&mb_wrapper) = 0;
|
||||||
virtual int free(MBWrapper *mb_wrapper) = 0;
|
virtual int free(MBWrapper *mb_wrapper) = 0;
|
||||||
@ -84,12 +84,12 @@ public:
|
|||||||
ObKVMemBlockHandle *&mb_handle);
|
ObKVMemBlockHandle *&mb_handle);
|
||||||
virtual int alloc_mbhandle(ObKVCacheInst &inst, ObKVMemBlockHandle *&mb_handle);
|
virtual int alloc_mbhandle(ObKVCacheInst &inst, ObKVMemBlockHandle *&mb_handle);
|
||||||
virtual int alloc_mbhandle(const ObKVCacheInstKey &inst_key, ObKVMemBlockHandle *&mb_handle);
|
virtual int alloc_mbhandle(const ObKVCacheInstKey &inst_key, ObKVMemBlockHandle *&mb_handle);
|
||||||
virtual int free_mbhandle(ObKVMemBlockHandle *mb_handle);
|
virtual int free_mbhandle(ObKVMemBlockHandle *mb_handle, const bool do_retire);
|
||||||
virtual int mark_washable(ObKVMemBlockHandle *mb_handle);
|
virtual int mark_washable(ObKVMemBlockHandle *mb_handle);
|
||||||
|
|
||||||
virtual bool add_handle_ref(ObKVMemBlockHandle *mb_handle, const uint32_t seq_num);
|
virtual bool add_handle_ref(ObKVMemBlockHandle *mb_handle, const uint32_t seq_num);
|
||||||
virtual bool add_handle_ref(ObKVMemBlockHandle *mb_handle);
|
virtual bool add_handle_ref(ObKVMemBlockHandle *mb_handle);
|
||||||
virtual void de_handle_ref(ObKVMemBlockHandle *mb_handle);
|
virtual void de_handle_ref(ObKVMemBlockHandle *mb_handle, const bool do_retire = true);
|
||||||
int64_t get_handle_ref_cnt(const ObKVMemBlockHandle *mb_handle);
|
int64_t get_handle_ref_cnt(const ObKVMemBlockHandle *mb_handle);
|
||||||
virtual int64_t get_block_size() const { return block_size_; }
|
virtual int64_t get_block_size() const { return block_size_; }
|
||||||
// implement functions of ObIMBWrapperMgr
|
// implement functions of ObIMBWrapperMgr
|
||||||
@ -111,6 +111,7 @@ private:
|
|||||||
const int64_t size_need_washed = INT64_MAX,
|
const int64_t size_need_washed = INT64_MAX,
|
||||||
const bool force_flush = false);
|
const bool force_flush = false);
|
||||||
int inner_push_memblock_info(const ObKVMemBlockHandle &handle, ObIArray<ObKVCacheStoreMemblockInfo> &memblock_infos);
|
int inner_push_memblock_info(const ObKVMemBlockHandle &handle, ObIArray<ObKVCacheStoreMemblockInfo> &memblock_infos);
|
||||||
|
void purge_mb_handle_retire_station();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
static const int64_t SYNC_WASH_MB_TIMEOUT_US = 100 * 1000; // 100ms
|
static const int64_t SYNC_WASH_MB_TIMEOUT_US = 100 * 1000; // 100ms
|
||||||
@ -213,9 +214,9 @@ private:
|
|||||||
}
|
}
|
||||||
|
|
||||||
int insert_mb_handle(common::ObDLink *head, ObKVMemBlockHandle *mb_handle);
|
int insert_mb_handle(common::ObDLink *head, ObKVMemBlockHandle *mb_handle);
|
||||||
int remove_mb_handle(ObKVMemBlockHandle *mb_handle);
|
int remove_mb_handle(ObKVMemBlockHandle *mb_handle, const bool do_retire);
|
||||||
void retire_mb_handle(ObKVMemBlockHandle *mb_handle);
|
void retire_mb_handle(ObKVMemBlockHandle *mb_handle, const bool do_retire);
|
||||||
void retire_mb_handles(HazardList &retire_list);
|
void retire_mb_handles(HazardList &retire_list, const bool do_retire);
|
||||||
void reuse_mb_handles(HazardList &reclaim_list);
|
void reuse_mb_handles(HazardList &reclaim_list);
|
||||||
bool try_supply_mb(const int64_t mb_count);
|
bool try_supply_mb(const int64_t mb_count);
|
||||||
private:
|
private:
|
||||||
|
4
src/share/cache/ob_kvcache_struct.h
vendored
4
src/share/cache/ob_kvcache_struct.h
vendored
@ -261,12 +261,12 @@ public:
|
|||||||
ObKVMemBlockHandle *&mb_handle) = 0;
|
ObKVMemBlockHandle *&mb_handle) = 0;
|
||||||
virtual int alloc_mbhandle(ObKVCacheInst &inst, ObKVMemBlockHandle *&mb_handle) = 0;
|
virtual int alloc_mbhandle(ObKVCacheInst &inst, ObKVMemBlockHandle *&mb_handle) = 0;
|
||||||
virtual int alloc_mbhandle(const ObKVCacheInstKey &inst_key, ObKVMemBlockHandle *&mb_handle) = 0;
|
virtual int alloc_mbhandle(const ObKVCacheInstKey &inst_key, ObKVMemBlockHandle *&mb_handle) = 0;
|
||||||
virtual int free_mbhandle(ObKVMemBlockHandle *mb_handle) = 0;
|
virtual int free_mbhandle(ObKVMemBlockHandle *mb_handle, const bool do_retire) = 0;
|
||||||
virtual int mark_washable(ObKVMemBlockHandle *mb_handle) = 0;
|
virtual int mark_washable(ObKVMemBlockHandle *mb_handle) = 0;
|
||||||
|
|
||||||
virtual bool add_handle_ref(ObKVMemBlockHandle *mb_handle, const uint32_t seq_num) = 0;
|
virtual bool add_handle_ref(ObKVMemBlockHandle *mb_handle, const uint32_t seq_num) = 0;
|
||||||
virtual bool add_handle_ref(ObKVMemBlockHandle *mb_handle) = 0;
|
virtual bool add_handle_ref(ObKVMemBlockHandle *mb_handle) = 0;
|
||||||
virtual void de_handle_ref(ObKVMemBlockHandle *mb_handle) = 0;
|
virtual void de_handle_ref(ObKVMemBlockHandle *mb_handle, const bool do_retire = true) = 0;
|
||||||
|
|
||||||
virtual int64_t get_block_size() const = 0;
|
virtual int64_t get_block_size() const = 0;
|
||||||
};
|
};
|
||||||
|
6
src/share/cache/ob_working_set_mgr.cpp
vendored
6
src/share/cache/ob_working_set_mgr.cpp
vendored
@ -142,10 +142,10 @@ bool ObWorkingSet::add_handle_ref(WorkingSetMB *ws_mb)
|
|||||||
return added;
|
return added;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ObWorkingSet::de_handle_ref(WorkingSetMB *ws_mb)
|
void ObWorkingSet::de_handle_ref(WorkingSetMB *ws_mb, const bool do_retire)
|
||||||
{
|
{
|
||||||
if (NULL != ws_mb) {
|
if (NULL != ws_mb) {
|
||||||
mb_handle_allocator_->de_handle_ref(ws_mb->mb_handle_);
|
mb_handle_allocator_->de_handle_ref(ws_mb->mb_handle_, do_retire);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -347,7 +347,7 @@ bool ObWorkingSet::try_reuse_mb(WorkingSetMB *ws_mb, ObKVMemBlockHandle *&mb_han
|
|||||||
if (ATOMIC_BCAS((uint32_t*)(&reused_handle->status_), FULL, FREE)) {
|
if (ATOMIC_BCAS((uint32_t*)(&reused_handle->status_), FULL, FREE)) {
|
||||||
if (reused_handle->handle_ref_.try_check_and_inc_seq_num(seq_num)) {
|
if (reused_handle->handle_ref_.try_check_and_inc_seq_num(seq_num)) {
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (OB_FAIL(mb_handle_allocator_->free_mbhandle(reused_handle))) {
|
if (OB_FAIL(mb_handle_allocator_->free_mbhandle(reused_handle, true /* do_retire */))) {
|
||||||
LOG_WARN("free_mbhandle failed", K(ret));
|
LOG_WARN("free_mbhandle failed", K(ret));
|
||||||
} else {
|
} else {
|
||||||
reused = true;
|
reused = true;
|
||||||
|
2
src/share/cache/ob_working_set_mgr.h
vendored
2
src/share/cache/ob_working_set_mgr.h
vendored
@ -74,7 +74,7 @@ public:
|
|||||||
|
|
||||||
// implemnt functions of ObIKVCacheStore<WorkingSetMB>
|
// implemnt functions of ObIKVCacheStore<WorkingSetMB>
|
||||||
virtual bool add_handle_ref(WorkingSetMB *ws_mb);
|
virtual bool add_handle_ref(WorkingSetMB *ws_mb);
|
||||||
virtual void de_handle_ref(WorkingSetMB *ws_mb);
|
virtual void de_handle_ref(WorkingSetMB *ws_mb, const bool do_retire);
|
||||||
virtual int alloc(ObKVCacheInst &inst, const enum ObKVCachePolicy policy,
|
virtual int alloc(ObKVCacheInst &inst, const enum ObKVCachePolicy policy,
|
||||||
const int64_t block_size, WorkingSetMB *&ws_mb);
|
const int64_t block_size, WorkingSetMB *&ws_mb);
|
||||||
virtual int free(WorkingSetMB *ws_mb);
|
virtual int free(WorkingSetMB *ws_mb);
|
||||||
|
Reference in New Issue
Block a user