[CP] Control thread lease time for submit log thread.

This commit is contained in:
jiadebinmary@gmail.com 2023-12-22 14:12:55 +00:00 committed by ob-robot
parent 5e97d62d03
commit 5c5f2f1c3e
18 changed files with 587 additions and 14 deletions

View File

@ -25,6 +25,7 @@
#include "logservice/palf/log_define.h"
#include "logservice/palf/log_group_entry_header.h"
#include "logservice/palf/log_io_worker.h"
#include "logservice/palf/log_shared_queue_thread.h"
#include "logservice/palf/lsn.h"
#include "share/scn.h"
#include "logservice/palf/log_io_task.h"
@ -71,6 +72,7 @@ public:
ObILogAllocator *alloc_mgr = log_engine_->alloc_mgr_;
LogRpc *log_rpc = log_engine_->log_net_service_.log_rpc_;
LogIOWorker *log_io_worker = log_engine_->log_io_worker_;
LogSharedQueueTh *log_shared_queue_th = log_engine_->log_shared_queue_th_;
LogPlugins *plugins = log_engine_->plugins_;
LogEngine log_engine;
ILogBlockPool *log_block_pool = log_engine_->log_storage_.block_mgr_.log_block_pool_;
@ -81,6 +83,7 @@ public:
&(leader_.palf_handle_impl_->hot_cache_),
log_rpc,
log_io_worker,
log_shared_queue_th,
plugins,
entry_header,
palf_epoch_,

View File

