Fix deadlock when auto increment service alloc handle
This commit is contained in:
@ -108,7 +108,7 @@ int TableNode::alloc_handle(ObSmallAllocator &allocator,
|
|||||||
needed_interval = max_value;
|
needed_interval = max_value;
|
||||||
} else if (min_value > node.cache_end_) {
|
} else if (min_value > node.cache_end_) {
|
||||||
ret = OB_SIZE_OVERFLOW;
|
ret = OB_SIZE_OVERFLOW;
|
||||||
LOG_WARN("fail to alloc handle; cache is not enough", K(min_value), K(max_value), K(node), K(*this), K(ret));
|
LOG_TRACE("fail to alloc handle; cache is not enough", K(min_value), K(max_value), K(node), K(*this), K(ret));
|
||||||
} else {
|
} else {
|
||||||
ret = ObAutoincrementService::calc_next_value(min_value,
|
ret = ObAutoincrementService::calc_next_value(min_value,
|
||||||
offset,
|
offset,
|
||||||
@ -346,7 +346,7 @@ int ObAutoincrementService::get_handle(AutoincParam ¶m,
|
|||||||
// alloc handle
|
// alloc handle
|
||||||
bool need_prefetch = false;
|
bool need_prefetch = false;
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
if (OB_FAIL(table_node->alloc_mutex_.lock())) {
|
if (OB_FAIL(alloc_autoinc_try_lock(table_node->alloc_mutex_))) {
|
||||||
LOG_WARN("failed to get alloc lock", K(param), K(*table_node), K(ret));
|
LOG_WARN("failed to get alloc lock", K(param), K(*table_node), K(ret));
|
||||||
} else {
|
} else {
|
||||||
if (OB_SIZE_OVERFLOW == (ret = table_node->alloc_handle(handle_allocator_,
|
if (OB_SIZE_OVERFLOW == (ret = table_node->alloc_handle(handle_allocator_,
|
||||||
@ -401,7 +401,7 @@ int ObAutoincrementService::get_handle(AutoincParam ¶m,
|
|||||||
TableNode mock_node;
|
TableNode mock_node;
|
||||||
if (OB_FAIL(fetch_table_node(param, &mock_node, true))) {
|
if (OB_FAIL(fetch_table_node(param, &mock_node, true))) {
|
||||||
LOG_WARN("failed to fetch table node", K(param), K(ret));
|
LOG_WARN("failed to fetch table node", K(param), K(ret));
|
||||||
} else if (OB_FAIL(table_node->alloc_mutex_.lock())) {
|
} else if (OB_FAIL(alloc_autoinc_try_lock(table_node->alloc_mutex_))) {
|
||||||
LOG_WARN("failed to get alloc mutex lock", K(ret));
|
LOG_WARN("failed to get alloc mutex lock", K(ret));
|
||||||
} else {
|
} else {
|
||||||
LOG_INFO("fetch table node success", K(param), K(mock_node), K(*table_node));
|
LOG_INFO("fetch table node success", K(param), K(mock_node), K(*table_node));
|
||||||
@ -699,6 +699,24 @@ int ObAutoincrementService::get_schema(share::schema::ObSchemaGetterGuard &schem
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObAutoincrementService::alloc_autoinc_try_lock(lib::ObMutex &alloc_mutex)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
static const int64_t SLEEP_TS_US = 10;
|
||||||
|
while (OB_SUCC(ret) && OB_FAIL(alloc_mutex.trylock())) {
|
||||||
|
if (OB_EAGAIN == ret) {
|
||||||
|
THIS_WORKER.check_wait();
|
||||||
|
ob_usleep(SLEEP_TS_US);
|
||||||
|
if (OB_FAIL(THIS_WORKER.check_status())) { // override ret code
|
||||||
|
LOG_WARN("fail to check worker status", K(ret));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
LOG_WARN("fail to try lock mutex", K(ret));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
int ObAutoincrementService::fetch_table_node(const AutoincParam ¶m,
|
int ObAutoincrementService::fetch_table_node(const AutoincParam ¶m,
|
||||||
TableNode *table_node,
|
TableNode *table_node,
|
||||||
const bool fetch_prefetch)
|
const bool fetch_prefetch)
|
||||||
@ -954,7 +972,7 @@ int ObAutoincrementService::sync_insert_value(AutoincParam ¶m,
|
|||||||
} else if (OB_FAIL(service->local_push_to_global_value(key, max_value, value_to_sync, global_sync_value))) {
|
} else if (OB_FAIL(service->local_push_to_global_value(key, max_value, value_to_sync, global_sync_value))) {
|
||||||
LOG_WARN("fail sync value to global", K(key), K(insert_value), K(ret));
|
LOG_WARN("fail sync value to global", K(key), K(insert_value), K(ret));
|
||||||
} else {
|
} else {
|
||||||
if (OB_FAIL(table_node->alloc_mutex_.lock())) {
|
if (OB_FAIL(alloc_autoinc_try_lock(table_node->alloc_mutex_))) {
|
||||||
LOG_WARN("fail to get alloc mutex", K(ret));
|
LOG_WARN("fail to get alloc mutex", K(ret));
|
||||||
} else {
|
} else {
|
||||||
if (value_to_sync <= global_sync_value) {
|
if (value_to_sync <= global_sync_value) {
|
||||||
@ -1006,7 +1024,7 @@ int ObAutoincrementService::sync_insert_value(AutoincParam ¶m,
|
|||||||
// - if we don't, and other thread fetchs a small cache value, it is not OK.
|
// - if we don't, and other thread fetchs a small cache value, it is not OK.
|
||||||
// SO, we must fetch table node here.
|
// SO, we must fetch table node here.
|
||||||
// syncing insert_value is not the common case. perf acceptable
|
// syncing insert_value is not the common case. perf acceptable
|
||||||
if (OB_FAIL(table_node->alloc_mutex_.lock())) {
|
if (OB_FAIL(alloc_autoinc_try_lock(table_node->alloc_mutex_))) {
|
||||||
LOG_WARN("fail to get alloc mutex", K(ret));
|
LOG_WARN("fail to get alloc mutex", K(ret));
|
||||||
} else {
|
} else {
|
||||||
if (insert_value >= table_node->curr_node_.cache_end_
|
if (insert_value >= table_node->curr_node_.cache_end_
|
||||||
|
|||||||
@ -442,6 +442,8 @@ private:
|
|||||||
return service;
|
return service;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int alloc_autoinc_try_lock(lib::ObMutex &alloc_mutex);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
common::ObSmallAllocator node_allocator_;
|
common::ObSmallAllocator node_allocator_;
|
||||||
common::ObSmallAllocator handle_allocator_;
|
common::ObSmallAllocator handle_allocator_;
|
||||||
|
|||||||
Reference in New Issue
Block a user