add defensive log & fix wrong tid in pthread_join
This commit is contained in:
@ -1902,7 +1902,7 @@ int dump_thread_info(lua_State *L)
|
|||||||
struct iovec remote_iov = {thread_base + rpc_dest_addr_offset, sizeof(ObAddr)};
|
struct iovec remote_iov = {thread_base + rpc_dest_addr_offset, sizeof(ObAddr)};
|
||||||
wait_event[0] = '\0';
|
wait_event[0] = '\0';
|
||||||
if (0 != join_addr) {
|
if (0 != join_addr) {
|
||||||
IGNORE_RETURN snprintf(wait_event, BUF_LEN, "thread %u %ld", *(uint32_t*)(thread_base + tid_offset), tid_offset);
|
IGNORE_RETURN snprintf(wait_event, BUF_LEN, "thread %u %ld", *(uint32_t*)(join_addr + tid_offset), tid_offset);
|
||||||
} else if (OB_NOT_NULL(wait_addr)) {
|
} else if (OB_NOT_NULL(wait_addr)) {
|
||||||
uint32_t val = 0;
|
uint32_t val = 0;
|
||||||
struct iovec local_iov = {&val, sizeof(val)};
|
struct iovec local_iov = {&val, sizeof(val)};
|
||||||
|
|||||||
@ -584,7 +584,7 @@ ObTenant::ObTenant(const int64_t id,
|
|||||||
shrink_(0),
|
shrink_(0),
|
||||||
total_worker_cnt_(0),
|
total_worker_cnt_(0),
|
||||||
gc_thread_(0),
|
gc_thread_(0),
|
||||||
stopped_(true),
|
stopped_(0),
|
||||||
wait_mtl_finished_(false),
|
wait_mtl_finished_(false),
|
||||||
req_queue_(),
|
req_queue_(),
|
||||||
multi_level_queue_(nullptr),
|
multi_level_queue_(nullptr),
|
||||||
@ -697,7 +697,7 @@ int ObTenant::init(const ObTenantMeta &meta)
|
|||||||
if (OB_FAIL(ret)) {
|
if (OB_FAIL(ret)) {
|
||||||
LOG_ERROR("fail to create tenant module", K(ret));
|
LOG_ERROR("fail to create tenant module", K(ret));
|
||||||
} else {
|
} else {
|
||||||
ATOMIC_STORE(&stopped_, false);
|
start();
|
||||||
}
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
@ -858,6 +858,15 @@ int ObTenant::create_tenant_module()
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ObTenant::sleep_and_warn(ObTenant* tenant)
|
||||||
|
{
|
||||||
|
ob_usleep(10_ms);
|
||||||
|
const int64_t ts = ObTimeUtility::current_time() - tenant->stopped_;
|
||||||
|
if (ts >= 3_min && TC_REACH_TIME_INTERVAL(3_min)) {
|
||||||
|
LOG_ERROR_RET(OB_SUCCESS, "tenant destructed for too long time.", K_(tenant->id), K(ts));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void* ObTenant::wait(void* t)
|
void* ObTenant::wait(void* t)
|
||||||
{
|
{
|
||||||
ObStackHeaderGuard stack_header_guard;
|
ObStackHeaderGuard stack_header_guard;
|
||||||
@ -869,7 +878,7 @@ void* ObTenant::wait(void* t)
|
|||||||
tenant->handle_retry_req(true);
|
tenant->handle_retry_req(true);
|
||||||
while (tenant->req_queue_.size() > 0
|
while (tenant->req_queue_.size() > 0
|
||||||
|| (tenant->multi_level_queue_ != nullptr && tenant->multi_level_queue_->get_total_size() > 0)) {
|
|| (tenant->multi_level_queue_ != nullptr && tenant->multi_level_queue_->get_total_size() > 0)) {
|
||||||
ob_usleep(10L * 1000L);
|
sleep_and_warn(tenant);
|
||||||
}
|
}
|
||||||
while (tenant->workers_.get_size() > 0) {
|
while (tenant->workers_.get_size() > 0) {
|
||||||
if (OB_SUCC(tenant->workers_lock_.trylock())) {
|
if (OB_SUCC(tenant->workers_lock_.trylock())) {
|
||||||
@ -879,16 +888,16 @@ void* ObTenant::wait(void* t)
|
|||||||
destroy_worker(w);
|
destroy_worker(w);
|
||||||
}
|
}
|
||||||
IGNORE_RETURN tenant->workers_lock_.unlock();
|
IGNORE_RETURN tenant->workers_lock_.unlock();
|
||||||
if (REACH_TIME_INTERVAL(10 * 1000L * 1000L)) {
|
if (REACH_TIME_INTERVAL(10_s)) {
|
||||||
LOG_INFO(
|
LOG_INFO(
|
||||||
"Tenant has some workers need stop", K_(tenant->id),
|
"Tenant has some workers need stop", K_(tenant->id),
|
||||||
"workers", tenant->workers_.get_size(),
|
"workers", tenant->workers_.get_size(),
|
||||||
K_(tenant->req_queue));
|
K_(tenant->req_queue));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ob_usleep(10L * 1000L);
|
sleep_and_warn(tenant);
|
||||||
}
|
}
|
||||||
LOG_WARN_RET(OB_SUCCESS,"start remove nesting", K(tenant->nesting_workers_.get_size()), K_(tenant->id));
|
LOG_INFO("start remove nesting", K(tenant->nesting_workers_.get_size()), K_(tenant->id));
|
||||||
while (tenant->nesting_workers_.get_size() > 0) {
|
while (tenant->nesting_workers_.get_size() > 0) {
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (OB_SUCC(tenant->workers_lock_.trylock())) {
|
if (OB_SUCC(tenant->workers_lock_.trylock())) {
|
||||||
@ -898,7 +907,7 @@ void* ObTenant::wait(void* t)
|
|||||||
destroy_worker(w);
|
destroy_worker(w);
|
||||||
}
|
}
|
||||||
IGNORE_RETURN tenant->workers_lock_.unlock();
|
IGNORE_RETURN tenant->workers_lock_.unlock();
|
||||||
if (REACH_TIME_INTERVAL(10 * 1000L * 1000L)) {
|
if (REACH_TIME_INTERVAL(10_s)) {
|
||||||
LOG_INFO(
|
LOG_INFO(
|
||||||
"Tenant has some nesting workers need stop",
|
"Tenant has some nesting workers need stop",
|
||||||
K_(tenant->id),
|
K_(tenant->id),
|
||||||
@ -906,12 +915,12 @@ void* ObTenant::wait(void* t)
|
|||||||
K_(tenant->req_queue));
|
K_(tenant->req_queue));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ob_usleep(10L * 1000L);
|
sleep_and_warn(tenant);
|
||||||
}
|
}
|
||||||
LOG_WARN_RET(OB_SUCCESS, "finish remove nesting", K(tenant->nesting_workers_.get_size()), K_(tenant->id));
|
LOG_INFO("finish remove nesting", K(tenant->nesting_workers_.get_size()), K_(tenant->id));
|
||||||
LOG_WARN_RET(OB_SUCCESS, "start remove group_map", K_(tenant->id));
|
LOG_INFO("start remove group_map", K_(tenant->id));
|
||||||
tenant->group_map_.wait_group();
|
tenant->group_map_.wait_group();
|
||||||
LOG_WARN_RET(OB_SUCCESS, "finish remove group_map", K_(tenant->id));
|
LOG_INFO("finish remove group_map", K_(tenant->id));
|
||||||
if (!is_virtual_tenant_id(tenant->id_) && !tenant->wait_mtl_finished_) {
|
if (!is_virtual_tenant_id(tenant->id_) && !tenant->wait_mtl_finished_) {
|
||||||
ObTenantSwitchGuard guard(tenant);
|
ObTenantSwitchGuard guard(tenant);
|
||||||
tenant->stop_mtl_module();
|
tenant->stop_mtl_module();
|
||||||
@ -920,6 +929,7 @@ void* ObTenant::wait(void* t)
|
|||||||
tenant->wait_mtl_module();
|
tenant->wait_mtl_module();
|
||||||
tenant->wait_mtl_finished_ = true;
|
tenant->wait_mtl_finished_ = true;
|
||||||
}
|
}
|
||||||
|
LOG_INFO("finish waiting", K_(tenant->id));
|
||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1190,7 +1200,7 @@ int ObTenant::recv_request(ObRequest &req)
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
int req_level = 0;
|
int req_level = 0;
|
||||||
if (ATOMIC_LOAD(&stopped_)) {
|
if (has_stopped()) {
|
||||||
ret = OB_TENANT_NOT_IN_SERVER;
|
ret = OB_TENANT_NOT_IN_SERVER;
|
||||||
LOG_WARN("receive request but tenant has already stopped", K(ret), K(id_));
|
LOG_WARN("receive request but tenant has already stopped", K(ret), K(id_));
|
||||||
} else if (0 != req.get_group_id()) {
|
} else if (0 != req.get_group_id()) {
|
||||||
@ -1325,7 +1335,7 @@ int ObTenant::recv_large_request(rpc::ObRequest &req)
|
|||||||
int ObTenant::push_retry_queue(rpc::ObRequest &req, const uint64_t timestamp)
|
int ObTenant::push_retry_queue(rpc::ObRequest &req, const uint64_t timestamp)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (ATOMIC_LOAD(&stopped_)) {
|
if (has_stopped()) {
|
||||||
ret = OB_IN_STOP_STATE;
|
ret = OB_IN_STOP_STATE;
|
||||||
LOG_WARN("receive retry request but tenant has already stopped", K(ret), K(id_));
|
LOG_WARN("receive retry request but tenant has already stopped", K(ret), K(id_));
|
||||||
} else if (OB_FAIL(retry_queue_.push(req, timestamp))) {
|
} else if (OB_FAIL(retry_queue_.push(req, timestamp))) {
|
||||||
@ -1338,9 +1348,9 @@ int ObTenant::timeup()
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObLDHandle handle;
|
ObLDHandle handle;
|
||||||
if (!stopped_ && OB_SUCC(try_rdlock(handle))) {
|
if (!has_stopped() && OB_SUCC(try_rdlock(handle))) {
|
||||||
// it may fail during drop tenant, try next time.
|
// it may fail during drop tenant, try next time.
|
||||||
if (!stopped_) {
|
if (!has_stopped()) {
|
||||||
check_group_worker_count();
|
check_group_worker_count();
|
||||||
check_worker_count();
|
check_worker_count();
|
||||||
update_token_usage();
|
update_token_usage();
|
||||||
|
|||||||
@ -383,11 +383,11 @@ public:
|
|||||||
int init_ctx();
|
int init_ctx();
|
||||||
int init_multi_level_queue();
|
int init_multi_level_queue();
|
||||||
int init(const ObTenantMeta &meta);
|
int init(const ObTenantMeta &meta);
|
||||||
void stop() { ATOMIC_STORE(&stopped_, true); }
|
void stop() { ATOMIC_STORE(&stopped_, ObTimeUtility::current_time()); }
|
||||||
void start() { ATOMIC_STORE(&stopped_, false); }
|
void start() { ATOMIC_STORE(&stopped_, 0); }
|
||||||
int try_wait();
|
int try_wait();
|
||||||
void destroy();
|
void destroy();
|
||||||
bool has_stopped() const { return ATOMIC_LOAD(&stopped_); }
|
bool has_stopped() const { return ATOMIC_LOAD(&stopped_) != 0; }
|
||||||
|
|
||||||
ObTenantMeta get_tenant_meta();
|
ObTenantMeta get_tenant_meta();
|
||||||
bool is_hidden();
|
bool is_hidden();
|
||||||
@ -498,6 +498,7 @@ public:
|
|||||||
// OB_INLINE bool has_normal_request() const { return req_queue_.size() != 0; }
|
// OB_INLINE bool has_normal_request() const { return req_queue_.size() != 0; }
|
||||||
// OB_INLINE bool has_level_request() const { return OB_NOT_NULL(multi_level_queue_) && multi_level_queue_->get_total_size() != 0; }
|
// OB_INLINE bool has_level_request() const { return OB_NOT_NULL(multi_level_queue_) && multi_level_queue_->get_total_size() != 0; }
|
||||||
private:
|
private:
|
||||||
|
static void sleep_and_warn(ObTenant* tenant);
|
||||||
static void* wait(void* tenant);
|
static void* wait(void* tenant);
|
||||||
// update CPU usage
|
// update CPU usage
|
||||||
void update_token_usage();
|
void update_token_usage();
|
||||||
@ -528,7 +529,6 @@ private:
|
|||||||
int construct_mtl_init_ctx(const ObTenantMeta &meta, share::ObTenantModuleInitCtx *&ctx);
|
int construct_mtl_init_ctx(const ObTenantMeta &meta, share::ObTenantModuleInitCtx *&ctx);
|
||||||
|
|
||||||
int recv_group_request(rpc::ObRequest &req, int64_t group_id);
|
int recv_group_request(rpc::ObRequest &req, int64_t group_id);
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
|
|
||||||
mutable common::TCRWLock meta_lock_;
|
mutable common::TCRWLock meta_lock_;
|
||||||
@ -540,7 +540,7 @@ protected:
|
|||||||
volatile bool shrink_ CACHE_ALIGNED;
|
volatile bool shrink_ CACHE_ALIGNED;
|
||||||
int64_t total_worker_cnt_;
|
int64_t total_worker_cnt_;
|
||||||
pthread_t gc_thread_;
|
pthread_t gc_thread_;
|
||||||
bool stopped_;
|
int64_t stopped_;
|
||||||
bool wait_mtl_finished_;
|
bool wait_mtl_finished_;
|
||||||
|
|
||||||
/// tenant task queue,
|
/// tenant task queue,
|
||||||
|
|||||||
@ -130,7 +130,7 @@ int ObAllVirtualThread::inner_get_next_row(common::ObNewRow *&row)
|
|||||||
struct iovec remote_iov = {thread_base + rpc_dest_addr_offset, sizeof(ObAddr)};
|
struct iovec remote_iov = {thread_base + rpc_dest_addr_offset, sizeof(ObAddr)};
|
||||||
wait_event_[0] = '\0';
|
wait_event_[0] = '\0';
|
||||||
if (0 != join_addr) {
|
if (0 != join_addr) {
|
||||||
IGNORE_RETURN snprintf(wait_event_, 64, "thread %u", *(uint32_t*)(thread_base + tid_offset));
|
IGNORE_RETURN snprintf(wait_event_, 64, "thread %u", *(uint32_t*)(join_addr + tid_offset));
|
||||||
} else if (OB_NOT_NULL(wait_addr)) {
|
} else if (OB_NOT_NULL(wait_addr)) {
|
||||||
uint32_t val = 0;
|
uint32_t val = 0;
|
||||||
struct iovec local_iov = {&val, sizeof(val)};
|
struct iovec local_iov = {&val, sizeof(val)};
|
||||||
|
|||||||
Reference in New Issue
Block a user