fix the core that sql nio send handshake packet failed and free ObSqlSock twice
This commit is contained in:
26
deps/oblib/src/rpc/obmysql/ob_sql_nio.cpp
vendored
26
deps/oblib/src/rpc/obmysql/ob_sql_nio.cpp
vendored
@ -436,6 +436,7 @@ public:
|
|||||||
void shutdown() { ::shutdown(fd_, SHUT_RD); }
|
void shutdown() { ::shutdown(fd_, SHUT_RD); }
|
||||||
int set_ssl_enabled();
|
int set_ssl_enabled();
|
||||||
SSL* get_ssl_st();
|
SSL* get_ssl_st();
|
||||||
|
int write_handshake_packet(const char* buf, int64_t sz);
|
||||||
public:
|
public:
|
||||||
ObDLink dlink_;
|
ObDLink dlink_;
|
||||||
ObDLink all_list_link_;
|
ObDLink all_list_link_;
|
||||||
@ -478,6 +479,27 @@ SSL* ObSqlSock::get_ssl_st()
|
|||||||
return ob_fd_get_ssl_st(fd_);
|
return ob_fd_get_ssl_st(fd_);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObSqlSock::write_handshake_packet(const char* buf, int64_t sz) {
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
int64_t pos = 0;
|
||||||
|
while(pos < sz && OB_SUCCESS == ret) {
|
||||||
|
int64_t wbytes = 0;
|
||||||
|
if ((wbytes = write(fd_, buf + pos, sz - pos)) >= 0) {
|
||||||
|
pos += wbytes;
|
||||||
|
} else if (EINTR == errno) {
|
||||||
|
//continue
|
||||||
|
} else {
|
||||||
|
//when send handshake packet, only process EINTR as normal, other errno
|
||||||
|
//will be treated as error
|
||||||
|
IGNORE_RETURN(set_error(EIO));
|
||||||
|
ret = OB_IO_ERROR;
|
||||||
|
LOG_WARN("write data error", K_(fd), K(errno));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
last_write_time_ = ObTimeUtility::current_time();
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
static struct epoll_event *__make_epoll_event(struct epoll_event *event, uint32_t event_flag, void* val) {
|
static struct epoll_event *__make_epoll_event(struct epoll_event *event, uint32_t event_flag, void* val) {
|
||||||
event->events = event_flag;
|
event->events = event_flag;
|
||||||
event->data.ptr = val;
|
event->data.ptr = val;
|
||||||
@ -1173,5 +1195,9 @@ void ObSqlNio::update_tcp_keepalive_params(int keepalive_enabled, uint32_t tcp_k
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObSqlNio::write_handshake_packet(void* sess, const char* buf, int64_t sz)
|
||||||
|
{
|
||||||
|
return sess2sock(sess)->write_handshake_packet(buf, sz);
|
||||||
|
}
|
||||||
}; // end namespace obmysql
|
}; // end namespace obmysql
|
||||||
}; // end namespace oceanbase
|
}; // end namespace oceanbase
|
||||||
|
|||||||
1
deps/oblib/src/rpc/obmysql/ob_sql_nio.h
vendored
1
deps/oblib/src/rpc/obmysql/ob_sql_nio.h
vendored
@ -58,6 +58,7 @@ public:
|
|||||||
return ATOMIC_FAA(&dispatch_idx_, 1) % get_thread_count();
|
return ATOMIC_FAA(&dispatch_idx_, 1) % get_thread_count();
|
||||||
}
|
}
|
||||||
void update_tcp_keepalive_params(int keepalive_enabled, uint32_t tcp_keepidle, uint32_t tcp_keepintvl, uint32_t tcp_keepcnt);
|
void update_tcp_keepalive_params(int keepalive_enabled, uint32_t tcp_keepidle, uint32_t tcp_keepintvl, uint32_t tcp_keepcnt);
|
||||||
|
int write_handshake_packet(void* sess, const char* buf, int64_t sz);
|
||||||
private:
|
private:
|
||||||
void run(int64_t idx);
|
void run(int64_t idx);
|
||||||
private:
|
private:
|
||||||
|
|||||||
@ -172,5 +172,10 @@ SSL* ObSqlSockSession::get_ssl_st()
|
|||||||
return nio_->get_ssl_st((void *)this);
|
return nio_->get_ssl_st((void *)this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObSqlSockSession::write_hanshake_packet(const char *buf, int64_t sz)
|
||||||
|
{
|
||||||
|
return nio_->write_handshake_packet((void *)this, buf, sz);
|
||||||
|
}
|
||||||
|
|
||||||
}; // end namespace obmysql
|
}; // end namespace obmysql
|
||||||
}; // end namespace oceanbase
|
}; // end namespace oceanbase
|
||||||
|
|||||||
@ -62,6 +62,7 @@ public:
|
|||||||
int set_ssl_enabled();
|
int set_ssl_enabled();
|
||||||
SSL* get_ssl_st();
|
SSL* get_ssl_st();
|
||||||
bool is_inited() const { return is_inited_; }
|
bool is_inited() const { return is_inited_; }
|
||||||
|
int write_hanshake_packet(const char *buf, int64_t sz);
|
||||||
ObSqlNio* nio_;
|
ObSqlNio* nio_;
|
||||||
ObISMConnectionCallback& sm_conn_cb_;
|
ObISMConnectionCallback& sm_conn_cb_;
|
||||||
rpc::ObRequest sql_req_;
|
rpc::ObRequest sql_req_;
|
||||||
|
|||||||
@ -65,7 +65,7 @@ static int send_handshake(ObSqlSockSession& sess, const OMPKHandshake &hsp)
|
|||||||
} else if (OB_UNLIKELY(pkt_count <= 0)) {
|
} else if (OB_UNLIKELY(pkt_count <= 0)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("invalid pkt count", K(pkt_count), K(ret));
|
LOG_WARN("invalid pkt count", K(pkt_count), K(ret));
|
||||||
} else if (OB_FAIL(sess.write_data(buf, pos))) {
|
} else if (OB_FAIL(sess.write_hanshake_packet(buf, pos))) {
|
||||||
LOG_WARN("write handshake packet data fail", K(ret));
|
LOG_WARN("write handshake packet data fail", K(ret));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user