diff --git a/mittest/logservice/test_ob_simple_log_engine.cpp b/mittest/logservice/test_ob_simple_log_engine.cpp index 6022d4c58..86b12543d 100644 --- a/mittest/logservice/test_ob_simple_log_engine.cpp +++ b/mittest/logservice/test_ob_simple_log_engine.cpp @@ -25,6 +25,7 @@ #include "logservice/palf/log_define.h" #include "logservice/palf/log_group_entry_header.h" #include "logservice/palf/log_io_worker.h" +#include "logservice/palf/log_shared_queue_thread.h" #include "logservice/palf/lsn.h" #include "share/scn.h" #include "logservice/palf/log_io_task.h" @@ -71,6 +72,7 @@ public: ObILogAllocator *alloc_mgr = log_engine_->alloc_mgr_; LogRpc *log_rpc = log_engine_->log_net_service_.log_rpc_; LogIOWorker *log_io_worker = log_engine_->log_io_worker_; + LogSharedQueueTh *log_shared_queue_th = log_engine_->log_shared_queue_th_; LogPlugins *plugins = log_engine_->plugins_; LogEngine log_engine; ILogBlockPool *log_block_pool = log_engine_->log_storage_.block_mgr_.log_block_pool_; @@ -81,6 +83,7 @@ public: &(leader_.palf_handle_impl_->hot_cache_), log_rpc, log_io_worker, + log_shared_queue_th, plugins, entry_header, palf_epoch_, diff --git a/src/logservice/CMakeLists.txt b/src/logservice/CMakeLists.txt index 0d0eedf73..d6a4421ed 100644 --- a/src/logservice/CMakeLists.txt +++ b/src/logservice/CMakeLists.txt @@ -108,9 +108,11 @@ ob_set_subtarget(ob_logservice palf palf/log_group_buffer.cpp palf/log_group_entry.cpp palf/log_group_entry_header.cpp + palf/log_shared_queue_thread.cpp palf/log_io_task.cpp palf/log_io_task_cb_thread_pool.cpp palf/log_io_task_cb_utils.cpp + palf/log_shared_task.cpp palf/log_io_worker.cpp palf/log_iterator_storage.cpp palf/log_learner.cpp diff --git a/src/logservice/palf/log_engine.cpp b/src/logservice/palf/log_engine.cpp index bb9ec7079..05bd56fde 100644 --- a/src/logservice/palf/log_engine.cpp +++ b/src/logservice/palf/log_engine.cpp @@ -24,6 +24,7 @@ #include "log_io_task.h" // LogIOTask #include "log_io_worker.h" // LogIOWorker #include "log_reader_utils.h" // ReadBuf +#include "log_shared_task.h" // LogSharedTask #include "log_writer_utils.h" // LogWriteBuf #include "lsn.h" // LSN #include "log_meta_entry.h" // LogMetaEntry @@ -50,6 +51,7 @@ LogEngine::LogEngine() : log_net_service_(), alloc_mgr_(NULL), log_io_worker_(NULL), + log_shared_queue_th_(NULL), plugins_(NULL), palf_id_(INVALID_PALF_ID), palf_epoch_(-1), @@ -89,6 +91,7 @@ int LogEngine::init(const int64_t palf_id, LogHotCache *hot_cache, LogRpc *log_rpc, LogIOWorker *log_io_worker, + LogSharedQueueTh *log_shared_queue_th, LogPlugins *plugins, const int64_t palf_epoch, const int64_t log_storage_block_size, @@ -106,7 +109,8 @@ int LogEngine::init(const int64_t palf_id, ret = OB_INIT_TWICE; PALF_LOG(ERROR, "LogEngine has inited!!!", K(ret), K(palf_id)); } else if (false == is_valid_palf_id(palf_id) || OB_ISNULL(base_dir) || OB_ISNULL(alloc_mgr) - || OB_ISNULL(log_rpc) || OB_ISNULL(log_io_worker) || OB_ISNULL(plugins)) { + || OB_ISNULL(log_rpc) || OB_ISNULL(log_io_worker) + || OB_ISNULL(plugins)) { ret = OB_INVALID_ARGUMENT; PALF_LOG(ERROR, "Invalid argument!!!", @@ -117,6 +121,7 @@ int LogEngine::init(const int64_t palf_id, K(hot_cache), K(alloc_mgr), K(log_io_worker), + K(log_shared_queue_th), K(plugins)); // NB: Nowday, LSN is strongly dependent on physical block, } else if (OB_FAIL(log_meta_storage_.init(base_dir, @@ -153,6 +158,7 @@ int LogEngine::init(const int64_t palf_id, log_meta_ = log_meta; alloc_mgr_ = alloc_mgr; log_io_worker_ = log_io_worker; + log_shared_queue_th_ = log_shared_queue_th; plugins_ = plugins; palf_epoch_ = palf_epoch; base_lsn_for_block_gc_ = log_meta.get_log_snapshot_meta().base_lsn_; @@ -172,6 +178,7 @@ void LogEngine::destroy() is_inited_ = false; palf_id_ = INVALID_PALF_ID; log_io_worker_ = NULL; + log_shared_queue_th_ = NULL; alloc_mgr_ = NULL; log_net_service_.destroy(); log_meta_storage_.destroy(); @@ -191,6 +198,7 @@ int LogEngine::load(const int64_t palf_id, LogHotCache *hot_cache, LogRpc *log_rpc, LogIOWorker *log_io_worker, + LogSharedQueueTh *log_shared_queue_th, LogPlugins *plugins, LogGroupEntryHeader &entry_header, const int64_t palf_epoch, @@ -270,6 +278,7 @@ int LogEngine::load(const int64_t palf_id, palf_epoch_ = palf_epoch; alloc_mgr_ = alloc_mgr; log_io_worker_ = log_io_worker; + log_shared_queue_th_ = log_shared_queue_th; base_lsn_for_block_gc_ = log_meta_.get_log_snapshot_meta().base_lsn_; is_inited_ = true; PALF_LOG(INFO, @@ -337,6 +346,36 @@ int LogEngine::submit_flush_log_task(const FlushLogCbCtx &flush_log_cb_ctx, return ret; } +int LogEngine::submit_handle_submit_task() +{ + int ret = OB_SUCCESS; + LogHandleSubmitTask *handle_submit_task = NULL; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + PALF_LOG(ERROR, "LogEngine not inited", K(ret), KPC(this)); + } else if (OB_ISNULL(log_shared_queue_th_)) { + ret = OB_ERR_UNEXPECTED; + PALF_LOG(ERROR, "log_shared_queue_th_ is NULL", K(ret), KPC(this)); + } else if (OB_FAIL(generate_handle_submit_task_(handle_submit_task))) { + PALF_LOG(ERROR, "generate_flush_log_task failed", K(ret), KPC(this)); + } else if (OB_FAIL(log_shared_queue_th_->push_task(handle_submit_task))) { + if (OB_IN_STOP_STATE == ret) { + if (REACH_TIME_INTERVAL(100 * 1000)) { + PALF_LOG(WARN, "push task failed", K(ret), KPC(this), KPC(handle_submit_task)); + } + } else { + PALF_LOG(ERROR, "push task failed", K(ret), KPC(this), KPC(handle_submit_task)); + } + } else { + PALF_LOG(TRACE, "log_shared_queue_th_->push_task success", K(ret), KPC(this), KPC(handle_submit_task)); + } + if (OB_FAIL(ret) && OB_NOT_NULL(handle_submit_task)) { + alloc_mgr_->free_log_handle_submit_task(handle_submit_task); + handle_submit_task = NULL; + } + return ret; +} + int LogEngine::submit_flush_prepare_meta_task(const FlushMetaCbCtx &flush_meta_cb_ctx, const LogPrepareMeta &prepare_meta) { @@ -1312,6 +1351,22 @@ int LogEngine::generate_flush_log_task_(const FlushLogCbCtx &flush_log_cb_ctx, return ret; } +int LogEngine::generate_handle_submit_task_(LogHandleSubmitTask *&handle_submit_task) +{ + int ret = OB_SUCCESS; + handle_submit_task = NULL; + if (NULL == (handle_submit_task = alloc_mgr_->alloc_log_handle_submit_task(palf_id_, palf_epoch_))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + PALF_LOG(ERROR, "alloc_log_handle_submit_task failed", K(ret)); + } + + if (OB_FAIL(ret) && OB_NOT_NULL(handle_submit_task)) { + alloc_mgr_->free_log_handle_submit_task(handle_submit_task); + handle_submit_task = NULL; + } + return ret; +} + int LogEngine::generate_truncate_log_task_(const TruncateLogCbCtx &truncate_log_cb_ctx, LogIOTruncateLogTask *&truncate_log_task) { diff --git a/src/logservice/palf/log_engine.h b/src/logservice/palf/log_engine.h index 98b8d1de2..59169f1c2 100644 --- a/src/logservice/palf/log_engine.h +++ b/src/logservice/palf/log_engine.h @@ -34,8 +34,10 @@ class LogRpc; class LogGroupEntry; class LSN; class LogIOWorker; +class LogSharedQueueTh; class PalfHandleImpl; class LogIOTask; +class LogHandleSubmitTask; class LogIOFlushLogTask; class LogIOTruncateLogTask; class LogIOFlushMetaTask; @@ -103,6 +105,7 @@ public: LogHotCache *hot_cache, LogRpc *log_rpc, LogIOWorker *log_io_worker, + LogSharedQueueTh *log_shared_queue_th, LogPlugins *plugins, const int64_t palf_epoch, const int64_t log_storage_block_size, @@ -116,6 +119,7 @@ public: LogHotCache *hot_cache, LogRpc *log_rpc, LogIOWorker *log_io_worker, + LogSharedQueueTh *log_shared_queue_th, LogPlugins *plugins, LogGroupEntryHeader &entry_header, const int64_t palf_epoch, @@ -130,6 +134,7 @@ public: const int64_t buf_len); virtual int submit_flush_log_task(const FlushLogCbCtx &flush_log_cb_ctx, const LogWriteBuf &write_buf); + virtual int submit_handle_submit_task(); int submit_flush_prepare_meta_task(const FlushMetaCbCtx &flush_meta_cb_ctx, const LogPrepareMeta &prepare_meta); @@ -446,7 +451,7 @@ private: int generate_flush_log_task_(const FlushLogCbCtx &flush_log_cb_ctx, const LogWriteBuf &write_buf, LogIOFlushLogTask *&flush_log_task); - + int generate_handle_submit_task_(LogHandleSubmitTask *&handle_submit_task); int generate_truncate_log_task_(const TruncateLogCbCtx &truncate_log_cb_ctx, LogIOTruncateLogTask *&truncate_log_task); int generate_truncate_prefix_blocks_task_( @@ -495,6 +500,7 @@ private: LogNetService log_net_service_; common::ObILogAllocator *alloc_mgr_; LogIOWorker *log_io_worker_; + LogSharedQueueTh *log_shared_queue_th_; LogPlugins *plugins_; // Except for LogNetService, this field is just only used for debug int64_t palf_id_; diff --git a/src/logservice/palf/log_shared_queue_thread.cpp b/src/logservice/palf/log_shared_queue_thread.cpp new file mode 100644 index 000000000..05a3a963e --- /dev/null +++ b/src/logservice/palf/log_shared_queue_thread.cpp @@ -0,0 +1,157 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#include "log_shared_queue_thread.h" +#include "log_shared_task.h" +#include "share/ob_errno.h" // errno... +#include "share/ob_thread_define.h" // TGDefIDs +#include "share/ob_thread_mgr.h" // TG_START +#include "palf_env_impl.h" // PalfEnvImpl + +namespace oceanbase +{ +namespace palf +{ +LogSharedQueueTh::LogSharedQueueTh() + : tg_id_(-1), + palf_env_impl_(NULL), + is_inited_(false) +{} + +LogSharedQueueTh::~LogSharedQueueTh() +{ + destroy(); +} + +int LogSharedQueueTh::init(IPalfEnvImpl *palf_env_impl) +{ + int ret = OB_SUCCESS; + const int tg_id = lib::TGDefIDs::LogSharedQueueTh; + if (IS_INIT) { + ret = OB_INIT_TWICE; + PALF_LOG(ERROR, "LogSharedQueueTh has inited", K(ret)); + } else if (NULL == palf_env_impl) { + ret = OB_INVALID_ARGUMENT; + PALF_LOG(ERROR, "Invalid argument", K(ret), KP(palf_env_impl)); + } else if (OB_FAIL(TG_CREATE_TENANT(tg_id, tg_id_, MAX_LOG_HANDLE_TASK_NUM))) { + PALF_LOG(WARN, "LogSharedQueueTh TG_CREATE failed", K(ret)); + } else { + palf_env_impl_ = palf_env_impl; + is_inited_ = true; + PALF_LOG(INFO, "LogSharedQueueTh init success", K(ret), K(tg_id_), KP(palf_env_impl)); + } + if (OB_FAIL(ret) && OB_INIT_TWICE != ret) { + destroy(); + } + return ret; +} + +int LogSharedQueueTh::start() +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + PALF_LOG(ERROR, "LogSharedQueueTh not inited", K(ret)); + } else if (OB_FAIL(TG_SET_HANDLER_AND_START(tg_id_, *this))) { + PALF_LOG(ERROR, "start LogSharedQueueTh failed", K(ret)); + } else { + PALF_LOG(INFO, "start LogSharedQueueTh success", K(ret), + K(tg_id_)); + } + return ret; +} + +int LogSharedQueueTh::stop() +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + PALF_LOG(WARN, "LogSharedQueueTh not inited", K(ret)); + } else { + TG_STOP(tg_id_); + PALF_LOG(INFO, "stop LogSharedQueueTh success", K(tg_id_)); + } + return ret; +} + +int LogSharedQueueTh::wait() +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + PALF_LOG(WARN, "LogSharedQueueTh not inited", K(ret)); + } else { + TG_WAIT(tg_id_); + PALF_LOG(INFO, "wait LogSharedQueueTh success", K(tg_id_)); + } + return ret; +} + +void LogSharedQueueTh::destroy() +{ + stop(); + wait(); + is_inited_ = false; + if (-1 != tg_id_) { + TG_DESTROY(tg_id_); + PALF_LOG(INFO, "destroy LogSharedQueueTh success", K(tg_id_)); + } + tg_id_ = -1; +} + +int LogSharedQueueTh::push_task(LogSharedTask *task) +{ + int ret = OB_SUCCESS; + if (NULL == task) { + ret = OB_INVALID_ARGUMENT; + } else { + int64_t print_log_interval = OB_INVALID_TIMESTAMP; + while (OB_FAIL(TG_PUSH_TASK(tg_id_, task))) { + if (OB_IN_STOP_STATE == ret) { + PALF_LOG(WARN, "thread_pool has been stopped, skip task", K(ret), K_(tg_id), KPC(task)); + break; + } else if (palf_reach_time_interval(5 * 1000 * 1000, print_log_interval)) { + PALF_LOG(ERROR, "push task failed", K(ret), K_(tg_id), KPC(task)); + } + ob_usleep(1000); + } + } + return ret; +} + +void LogSharedQueueTh::handle(void *task) +{ + int ret = OB_SUCCESS; + LogSharedTask *log_shared_task = reinterpret_cast(task); + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + PALF_LOG(ERROR, "LogSharedQueueTh not inited", K(ret)); + } else if (OB_ISNULL(log_shared_task)) { + ret = OB_INVALID_ARGUMENT; + PALF_LOG(ERROR, "Invalid argument", K(ret), K(log_shared_task)); + } else if (OB_FAIL(log_shared_task->do_task(palf_env_impl_))) { + PALF_LOG(WARN, "LogSharedTask handle_task failed", K(ret), KPC(log_shared_task)); + } else { + PALF_LOG(TRACE, "LogSharedQueueTh handle success", KPC(log_shared_task)); + } + if (OB_NOT_NULL(log_shared_task)) { + log_shared_task->free_this(palf_env_impl_); + } +} + +int LogSharedQueueTh::get_tg_id() const +{ + return tg_id_; +} + +} // end namespace palf +} // end namespace oceanbase diff --git a/src/logservice/palf/log_shared_queue_thread.h b/src/logservice/palf/log_shared_queue_thread.h new file mode 100644 index 000000000..beee80d20 --- /dev/null +++ b/src/logservice/palf/log_shared_queue_thread.h @@ -0,0 +1,54 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef OCEANBASE_LOGSERVICE_LOG_SHARED_QUEUE_THREAD_ +#define OCEANBASE_LOGSERVICE_LOG_SHARED_QUEUE_THREAD_ + +#include "lib/thread/thread_mgr_interface.h" +#include "lib/utility/ob_print_utils.h" + +namespace oceanbase +{ +namespace palf +{ +class IPalfEnvImpl; +class LogSharedTask; + +class LogSharedQueueTh : public lib::TGTaskHandler +{ +public: + LogSharedQueueTh(); + ~LogSharedQueueTh(); +public: + int init(IPalfEnvImpl *palf_env_impl); + int start(); + int stop(); + int wait(); + void destroy(); + int push_task(LogSharedTask *task); + virtual void handle(void *task); + int get_tg_id() const; +public: + static constexpr int64_t THREAD_NUM = 1; + static constexpr int64_t MINI_MODE_THREAD_NUM = 1; + static constexpr int64_t MAX_LOG_HANDLE_TASK_NUM = 10 * OB_MAX_LS_NUM_PER_TENANT_PER_SERVER; +private: + DISALLOW_COPY_AND_ASSIGN(LogSharedQueueTh); +private: + int tg_id_; + IPalfEnvImpl *palf_env_impl_; + bool is_inited_; +}; +} // end namespace palf +} // end namespace oceanbase + +#endif diff --git a/src/logservice/palf/log_shared_task.cpp b/src/logservice/palf/log_shared_task.cpp new file mode 100644 index 000000000..cf2c13420 --- /dev/null +++ b/src/logservice/palf/log_shared_task.cpp @@ -0,0 +1,74 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#include "log_shared_task.h" +#include "palf_env_impl.h" // PalfEnvImpl +#include "share/ob_errno.h" // errno... + +namespace oceanbase +{ +namespace palf +{ +LogSharedTask::LogSharedTask(const int64_t palf_id,const int64_t palf_epoch) + : palf_id_(palf_id), palf_epoch_(palf_epoch) +{} + +LogSharedTask::~LogSharedTask() +{ + destroy(); +} + +void LogSharedTask::destroy() +{ + reset(); +} + +void LogSharedTask::reset() +{ + palf_id_ = INVALID_PALF_ID; + palf_epoch_ = -1; +} + +LogHandleSubmitTask::LogHandleSubmitTask(const int64_t palf_id,const int64_t palf_epoch) + : LogSharedTask(palf_id, palf_epoch) +{} + +LogHandleSubmitTask::~LogHandleSubmitTask() +{} + +void LogHandleSubmitTask::free_this(IPalfEnvImpl *palf_env_impl) +{ + palf_env_impl->get_log_allocator()->free_log_handle_submit_task(this); +} + +int LogHandleSubmitTask::do_task(IPalfEnvImpl *palf_env_impl) +{ + int ret = OB_SUCCESS; + int64_t palf_epoch = -1; + IPalfHandleImplGuard guard; + common::ObTimeGuard time_guard("handle submit task", 100 * 1000); + if (OB_FAIL(palf_env_impl->get_palf_handle_impl(palf_id_, guard))) { + PALF_LOG(WARN, "IPalfEnvImpl get_palf_handle_impl failed", K(ret), KPC(this)); + } else if (OB_FAIL(guard.get_palf_handle_impl()->get_palf_epoch(palf_epoch))) { + PALF_LOG(WARN, "IPalfEnvImpl get_palf_epoch failed", K(ret), KPC(this)); + } else if (palf_epoch != palf_epoch_) { + PALF_LOG(WARN, "palf_epoch has changed, drop task", K(ret), K(palf_epoch), KPC(this)); + } else if (OB_FAIL(guard.get_palf_handle_impl()->try_handle_next_submit_log())) { + PALF_LOG(WARN, "PalfHandleImpl try_handle_next_submit_log failed", K(ret), KPC(this)); + } else { + PALF_LOG(TRACE, "LogHandleSubmitTask handle_task success", K(time_guard), KPC(this)); + } + return ret; +} + +} // end namespace palf +} // end namespace oceanbase diff --git a/src/logservice/palf/log_shared_task.h b/src/logservice/palf/log_shared_task.h new file mode 100644 index 000000000..0e89bb8cb --- /dev/null +++ b/src/logservice/palf/log_shared_task.h @@ -0,0 +1,78 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef OCEANBASE_LOGSERVICE_LOG_SHARED_TASK_ +#define OCEANBASE_LOGSERVICE_LOG_SHARED_TASK_ + +#include "lib/utility/ob_print_utils.h" + +namespace oceanbase +{ +namespace palf +{ +class IPalfEnvImpl; + +enum class LogSharedTaskType +{ + LogHandleSubmitType = 1, +}; + +inline const char *shared_type_2_str(const LogSharedTaskType type) +{ +#define EXTRACT_SHARED_TYPE(type_var) ({ case(LogSharedTaskType::type_var): return #type_var; }) + switch(type) + { + EXTRACT_SHARED_TYPE(LogHandleSubmitType); + + default: + return "Invalid Type"; + } +#undef EXTRACT_SHARED_TYPE +} + +class LogSharedTask +{ +public: + LogSharedTask(const int64_t palf_id, const int64_t palf_epoch); + virtual ~LogSharedTask(); + void destroy(); + void reset(); + virtual int do_task(IPalfEnvImpl *palf_env_impl) = 0; + virtual void free_this(IPalfEnvImpl *palf_env_impl) = 0; + virtual LogSharedTaskType get_shared_task_type() const = 0; + VIRTUAL_TO_STRING_KV("BaseClass", "LogSharedTask", + "palf_id", palf_id_, + "palf_epoch", palf_epoch_); +protected: + int64_t palf_id_; + int64_t palf_epoch_; +private: + DISALLOW_COPY_AND_ASSIGN(LogSharedTask); +}; + +class LogHandleSubmitTask : public LogSharedTask +{ +public: + LogHandleSubmitTask(const int64_t palf_id, const int64_t palf_epoch); + ~LogHandleSubmitTask() override; + int do_task(IPalfEnvImpl *palf_env_impl) override; + void free_this(IPalfEnvImpl *palf_env_impl) override; + virtual LogSharedTaskType get_shared_task_type() const override { return LogSharedTaskType::LogHandleSubmitType; } + INHERIT_TO_STRING_KV("LogSharedTask", LogSharedTask, "task type", shared_type_2_str(get_shared_task_type())); +private: + DISALLOW_COPY_AND_ASSIGN(LogHandleSubmitTask); +}; + +} // end namespace palf +} // end namespace oceanbase + +#endif diff --git a/src/logservice/palf/log_sliding_window.cpp b/src/logservice/palf/log_sliding_window.cpp index 26de6842e..a94485db3 100644 --- a/src/logservice/palf/log_sliding_window.cpp +++ b/src/logservice/palf/log_sliding_window.cpp @@ -134,6 +134,7 @@ LogSlidingWindow::LogSlidingWindow() accum_group_log_size_(0), last_record_group_log_id_(FIRST_VALID_LOG_ID - 1), freeze_mode_(FEEDBACK_FREEZE_MODE), + has_pending_handle_submit_task_(false), is_inited_(false) {} @@ -900,12 +901,43 @@ int LogSlidingWindow::try_push_log_to_children_(const int64_t curr_proposal_id, return ret; } +int LogSlidingWindow::try_handle_next_submit_log() +{ + int ret = OB_SUCCESS; + // Set has_pending_handle_submit_task_ to false forcedly. + (void) ATOMIC_STORE(&has_pending_handle_submit_task_, false); + bool unused_bool = false; + ret = handle_next_submit_log_(unused_bool); + return ret; +} + +bool LogSlidingWindow::is_handle_thread_lease_expired(const int64_t thread_lease_begin_ts) const +{ + // The thread lease time for handle_next_submit_log_ is 50ms. + static const int64_t THREAD_LEASE_US = 50 * 1000L; + bool bool_ret = false; + if (OB_INVALID_TIMESTAMP != thread_lease_begin_ts + && ObTimeUtility::current_time() - thread_lease_begin_ts > THREAD_LEASE_US) { + bool_ret = true; + } + return bool_ret; +} + int LogSlidingWindow::handle_next_submit_log_(bool &is_committed_lsn_updated) { int ret = OB_SUCCESS; + common::ObTimeGuard time_guard("handle_next_submit_log", 100 * 1000); if (submit_log_handling_lease_.acquire()) { + // record handle_thread_lease_begin_ts with current time + const int64_t thread_lease_begin_ts = ObTimeUtility::current_time(); + bool is_lease_expired = false; + bool need_submit_async_task = false; do { - while (OB_SUCC(ret)) { + // If it revoke fails when thread lease expired, this thread need submit an async task. + if (is_lease_expired) { + need_submit_async_task = true; + } + while (OB_SUCC(ret) && !is_lease_expired) { LSN last_submit_lsn; LSN last_submit_end_lsn; int64_t last_submit_log_id = OB_INVALID_LOG_ID; @@ -1095,8 +1127,32 @@ int LogSlidingWindow::handle_next_submit_log_(bool &is_committed_lsn_updated) PALF_LOG(TRACE, "handle one submit log", K(ret), K_(palf_id), K_(self), K(tmp_log_id), K(is_committed_lsn_updated), K(is_need_submit), K(is_submitted)); } + is_lease_expired = is_handle_thread_lease_expired(thread_lease_begin_ts); } } while (!submit_log_handling_lease_.revoke()); + + // Try push handle_submit_task into queue when lease revoke failed(lease expired). + if (OB_SUCC(ret) && need_submit_async_task) { + // This CAS is used to control only one task can be submitted into queue at any time. + if (ATOMIC_BCAS(&has_pending_handle_submit_task_, false, true)) { + // push task into queue until success + int tmp_ret = OB_SUCCESS; + while (OB_TMP_FAIL(log_engine_->submit_handle_submit_task())) { + if (REACH_TIME_INTERVAL(100 * 1000)) { + PALF_LOG(WARN, "submit_handle_submit_task failed", K(tmp_ret), K_(palf_id), K_(self)); + } + if (OB_IN_STOP_STATE == tmp_ret) { + // The thread pool has been stopped, no need retry. + break; + } else { + // sleep 100us when submit task failed + ob_usleep(100); + } + } + } else { + // no need push task into queue + } + } } return ret; } diff --git a/src/logservice/palf/log_sliding_window.h b/src/logservice/palf/log_sliding_window.h index 3eb76b25c..af4b9689e 100755 --- a/src/logservice/palf/log_sliding_window.h +++ b/src/logservice/palf/log_sliding_window.h @@ -322,6 +322,7 @@ public: char *buf, int64_t &out_read_size) const; int64_t get_last_slide_log_id() const; + virtual int try_handle_next_submit_log(); TO_STRING_KV(K_(palf_id), K_(self), K_(lsn_allocator), K_(group_buffer), \ K_(last_submit_lsn), K_(last_submit_end_lsn), K_(last_submit_log_id), K_(last_submit_log_pid), \ K_(max_flushed_lsn), K_(max_flushed_end_lsn), K_(max_flushed_log_pid), K_(committed_end_lsn), \ @@ -329,8 +330,10 @@ public: K_(last_slide_log_pid), K_(last_slide_log_accum_checksum), K_(last_fetch_end_lsn), \ K_(last_fetch_max_log_id), K_(last_fetch_committed_end_lsn), K_(last_truncate_lsn), \ K_(last_fetch_req_time), K_(is_truncating), K_(is_rebuilding), K_(last_rebuild_lsn), \ - "freeze_mode", freeze_mode_2_str(freeze_mode_), \ + "freeze_mode", freeze_mode_2_str(freeze_mode_), K_(has_pending_handle_submit_task), \ "last_fetch_trigger_type", fetch_trigger_type_2_str(last_fetch_trigger_type_), KP(this)); +protected: + virtual bool is_handle_thread_lease_expired(const int64_t thread_lease_begin_ts) const; private: int do_init_mem_(const int64_t palf_id, const PalfBaseInfo &palf_base_info, @@ -609,6 +612,7 @@ private: int64_t last_record_group_log_id_; int64_t append_cnt_array_[APPEND_CNT_ARRAY_SIZE]; FreezeMode freeze_mode_; + bool has_pending_handle_submit_task_; bool is_inited_; private: DISALLOW_COPY_AND_ASSIGN(LogSlidingWindow); diff --git a/src/logservice/palf/palf_env_impl.cpp b/src/logservice/palf/palf_env_impl.cpp index ead1a7bb1..0157f66e9 100644 --- a/src/logservice/palf/palf_env_impl.cpp +++ b/src/logservice/palf/palf_env_impl.cpp @@ -182,6 +182,7 @@ PalfEnvImpl::PalfEnvImpl() : palf_meta_lock_(common::ObLatchIds::PALF_ENV_LOCK), log_rpc_(), cb_thread_pool_(), log_io_worker_wrapper_(), + log_shared_queue_th_(), block_gc_timer_task_(), log_updater_(), monitor_(NULL), @@ -243,6 +244,8 @@ int PalfEnvImpl::init( cb_thread_pool_.get_tg_id(), log_alloc_mgr, this))) { PALF_LOG(ERROR, "LogIOWorker init failed", K(ret)); + } else if (OB_FAIL(log_shared_queue_th_.init(this))) { + PALF_LOG(ERROR, "LogSharedQueueTh init failed", K(ret)); } else if (OB_FAIL(block_gc_timer_task_.init(this))) { PALF_LOG(ERROR, "ObCheckLogBlockCollectTask init failed", K(ret)); } else if ((pret = snprintf(log_dir_, MAX_PATH_SIZE, "%s", base_dir)) && false) { @@ -293,6 +296,8 @@ int PalfEnvImpl::start() PALF_LOG(ERROR, "LogIOTaskThreadPool start failed", K(ret)); } else if (OB_FAIL(log_io_worker_wrapper_.start())) { PALF_LOG(ERROR, "LogIOWorker start failed", K(ret)); + } else if (OB_FAIL(log_shared_queue_th_.start())) { + PALF_LOG(ERROR, "LogIOWorker start failed", K(ret)); } else if (OB_FAIL(block_gc_timer_task_.start())) { PALF_LOG(ERROR, "FileCollectTimerTask start failed", K(ret)); } else if (OB_FAIL(fetch_log_engine_.start())) { @@ -314,6 +319,7 @@ void PalfEnvImpl::stop() PALF_LOG(INFO, "PalfEnvImpl begin stop", KPC(this)); is_running_ = false; log_io_worker_wrapper_.stop(); + log_shared_queue_th_.stop(); cb_thread_pool_.stop(); block_gc_timer_task_.stop(); fetch_log_engine_.stop(); @@ -327,6 +333,7 @@ void PalfEnvImpl::wait() { PALF_LOG(INFO, "PalfEnvImpl begin wait", KPC(this)); log_io_worker_wrapper_.wait(); + log_shared_queue_th_.wait(); cb_thread_pool_.wait(); block_gc_timer_task_.wait(); fetch_log_engine_.wait(); @@ -342,6 +349,7 @@ void PalfEnvImpl::destroy() is_inited_ = false; palf_handle_impl_map_.destroy(); log_io_worker_wrapper_.destroy(); + log_shared_queue_th_.destroy(); cb_thread_pool_.destroy(); log_loop_thread_.destroy(); block_gc_timer_task_.destroy(); @@ -419,7 +427,8 @@ int PalfEnvImpl::create_palf_handle_impl_(const int64_t palf_id, PALF_LOG(WARN, "prepare_directory_for_creating_ls failed!!!", K(ret), K(palf_id)); } else if (OB_FAIL(palf_handle_impl->init(palf_id, access_mode, palf_base_info, replica_type, &fetch_log_engine_, base_dir, log_alloc_mgr_, log_block_pool_, &log_rpc_, - log_io_worker_wrapper_.get_log_io_worker(palf_id), this, self_, &election_timer_, palf_epoch))) { + log_io_worker_wrapper_.get_log_io_worker(palf_id), &log_shared_queue_th_, this, + self_, &election_timer_, palf_epoch))) { PALF_LOG(ERROR, "IPalfHandleImpl init failed", K(ret), K(palf_id)); // NB: always insert value into hash map finally. } else if (OB_FAIL(palf_handle_impl_map_.insert_and_get(hash_map_key, palf_handle_impl))) { @@ -1045,8 +1054,8 @@ int PalfEnvImpl::reload_palf_handle_impl_(const int64_t palf_id) ret = OB_ALLOCATE_MEMORY_FAILED; PALF_LOG(WARN, "alloc ipalf_handle_impl failed", K(ret)); } else if (OB_FAIL(tmp_palf_handle_impl->load(palf_id, &fetch_log_engine_, base_dir, log_alloc_mgr_, - log_block_pool_, &log_rpc_, log_io_worker_wrapper_.get_log_io_worker(palf_id), this, self_, - &election_timer_, palf_epoch, is_integrity))) { + log_block_pool_, &log_rpc_, log_io_worker_wrapper_.get_log_io_worker(palf_id), &log_shared_queue_th_, + this, self_, &election_timer_, palf_epoch, is_integrity))) { PALF_LOG(ERROR, "PalfHandleImpl init failed", K(ret), K(palf_id)); } else if (OB_FAIL(palf_handle_impl_map_.insert_and_get(hash_map_key, tmp_palf_handle_impl))) { PALF_LOG(WARN, "palf_handle_impl_map_ insert_and_get failed", K(ret), K(palf_id), K(tmp_palf_handle_impl)); diff --git a/src/logservice/palf/palf_env_impl.h b/src/logservice/palf/palf_env_impl.h index 270488616..e5ac098d8 100644 --- a/src/logservice/palf/palf_env_impl.h +++ b/src/logservice/palf/palf_env_impl.h @@ -24,9 +24,10 @@ #include "share/ob_occam_timer.h" #include "share/scn.h" #include "fetch_log_engine.h" -#include "log_loop_thread.h" #include "log_define.h" +#include "log_shared_queue_thread.h" #include "log_io_task_cb_thread_pool.h" +#include "log_loop_thread.h" #include "log_rpc.h" #include "palf_options.h" #include "palf_handle_impl.h" @@ -368,6 +369,7 @@ private: LogIOTaskCbThreadPool cb_thread_pool_; common::ObOccamTimer election_timer_; LogIOWorkerWrapper log_io_worker_wrapper_; + LogSharedQueueTh log_shared_queue_th_; BlockGCTimerTask block_gc_timer_task_; LogUpdater log_updater_; PalfMonitorCb *monitor_; diff --git a/src/logservice/palf/palf_handle_impl.cpp b/src/logservice/palf/palf_handle_impl.cpp index 61191ecdd..5513f9b59 100755 --- a/src/logservice/palf/palf_handle_impl.cpp +++ b/src/logservice/palf/palf_handle_impl.cpp @@ -71,6 +71,7 @@ PalfHandleImpl::PalfHandleImpl() palf_env_impl_(NULL), append_cost_stat_("[PALF STAT WRITE LOG COST TIME]", PALF_STAT_PRINT_INTERVAL_US), flush_cb_cost_stat_("[PALF STAT FLUSH CB COST TIME]", PALF_STAT_PRINT_INTERVAL_US), + handle_submit_log_cost_stat_("[PALF STAT HANDLE SUBMIT LOG COST TIME]", PALF_STAT_PRINT_INTERVAL_US), last_accum_write_statistic_time_(OB_INVALID_TIMESTAMP), accum_write_log_size_(0), last_accum_fetch_statistic_time_(OB_INVALID_TIMESTAMP), @@ -109,6 +110,7 @@ int PalfHandleImpl::init(const int64_t palf_id, ILogBlockPool *log_block_pool, LogRpc *log_rpc, LogIOWorker *log_io_worker, + LogSharedQueueTh *log_shared_queue_th, IPalfEnvImpl *palf_env_impl, const common::ObAddr &self, common::ObOccamTimer *election_timer, @@ -131,6 +133,7 @@ int PalfHandleImpl::init(const int64_t palf_id, || NULL == log_block_pool || NULL == log_rpc || NULL == log_io_worker + || NULL == log_shared_queue_th || NULL == palf_env_impl || false == self.is_valid() || NULL == election_timer @@ -138,16 +141,16 @@ int PalfHandleImpl::init(const int64_t palf_id, ret = OB_INVALID_ARGUMENT; PALF_LOG(ERROR, "Invalid argument!!!", K(ret), K(palf_id), K(palf_base_info), K(replica_type), K(access_mode), K(log_dir), K(alloc_mgr), K(log_block_pool), K(log_rpc), - K(log_io_worker), K(palf_env_impl), K(self), K(election_timer), K(palf_epoch)); + K(log_io_worker), K(log_shared_queue_th), K(palf_env_impl), K(self), K(election_timer), K(palf_epoch)); } else if (OB_FAIL(log_meta.generate_by_palf_base_info(palf_base_info, access_mode, replica_type))) { PALF_LOG(WARN, "generate_by_palf_base_info failed", K(ret), K(palf_id), K(palf_base_info), K(access_mode), K(replica_type)); } else if ((pret = snprintf(log_dir_, MAX_PATH_SIZE, "%s", log_dir)) && false) { ret = OB_ERR_UNEXPECTED; PALF_LOG(ERROR, "error unexpected", K(ret), K(palf_id)); } else if (OB_FAIL(log_engine_.init(palf_id, log_dir, log_meta, alloc_mgr, log_block_pool, &hot_cache_, \ - log_rpc, log_io_worker, &plugins_, palf_epoch, PALF_BLOCK_SIZE, PALF_META_BLOCK_SIZE))) { + log_rpc, log_io_worker, log_shared_queue_th, &plugins_, palf_epoch, PALF_BLOCK_SIZE, PALF_META_BLOCK_SIZE))) { PALF_LOG(WARN, "LogEngine init failed", K(ret), K(palf_id), K(log_dir), K(alloc_mgr), - K(log_rpc), K(log_io_worker)); + K(log_rpc), K(log_io_worker), K(log_shared_queue_th)); } else if (OB_FAIL(do_init_mem_(palf_id, palf_base_info, log_meta, log_dir, self, fetch_log_engine, alloc_mgr, log_rpc, palf_env_impl, election_timer))) { PALF_LOG(WARN, "PalfHandleImpl do_init_mem_ failed", K(ret), K(palf_id)); @@ -175,6 +178,7 @@ int PalfHandleImpl::load(const int64_t palf_id, ILogBlockPool *log_block_pool, LogRpc *log_rpc, LogIOWorker *log_io_worker, + LogSharedQueueTh *log_shared_queue_th, IPalfEnvImpl *palf_env_impl, const common::ObAddr &self, common::ObOccamTimer *election_timer, @@ -194,12 +198,14 @@ int PalfHandleImpl::load(const int64_t palf_id, || NULL == alloc_mgr || NULL == log_rpc || NULL == log_io_worker + || NULL == log_shared_queue_th || false == self.is_valid()) { ret = OB_INVALID_ARGUMENT; PALF_LOG(ERROR, "Invalid argument!!!", K(ret), K(palf_id), K(log_dir), K(alloc_mgr), - K(log_rpc), K(log_io_worker)); + K(log_rpc), K(log_io_worker), K(log_shared_queue_th)); } else if (OB_FAIL(log_engine_.load(palf_id, log_dir, alloc_mgr, log_block_pool, &hot_cache_, log_rpc, - log_io_worker, &plugins_, entry_header, palf_epoch, is_integrity, PALF_BLOCK_SIZE, PALF_META_BLOCK_SIZE))) { + log_io_worker, log_shared_queue_th, &plugins_, entry_header, palf_epoch, is_integrity, + PALF_BLOCK_SIZE, PALF_META_BLOCK_SIZE))) { PALF_LOG(WARN, "LogEngine load failed", K(ret), K(palf_id)); // NB: when 'entry_header' is invalid, means that there is no data on disk, and set max_committed_end_lsn // to 'base_lsn_', we will generate default PalfBaseInfo or get it from LogSnapshotMeta(rebuild). @@ -4088,6 +4094,24 @@ int PalfHandleImpl::advance_reuse_lsn(const LSN &flush_log_end_lsn) return ret; } +int PalfHandleImpl::try_handle_next_submit_log() +{ + int ret = OB_SUCCESS; + PALF_LOG(TRACE, "try_handle_next_submit_log begin", KPC(this)); + const int64_t begin_ts = ObTimeUtility::current_time(); + RLockGuard guard(lock_); + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + } else if (OB_FAIL(sw_.try_handle_next_submit_log())) { + PALF_LOG(WARN, "sw_.try_handle_next_submit_log failed", K(ret), KPC(this)); + } else { + const int64_t time_cost = ObTimeUtility::current_time() - begin_ts; + handle_submit_log_cost_stat_.stat(time_cost); + PALF_LOG(TRACE, "try_handle_next_submit_log success", K(ret), KPC(this)); + } + return ret; +} + int PalfHandleImpl::inner_after_flush_log(const FlushLogCbCtx &flush_log_cb_ctx) { int ret = OB_SUCCESS; diff --git a/src/logservice/palf/palf_handle_impl.h b/src/logservice/palf/palf_handle_impl.h index 7370dabda..7ee776814 100755 --- a/src/logservice/palf/palf_handle_impl.h +++ b/src/logservice/palf/palf_handle_impl.h @@ -60,6 +60,7 @@ class LogIOFlushMetaTask; class ReadBuf; class LogWriteBuf; class LogIOWorker; +class LogSharedQueueTh; class LogRpc; class IPalfEnvImpl; @@ -820,6 +821,7 @@ public: const int64_t in_read_size, char *buf, int64_t &out_read_size) const = 0; + virtual int try_handle_next_submit_log() = 0; DECLARE_PURE_VIRTUAL_TO_STRING; }; @@ -838,6 +840,7 @@ public: ILogBlockPool *log_block_pool, LogRpc *log_rpc, LogIOWorker *log_io_worker, + LogSharedQueueTh *log_shared_queue_th, IPalfEnvImpl *palf_env_impl, const common::ObAddr &self, common::ObOccamTimer *election_timer, @@ -855,6 +858,7 @@ public: ILogBlockPool *log_block_pool, LogRpc *log_rpc, LogIOWorker*log_io_worker, + LogSharedQueueTh *log_shared_queue_th, IPalfEnvImpl *palf_env_impl, const common::ObAddr &self, common::ObOccamTimer *election_timer, @@ -958,6 +962,7 @@ public: const int64_t in_read_size, char *buf, int64_t &out_read_size) const; + int try_handle_next_submit_log(); public: int delete_block(const block_id_t &block_id) override final; int read_log(const LSN &lsn, @@ -1423,6 +1428,7 @@ private: bool diskspace_enough_; ObMiniStat::ObStatItem append_cost_stat_; ObMiniStat::ObStatItem flush_cb_cost_stat_; + ObMiniStat::ObStatItem handle_submit_log_cost_stat_; int64_t last_accum_write_statistic_time_; int64_t accum_write_log_size_; // the accum size of written logs int64_t last_accum_fetch_statistic_time_; diff --git a/src/share/allocator/ob_tenant_mutil_allocator.cpp b/src/share/allocator/ob_tenant_mutil_allocator.cpp index 07691ea6c..772b9b9f3 100644 --- a/src/share/allocator/ob_tenant_mutil_allocator.cpp +++ b/src/share/allocator/ob_tenant_mutil_allocator.cpp @@ -16,6 +16,7 @@ #include "observer/omt/ob_multi_tenant.h" #include "logservice/palf/log_io_task.h" #include "logservice/palf/fetch_log_engine.h" +#include "logservice/palf/log_shared_task.h" #include "logservice/replayservice/ob_replay_status.h" namespace oceanbase @@ -28,6 +29,7 @@ namespace common ObTenantMutilAllocator::ObTenantMutilAllocator(uint64_t tenant_id) : tenant_id_(tenant_id), total_limit_(INT64_MAX), pending_replay_mutator_size_(0), + LOG_HANDLE_SUBMIT_TASK_SIZE(sizeof(palf::LogHandleSubmitTask)), LOG_IO_FLUSH_LOG_TASK_SIZE(sizeof(palf::LogIOFlushLogTask)), LOG_IO_TRUNCATE_LOG_TASK_SIZE(sizeof(palf::LogIOTruncateLogTask)), LOG_IO_FLUSH_META_TASK_SIZE(sizeof(palf::LogIOFlushMetaTask)), @@ -40,6 +42,7 @@ ObTenantMutilAllocator::ObTenantMutilAllocator(uint64_t tenant_id) unlimited_blk_alloc_(), replay_log_task_blk_alloc_(REPLAY_MEM_LIMIT_THRESHOLD), clog_ge_alloc_(ObMemAttr(tenant_id, ObModIds::OB_CLOG_GE), ObVSliceAlloc::DEFAULT_BLOCK_SIZE, clog_blk_alloc_), + log_handle_submit_task_alloc_(LOG_HANDLE_SUBMIT_TASK_SIZE, ObMemAttr(tenant_id, "HandleSubmit"), choose_blk_size(LOG_HANDLE_SUBMIT_TASK_SIZE), clog_blk_alloc_, this), log_io_flush_log_task_alloc_(LOG_IO_FLUSH_LOG_TASK_SIZE, ObMemAttr(tenant_id, "FlushLog"), choose_blk_size(LOG_IO_FLUSH_LOG_TASK_SIZE), clog_blk_alloc_, this), log_io_truncate_log_task_alloc_(LOG_IO_TRUNCATE_LOG_TASK_SIZE, ObMemAttr(tenant_id, "TruncateLog"), choose_blk_size(LOG_IO_TRUNCATE_LOG_TASK_SIZE), clog_blk_alloc_, this), log_io_flush_meta_task_alloc_(LOG_IO_FLUSH_META_TASK_SIZE, ObMemAttr(tenant_id, "FlushMeta"), choose_blk_size(LOG_IO_FLUSH_META_TASK_SIZE), clog_blk_alloc_, this), @@ -71,6 +74,7 @@ void ObTenantMutilAllocator::destroy() { OB_LOG(INFO, "ObTenantMutilAllocator destroy", K(tenant_id_)); clog_ge_alloc_.destroy(); + log_handle_submit_task_alloc_.destroy(); log_io_flush_log_task_alloc_.destroy(); log_io_truncate_log_task_alloc_.destroy(); log_io_flush_meta_task_alloc_.destroy(); @@ -98,6 +102,7 @@ int ObTenantMutilAllocator::choose_blk_size(int obj_size) void ObTenantMutilAllocator::try_purge() { clog_ge_alloc_.purge_extra_cached_block(0); + log_handle_submit_task_alloc_.purge_extra_cached_block(0); log_io_flush_log_task_alloc_.purge_extra_cached_block(0); log_io_truncate_log_task_alloc_.purge_extra_cached_block(0); log_io_flush_meta_task_alloc_.purge_extra_cached_block(0); @@ -161,6 +166,27 @@ void ObTenantMutilAllocator::free_log_io_flush_log_task(LogIOFlushLogTask *ptr) } } +LogHandleSubmitTask *ObTenantMutilAllocator::alloc_log_handle_submit_task( + const int64_t palf_id, const int64_t palf_epoch) +{ + LogHandleSubmitTask *ret_ptr = NULL; + void *ptr = log_handle_submit_task_alloc_.alloc(); + if (NULL != ptr) { + ret_ptr = new(ptr)LogHandleSubmitTask(palf_id, palf_epoch); + ATOMIC_INC(&flying_log_handle_submit_task_); + } + return ret_ptr; +} + +void ObTenantMutilAllocator::free_log_handle_submit_task(LogHandleSubmitTask *ptr) +{ + if (OB_LIKELY(NULL != ptr)) { + ptr->~LogHandleSubmitTask(); + log_handle_submit_task_alloc_.free(ptr); + ATOMIC_DEC(&flying_log_handle_submit_task_); + } +} + LogIOTruncateLogTask *ObTenantMutilAllocator::alloc_log_io_truncate_log_task( const int64_t palf_id, const int64_t palf_epoch) { diff --git a/src/share/allocator/ob_tenant_mutil_allocator.h b/src/share/allocator/ob_tenant_mutil_allocator.h index 422092e06..1289e312f 100644 --- a/src/share/allocator/ob_tenant_mutil_allocator.h +++ b/src/share/allocator/ob_tenant_mutil_allocator.h @@ -25,6 +25,7 @@ namespace oceanbase namespace palf { class LogIOFlushLogTask; +class LogHandleSubmitTask; class LogIOTruncateLogTask; class LogIOFlushMetaTask; class LogIOTruncatePrefixBlocksTask; @@ -44,7 +45,7 @@ class ObTraceProfile; class ObILogAllocator : public ObIAllocator { public: - ObILogAllocator() : flying_log_task_(0), flying_meta_task_(0) {} + ObILogAllocator() : flying_log_task_(0), flying_log_handle_submit_task_(0), flying_meta_task_(0) {} virtual ~ObILogAllocator() {} public: @@ -54,6 +55,8 @@ public: virtual void *ge_alloc(const int64_t size) = 0; virtual void ge_free(void *ptr) = 0; virtual const ObBlockAllocMgr &get_clog_blk_alloc_mgr() const = 0; + virtual palf::LogHandleSubmitTask *alloc_log_handle_submit_task(const int64_t palf_id, const int64_t palf_epoch) = 0; + virtual void free_log_handle_submit_task(palf::LogHandleSubmitTask *ptr) = 0; virtual palf::LogIOFlushLogTask *alloc_log_io_flush_log_task(const int64_t palf_id, const int64_t palf_epoch) = 0; virtual void free_log_io_flush_log_task(palf::LogIOFlushLogTask *ptr) = 0; virtual palf::LogIOTruncateLogTask *alloc_log_io_truncate_log_task(const int64_t palf_id, const int64_t palf_epoch) = 0; @@ -76,6 +79,7 @@ public: protected: int64_t flying_log_task_; + int64_t flying_log_handle_submit_task_; int64_t flying_meta_task_; }; @@ -118,6 +122,8 @@ public: // V4.0 palf::LogIOFlushLogTask *alloc_log_io_flush_log_task(const int64_t palf_id, const int64_t palf_epoch); void free_log_io_flush_log_task(palf::LogIOFlushLogTask *ptr); + palf::LogHandleSubmitTask *alloc_log_handle_submit_task(const int64_t palf_id, const int64_t palf_epoch); + void free_log_handle_submit_task(palf::LogHandleSubmitTask *ptr); palf::LogIOTruncateLogTask *alloc_log_io_truncate_log_task(const int64_t palf_id, const int64_t palf_epoch); void free_log_io_truncate_log_task(palf::LogIOTruncateLogTask *ptr); palf::LogIOFlushMetaTask *alloc_log_io_flush_meta_task(const int64_t palf_id, const int64_t palf_epoch); @@ -139,6 +145,7 @@ private: uint64_t tenant_id_ CACHE_ALIGNED; int64_t total_limit_; int64_t pending_replay_mutator_size_; + const int LOG_HANDLE_SUBMIT_TASK_SIZE; const int LOG_IO_FLUSH_LOG_TASK_SIZE; const int LOG_IO_TRUNCATE_LOG_TASK_SIZE; const int LOG_IO_FLUSH_META_TASK_SIZE; @@ -151,6 +158,7 @@ private: ObBlockAllocMgr unlimited_blk_alloc_; ObBlockAllocMgr replay_log_task_blk_alloc_; ObVSliceAlloc clog_ge_alloc_; + ObSliceAlloc log_handle_submit_task_alloc_; ObSliceAlloc log_io_flush_log_task_alloc_; ObSliceAlloc log_io_truncate_log_task_alloc_; ObSliceAlloc log_io_flush_meta_task_alloc_; diff --git a/src/share/ob_thread_define.h b/src/share/ob_thread_define.h index eac4a91a4..efea987ba 100755 --- a/src/share/ob_thread_define.h +++ b/src/share/ob_thread_define.h @@ -99,6 +99,10 @@ TG_DEF(LogIOTaskCbThreadPool, LogIOCb, QUEUE_THREAD, ThreadCountPair(palf::LogIOTaskCbThreadPool::THREAD_NUM, palf::LogIOTaskCbThreadPool::MINI_MODE_THREAD_NUM), palf::LogIOTaskCbThreadPool::MAX_LOG_IO_CB_TASK_NUM) +TG_DEF(LogSharedQueueTh, LogSharedQueueThread, QUEUE_THREAD, + ThreadCountPair(palf::LogSharedQueueTh::THREAD_NUM, + palf::LogSharedQueueTh::MINI_MODE_THREAD_NUM), + palf::LogSharedQueueTh::MAX_LOG_HANDLE_TASK_NUM) TG_DEF(ReplayService, ReplaySrv, QUEUE_THREAD, 1, (common::REPLAY_TASK_QUEUE_SIZE + 1) * OB_MAX_LS_NUM_PER_TENANT_PER_SERVER_CAN_BE_SET) TG_DEF(LogRouteService, LogRouteSrv, QUEUE_THREAD, 1, (common::MAX_SERVER_COUNT) * OB_MAX_LS_NUM_PER_TENANT_PER_SERVER_CAN_BE_SET) TG_DEF(LogRouterTimer, LogRouterTimer, TIMER) diff --git a/unittest/logservice/test_log_sliding_window.cpp b/unittest/logservice/test_log_sliding_window.cpp index 86bf9f621..f80cc7de2 100644 --- a/unittest/logservice/test_log_sliding_window.cpp +++ b/unittest/logservice/test_log_sliding_window.cpp @@ -50,6 +50,11 @@ public: public: MockPublicLogSlidingWindow() {} virtual ~MockPublicLogSlidingWindow() {} + virtual bool is_handle_thread_lease_expired(const int64_t thread_lease_begin_ts) const override final + { + UNUSED(thread_lease_begin_ts); + return false; + } }; class MockLocCb : public PalfLocationCacheCb {