[CP] [to #45358808]add session lock for close_cursor in fetch

This commit is contained in:
LiuYoung00 2022-11-07 09:38:20 +00:00 committed by wangzelin.wzl
parent 8fe665e011
commit 142433eb7b
2 changed files with 22 additions and 23 deletions

View File

@ -517,15 +517,14 @@ int ObMPBase::response_row(ObSQLSessionInfo &session,
}
}
}
if (OB_SUCC(ret)) {
const ObDataTypeCastParams dtc_params = ObBasicSessionInfo::create_dtc_params(&session);
ObSMRow sm_row(obmysql::BINARY, row, dtc_params, fields);
obmysql::OMPKRow rp(sm_row);
if (OB_FAIL(response_packet(rp, &session))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("response packet fail", K(ret));
if (OB_SUCC(ret)) {
const ObDataTypeCastParams dtc_params = ObBasicSessionInfo::create_dtc_params(&session);
ObSMRow sm_row(obmysql::BINARY, row, dtc_params, fields);
obmysql::OMPKRow rp(sm_row);
if (OB_FAIL(response_packet(rp, &session))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("response packet fail", K(ret));
}
}
}
return ret;

View File

@ -746,28 +746,28 @@ int ObMPStmtFetch::process()
OB_SYS_TENANT_ID, sys_version))) {
LOG_WARN("fail get tenant broadcast version", K(ret));
} else {
ObPLCursorInfo *cursor = NULL;
THIS_WORKER.set_timeout_ts(get_receive_timestamp() + query_timeout);
session.partition_hit().reset();
ret = process_fetch_stmt(session);
// set cursor fetched info. if cursor has be fetched, we need to disconnect
cursor = session.get_cursor(cursor_id_);
if (OB_NOT_NULL(cursor) && cursor->get_fetched()) {
cursor_fetched = true;
}
if (need_close_cursor()) {
// close at here because after do_process, need read some cursor info for log in process_fetch_stmt
int tmp_ret = session.close_cursor(cursor_id_);
ret = ret == OB_SUCCESS ? tmp_ret : ret;
if (OB_SUCCESS != tmp_ret) {
LOG_WARN("no scrollable cursor close cursor failed at last row.", K(tmp_ret));
}
}
}
session.check_and_reset_retry_info(*cur_trace_id, THIS_WORKER.need_retry());
session.set_last_trace_id(ObCurTraceId::get_trace_id());
}
// set cursor fetched info. if cursor has be fetched, we need to disconnect
if (OB_NOT_NULL(sess)) {
ObPLCursorInfo *cursor = sess->get_cursor(cursor_id_);
if (OB_NOT_NULL(cursor) && cursor->get_fetched()) {
cursor_fetched = true;
}
if (need_close_cursor()) {
int tmp_ret = sess->close_cursor(cursor_id_);
if (OB_SUCCESS != tmp_ret) {
ret = OB_SUCCESS == ret ? tmp_ret : ret;
LOG_WARN("no scrollable cursor close cursor failed at last row.", K(ret), K(tmp_ret));
}
}
}
if (!OB_SUCC(ret) && is_conn_valid()) {
send_error_packet(ret, NULL);
if (cursor_fetched) {