@ -108,9 +108,11 @@ ob_set_subtarget(ob_logservice palf
palf/log_group_buffer.cpp
palf/log_group_entry.cpp
palf/log_group_entry_header.cpp
palf/log_shared_queue_thread.cpp
palf/log_io_task.cpp
palf/log_io_task_cb_thread_pool.cpp
palf/log_io_task_cb_utils.cpp
palf/log_shared_task.cpp
palf/log_io_worker.cpp
palf/log_iterator_storage.cpp
palf/log_learner.cpp

View File

@ -24,6 +24,7 @@
#include "log_io_task.h" // LogIOTask
#include "log_io_worker.h" // LogIOWorker
#include "log_reader_utils.h" // ReadBuf
#include "log_shared_task.h" // LogSharedTask
#include "log_writer_utils.h" // LogWriteBuf
#include "lsn.h" // LSN
#include "log_meta_entry.h" // LogMetaEntry
@ -50,6 +51,7 @@ LogEngine::LogEngine() :
log_net_service_(),
alloc_mgr_(NULL),
log_io_worker_(NULL),
log_shared_queue_th_(NULL),
plugins_(NULL),
palf_id_(INVALID_PALF_ID),
palf_epoch_(-1),
@ -89,6 +91,7 @@ int LogEngine::init(const int64_t palf_id,
LogHotCache *hot_cache,
LogRpc *log_rpc,
LogIOWorker *log_io_worker,
LogSharedQueueTh *log_shared_queue_th,
LogPlugins *plugins,
const int64_t palf_epoch,
const int64_t log_storage_block_size,
@ -106,7 +109,8 @@ int LogEngine::init(const int64_t palf_id,
ret = OB_INIT_TWICE;
PALF_LOG(ERROR, "LogEngine has inited!!!", K(ret), K(palf_id));
} else if (false == is_valid_palf_id(palf_id) || OB_ISNULL(base_dir) || OB_ISNULL(alloc_mgr)
|| OB_ISNULL(log_rpc) || OB_ISNULL(log_io_worker) || OB_ISNULL(plugins)) {
|| OB_ISNULL(log_rpc) || OB_ISNULL(log_io_worker)
|| OB_ISNULL(plugins)) {
ret = OB_INVALID_ARGUMENT;
PALF_LOG(ERROR,
"Invalid argument!!!",
@ -117,6 +121,7 @@ int LogEngine::init(const int64_t palf_id,
K(hot_cache),
K(alloc_mgr),
K(log_io_worker),
K(log_shared_queue_th),
K(plugins));
// NB: Nowday, LSN is strongly dependent on physical block,
} else if (OB_FAIL(log_meta_storage_.init(base_dir,
@ -153,6 +158,7 @@ int LogEngine::init(const int64_t palf_id,
log_meta_ = log_meta;
alloc_mgr_ = alloc_mgr;
log_io_worker_ = log_io_worker;
log_shared_queue_th_ = log_shared_queue_th;
plugins_ = plugins;
palf_epoch_ = palf_epoch;
base_lsn_for_block_gc_ = log_meta.get_log_snapshot_meta().base_lsn_;
@ -172,6 +178,7 @@ void LogEngine::destroy()
is_inited_ = false;
palf_id_ = INVALID_PALF_ID;
log_io_worker_ = NULL;
log_shared_queue_th_ = NULL;
alloc_mgr_ = NULL;
log_net_service_.destroy();
log_meta_storage_.destroy();
@ -191,6 +198,7 @@ int LogEngine::load(const int64_t palf_id,
LogHotCache *hot_cache,
LogRpc *log_rpc,
LogIOWorker *log_io_worker,
LogSharedQueueTh *log_shared_queue_th,
LogPlugins *plugins,
LogGroupEntryHeader &entry_header,
const int64_t palf_epoch,
@ -270,6 +278,7 @@ int LogEngine::load(const int64_t palf_id,
palf_epoch_ = palf_epoch;
alloc_mgr_ = alloc_mgr;
log_io_worker_ = log_io_worker;
log_shared_queue_th_ = log_shared_queue_th;
base_lsn_for_block_gc_ = log_meta_.get_log_snapshot_meta().base_lsn_;
is_inited_ = true;
PALF_LOG(INFO,
@ -337,6 +346,36 @@ int LogEngine::submit_flush_log_task(const FlushLogCbCtx &flush_log_cb_ctx,
return ret;
}
int LogEngine::submit_handle_submit_task()
{
int ret = OB_SUCCESS;
LogHandleSubmitTask *handle_submit_task = NULL;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
PALF_LOG(ERROR, "LogEngine not inited", K(ret), KPC(this));
} else if (OB_ISNULL(log_shared_queue_th_)) {
ret = OB_ERR_UNEXPECTED;
PALF_LOG(ERROR, "log_shared_queue_th_ is NULL", K(ret), KPC(this));
} else if (OB_FAIL(generate_handle_submit_task_(handle_submit_task))) {
PALF_LOG(ERROR, "generate_flush_log_task failed", K(ret), KPC(this));
} else if (OB_FAIL(log_shared_queue_th_->push_task(handle_submit_task))) {
if (OB_IN_STOP_STATE == ret) {
if (REACH_TIME_INTERVAL(100 * 1000)) {
PALF_LOG(WARN, "push task failed", K(ret), KPC(this), KPC(handle_submit_task));
}
} else {
PALF_LOG(ERROR, "push task failed", K(ret), KPC(this), KPC(handle_submit_task));
}
} else {
PALF_LOG(TRACE, "log_shared_queue_th_->push_task success", K(ret), KPC(this), KPC(handle_submit_task));
}
if (OB_FAIL(ret) && OB_NOT_NULL(handle_submit_task)) {
alloc_mgr_->free_log_handle_submit_task(handle_submit_task);
handle_submit_task = NULL;
}
return ret;
}
int LogEngine::submit_flush_prepare_meta_task(const FlushMetaCbCtx &flush_meta_cb_ctx,
const LogPrepareMeta &prepare_meta)
{
@ -1312,6 +1351,22 @@ int LogEngine::generate_flush_log_task_(const FlushLogCbCtx &flush_log_cb_ctx,
return ret;
}
int LogEngine::generate_handle_submit_task_(LogHandleSubmitTask *&handle_submit_task)
{
int ret = OB_SUCCESS;
handle_submit_task = NULL;
if (NULL == (handle_submit_task = alloc_mgr_->alloc_log_handle_submit_task(palf_id_, palf_epoch_))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
PALF_LOG(ERROR, "alloc_log_handle_submit_task failed", K(ret));
}
if (OB_FAIL(ret) && OB_NOT_NULL(handle_submit_task)) {
alloc_mgr_->free_log_handle_submit_task(handle_submit_task);
handle_submit_task = NULL;
}
return ret;
}
int LogEngine::generate_truncate_log_task_(const TruncateLogCbCtx &truncate_log_cb_ctx,
LogIOTruncateLogTask *&truncate_log_task)
{

View File

@ -34,8 +34,10 @@ class LogRpc;
class LogGroupEntry;
class LSN;
class LogIOWorker;
class LogSharedQueueTh;
class PalfHandleImpl;
class LogIOTask;
class LogHandleSubmitTask;
class LogIOFlushLogTask;
class LogIOTruncateLogTask;
class LogIOFlushMetaTask;
@ -103,6 +105,7 @@ public:
LogHotCache *hot_cache,
LogRpc *log_rpc,
LogIOWorker *log_io_worker,
LogSharedQueueTh *log_shared_queue_th,
LogPlugins *plugins,
const int64_t palf_epoch,
const int64_t log_storage_block_size,
@ -116,6 +119,7 @@ public:
LogHotCache *hot_cache,
LogRpc *log_rpc,
LogIOWorker *log_io_worker,
LogSharedQueueTh *log_shared_queue_th,
LogPlugins *plugins,
LogGroupEntryHeader &entry_header,
const int64_t palf_epoch,
@ -130,6 +134,7 @@ public:
const int64_t buf_len);
virtual int submit_flush_log_task(const FlushLogCbCtx &flush_log_cb_ctx, const LogWriteBuf &write_buf);
virtual int submit_handle_submit_task();
int submit_flush_prepare_meta_task(const FlushMetaCbCtx &flush_meta_cb_ctx,
const LogPrepareMeta &prepare_meta);
@ -446,7 +451,7 @@ private:
int generate_flush_log_task_(const FlushLogCbCtx &flush_log_cb_ctx,
const LogWriteBuf &write_buf,
LogIOFlushLogTask *&flush_log_task);
int generate_handle_submit_task_(LogHandleSubmitTask *&handle_submit_task);
int generate_truncate_log_task_(const TruncateLogCbCtx &truncate_log_cb_ctx,
LogIOTruncateLogTask *&truncate_log_task);
int generate_truncate_prefix_blocks_task_(
@ -495,6 +500,7 @@ private:
LogNetService log_net_service_;
common::ObILogAllocator *alloc_mgr_;
LogIOWorker *log_io_worker_;
LogSharedQueueTh *log_shared_queue_th_;
LogPlugins *plugins_;
// Except for LogNetService, this field is just only used for debug
int64_t palf_id_;

View File

@ -0,0 +1,157 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include "log_shared_queue_thread.h"
#include "log_shared_task.h"
#include "share/ob_errno.h" // errno...
#include "share/ob_thread_define.h" // TGDefIDs
#include "share/ob_thread_mgr.h" // TG_START
#include "palf_env_impl.h" // PalfEnvImpl
namespace oceanbase
{
namespace palf
{
LogSharedQueueTh::LogSharedQueueTh()
: tg_id_(-1),
palf_env_impl_(NULL),
is_inited_(false)
{}
LogSharedQueueTh::~LogSharedQueueTh()
{
destroy();
}
int LogSharedQueueTh::init(IPalfEnvImpl *palf_env_impl)
{
int ret = OB_SUCCESS;
const int tg_id = lib::TGDefIDs::LogSharedQueueTh;
if (IS_INIT) {
ret = OB_INIT_TWICE;
PALF_LOG(ERROR, "LogSharedQueueTh has inited", K(ret));
} else if (NULL == palf_env_impl) {
ret = OB_INVALID_ARGUMENT;
PALF_LOG(ERROR, "Invalid argument", K(ret), KP(palf_env_impl));
} else if (OB_FAIL(TG_CREATE_TENANT(tg_id, tg_id_, MAX_LOG_HANDLE_TASK_NUM))) {
PALF_LOG(WARN, "LogSharedQueueTh TG_CREATE failed", K(ret));
} else {
palf_env_impl_ = palf_env_impl;
is_inited_ = true;
PALF_LOG(INFO, "LogSharedQueueTh init success", K(ret), K(tg_id_), KP(palf_env_impl));
}
if (OB_FAIL(ret) && OB_INIT_TWICE != ret) {
destroy();
}
return ret;
}
int LogSharedQueueTh::start()
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
PALF_LOG(ERROR, "LogSharedQueueTh not inited", K(ret));
} else if (OB_FAIL(TG_SET_HANDLER_AND_START(tg_id_, *this))) {
PALF_LOG(ERROR, "start LogSharedQueueTh failed", K(ret));
} else {
PALF_LOG(INFO, "start LogSharedQueueTh success", K(ret),
K(tg_id_));
}
return ret;
}
int LogSharedQueueTh::stop()
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
PALF_LOG(WARN, "LogSharedQueueTh not inited", K(ret));
} else {
TG_STOP(tg_id_);
PALF_LOG(INFO, "stop LogSharedQueueTh success", K(tg_id_));
}
return ret;
}
int LogSharedQueueTh::wait()
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
PALF_LOG(WARN, "LogSharedQueueTh not inited", K(ret));
} else {
TG_WAIT(tg_id_);
PALF_LOG(INFO, "wait LogSharedQueueTh success", K(tg_id_));
}
return ret;
}
void LogSharedQueueTh::destroy()
{
stop();
wait();
is_inited_ = false;
if (-1 != tg_id_) {
TG_DESTROY(tg_id_);
PALF_LOG(INFO, "destroy LogSharedQueueTh success", K(tg_id_));
}
tg_id_ = -1;
}
int LogSharedQueueTh::push_task(LogSharedTask *task)
{
int ret = OB_SUCCESS;
if (NULL == task) {
ret = OB_INVALID_ARGUMENT;
} else {
int64_t print_log_interval = OB_INVALID_TIMESTAMP;
while (OB_FAIL(TG_PUSH_TASK(tg_id_, task))) {
if (OB_IN_STOP_STATE == ret) {
PALF_LOG(WARN, "thread_pool has been stopped, skip task", K(ret), K_(tg_id), KPC(task));
break;
} else if (palf_reach_time_interval(5 * 1000 * 1000, print_log_interval)) {
PALF_LOG(ERROR, "push task failed", K(ret), K_(tg_id), KPC(task));
}
ob_usleep(1000);
}
}
return ret;
}
void LogSharedQueueTh::handle(void *task)
{
int ret = OB_SUCCESS;
LogSharedTask *log_shared_task = reinterpret_cast<LogSharedTask*>(task);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
PALF_LOG(ERROR, "LogSharedQueueTh not inited", K(ret));
} else if (OB_ISNULL(log_shared_task)) {
ret = OB_INVALID_ARGUMENT;
PALF_LOG(ERROR, "Invalid argument", K(ret), K(log_shared_task));
} else if (OB_FAIL(log_shared_task->do_task(palf_env_impl_))) {
PALF_LOG(WARN, "LogSharedTask handle_task failed", K(ret), KPC(log_shared_task));
} else {
PALF_LOG(TRACE, "LogSharedQueueTh handle success", KPC(log_shared_task));
}
if (OB_NOT_NULL(log_shared_task)) {
log_shared_task->free_this(palf_env_impl_);
}
}
int LogSharedQueueTh::get_tg_id() const
{
return tg_id_;
}
} // end namespace palf
} // end namespace oceanbase

View File

@ -0,0 +1,54 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_LOGSERVICE_LOG_SHARED_QUEUE_THREAD_
#define OCEANBASE_LOGSERVICE_LOG_SHARED_QUEUE_THREAD_
#include "lib/thread/thread_mgr_interface.h"
#include "lib/utility/ob_print_utils.h"
namespace oceanbase
{
namespace palf
{
class IPalfEnvImpl;
class LogSharedTask;
class LogSharedQueueTh : public lib::TGTaskHandler
{
public:
LogSharedQueueTh();
~LogSharedQueueTh();
public:
int init(IPalfEnvImpl *palf_env_impl);
int start();
int stop();
int wait();
void destroy();
int push_task(LogSharedTask *task);
virtual void handle(void *task);
int get_tg_id() const;
public:
static constexpr int64_t THREAD_NUM = 1;
static constexpr int64_t MINI_MODE_THREAD_NUM = 1;
static constexpr int64_t MAX_LOG_HANDLE_TASK_NUM = 10 * OB_MAX_LS_NUM_PER_TENANT_PER_SERVER;
private:
DISALLOW_COPY_AND_ASSIGN(LogSharedQueueTh);
private:
int tg_id_;
IPalfEnvImpl *palf_env_impl_;
bool is_inited_;
};
} // end namespace palf
} // end namespace oceanbase
#endif

View File

@ -0,0 +1,74 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include "log_shared_task.h"
#include "palf_env_impl.h" // PalfEnvImpl
#include "share/ob_errno.h" // errno...
namespace oceanbase
{
namespace palf
{
LogSharedTask::LogSharedTask(const int64_t palf_id,const int64_t palf_epoch)
: palf_id_(palf_id), palf_epoch_(palf_epoch)
{}
LogSharedTask::~LogSharedTask()
{
destroy();
}
void LogSharedTask::destroy()
{
reset();
}
void LogSharedTask::reset()
{
palf_id_ = INVALID_PALF_ID;
palf_epoch_ = -1;
}
LogHandleSubmitTask::LogHandleSubmitTask(const int64_t palf_id,const int64_t palf_epoch)
: LogSharedTask(palf_id, palf_epoch)
{}
LogHandleSubmitTask::~LogHandleSubmitTask()
{}
void LogHandleSubmitTask::free_this(IPalfEnvImpl *palf_env_impl)
{
palf_env_impl->get_log_allocator()->free_log_handle_submit_task(this);
}
int LogHandleSubmitTask::do_task(IPalfEnvImpl *palf_env_impl)
{
int ret = OB_SUCCESS;
int64_t palf_epoch = -1;
IPalfHandleImplGuard guard;
common::ObTimeGuard time_guard("handle submit task", 100 * 1000);
if (OB_FAIL(palf_env_impl->get_palf_handle_impl(palf_id_, guard))) {
PALF_LOG(WARN, "IPalfEnvImpl get_palf_handle_impl failed", K(ret), KPC(this));
} else if (OB_FAIL(guard.get_palf_handle_impl()->get_palf_epoch(palf_epoch))) {
PALF_LOG(WARN, "IPalfEnvImpl get_palf_epoch failed", K(ret), KPC(this));
} else if (palf_epoch != palf_epoch_) {
PALF_LOG(WARN, "palf_epoch has changed, drop task", K(ret), K(palf_epoch), KPC(this));
} else if (OB_FAIL(guard.get_palf_handle_impl()->try_handle_next_submit_log())) {
PALF_LOG(WARN, "PalfHandleImpl try_handle_next_submit_log failed", K(ret), KPC(this));
} else {
PALF_LOG(TRACE, "LogHandleSubmitTask handle_task success", K(time_guard), KPC(this));
}
return ret;
}
} // end namespace palf
} // end namespace oceanbase

View File

@ -0,0 +1,78 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_LOGSERVICE_LOG_SHARED_TASK_
#define OCEANBASE_LOGSERVICE_LOG_SHARED_TASK_
#include "lib/utility/ob_print_utils.h"
namespace oceanbase
{
namespace palf
{
class IPalfEnvImpl;
enum class LogSharedTaskType
{
LogHandleSubmitType = 1,
};
inline const char *shared_type_2_str(const LogSharedTaskType type)
{
#define EXTRACT_SHARED_TYPE(type_var) ({ case(LogSharedTaskType::type_var): return #type_var; })
switch(type)
{
EXTRACT_SHARED_TYPE(LogHandleSubmitType);
default:
return "Invalid Type";
}
#undef EXTRACT_SHARED_TYPE
}
class LogSharedTask
{
public:
LogSharedTask(const int64_t palf_id, const int64_t palf_epoch);
virtual ~LogSharedTask();
void destroy();
void reset();
virtual int do_task(IPalfEnvImpl *palf_env_impl) = 0;
virtual void free_this(IPalfEnvImpl *palf_env_impl) = 0;
virtual LogSharedTaskType get_shared_task_type() const = 0;
VIRTUAL_TO_STRING_KV("BaseClass", "LogSharedTask",
"palf_id", palf_id_,
"palf_epoch", palf_epoch_);
protected:
int64_t palf_id_;
int64_t palf_epoch_;
private:
DISALLOW_COPY_AND_ASSIGN(LogSharedTask);
};
class LogHandleSubmitTask : public LogSharedTask
{
public:
LogHandleSubmitTask(const int64_t palf_id, const int64_t palf_epoch);
~LogHandleSubmitTask() override;
int do_task(IPalfEnvImpl *palf_env_impl) override;
void free_this(IPalfEnvImpl *palf_env_impl) override;
virtual LogSharedTaskType get_shared_task_type() const override { return LogSharedTaskType::LogHandleSubmitType; }
INHERIT_TO_STRING_KV("LogSharedTask", LogSharedTask, "task type", shared_type_2_str(get_shared_task_type()));
private:
DISALLOW_COPY_AND_ASSIGN(LogHandleSubmitTask);
};
} // end namespace palf
} // end namespace oceanbase
#endif

View File

@ -134,6 +134,7 @@ LogSlidingWindow::LogSlidingWindow()
accum_group_log_size_(0),
last_record_group_log_id_(FIRST_VALID_LOG_ID - 1),
freeze_mode_(FEEDBACK_FREEZE_MODE),
has_pending_handle_submit_task_(false),
is_inited_(false)
{}
@ -900,12 +901,43 @@ int LogSlidingWindow::try_push_log_to_children_(const int64_t curr_proposal_id,
return ret;
}
int LogSlidingWindow::try_handle_next_submit_log()
{
int ret = OB_SUCCESS;
// Set has_pending_handle_submit_task_ to false forcedly.
(void) ATOMIC_STORE(&has_pending_handle_submit_task_, false);
bool unused_bool = false;
ret = handle_next_submit_log_(unused_bool);
return ret;
}
bool LogSlidingWindow::is_handle_thread_lease_expired(const int64_t thread_lease_begin_ts) const
{
// The thread lease time for handle_next_submit_log_ is 50ms.
static const int64_t THREAD_LEASE_US = 50 * 1000L;
bool bool_ret = false;
if (OB_INVALID_TIMESTAMP != thread_lease_begin_ts
&& ObTimeUtility::current_time() - thread_lease_begin_ts > THREAD_LEASE_US) {
bool_ret = true;
}
return bool_ret;
}
int LogSlidingWindow::handle_next_submit_log_(bool &is_committed_lsn_updated)
{
int ret = OB_SUCCESS;
common::ObTimeGuard time_guard("handle_next_submit_log", 100 * 1000);
if (submit_log_handling_lease_.acquire()) {
// record handle_thread_lease_begin_ts with current time
const int64_t thread_lease_begin_ts = ObTimeUtility::current_time();
bool is_lease_expired = false;
bool need_submit_async_task = false;
do {
while (OB_SUCC(ret)) {
// If it revoke fails when thread lease expired, this thread need submit an async task.
if (is_lease_expired) {
need_submit_async_task = true;
}
while (OB_SUCC(ret) && !is_lease_expired) {
LSN last_submit_lsn;
LSN last_submit_end_lsn;
int64_t last_submit_log_id = OB_INVALID_LOG_ID;
@ -1095,8 +1127,32 @@ int LogSlidingWindow::handle_next_submit_log_(bool &is_committed_lsn_updated)
PALF_LOG(TRACE, "handle one submit log", K(ret), K_(palf_id), K_(self), K(tmp_log_id), K(is_committed_lsn_updated),
K(is_need_submit), K(is_submitted));
}
is_lease_expired = is_handle_thread_lease_expired(thread_lease_begin_ts);
}
} while (!submit_log_handling_lease_.revoke());
// Try push handle_submit_task into queue when lease revoke failed(lease expired).
if (OB_SUCC(ret) && need_submit_async_task) {
// This CAS is used to control only one task can be submitted into queue at any time.
if (ATOMIC_BCAS(&has_pending_handle_submit_task_, false, true)) {
// push task into queue until success
int tmp_ret = OB_SUCCESS;
while (OB_TMP_FAIL(log_engine_->submit_handle_submit_task())) {
if (REACH_TIME_INTERVAL(100 * 1000)) {
PALF_LOG(WARN, "submit_handle_submit_task failed", K(tmp_ret), K_(palf_id), K_(self));
}
if (OB_IN_STOP_STATE == tmp_ret) {
// The thread pool has been stopped, no need retry.
break;
} else {
// sleep 100us when submit task failed
ob_usleep(100);
}
}
} else {
// no need push task into queue
}
}
}
return ret;
}

View File

@ -322,6 +322,7 @@ public:
char *buf,
int64_t &out_read_size) const;
int64_t get_last_slide_log_id() const;
virtual int try_handle_next_submit_log();
TO_STRING_KV(K_(palf_id), K_(self), K_(lsn_allocator), K_(group_buffer), \
K_(last_submit_lsn), K_(last_submit_end_lsn), K_(last_submit_log_id), K_(last_submit_log_pid), \
K_(max_flushed_lsn), K_(max_flushed_end_lsn), K_(max_flushed_log_pid), K_(committed_end_lsn), \
@ -329,8 +330,10 @@ public:
K_(last_slide_log_pid), K_(last_slide_log_accum_checksum), K_(last_fetch_end_lsn), \
K_(last_fetch_max_log_id), K_(last_fetch_committed_end_lsn), K_(last_truncate_lsn), \
K_(last_fetch_req_time), K_(is_truncating), K_(is_rebuilding), K_(last_rebuild_lsn), \
"freeze_mode", freeze_mode_2_str(freeze_mode_), \
"freeze_mode", freeze_mode_2_str(freeze_mode_), K_(has_pending_handle_submit_task), \
"last_fetch_trigger_type", fetch_trigger_type_2_str(last_fetch_trigger_type_), KP(this));
protected:
virtual bool is_handle_thread_lease_expired(const int64_t thread_lease_begin_ts) const;
private:
int do_init_mem_(const int64_t palf_id,
const PalfBaseInfo &palf_base_info,
@ -609,6 +612,7 @@ private:
int64_t last_record_group_log_id_;
int64_t append_cnt_array_[APPEND_CNT_ARRAY_SIZE];
FreezeMode freeze_mode_;
bool has_pending_handle_submit_task_;
bool is_inited_;
private:
DISALLOW_COPY_AND_ASSIGN(LogSlidingWindow);

View File

@ -182,6 +182,7 @@ PalfEnvImpl::PalfEnvImpl() : palf_meta_lock_(common::ObLatchIds::PALF_ENV_LOCK),
log_rpc_(),
cb_thread_pool_(),
log_io_worker_wrapper_(),
log_shared_queue_th_(),
block_gc_timer_task_(),
log_updater_(),
monitor_(NULL),
@ -243,6 +244,8 @@ int PalfEnvImpl::init(
cb_thread_pool_.get_tg_id(),
log_alloc_mgr, this))) {
PALF_LOG(ERROR, "LogIOWorker init failed", K(ret));
} else if (OB_FAIL(log_shared_queue_th_.init(this))) {
PALF_LOG(ERROR, "LogSharedQueueTh init failed", K(ret));
} else if (OB_FAIL(block_gc_timer_task_.init(this))) {
PALF_LOG(ERROR, "ObCheckLogBlockCollectTask init failed", K(ret));
} else if ((pret = snprintf(log_dir_, MAX_PATH_SIZE, "%s", base_dir)) && false) {
@ -293,6 +296,8 @@ int PalfEnvImpl::start()
PALF_LOG(ERROR, "LogIOTaskThreadPool start failed", K(ret));
} else if (OB_FAIL(log_io_worker_wrapper_.start())) {
PALF_LOG(ERROR, "LogIOWorker start failed", K(ret));
} else if (OB_FAIL(log_shared_queue_th_.start())) {
PALF_LOG(ERROR, "LogIOWorker start failed", K(ret));
} else if (OB_FAIL(block_gc_timer_task_.start())) {
PALF_LOG(ERROR, "FileCollectTimerTask start failed", K(ret));
} else if (OB_FAIL(fetch_log_engine_.start())) {
@ -314,6 +319,7 @@ void PalfEnvImpl::stop()
PALF_LOG(INFO, "PalfEnvImpl begin stop", KPC(this));
is_running_ = false;
log_io_worker_wrapper_.stop();
log_shared_queue_th_.stop();
cb_thread_pool_.stop();
block_gc_timer_task_.stop();
fetch_log_engine_.stop();
@ -327,6 +333,7 @@ void PalfEnvImpl::wait()
{
PALF_LOG(INFO, "PalfEnvImpl begin wait", KPC(this));
log_io_worker_wrapper_.wait();
log_shared_queue_th_.wait();
cb_thread_pool_.wait();
block_gc_timer_task_.wait();
fetch_log_engine_.wait();
@ -342,6 +349,7 @@ void PalfEnvImpl::destroy()
is_inited_ = false;
palf_handle_impl_map_.destroy();
log_io_worker_wrapper_.destroy();
log_shared_queue_th_.destroy();
cb_thread_pool_.destroy();
log_loop_thread_.destroy();
block_gc_timer_task_.destroy();
@ -419,7 +427,8 @@ int PalfEnvImpl::create_palf_handle_impl_(const int64_t palf_id,
PALF_LOG(WARN, "prepare_directory_for_creating_ls failed!!!", K(ret), K(palf_id));
} else if (OB_FAIL(palf_handle_impl->init(palf_id, access_mode, palf_base_info, replica_type,
&fetch_log_engine_, base_dir, log_alloc_mgr_, log_block_pool_, &log_rpc_,
log_io_worker_wrapper_.get_log_io_worker(palf_id), this, self_, &election_timer_, palf_epoch))) {
log_io_worker_wrapper_.get_log_io_worker(palf_id), &log_shared_queue_th_, this,
self_, &election_timer_, palf_epoch))) {
PALF_LOG(ERROR, "IPalfHandleImpl init failed", K(ret), K(palf_id));
// NB: always insert value into hash map finally.
} else if (OB_FAIL(palf_handle_impl_map_.insert_and_get(hash_map_key, palf_handle_impl))) {
@ -1045,8 +1054,8 @@ int PalfEnvImpl::reload_palf_handle_impl_(const int64_t palf_id)
ret = OB_ALLOCATE_MEMORY_FAILED;
PALF_LOG(WARN, "alloc ipalf_handle_impl failed", K(ret));
} else if (OB_FAIL(tmp_palf_handle_impl->load(palf_id, &fetch_log_engine_, base_dir, log_alloc_mgr_,
log_block_pool_, &log_rpc_, log_io_worker_wrapper_.get_log_io_worker(palf_id), this, self_,
&election_timer_, palf_epoch, is_integrity))) {
log_block_pool_, &log_rpc_, log_io_worker_wrapper_.get_log_io_worker(palf_id), &log_shared_queue_th_,
this, self_, &election_timer_, palf_epoch, is_integrity))) {
PALF_LOG(ERROR, "PalfHandleImpl init failed", K(ret), K(palf_id));
} else if (OB_FAIL(palf_handle_impl_map_.insert_and_get(hash_map_key, tmp_palf_handle_impl))) {
PALF_LOG(WARN, "palf_handle_impl_map_ insert_and_get failed", K(ret), K(palf_id), K(tmp_palf_handle_impl));

View File

@ -24,9 +24,10 @@
#include "share/ob_occam_timer.h"
#include "share/scn.h"
#include "fetch_log_engine.h"
#include "log_loop_thread.h"
#include "log_define.h"
#include "log_shared_queue_thread.h"
#include "log_io_task_cb_thread_pool.h"
#include "log_loop_thread.h"
#include "log_rpc.h"
#include "palf_options.h"
#include "palf_handle_impl.h"
@ -368,6 +369,7 @@ private:
LogIOTaskCbThreadPool cb_thread_pool_;
common::ObOccamTimer election_timer_;
LogIOWorkerWrapper log_io_worker_wrapper_;
LogSharedQueueTh log_shared_queue_th_;
BlockGCTimerTask block_gc_timer_task_;
LogUpdater log_updater_;
PalfMonitorCb *monitor_;

View File

@ -71,6 +71,7 @@ PalfHandleImpl::PalfHandleImpl()
palf_env_impl_(NULL),
append_cost_stat_("[PALF STAT WRITE LOG COST TIME]", PALF_STAT_PRINT_INTERVAL_US),
flush_cb_cost_stat_("[PALF STAT FLUSH CB COST TIME]", PALF_STAT_PRINT_INTERVAL_US),
handle_submit_log_cost_stat_("[PALF STAT HANDLE SUBMIT LOG COST TIME]", PALF_STAT_PRINT_INTERVAL_US),
last_accum_write_statistic_time_(OB_INVALID_TIMESTAMP),
accum_write_log_size_(0),
last_accum_fetch_statistic_time_(OB_INVALID_TIMESTAMP),
@ -109,6 +110,7 @@ int PalfHandleImpl::init(const int64_t palf_id,
ILogBlockPool *log_block_pool,
LogRpc *log_rpc,
LogIOWorker *log_io_worker,
LogSharedQueueTh *log_shared_queue_th,
IPalfEnvImpl *palf_env_impl,
const common::ObAddr &self,
common::ObOccamTimer *election_timer,
@ -131,6 +133,7 @@ int PalfHandleImpl::init(const int64_t palf_id,
|| NULL == log_block_pool
|| NULL == log_rpc
|| NULL == log_io_worker
|| NULL == log_shared_queue_th
|| NULL == palf_env_impl
|| false == self.is_valid()
|| NULL == election_timer
@ -138,16 +141,16 @@ int PalfHandleImpl::init(const int64_t palf_id,
ret = OB_INVALID_ARGUMENT;
PALF_LOG(ERROR, "Invalid argument!!!", K(ret), K(palf_id), K(palf_base_info), K(replica_type),
K(access_mode), K(log_dir), K(alloc_mgr), K(log_block_pool), K(log_rpc),
K(log_io_worker), K(palf_env_impl), K(self), K(election_timer), K(palf_epoch));
K(log_io_worker), K(log_shared_queue_th), K(palf_env_impl), K(self), K(election_timer), K(palf_epoch));
} else if (OB_FAIL(log_meta.generate_by_palf_base_info(palf_base_info, access_mode, replica_type))) {
PALF_LOG(WARN, "generate_by_palf_base_info failed", K(ret), K(palf_id), K(palf_base_info), K(access_mode), K(replica_type));
} else if ((pret = snprintf(log_dir_, MAX_PATH_SIZE, "%s", log_dir)) && false) {
ret = OB_ERR_UNEXPECTED;
PALF_LOG(ERROR, "error unexpected", K(ret), K(palf_id));
} else if (OB_FAIL(log_engine_.init(palf_id, log_dir, log_meta, alloc_mgr, log_block_pool, &hot_cache_, \
log_rpc, log_io_worker, &plugins_, palf_epoch, PALF_BLOCK_SIZE, PALF_META_BLOCK_SIZE))) {
log_rpc, log_io_worker, log_shared_queue_th, &plugins_, palf_epoch, PALF_BLOCK_SIZE, PALF_META_BLOCK_SIZE))) {
PALF_LOG(WARN, "LogEngine init failed", K(ret), K(palf_id), K(log_dir), K(alloc_mgr),
K(log_rpc), K(log_io_worker));
K(log_rpc), K(log_io_worker), K(log_shared_queue_th));
} else if (OB_FAIL(do_init_mem_(palf_id, palf_base_info, log_meta, log_dir, self, fetch_log_engine,
alloc_mgr, log_rpc, palf_env_impl, election_timer))) {
PALF_LOG(WARN, "PalfHandleImpl do_init_mem_ failed", K(ret), K(palf_id));
@ -175,6 +178,7 @@ int PalfHandleImpl::load(const int64_t palf_id,
ILogBlockPool *log_block_pool,
LogRpc *log_rpc,
LogIOWorker *log_io_worker,
LogSharedQueueTh *log_shared_queue_th,
IPalfEnvImpl *palf_env_impl,
const common::ObAddr &self,
common::ObOccamTimer *election_timer,
@ -194,12 +198,14 @@ int PalfHandleImpl::load(const int64_t palf_id,
|| NULL == alloc_mgr
|| NULL == log_rpc
|| NULL == log_io_worker
|| NULL == log_shared_queue_th
|| false == self.is_valid()) {
ret = OB_INVALID_ARGUMENT;
PALF_LOG(ERROR, "Invalid argument!!!", K(ret), K(palf_id), K(log_dir), K(alloc_mgr),
K(log_rpc), K(log_io_worker));
K(log_rpc), K(log_io_worker), K(log_shared_queue_th));
} else if (OB_FAIL(log_engine_.load(palf_id, log_dir, alloc_mgr, log_block_pool, &hot_cache_, log_rpc,
log_io_worker, &plugins_, entry_header, palf_epoch, is_integrity, PALF_BLOCK_SIZE, PALF_META_BLOCK_SIZE))) {
log_io_worker, log_shared_queue_th, &plugins_, entry_header, palf_epoch, is_integrity,
PALF_BLOCK_SIZE, PALF_META_BLOCK_SIZE))) {
PALF_LOG(WARN, "LogEngine load failed", K(ret), K(palf_id));
// NB: when 'entry_header' is invalid, means that there is no data on disk, and set max_committed_end_lsn
// to 'base_lsn_', we will generate default PalfBaseInfo or get it from LogSnapshotMeta(rebuild).
@ -4088,6 +4094,24 @@ int PalfHandleImpl::advance_reuse_lsn(const LSN &flush_log_end_lsn)
return ret;
}
int PalfHandleImpl::try_handle_next_submit_log()
{
int ret = OB_SUCCESS;
PALF_LOG(TRACE, "try_handle_next_submit_log begin", KPC(this));
const int64_t begin_ts = ObTimeUtility::current_time();
RLockGuard guard(lock_);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else if (OB_FAIL(sw_.try_handle_next_submit_log())) {
PALF_LOG(WARN, "sw_.try_handle_next_submit_log failed", K(ret), KPC(this));
} else {
const int64_t time_cost = ObTimeUtility::current_time() - begin_ts;
handle_submit_log_cost_stat_.stat(time_cost);
PALF_LOG(TRACE, "try_handle_next_submit_log success", K(ret), KPC(this));
}
return ret;
}
int PalfHandleImpl::inner_after_flush_log(const FlushLogCbCtx &flush_log_cb_ctx)
{
int ret = OB_SUCCESS;

View File

@ -60,6 +60,7 @@ class LogIOFlushMetaTask;
class ReadBuf;
class LogWriteBuf;
class LogIOWorker;
class LogSharedQueueTh;
class LogRpc;
class IPalfEnvImpl;
@ -820,6 +821,7 @@ public:
const int64_t in_read_size,
char *buf,
int64_t &out_read_size) const = 0;
virtual int try_handle_next_submit_log() = 0;
DECLARE_PURE_VIRTUAL_TO_STRING;
};
@ -838,6 +840,7 @@ public:
ILogBlockPool *log_block_pool,
LogRpc *log_rpc,
LogIOWorker *log_io_worker,
LogSharedQueueTh *log_shared_queue_th,
IPalfEnvImpl *palf_env_impl,
const common::ObAddr &self,
common::ObOccamTimer *election_timer,
@ -855,6 +858,7 @@ public:
ILogBlockPool *log_block_pool,
LogRpc *log_rpc,
LogIOWorker*log_io_worker,
LogSharedQueueTh *log_shared_queue_th,
IPalfEnvImpl *palf_env_impl,
const common::ObAddr &self,
common::ObOccamTimer *election_timer,
@ -958,6 +962,7 @@ public:
const int64_t in_read_size,
char *buf,
int64_t &out_read_size) const;
int try_handle_next_submit_log();
public:
int delete_block(const block_id_t &block_id) override final;
int read_log(const LSN &lsn,
@ -1423,6 +1428,7 @@ private:
bool diskspace_enough_;
ObMiniStat::ObStatItem append_cost_stat_;
ObMiniStat::ObStatItem flush_cb_cost_stat_;
ObMiniStat::ObStatItem handle_submit_log_cost_stat_;
int64_t last_accum_write_statistic_time_;
int64_t accum_write_log_size_; // the accum size of written logs
int64_t last_accum_fetch_statistic_time_;

View File

@ -16,6 +16,7 @@
#include "observer/omt/ob_multi_tenant.h"
#include "logservice/palf/log_io_task.h"
#include "logservice/palf/fetch_log_engine.h"
#include "logservice/palf/log_shared_task.h"
#include "logservice/replayservice/ob_replay_status.h"
namespace oceanbase
@ -28,6 +29,7 @@ namespace common
ObTenantMutilAllocator::ObTenantMutilAllocator(uint64_t tenant_id)
: tenant_id_(tenant_id), total_limit_(INT64_MAX), pending_replay_mutator_size_(0),
LOG_HANDLE_SUBMIT_TASK_SIZE(sizeof(palf::LogHandleSubmitTask)),
LOG_IO_FLUSH_LOG_TASK_SIZE(sizeof(palf::LogIOFlushLogTask)),
LOG_IO_TRUNCATE_LOG_TASK_SIZE(sizeof(palf::LogIOTruncateLogTask)),
LOG_IO_FLUSH_META_TASK_SIZE(sizeof(palf::LogIOFlushMetaTask)),
@ -40,6 +42,7 @@ ObTenantMutilAllocator::ObTenantMutilAllocator(uint64_t tenant_id)
unlimited_blk_alloc_(),
replay_log_task_blk_alloc_(REPLAY_MEM_LIMIT_THRESHOLD),
clog_ge_alloc_(ObMemAttr(tenant_id, ObModIds::OB_CLOG_GE), ObVSliceAlloc::DEFAULT_BLOCK_SIZE, clog_blk_alloc_),
log_handle_submit_task_alloc_(LOG_HANDLE_SUBMIT_TASK_SIZE, ObMemAttr(tenant_id, "HandleSubmit"), choose_blk_size(LOG_HANDLE_SUBMIT_TASK_SIZE), clog_blk_alloc_, this),
log_io_flush_log_task_alloc_(LOG_IO_FLUSH_LOG_TASK_SIZE, ObMemAttr(tenant_id, "FlushLog"), choose_blk_size(LOG_IO_FLUSH_LOG_TASK_SIZE), clog_blk_alloc_, this),
log_io_truncate_log_task_alloc_(LOG_IO_TRUNCATE_LOG_TASK_SIZE, ObMemAttr(tenant_id, "TruncateLog"), choose_blk_size(LOG_IO_TRUNCATE_LOG_TASK_SIZE), clog_blk_alloc_, this),
log_io_flush_meta_task_alloc_(LOG_IO_FLUSH_META_TASK_SIZE, ObMemAttr(tenant_id, "FlushMeta"), choose_blk_size(LOG_IO_FLUSH_META_TASK_SIZE), clog_blk_alloc_, this),
@ -71,6 +74,7 @@ void ObTenantMutilAllocator::destroy()
{
OB_LOG(INFO, "ObTenantMutilAllocator destroy", K(tenant_id_));
clog_ge_alloc_.destroy();
log_handle_submit_task_alloc_.destroy();
log_io_flush_log_task_alloc_.destroy();
log_io_truncate_log_task_alloc_.destroy();
log_io_flush_meta_task_alloc_.destroy();
@ -98,6 +102,7 @@ int ObTenantMutilAllocator::choose_blk_size(int obj_size)
void ObTenantMutilAllocator::try_purge()
{
clog_ge_alloc_.purge_extra_cached_block(0);
log_handle_submit_task_alloc_.purge_extra_cached_block(0);
log_io_flush_log_task_alloc_.purge_extra_cached_block(0);
log_io_truncate_log_task_alloc_.purge_extra_cached_block(0);
log_io_flush_meta_task_alloc_.purge_extra_cached_block(0);
@ -161,6 +166,27 @@ void ObTenantMutilAllocator::free_log_io_flush_log_task(LogIOFlushLogTask *ptr)
}
}
LogHandleSubmitTask *ObTenantMutilAllocator::alloc_log_handle_submit_task(
const int64_t palf_id, const int64_t palf_epoch)
{
LogHandleSubmitTask *ret_ptr = NULL;
void *ptr = log_handle_submit_task_alloc_.alloc();
if (NULL != ptr) {
ret_ptr = new(ptr)LogHandleSubmitTask(palf_id, palf_epoch);
ATOMIC_INC(&flying_log_handle_submit_task_);
}
return ret_ptr;
}
void ObTenantMutilAllocator::free_log_handle_submit_task(LogHandleSubmitTask *ptr)
{
if (OB_LIKELY(NULL != ptr)) {
ptr->~LogHandleSubmitTask();
log_handle_submit_task_alloc_.free(ptr);
ATOMIC_DEC(&flying_log_handle_submit_task_);
}
}
LogIOTruncateLogTask *ObTenantMutilAllocator::alloc_log_io_truncate_log_task(
const int64_t palf_id, const int64_t palf_epoch)
{

View File

@ -25,6 +25,7 @@ namespace oceanbase
namespace palf
{
class LogIOFlushLogTask;
class LogHandleSubmitTask;
class LogIOTruncateLogTask;
class LogIOFlushMetaTask;
class LogIOTruncatePrefixBlocksTask;
@ -44,7 +45,7 @@ class ObTraceProfile;
class ObILogAllocator : public ObIAllocator
{
public:
ObILogAllocator() : flying_log_task_(0), flying_meta_task_(0) {}
ObILogAllocator() : flying_log_task_(0), flying_log_handle_submit_task_(0), flying_meta_task_(0) {}
virtual ~ObILogAllocator() {}
public:
@ -54,6 +55,8 @@ public:
virtual void *ge_alloc(const int64_t size) = 0;
virtual void ge_free(void *ptr) = 0;
virtual const ObBlockAllocMgr &get_clog_blk_alloc_mgr() const = 0;
virtual palf::LogHandleSubmitTask *alloc_log_handle_submit_task(const int64_t palf_id, const int64_t palf_epoch) = 0;
virtual void free_log_handle_submit_task(palf::LogHandleSubmitTask *ptr) = 0;
virtual palf::LogIOFlushLogTask *alloc_log_io_flush_log_task(const int64_t palf_id, const int64_t palf_epoch) = 0;
virtual void free_log_io_flush_log_task(palf::LogIOFlushLogTask *ptr) = 0;
virtual palf::LogIOTruncateLogTask *alloc_log_io_truncate_log_task(const int64_t palf_id, const int64_t palf_epoch) = 0;
@ -76,6 +79,7 @@ public:
protected:
int64_t flying_log_task_;
int64_t flying_log_handle_submit_task_;
int64_t flying_meta_task_;
};
@ -118,6 +122,8 @@ public:
// V4.0
palf::LogIOFlushLogTask *alloc_log_io_flush_log_task(const int64_t palf_id, const int64_t palf_epoch);
void free_log_io_flush_log_task(palf::LogIOFlushLogTask *ptr);
palf::LogHandleSubmitTask *alloc_log_handle_submit_task(const int64_t palf_id, const int64_t palf_epoch);
void free_log_handle_submit_task(palf::LogHandleSubmitTask *ptr);
palf::LogIOTruncateLogTask *alloc_log_io_truncate_log_task(const int64_t palf_id, const int64_t palf_epoch);
void free_log_io_truncate_log_task(palf::LogIOTruncateLogTask *ptr);
palf::LogIOFlushMetaTask *alloc_log_io_flush_meta_task(const int64_t palf_id, const int64_t palf_epoch);
@ -139,6 +145,7 @@ private:
uint64_t tenant_id_ CACHE_ALIGNED;
int64_t total_limit_;
int64_t pending_replay_mutator_size_;
const int LOG_HANDLE_SUBMIT_TASK_SIZE;
const int LOG_IO_FLUSH_LOG_TASK_SIZE;
const int LOG_IO_TRUNCATE_LOG_TASK_SIZE;
const int LOG_IO_FLUSH_META_TASK_SIZE;
@ -151,6 +158,7 @@ private:
ObBlockAllocMgr unlimited_blk_alloc_;
ObBlockAllocMgr replay_log_task_blk_alloc_;
ObVSliceAlloc clog_ge_alloc_;
ObSliceAlloc log_handle_submit_task_alloc_;
ObSliceAlloc log_io_flush_log_task_alloc_;
ObSliceAlloc log_io_truncate_log_task_alloc_;
ObSliceAlloc log_io_flush_meta_task_alloc_;

View File

@ -99,6 +99,10 @@ TG_DEF(LogIOTaskCbThreadPool, LogIOCb, QUEUE_THREAD,
ThreadCountPair(palf::LogIOTaskCbThreadPool::THREAD_NUM,
palf::LogIOTaskCbThreadPool::MINI_MODE_THREAD_NUM),
palf::LogIOTaskCbThreadPool::MAX_LOG_IO_CB_TASK_NUM)
TG_DEF(LogSharedQueueTh, LogSharedQueueThread, QUEUE_THREAD,
ThreadCountPair(palf::LogSharedQueueTh::THREAD_NUM,
palf::LogSharedQueueTh::MINI_MODE_THREAD_NUM),
palf::LogSharedQueueTh::MAX_LOG_HANDLE_TASK_NUM)
TG_DEF(ReplayService, ReplaySrv, QUEUE_THREAD, 1, (common::REPLAY_TASK_QUEUE_SIZE + 1) * OB_MAX_LS_NUM_PER_TENANT_PER_SERVER_CAN_BE_SET)
TG_DEF(LogRouteService, LogRouteSrv, QUEUE_THREAD, 1, (common::MAX_SERVER_COUNT) * OB_MAX_LS_NUM_PER_TENANT_PER_SERVER_CAN_BE_SET)
TG_DEF(LogRouterTimer, LogRouterTimer, TIMER)

View File

@ -50,6 +50,11 @@ public:
public:
MockPublicLogSlidingWindow() {}
virtual ~MockPublicLogSlidingWindow() {}
virtual bool is_handle_thread_lease_expired(const int64_t thread_lease_begin_ts) const override final
{
UNUSED(thread_lease_begin_ts);
return false;
}
};
class MockLocCb : public PalfLocationCacheCb
{