[CP] when sql response has not been sent completely and the socket been closed,there will be reference count leakage

This commit is contained in:
496148326@qq.com
2023-07-03 11:12:33 +00:00
committed by ob-robot
parent 469144616f
commit 92f96f39d8
7 changed files with 39 additions and 23 deletions

View File

@ -22,6 +22,7 @@
#include "lib/thread/ob_thread_name.h"
#include "lib/utility/ob_macro_utils.h"
#include "lib/profile/ob_trace_id.h"
#include "common/ob_clock_generator.h"
#include <sys/epoll.h>
#include <sys/types.h>
#include <sys/socket.h>
@ -305,6 +306,7 @@ public:
}
return ret;
}
TO_STRING_KV(KP_(buf), K_(sz));
private:
int do_write(int fd, const char* buf, int64_t sz, int64_t& consume_bytes) {
int ret = OB_SUCCESS;
@ -345,8 +347,9 @@ public:
}
~ObSqlSock() {}
int64_t get_remain_sz() const { return read_buffer_.get_remain_sz(); }
TO_STRING_KV(KP(this), K_(fd), K_(err), K(last_decode_time_), K(last_write_time_),
K(read_buffer_.get_consume_sz()), K(get_pending_flag()), KPC(get_trace_id()));
TO_STRING_KV(KP(this), "session_id", get_sql_session_id(), "trace_id", get_trace_id(), "sql_handling_stage", get_sql_request_execute_state(), "sql_initiative_shutdown", need_shutdown_,
K_(fd), K_(err), K_(last_decode_time), K_(last_write_time), K_(pending_write_task), K_(need_epoll_trigger_write),
"consume_size", read_buffer_.get_consume_sz(), "pending_flag", get_pending_flag(), "may_handling_flag", get_may_handling_flag(), K_(handler_close_flag));
ObSqlNioImpl *get_nio_impl() { return nio_impl_; }
void set_nio_impl(ObSqlNioImpl *impl) { nio_impl_ = impl; }
bool set_error(int err) { return 0 == ATOMIC_TAS(&err_, err); }
@ -409,11 +412,6 @@ public:
return ret;
}
const rpc::TraceId* get_trace_id() const {
ObSqlSockSession* sess = (ObSqlSockSession *)sess_;
return &(sess->sql_req_.get_trace_id());
}
bool wait_handling() { return ready_flag_.set_ready(); }
int32_t get_pending_flag() const { return ready_flag_.get_pending_flag(); }
void set_writable() { write_cond_.signal(); }
bool set_readable() { return ready_flag_.set_ready(); }
@ -463,6 +461,19 @@ private:
int64_t last_decode_time_;
int64_t last_write_time_;
void* sql_session_info_;
private:
const rpc::TraceId* get_trace_id() const {
ObSqlSockSession* sess = (ObSqlSockSession *)sess_;
return &(sess->sql_req_.get_trace_id());
}
int32_t get_sql_request_execute_state() const {
ObSqlSockSession* sess = (ObSqlSockSession *)sess_;
return sess->sql_req_.handling_state_;
}
uint32_t get_sql_session_id() const {
ObSqlSockSession* sess = (ObSqlSockSession *)sess_;
return sess->sql_session_id_;
}
public:
char sess_[3000] __attribute__((aligned(16)));
};
@ -801,18 +812,17 @@ private:
while(cur != head) {
ObSqlSock* s = CONTAINER_OF(cur, ObSqlSock, dlink_);
cur = cur->next_;
bool need_destroy = false;
if (false == s->handler_close_been_called()) {
if (false == s->get_may_handling_flag()) {
bool need_destroy = true;
if (0 == s->get_pending_flag()) {
LOG_INFO("sock ref clean, do destroy", K(*s));
} else if (false == s->get_may_handling_flag()) {
LOG_INFO("can close safely, do destroy", K(*s));
need_destroy = true;
} else if (s->is_need_epoll_trigger_write()) {
LOG_INFO("data hasn't write completely and need close, do destroy", K(*s));
} else {
if (s->wait_handling()) {
LOG_INFO("sock ref clean, do destroy", K(*s));
need_destroy = true;
} else {
LOG_TRACE("wait handling done...", K(*s));
}
need_destroy = false;
LOG_TRACE("wait handling done...", K(*s));
}
if (need_destroy) {
handler_.on_close(s->sess_, 0);
@ -829,12 +839,11 @@ private:
}
void handle_sock_event(ObSqlSock* s, uint32_t mask) {
if (OB_UNLIKELY((EPOLLERR & mask) || (EPOLLHUP & mask) || (EPOLLRDHUP & mask))) {
if (s->set_error(EIO)) {
LOG_WARN_RET(OB_ERR_SYS, "sock error detect by epoll", K(mask), K(*s));
LOG_WARN_RET(OB_SUCCESS, "socket closed, it maybe disconnected by the client or by observer actively", K(mask), K(*s));
prepare_destroy(s);
} else {
LOG_WARN_RET(OB_ERR_SYS, "sock error detect by epoll, and worker thread alread set error", K(*s));
LOG_WARN_RET(OB_SUCCESS, "socket closed, and worker thread alread set error", K(mask), K(*s));
}
} else {
int err = 0;