diff --git a/src/storage/tx/ob_trans_service.cpp b/src/storage/tx/ob_trans_service.cpp index c3accf9c1..c6d791670 100644 --- a/src/storage/tx/ob_trans_service.cpp +++ b/src/storage/tx/ob_trans_service.cpp @@ -116,11 +116,15 @@ int ObTransService::init(const ObAddr &self, { int ret = OB_SUCCESS; ObSimpleThreadPool::set_run_wrapper(MTL_CTX()); - int64_t msg_task_cnt = MAX_MSG_TASK; - if (is_mini_mode()) { - msg_task_cnt /= (lib::ObRunningModeConfig::MINI_MEM_UPPER / lib::ObRunningModeConfig::instance().memory_limit_); - } const int64_t tenant_id = MTL_ID(); + const int64_t tenant_memory_limit = lib::get_tenant_memory_limit(tenant_id); + int64_t msg_task_cnt = MSG_TASK_CNT_PER_GB * (tenant_memory_limit / (1024 * 1024 * 1024)); + if (msg_task_cnt < MSG_TASK_CNT_PER_GB) { + msg_task_cnt = MSG_TASK_CNT_PER_GB; + } + if (msg_task_cnt > MAX_MSG_TASK_CNT) { + msg_task_cnt = MAX_MSG_TASK_CNT; + } if (is_inited_) { TRANS_LOG(WARN, "ObTransService inited twice", KPC(this)); ret = OB_INIT_TWICE; @@ -143,10 +147,10 @@ int ObTransService::init(const ObAddr &self, } else if (OB_FAIL(dup_table_scan_timer_.init())) { TRANS_LOG(ERROR, "dup table scan timer init error", K(ret)); } else if (OB_FAIL(ObSimpleThreadPool::init(2, msg_task_cnt, "TransService", tenant_id))) { - TRANS_LOG(WARN, "thread pool init error", KR(ret)); + TRANS_LOG(WARN, "thread pool init error", KR(ret), K(msg_task_cnt)); } else if (OB_FAIL(tx_desc_mgr_.init(std::bind(&ObTransService::gen_trans_id_, this, std::placeholders::_1), - lib::ObMemAttr(tenant_id, "TransService")))) { + lib::ObMemAttr(tenant_id, "TxDescMgr")))) { TRANS_LOG(WARN, "ObTxDescMgr init error", K(ret)); } else if (OB_FAIL(tx_ctx_mgr_.init(tenant_id, ts_mgr, this))) { TRANS_LOG(WARN, "tx_ctx_mgr_ init error", KR(ret)); @@ -168,7 +172,7 @@ int ObTransService::init(const ObAddr &self, ts_mgr_ = ts_mgr; server_tracer_ = server_tracer; is_inited_ = true; - TRANS_LOG(INFO, "transaction service inited success", KP(this)); + TRANS_LOG(INFO, "transaction service inited success", KP(this), K(tenant_memory_limit)); } if (OB_SUCC(ret)) { #ifdef ENABLE_DEBUG_LOG @@ -191,6 +195,8 @@ int ObTransService::init(const ObAddr &self, } } #endif + } else { + TRANS_LOG(WARN, "transaction service inited failed", K(ret), K(tenant_memory_limit)); } return ret; } diff --git a/src/storage/tx/ob_trans_service.h b/src/storage/tx/ob_trans_service.h index c2f9488fb..7a7f9f2c6 100644 --- a/src/storage/tx/ob_trans_service.h +++ b/src/storage/tx/ob_trans_service.h @@ -234,7 +234,8 @@ public: private: static const int64_t END_STMT_MORE_TIME_US = 100 * 1000; // max task count in message process queue - static const int64_t MAX_MSG_TASK = (1 << 20); // 8M + static const int64_t MAX_MSG_TASK_CNT = 1000 * 1000; + static const int64_t MSG_TASK_CNT_PER_GB = 50 * 1000; static const int64_t MAX_BIG_TRANS_WORKER = 8; static const int64_t MAX_BIG_TRANS_TASK = 100 * 1000; // max time bias between any two machine