212 lines
8.0 KiB
C++
212 lines
8.0 KiB
C++
/**
|
|
* Copyright (c) 2021 OceanBase
|
|
* OceanBase CE is licensed under Mulan PubL v2.
|
|
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
|
* You may obtain a copy of Mulan PubL v2 at:
|
|
* http://license.coscl.org.cn/MulanPubL-2.0
|
|
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
|
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
|
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
|
* See the Mulan PubL v2 for more details.
|
|
*/
|
|
|
|
#ifndef OCEANBASE_PALF_CLUSTER_OB_LOG_SERVICE_
|
|
#define OCEANBASE_PALF_CLUSTER_OB_LOG_SERVICE_
|
|
|
|
#include "common/ob_role.h"
|
|
#include "lib/ob_define.h"
|
|
#include "share/ob_tenant_info_proxy.h" // ObTenantRole
|
|
#include "logservice/applyservice/ob_log_apply_service.h"
|
|
#include "mittest/palf_cluster/rpc/palf_cluster_rpc_req.h"
|
|
#include "mittest/palf_cluster/rpc/palf_cluster_rpc_proxy.h"
|
|
#include "mittest/palf_cluster/logservice/ob_log_client.h"
|
|
#include "logservice/palf/log_block_pool_interface.h" // ILogBlockPool
|
|
#include "logservice/palf/log_define.h"
|
|
#include "logservice/ob_log_monitor.h"
|
|
#include "logservice/ob_log_handler.h"
|
|
#include "logservice/ob_log_service.h"
|
|
#include "role_coordinator.h"
|
|
#include "ls_adapter.h"
|
|
// #include "replayservice/ob_log_replay_service.h"
|
|
|
|
namespace oceanbase
|
|
{
|
|
namespace commom
|
|
{
|
|
class ObAddr;
|
|
class ObILogAllocator;
|
|
}
|
|
|
|
namespace rpc
|
|
{
|
|
namespace frame
|
|
{
|
|
class ObReqTransport;
|
|
}
|
|
}
|
|
|
|
namespace share
|
|
{
|
|
class ObLSID;
|
|
class SCN;
|
|
}
|
|
|
|
namespace palf
|
|
{
|
|
class PalfHandleGuard;
|
|
class PalfRoleChangeCb;
|
|
class PalfDiskOptions;
|
|
class PalfEnv;
|
|
}
|
|
|
|
namespace palfcluster
|
|
{
|
|
|
|
class LogService
|
|
{
|
|
public:
|
|
LogService();
|
|
virtual ~LogService();
|
|
static int mtl_init(LogService* &logservice);
|
|
static void mtl_destroy(LogService* &logservice);
|
|
int start();
|
|
void stop();
|
|
void wait();
|
|
void destroy();
|
|
public:
|
|
static palf::AccessMode get_palf_access_mode(const share::ObTenantRole &tenant_role);
|
|
int init(const palf::PalfOptions &options,
|
|
const char *base_dir,
|
|
const common::ObAddr &self,
|
|
common::ObILogAllocator *alloc_mgr,
|
|
rpc::frame::ObReqTransport *transport,
|
|
palf::ILogBlockPool *log_block_pool);
|
|
//--日志流相关接口--
|
|
//新建日志流接口,该接口会创建日志流对应的目录,新建一个以PalfBaeInfo为日志基点的日志流。
|
|
//其中包括生成并初始化对应的ObReplayStatus结构
|
|
// @param [in] id,日志流标识符
|
|
// @param [in] replica_type,日志流的副本类型
|
|
// @param [in] tenant_role, 租户角色, 以此决定Palf使用模式(APPEND/RAW_WRITE)
|
|
// @param [in] palf_base_info, 日志同步基点信息
|
|
// @param [out] log_handler,新建日志流以logservice::ObLogHandler形式返回,保证上层使用日志流时的生命周期
|
|
int create_ls(const share::ObLSID &id,
|
|
const common::ObReplicaType &replica_type,
|
|
const share::ObTenantRole &tenant_role,
|
|
const palf::PalfBaseInfo &palf_base_info,
|
|
const bool allow_log_sync,
|
|
logservice::ObLogHandler &log_handler);
|
|
|
|
//删除日志流接口:外层调用create_ls()之后,后续流程失败,需要调用remove_ls()
|
|
int remove_ls(const share::ObLSID &id,
|
|
logservice::ObLogHandler &log_handler);
|
|
|
|
int check_palf_exist(const share::ObLSID &id, bool &exist) const;
|
|
//宕机重启恢复日志流接口,包括生成并初始化对应的ObReplayStatus结构
|
|
// @param [in] id,日志流标识符
|
|
// @param [out] log_handler,新建日志流以logservice::ObLogHandler形式返回,保证上层使用日志流时的生命周期
|
|
int add_ls(const share::ObLSID &id,
|
|
logservice::ObLogHandler &log_handler);
|
|
|
|
int open_palf(const share::ObLSID &id,
|
|
palf::PalfHandleGuard &palf_handle);
|
|
|
|
// get role of current palf replica.
|
|
// NB: distinguish the difference from get_role of log_handler
|
|
// In general, get the replica role to do migration/blance/report, use this interface,
|
|
// to write log, use get_role of log_handler
|
|
int get_palf_role(const share::ObLSID &id,
|
|
common::ObRole &role,
|
|
int64_t &proposal_id);
|
|
|
|
// @brief get palf disk usage
|
|
// @param [out] used_size_byte
|
|
// @param [out] total_size_byte, if in shrinking status, total_size_byte is the value after shrinking.
|
|
// NB: total_size_byte may be smaller than used_size_byte.
|
|
int get_palf_disk_usage(int64_t &used_size_byte, int64_t &total_size_byte);
|
|
|
|
// @brief get palf disk usage
|
|
// @param [out] used_size_byte
|
|
// @param [out] total_size_byte, if in shrinking status, total_size_byte is the value before shrinking.
|
|
int get_palf_stable_disk_usage(int64_t &used_size_byte, int64_t &total_size_byte);
|
|
// why we need update 'log_disk_size_' and 'log_disk_util_threshold' separately.
|
|
//
|
|
// 'log_disk_size' is a member of unit config.
|
|
// 'log_disk_util_threshold' and 'log_disk_util_limit_threshold' are members of tenant parameters.
|
|
// If we just only provide 'update_disk_options', the correctness of PalfDiskOptions can not be guaranteed.
|
|
// for example, original PalfDiskOptions is that
|
|
// {
|
|
// log_disk_size = 100G,
|
|
// log_disk_util_limit_threshold = 95,
|
|
// log_disk_util_threshold = 80
|
|
// }
|
|
//
|
|
// 1. thread1 update 'log_disk_size' with 50G, and it will used 'update_disk_options' with PalfDiskOptions
|
|
// {
|
|
// log_disk_size = 50G,
|
|
// log_disk_util_limit_threshold = 95,
|
|
// log_disk_util_threshold = 80
|
|
// }
|
|
// 2. thread2 updaet 'log_disk_util_limit_threshold' with 85, and it will used 'update_disk_options' with PalfDiskOptions
|
|
// {
|
|
// log_disk_size = 100G,
|
|
// log_disk_util_limit_threshold = 85,
|
|
// log_disk_util_threshold = 80
|
|
// }.
|
|
int update_palf_options_except_disk_usage_limit_size();
|
|
int update_log_disk_usage_limit_size(const int64_t log_disk_usage_limit_size);
|
|
int get_palf_options(palf::PalfOptions &options);
|
|
int iterate_palf(const ObFunction<int(const palf::PalfHandle&)> &func);
|
|
|
|
int get_io_start_time(int64_t &last_working_time);
|
|
int check_disk_space_enough(bool &is_disk_enough);
|
|
|
|
palf::PalfEnv *get_palf_env() { return palf_env_; }
|
|
// TODO by yunlong: temp solution, will by removed after Reporter be added in MTL
|
|
// ObLogReplayService *get_log_replay_service() { return &replay_service_; }
|
|
obrpc::PalfClusterRpcProxy *get_rpc_proxy() { return &rpc_proxy_; }
|
|
ObAddr &get_self() { return self_; }
|
|
|
|
int create_palf_replica(const int64_t palf_id,
|
|
const common::ObMemberList &member_list,
|
|
const int64_t replica_num,
|
|
const int64_t leader_idx);
|
|
|
|
int create_log_clients(const int64_t thread_num,
|
|
const int64_t log_size,
|
|
const int64_t palf_group_num,
|
|
std::vector<common::ObAddr> leader_list);
|
|
public:
|
|
palfcluster::LogClientMap *get_log_client_map() { return &log_client_map_; }
|
|
static const int64_t THREAD_NUM = 2000;
|
|
palfcluster::LogRemoteClient clients_[THREAD_NUM];
|
|
private:
|
|
int create_ls_(const share::ObLSID &id,
|
|
const common::ObReplicaType &replica_type,
|
|
const share::ObTenantRole &tenant_role,
|
|
const palf::PalfBaseInfo &palf_base_info,
|
|
const bool allow_log_sync,
|
|
logservice::ObLogHandler &log_handler);
|
|
private:
|
|
bool is_inited_;
|
|
bool is_running_;
|
|
|
|
common::ObAddr self_;
|
|
palf::PalfEnv *palf_env_;
|
|
|
|
logservice::ObLogApplyService apply_service_;
|
|
logservice::ObLogReplayService replay_service_;
|
|
palfcluster::RoleCoordinator role_change_service_;
|
|
logservice::ObLogMonitor monitor_;
|
|
obrpc::PalfClusterRpcProxy rpc_proxy_;
|
|
ObSpinLock update_palf_opts_lock_;
|
|
palfcluster::LogClientMap log_client_map_;
|
|
palfcluster::MockLSAdapter ls_adapter_;
|
|
logservice::ObLocationAdapter location_adapter_;
|
|
obrpc::ObLogServiceRpcProxy log_service_rpc_proxy_;
|
|
private:
|
|
DISALLOW_COPY_AND_ASSIGN(LogService);
|
|
};
|
|
} // end namespace palfcluster
|
|
} // end namespace oceanbase
|
|
#endif // OCEANBASE_PALF_CLUSTER_OB_LOG_SERVICE_
|