From 2e4c947fd7d04560b8930f52ba273a891c70a446 Mon Sep 17 00:00:00 2001 From: nroskill Date: Wed, 3 Nov 2021 20:37:46 +0800 Subject: [PATCH] slog writer task use dynamic memory alloc --- src/storage/ob_slog_writer_queue_thread.cpp | 71 +++++++++++---------- src/storage/ob_slog_writer_queue_thread.h | 9 +-- 2 files changed, 42 insertions(+), 38 deletions(-) diff --git a/src/storage/ob_slog_writer_queue_thread.cpp b/src/storage/ob_slog_writer_queue_thread.cpp index 598ed4636c..cdefb6a88f 100644 --- a/src/storage/ob_slog_writer_queue_thread.cpp +++ b/src/storage/ob_slog_writer_queue_thread.cpp @@ -104,8 +104,7 @@ int ObMsInfoTask::update_curr_member_list(const common::ObMemberList& curr_membe return ret; } -ObSlogWriterQueueThread::ObSlogWriterQueueThread() - : inited_(false), partition_service_(NULL), free_queue_(), tasks_(NULL), tg_id_(-1) +ObSlogWriterQueueThread::ObSlogWriterQueueThread() : inited_(false), partition_service_(NULL), free_queue_(), tg_id_(-1) {} ObSlogWriterQueueThread::~ObSlogWriterQueueThread() @@ -115,49 +114,34 @@ ObSlogWriterQueueThread::~ObSlogWriterQueueThread() void ObSlogWriterQueueThread::destroy() { + int ret = OB_SUCCESS; inited_ = false; partition_service_ = NULL; - if (NULL != tasks_) { - ob_free(tasks_); + ObMsInfoTask *task = nullptr; + while (OB_SUCC(free_queue_.pop((ObLink *&)task))) { + if (OB_NOT_NULL(task)) { + ob_free(task); + } } - tasks_ = NULL; STORAGE_LOG(INFO, "ObSlogWriterQueueThread destroy"); } int ObSlogWriterQueueThread::init(ObPartitionService* partition_service, int tg_id) { int ret = OB_SUCCESS; - const int64_t max_task_num = OB_MAX_PARTITION_NUM_PER_SERVER * 2; tg_id_ = tg_id; if (inited_) { ret = OB_INIT_TWICE; STORAGE_LOG(WARN, "ObSlogWriterQueueThread has already been inited", K(ret)); - } else if (NULL == partition_service) { + } else if (OB_ISNULL(partition_service)) { ret = OB_INVALID_ARGUMENT; STORAGE_LOG(WARN, "invalid argument", K(ret), KP(partition_service)); } else if (OB_FAIL(TG_SET_HANDLER_AND_START(tg_id_, *this))) { STORAGE_LOG(WARN, "ObSimpleThreadPool inited error.", K(ret)); - } else if (OB_SUCCESS != (ret = free_queue_.init(max_task_num))) { - STORAGE_LOG(WARN, "initialize fixed queue of tasks failed", K(ret)); } else { - int64_t size = sizeof(ObMsInfoTask) * max_task_num; - ObMemAttr attr(common::OB_SERVER_TENANT_ID, ObModIds::OB_CALLBACK_TASK); - if (NULL == (tasks_ = (ObMsInfoTask*)ob_malloc(size, attr))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - STORAGE_LOG(ERROR, "no memory", K(ret), K(size)); - } else { - for (int64_t i = max_task_num - 1; OB_SUCC(ret) && i >= 0; --i) { - (void)new (tasks_ + i) ObMsInfoTask; - if (OB_SUCCESS != (ret = free_queue_.push(&tasks_[i]))) { - STORAGE_LOG(WARN, "push free task failed", K(ret)); - } - } - if (OB_SUCC(ret)) { - partition_service_ = partition_service; - inited_ = true; - STORAGE_LOG(INFO, "ObSlogWriterQueueThread init success", K_(tg_id)); - } - } + partition_service_ = partition_service; + inited_ = true; + STORAGE_LOG(INFO, "ObSlogWriterQueueThread init success", K_(tg_id)); } if (OB_SUCCESS != ret && !inited_) { destroy(); @@ -171,9 +155,25 @@ int ObSlogWriterQueueThread::get_task(ObMsInfoTask*& task) if (!inited_) { ret = OB_NOT_INIT; STORAGE_LOG(WARN, "ObSlogWriterQueueThread not init", K(ret)); - } else if (OB_SUCCESS != (ret = free_queue_.pop(task))) { - STORAGE_LOG(WARN, "pop free task failed", K(ret)); + } else if (OB_SUCC(free_queue_.pop((ObLink *&)task))) { + // do nothing + } else if (OB_SUCC(alloc_task(task))) { + // do nothing } else { + STORAGE_LOG(ERROR, "get task failed"); + } + return ret; +} + +int ObSlogWriterQueueThread::alloc_task(ObMsInfoTask *&task) +{ + const ObMemAttr attr(common::OB_SERVER_TENANT_ID, ObModIds::OB_CALLBACK_TASK); + int ret = OB_SUCCESS; + if (OB_ISNULL(task = (ObMsInfoTask *)ob_malloc(sizeof(ObMsInfoTask), attr))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + STORAGE_LOG(WARN, "alloc task failed"); + } else { + task = new (task) ObMsInfoTask; } return ret; } @@ -184,11 +184,14 @@ void ObSlogWriterQueueThread::free_task(ObMsInfoTask* task) if (!inited_) { tmp_ret = OB_NOT_INIT; STORAGE_LOG(ERROR, "ObSlogWriterQueueThread not init", K(tmp_ret)); - } else if (NULL == task) { + } else if (OB_ISNULL(task)) { tmp_ret = OB_INVALID_ARGUMENT; STORAGE_LOG(ERROR, "ObSlogWriterQueueThread invalid argument", K(tmp_ret), K(task)); - } else if (OB_SUCCESS != (tmp_ret = free_queue_.push(task))) { - STORAGE_LOG(ERROR, "push free task failed", K(tmp_ret)); + } else if (free_queue_.size() >= MAX_FREE_TASK_NUM) { + ob_free(task); + } else if (OB_UNLIKELY(OB_SUCCESS != (tmp_ret = free_queue_.push(task)))) { + STORAGE_LOG(WARN, "push free task failed", K(tmp_ret)); + ob_free(task); } } @@ -202,9 +205,9 @@ int ObSlogWriterQueueThread::push(const ObMsInfoTask* task) } else if (!task->is_valid()) { ret = OB_INVALID_ARGUMENT; STORAGE_LOG(WARN, "invalid argument", K(ret), K(*task)); - } else if (OB_SUCCESS != (ret = get_task(saved_task))) { + } else if (OB_FAIL(get_task(saved_task))) { STORAGE_LOG(WARN, "get free task failed", K(ret)); - } else if (NULL == saved_task) { + } else if (OB_ISNULL(saved_task)) { ret = OB_ERR_UNEXPECTED; STORAGE_LOG(WARN, "Unexpected error, saved_task shouldn't be null", K(ret)); } else { diff --git a/src/storage/ob_slog_writer_queue_thread.h b/src/storage/ob_slog_writer_queue_thread.h index 982f999d45..2e3df9b382 100644 --- a/src/storage/ob_slog_writer_queue_thread.h +++ b/src/storage/ob_slog_writer_queue_thread.h @@ -15,7 +15,7 @@ #include "common/ob_partition_key.h" #include "common/ob_member_list.h" -#include "lib/queue/ob_fixed_queue.h" +#include "lib/queue/ob_link_queue.h" #include "lib/thread/thread_mgr_interface.h" #include "share/ob_proposal_id.h" #include "clog/ob_log_define.h" @@ -24,7 +24,7 @@ namespace oceanbase { namespace storage { class ObPartitionService; -class ObMsInfoTask { +class ObMsInfoTask : public common::ObLink { public: ObMsInfoTask() : pkey_(), @@ -141,6 +141,7 @@ class ObSlogWriterQueueThread : public lib::TGTaskHandler { public: static const int64_t QUEUE_THREAD_NUM = 4; static const int64_t MINI_MODE_QUEUE_THREAD_NUM = 2; + static const int64_t MAX_FREE_TASK_NUM = 1024; static const int64_t SLOG_FLUSH_TASK_TIMEOUT_THRESHOLD = clog::CLOG_LEADER_RECONFIRM_SYNC_TIMEOUT; ObSlogWriterQueueThread(); virtual ~ObSlogWriterQueueThread(); @@ -157,13 +158,13 @@ public: private: int get_task(ObMsInfoTask*& task); + int alloc_task(ObMsInfoTask*& task); void free_task(ObMsInfoTask* task); private: bool inited_; ObPartitionService* partition_service_; - common::ObFixedQueue free_queue_; - ObMsInfoTask* tasks_; + common::ObLinkQueue free_queue_; int tg_id_; private: