[FEAT MERGE]vos merge into master
Co-authored-by: ZenoWang <wzybuaasoft@163.com> Co-authored-by: zhjc1124 <zhjc1124@gmail.com> Co-authored-by: JiahuaChen <garfieldjia@qq.com>
This commit is contained in:
154
deps/oblib/src/rpc/obmysql/ob_sql_nio.cpp
vendored
154
deps/oblib/src/rpc/obmysql/ob_sql_nio.cpp
vendored
@ -92,10 +92,9 @@ struct ReadyFlag
|
||||
int32_t pending_ CACHE_ALIGNED;
|
||||
};
|
||||
|
||||
#define futex(...) syscall(SYS_futex,__VA_ARGS__)
|
||||
static int futex_wake(volatile int *p, int val)
|
||||
{
|
||||
return static_cast<int>(futex((int *)p, FUTEX_WAKE_PRIVATE, val, NULL, NULL, 0));
|
||||
return futex((uint *)p, FUTEX_WAKE_PRIVATE, val, NULL);
|
||||
}
|
||||
|
||||
struct SingleWaitCond
|
||||
@ -330,7 +329,7 @@ private:
|
||||
class ObSqlSock: public ObLink
|
||||
{
|
||||
public:
|
||||
ObSqlSock(ObSqlNioImpl& nio, int fd): nio_impl_(nio), fd_(fd), err_(0), read_buffer_(fd),
|
||||
ObSqlSock(ObSqlNioImpl *nio, int fd): nio_impl_(nio), fd_(fd), err_(0), read_buffer_(fd),
|
||||
need_epoll_trigger_write_(false), may_handling_(true), handler_close_flag_(false),
|
||||
need_shutdown_(false), last_decode_time_(0), last_write_time_(0), sql_session_info_(NULL) {
|
||||
memset(sess_, 0, sizeof(sess_));
|
||||
@ -339,7 +338,8 @@ public:
|
||||
int64_t get_remain_sz() const { return read_buffer_.get_remain_sz(); }
|
||||
TO_STRING_KV(KP(this), K_(fd), K_(err), K(last_decode_time_), K(last_write_time_),
|
||||
K(read_buffer_.get_consume_sz()), K(get_pending_flag()), KPC(get_trace_id()));
|
||||
ObSqlNioImpl& get_nio_impl() { return nio_impl_; }
|
||||
ObSqlNioImpl *get_nio_impl() { return nio_impl_; }
|
||||
void set_nio_impl(ObSqlNioImpl *impl) { nio_impl_ = impl; }
|
||||
bool set_error(int err) { return 0 == ATOMIC_TAS(&err_, err); }
|
||||
bool has_error() const { return ATOMIC_LOAD(&err_) != 0; }
|
||||
|
||||
@ -439,7 +439,7 @@ public:
|
||||
ObDLink all_list_link_;
|
||||
ObLink write_task_link_;
|
||||
private:
|
||||
ObSqlNioImpl& nio_impl_;
|
||||
ObSqlNioImpl *nio_impl_;
|
||||
int fd_;
|
||||
int err_;
|
||||
ReadBuffer read_buffer_;
|
||||
@ -457,6 +457,11 @@ public:
|
||||
char sess_[3000] __attribute__((aligned(16)));
|
||||
};
|
||||
|
||||
static ObSqlSock *sess2sock(void *sess)
|
||||
{
|
||||
return CONTAINER_OF(sess, ObSqlSock, sess_);
|
||||
}
|
||||
|
||||
int ObSqlSock::set_ssl_enabled()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -584,8 +589,41 @@ public:
|
||||
ObSqlNioImpl(ObISqlSockHandler& handler):
|
||||
handler_(handler), epfd_(-1), lfd_(-1), tcp_keepalive_enabled_(0),
|
||||
tcp_keepidle_(0), tcp_keepintvl_(0), tcp_keepcnt_(0) {}
|
||||
~ObSqlNioImpl() {}
|
||||
~ObSqlNioImpl() {
|
||||
destroy();
|
||||
}
|
||||
void destroy() {
|
||||
ObDLink *head = all_list_.head();
|
||||
ObLink *cur = head->next_;
|
||||
while (cur != head) {
|
||||
ObSqlSock *s = CONTAINER_OF(cur, ObSqlSock, all_list_link_);
|
||||
cur = cur->next_;
|
||||
free_sql_sock(s);
|
||||
}
|
||||
}
|
||||
int init(int port) {
|
||||
int ret = OB_SUCCESS;
|
||||
if (port == -1) {
|
||||
ret = init_io();
|
||||
} else {
|
||||
ret = init_listen(port);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
int init_io() {
|
||||
int ret = OB_SUCCESS;
|
||||
uint32_t epflag = EPOLLIN;
|
||||
if ((epfd_ = epoll_create1(EPOLL_CLOEXEC)) < 0) {
|
||||
ret = OB_IO_ERROR;
|
||||
LOG_WARN("epoll_create fail", K(ret), K(errno));
|
||||
} else if (OB_FAIL(evfd_.create(epfd_))) {
|
||||
LOG_WARN("evfd create fail", K(ret));
|
||||
} else {
|
||||
LOG_INFO("sql_nio init io succ");
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
int init_listen(int port) {
|
||||
int ret = OB_SUCCESS;
|
||||
uint32_t epflag = EPOLLIN;
|
||||
if ((epfd_ = epoll_create1(EPOLL_CLOEXEC)) < 0) {
|
||||
@ -600,7 +638,7 @@ public:
|
||||
} else if (OB_FAIL(evfd_.create(epfd_))) {
|
||||
LOG_WARN("evfd create fail", K(ret));
|
||||
} else {
|
||||
LOG_INFO("sql_nio listen succ", K(port));
|
||||
LOG_INFO("sql_nio init listen succ", K(port));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -618,6 +656,25 @@ public:
|
||||
update_tcp_keepalive_parameters();
|
||||
print_session_info();
|
||||
}
|
||||
int regist_sess(void *sess) {
|
||||
int err = 0;
|
||||
ObSqlSock *sock = sess2sock(sess);
|
||||
int fd = sock->get_fd();
|
||||
uint32_t epflag = EPOLLIN | EPOLLOUT | EPOLLERR | EPOLLET | EPOLLRDHUP;
|
||||
ObSqlNioImpl *nio_impl = sock->get_nio_impl();
|
||||
sock->remove_fd_from_epoll(nio_impl->get_epfd());
|
||||
nio_impl->remove_session_info(sock);
|
||||
record_session_info(sock);
|
||||
sock->set_nio_impl(this);
|
||||
if (0 != (err = epoll_regist(epfd_, fd, epflag, sock))) {
|
||||
LOG_WARN_RET(OB_ERR_SYS, "epoll_regist fail", K(fd), K(err));
|
||||
}
|
||||
if (0 != err && NULL != sock) {
|
||||
ObSqlSockSession *sess = (ObSqlSockSession *)sock->sess_;
|
||||
sess->destroy_sock();
|
||||
}
|
||||
return err;
|
||||
}
|
||||
void push_close_req(ObSqlSock* s) {
|
||||
if (s->set_error(EIO)) {
|
||||
LOG_WARN_RET(OB_ERR_SYS, "close sql sock by user req", K(*s));
|
||||
@ -724,7 +781,6 @@ private:
|
||||
} else {
|
||||
if (true == s->sql_session_info_is_null()) {
|
||||
pending_destroy_list_.del(&s->dlink_);
|
||||
remove_session_info(s);
|
||||
s->do_close();
|
||||
free_sql_sock(s);
|
||||
}
|
||||
@ -832,13 +888,14 @@ private:
|
||||
ObSqlSock* alloc_sql_sock(int fd) {
|
||||
ObSqlSock* s = NULL;
|
||||
if (NULL != (s = (ObSqlSock*)direct_alloc(sizeof(*s)))) {
|
||||
new(s)ObSqlSock(*this, fd);
|
||||
new (s) ObSqlSock(this, fd);
|
||||
record_session_info(s);
|
||||
}
|
||||
return s;
|
||||
}
|
||||
void free_sql_sock(ObSqlSock* s) {
|
||||
if (NULL != s) {
|
||||
remove_session_info(s);
|
||||
s->~ObSqlSock();
|
||||
direct_free(s);
|
||||
}
|
||||
@ -893,6 +950,7 @@ private:
|
||||
static void* direct_alloc(int64_t sz) { return common::ob_malloc(sz, common::ObModIds::OB_COMMON_NETWORK); }
|
||||
static void direct_free(void* p) { common::ob_free(p); }
|
||||
|
||||
int get_epfd(){return epfd_;}
|
||||
private:
|
||||
ObISqlSockHandler& handler_;
|
||||
int epfd_;
|
||||
@ -908,21 +966,28 @@ private:
|
||||
uint32_t tcp_keepcnt_;
|
||||
};
|
||||
|
||||
int ObSqlNio::start(int port, ObISqlSockHandler* handler, int n_thread)
|
||||
int ObSqlNio::start(int port, ObISqlSockHandler *handler, int n_thread,
|
||||
const uint64_t tenant_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (NULL == (impl_ = (typeof(impl_))ob_malloc(sizeof(ObSqlNioImpl) * n_thread, "SqlNio"))) {
|
||||
port_ = port;
|
||||
handler_ = handler;
|
||||
tenant_id_ = tenant_id;
|
||||
if (n_thread > MAX_THREAD_CNT) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
} else if (NULL == (impl_ = (typeof(impl_))ob_malloc(
|
||||
sizeof(ObSqlNioImpl) * MAX_THREAD_CNT, "SqlNio"))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("alloc sql nio fail", K(ret));
|
||||
} else {
|
||||
for(int i = 0; OB_SUCCESS == ret && i < n_thread; i++) {
|
||||
new(impl_ + i)ObSqlNioImpl(*handler);
|
||||
for (int i = 0; OB_SUCCESS == ret && i < n_thread; i++) {
|
||||
new (impl_ + i) ObSqlNioImpl(*handler);
|
||||
if (OB_FAIL(impl_[i].init(port))) {
|
||||
LOG_WARN("impl init fail", K(ret));
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
set_thread_count(n_thread);
|
||||
lib::Threads::set_thread_count(n_thread);
|
||||
lib::Threads::start();
|
||||
}
|
||||
}
|
||||
@ -941,14 +1006,23 @@ void ObSqlNio::wait()
|
||||
|
||||
void ObSqlNio::destroy()
|
||||
{
|
||||
for (int i = 0; i < get_thread_count(); i++) {
|
||||
impl_[i].destroy();
|
||||
}
|
||||
}
|
||||
|
||||
int __attribute__((weak)) sql_nio_add_cgroup(const uint64_t tenant_id)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
void ObSqlNio::run(int64_t idx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (NULL != impl_) {
|
||||
lib::set_thread_name("sql_nio", idx);
|
||||
while(!has_set_stop()) {
|
||||
// if (tenant_id_ != common::OB_INVALID_ID) {
|
||||
// obmysql::sql_nio_add_cgroup(tenant_id_);
|
||||
// }
|
||||
while(!has_set_stop() && !(OB_NOT_NULL(&lib::Thread::current()) ? lib::Thread::current().has_set_stop() : false)) {
|
||||
impl_[idx].do_work();
|
||||
}
|
||||
if (has_set_stop()) {
|
||||
@ -957,21 +1031,16 @@ void ObSqlNio::run(int64_t idx)
|
||||
}
|
||||
}
|
||||
|
||||
static ObSqlSock* sess2sock(void* sess)
|
||||
{
|
||||
return CONTAINER_OF(sess, ObSqlSock, sess_);
|
||||
}
|
||||
|
||||
void ObSqlNio::destroy_sock(void* sess)
|
||||
{
|
||||
ObSqlSock* sock = sess2sock(sess);
|
||||
sock->get_nio_impl().push_close_req(sock);
|
||||
sock->get_nio_impl()->push_close_req(sock);
|
||||
}
|
||||
|
||||
void ObSqlNio::revert_sock(void* sess)
|
||||
{
|
||||
ObSqlSock* sock = sess2sock(sess);
|
||||
sock->get_nio_impl().revert_sock(sock);
|
||||
sock->get_nio_impl()->revert_sock(sock);
|
||||
}
|
||||
|
||||
void ObSqlNio::set_shutdown(void* sess)
|
||||
@ -1027,7 +1096,7 @@ void ObSqlNio::async_write_data(void* sess, const char* buf, int64_t sz)
|
||||
{
|
||||
ObSqlSock* sock = sess2sock(sess);
|
||||
sock->init_write_task(buf, sz);
|
||||
sock->get_nio_impl().push_write_req(sock);
|
||||
sock->get_nio_impl()->push_write_req(sock);
|
||||
}
|
||||
|
||||
int ObSqlNio::set_ssl_enabled(void* sess)
|
||||
@ -1042,6 +1111,43 @@ SSL* ObSqlNio::get_ssl_st(void* sess)
|
||||
return sock->get_ssl_st();
|
||||
}
|
||||
|
||||
int ObSqlNio::set_thread_count(const int n_thread)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int cur_thread = get_thread_count();
|
||||
if (n_thread > MAX_THREAD_CNT) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
} else if (n_thread == cur_thread) {
|
||||
// do nothing
|
||||
} else {
|
||||
if (n_thread > cur_thread) {
|
||||
for (int i = cur_thread; OB_SUCCESS == ret && i < n_thread; i++) {
|
||||
new (impl_ + i) ObSqlNioImpl(*handler_);
|
||||
if (OB_FAIL(impl_[i].init(port_))) {
|
||||
LOG_WARN("impl init fail");
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
lib::Threads::set_thread_count(n_thread);
|
||||
}
|
||||
} else {
|
||||
LOG_WARN("decrease thread count not allowed", K(cur_thread),
|
||||
K(n_thread));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObSqlNio::regist_sess(void *sess)
|
||||
{
|
||||
int err = 0;
|
||||
((ObSqlSockSession *)sess)->nio_ = this;
|
||||
if (0 != (err = impl_[get_dispatch_idx()].regist_sess(sess))) {
|
||||
LOG_ERROR_RET(OB_ERR_SYS, "regist sess fd fail", K(err));
|
||||
}
|
||||
return err;
|
||||
}
|
||||
|
||||
void ObSqlNio::update_tcp_keepalive_params(int keepalive_enabled, uint32_t tcp_keepidle, uint32_t tcp_keepintvl, uint32_t tcp_keepcnt)
|
||||
{
|
||||
int thread_count = get_thread_count();
|
||||
|
||||
Reference in New Issue
Block a user