[to #49335040]add thread lock for pl_cursor_map_
This commit is contained in:
@ -292,6 +292,9 @@ void ObSQLSessionInfo::reset(bool skip_sys_var)
|
|||||||
pl_query_sender_ = NULL;
|
pl_query_sender_ = NULL;
|
||||||
pl_ps_protocol_ = false;
|
pl_ps_protocol_ = false;
|
||||||
if (pl_cursor_cache_.is_inited()) {
|
if (pl_cursor_cache_.is_inited()) {
|
||||||
|
// when select GV$OPEN_CURSOR, we will add get_thread_data_lock to fetch pl_cursor_map_
|
||||||
|
// so we need get_thread_data_lock there
|
||||||
|
ObSQLSessionInfo::LockGuard lock_guard(get_thread_data_lock());
|
||||||
pl_cursor_cache_.reset();
|
pl_cursor_cache_.reset();
|
||||||
}
|
}
|
||||||
inner_conn_ = NULL;
|
inner_conn_ = NULL;
|
||||||
@ -1190,16 +1193,21 @@ int ObSQLSessionInfo::add_cursor(pl::ObPLCursorInfo *cursor)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (OB_FAIL(ret)) {
|
if (OB_FAIL(ret)) {
|
||||||
} else if (OB_FAIL(pl_cursor_cache_.pl_cursor_map_.set_refactored(id, cursor))) {
|
|
||||||
LOG_WARN("fail insert ps id to hash map", K(id), K(*cursor), K(ret));
|
|
||||||
} else {
|
} else {
|
||||||
cursor->set_id(id);
|
// when select GV$OPEN_CURSOR, we will add get_thread_data_lock to fetch pl_cursor_map_
|
||||||
add_cursor_success = true;
|
// so we need get_thread_data_lock there
|
||||||
if (lib::is_diagnose_info_enabled()) {
|
ObSQLSessionInfo::LockGuard lock_guard(get_thread_data_lock());
|
||||||
EVENT_INC(SQL_OPEN_CURSORS_CURRENT);
|
if (OB_FAIL(pl_cursor_cache_.pl_cursor_map_.set_refactored(id, cursor))) {
|
||||||
EVENT_INC(SQL_OPEN_CURSORS_CUMULATIVE);
|
LOG_WARN("fail insert ps id to hash map", K(id), K(*cursor), K(ret));
|
||||||
|
} else {
|
||||||
|
cursor->set_id(id);
|
||||||
|
add_cursor_success = true;
|
||||||
|
if (lib::is_diagnose_info_enabled()) {
|
||||||
|
EVENT_INC(SQL_OPEN_CURSORS_CURRENT);
|
||||||
|
EVENT_INC(SQL_OPEN_CURSORS_CUMULATIVE);
|
||||||
|
}
|
||||||
|
LOG_DEBUG("ps cursor: add cursor", K(ret), K(id), K(get_sessid()));
|
||||||
}
|
}
|
||||||
LOG_DEBUG("ps cursor: add cursor", K(ret), K(id), K(get_sessid()));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!add_cursor_success && OB_NOT_NULL(cursor)) {
|
if (!add_cursor_success && OB_NOT_NULL(cursor)) {
|
||||||
@ -1234,6 +1242,9 @@ int ObSQLSessionInfo::close_cursor(int64_t cursor_id)
|
|||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObPLCursorInfo *cursor = NULL;
|
ObPLCursorInfo *cursor = NULL;
|
||||||
LOG_INFO("ps cursor : remove cursor", K(ret), K(cursor_id), K(get_sessid()));
|
LOG_INFO("ps cursor : remove cursor", K(ret), K(cursor_id), K(get_sessid()));
|
||||||
|
// when select GV$OPEN_CURSOR, we will add get_thread_data_lock to fetch pl_cursor_map_
|
||||||
|
// so we need get_thread_data_lock there
|
||||||
|
ObSQLSessionInfo::LockGuard lock_guard(get_thread_data_lock());
|
||||||
if (OB_FAIL(pl_cursor_cache_.pl_cursor_map_.erase_refactored(cursor_id, &cursor))) {
|
if (OB_FAIL(pl_cursor_cache_.pl_cursor_map_.erase_refactored(cursor_id, &cursor))) {
|
||||||
LOG_WARN("cursor info not exist", K(cursor_id));
|
LOG_WARN("cursor info not exist", K(cursor_id));
|
||||||
} else if (OB_ISNULL(cursor)) {
|
} else if (OB_ISNULL(cursor)) {
|
||||||
@ -1286,6 +1297,9 @@ int ObSQLSessionInfo::init_cursor_cache()
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (!pl_cursor_cache_.is_inited()) {
|
if (!pl_cursor_cache_.is_inited()) {
|
||||||
|
// when select GV$OPEN_CURSOR, we will add get_thread_data_lock to fetch pl_cursor_map_
|
||||||
|
// so we need get_thread_data_lock there
|
||||||
|
ObSQLSessionInfo::LockGuard lock_guard(get_thread_data_lock());
|
||||||
OZ (pl_cursor_cache_.init(get_effective_tenant_id()),
|
OZ (pl_cursor_cache_.init(get_effective_tenant_id()),
|
||||||
get_effective_tenant_id(),
|
get_effective_tenant_id(),
|
||||||
get_proxy_sessid(),
|
get_proxy_sessid(),
|
||||||
@ -1299,6 +1313,9 @@ int ObSQLSessionInfo::close_dbms_cursor(int64_t cursor_id)
|
|||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObPLCursorInfo *cursor = NULL;
|
ObPLCursorInfo *cursor = NULL;
|
||||||
LOG_INFO("remove dbms cursor", K(ret), K(cursor_id), K(get_sessid()));
|
LOG_INFO("remove dbms cursor", K(ret), K(cursor_id), K(get_sessid()));
|
||||||
|
// when select GV$OPEN_CURSOR, we will add get_thread_data_lock to fetch pl_cursor_map_
|
||||||
|
// so we need get_thread_data_lock there
|
||||||
|
ObSQLSessionInfo::LockGuard lock_guard(get_thread_data_lock());
|
||||||
OZ (pl_cursor_cache_.pl_cursor_map_.erase_refactored(cursor_id, &cursor), cursor_id);
|
OZ (pl_cursor_cache_.pl_cursor_map_.erase_refactored(cursor_id, &cursor), cursor_id);
|
||||||
OV (OB_NOT_NULL(cursor), OB_ERR_UNEXPECTED, cursor_id);
|
OV (OB_NOT_NULL(cursor), OB_ERR_UNEXPECTED, cursor_id);
|
||||||
if (OB_SUCC(ret) && lib::is_diagnose_info_enabled()) {
|
if (OB_SUCC(ret) && lib::is_diagnose_info_enabled()) {
|
||||||
|
|||||||
Reference in New Issue
Block a user