[FEAT MERGE] log4100 branch
Co-authored-by: tino247 <tino247@126.com> Co-authored-by: BinChenn <binchenn.bc@gmail.com> Co-authored-by: HaHaJeff <jeffzhouhhh@gmail.com>
This commit is contained in:
@ -12,16 +12,45 @@
|
||||
|
||||
#include "ob_cdc_service.h"
|
||||
#include "ob_cdc_service_monitor.h"
|
||||
#include "logservice/ob_log_service.h"
|
||||
#include "share/backup/ob_archive_persist_helper.h" // share::ObArchivePersistHelper
|
||||
#include "logservice/restoreservice/ob_remote_log_source_allocator.h" // ObResSrcAlloctor::free
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
namespace cdc
|
||||
{
|
||||
///////////////////////////////////////////ObCdcService///////////////////////////////////////////
|
||||
|
||||
// suppose archive log only has one destination.
|
||||
int ObCdcService::get_backup_dest(const share::ObLSID &ls_id, share::ObBackupDest &backup_dest)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObCdcService *cdc_service = MTL(logservice::ObLogService *)->get_cdc_service();
|
||||
ObArchiveDestInfo archive_dest;
|
||||
if (OB_ISNULL(cdc_service)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
EXTLOG_LOG(WARN, "cdc service is null, unexpected", KR(ret));
|
||||
} else if (FALSE_IT(archive_dest = cdc_service->get_archive_dest_info())) {
|
||||
} else if (archive_dest.empty()) {
|
||||
ret = OB_ENTRY_NOT_EXIST;
|
||||
EXTLOG_LOG(WARN, "archivelog is off yet", KR(ret), K(MTL_ID()));
|
||||
} else if (OB_FAIL(backup_dest.set(archive_dest.at(0).second))) {
|
||||
EXTLOG_LOG(WARN, "failed to set backup dest info", KR(ret), K(archive_dest));
|
||||
} else { }
|
||||
return ret;
|
||||
}
|
||||
|
||||
ObCdcService::ObCdcService()
|
||||
: is_inited_(false),
|
||||
stop_flag_(true),
|
||||
locator_(),
|
||||
fetcher_()
|
||||
fetcher_(),
|
||||
tg_id_(-1),
|
||||
dest_info_(),
|
||||
dest_info_lock_(),
|
||||
ls_ctx_map_(),
|
||||
large_buffer_pool_()
|
||||
{
|
||||
}
|
||||
|
||||
@ -38,10 +67,16 @@ int ObCdcService::init(const uint64_t tenant_id,
|
||||
if (IS_INIT) {
|
||||
ret = OB_INIT_TWICE;;
|
||||
EXTLOG_LOG(WARN, "ObCdcService has inited", KR(ret));
|
||||
} else if (OB_FAIL(locator_.init(tenant_id))) {
|
||||
EXTLOG_LOG(WARN, "ObCdcStartLsnLocator init failed", KR(ret));
|
||||
} else if (OB_FAIL(fetcher_.init(tenant_id, ls_service))) {
|
||||
EXTLOG_LOG(WARN, "ObCdcFetcher init failed", KR(ret));
|
||||
} else if (OB_FAIL(ls_ctx_map_.init("ExtClientLSCtxM", tenant_id))) {
|
||||
EXTLOG_LOG(WARN, "ls ctx map init failed", KR(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(large_buffer_pool_.init("CDCService", 1L * 1024 * 1024 * 1024))) {
|
||||
EXTLOG_LOG(WARN, "large buffer pool init failed", KR(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(locator_.init(tenant_id, &large_buffer_pool_))) {
|
||||
EXTLOG_LOG(WARN, "ObCdcStartLsnLocator init failed", KR(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(fetcher_.init(tenant_id, ls_service, &large_buffer_pool_))) {
|
||||
EXTLOG_LOG(WARN, "ObCdcFetcher init failed", KR(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(create_tenant_tg_(tenant_id))) {
|
||||
EXTLOG_LOG(WARN, "cdc thread group create failed", KR(ret), K(tenant_id));
|
||||
} else {
|
||||
is_inited_ = true;
|
||||
}
|
||||
@ -49,6 +84,57 @@ int ObCdcService::init(const uint64_t tenant_id,
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObCdcService::run1()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t tenant_id = MTL_ID();
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
EXTLOG_LOG(ERROR, "ObCdcService is not initialized", KR(ret));
|
||||
} else {
|
||||
static const int64_t BASE_INTERVAL = 1L * 1000 * 1000;
|
||||
static const int64_t QUERY_INTERVAL = 10L * BASE_INTERVAL;
|
||||
static const int64_t RECYCLE_INTERVAL = 10L * 60 * BASE_INTERVAL;
|
||||
static const int64_t BUFFER_POOL_PURGE_INTERVAL = 10L * 60 * BASE_INTERVAL;
|
||||
int64_t last_query_ts = 0;
|
||||
int64_t last_recycle_ts = 0;
|
||||
int64_t last_purge_ts = 0;
|
||||
while(! is_stoped()) {
|
||||
// archive is always off for sys tenant, no need to query archive dest
|
||||
int64_t current_ts = ObTimeUtility::current_time();
|
||||
if (OB_SYS_TENANT_ID != tenant_id) {
|
||||
if (current_ts - last_query_ts >= QUERY_INTERVAL) {
|
||||
// the change of archive dest info is not supported
|
||||
// TODO
|
||||
if (OB_FAIL(query_tenant_archive_info_())) {
|
||||
EXTLOG_LOG(WARN, "query_tenant_archive_info_ failed", KR(ret));
|
||||
} else {
|
||||
EXTLOG_LOG(INFO, "query dest_info_ succ", K_(dest_info));
|
||||
}
|
||||
last_query_ts = current_ts;
|
||||
}
|
||||
}
|
||||
// but sys tenant still hold the ctx when fetching log
|
||||
if (current_ts - last_recycle_ts >= RECYCLE_INTERVAL) {
|
||||
if (OB_FAIL(recycle_expired_ctx_(current_ts))) {
|
||||
EXTLOG_LOG(WARN, "failed to recycle expired ctx", KR(ret));
|
||||
} else {
|
||||
int64_t count = ls_ctx_map_.count();
|
||||
EXTLOG_LOG(INFO, "total number of items in ctx map ", K(count));
|
||||
}
|
||||
last_recycle_ts = current_ts;
|
||||
}
|
||||
|
||||
if (current_ts - last_purge_ts >= BUFFER_POOL_PURGE_INTERVAL) {
|
||||
large_buffer_pool_.weed_out();
|
||||
last_purge_ts = current_ts;
|
||||
}
|
||||
|
||||
ob_usleep(BASE_INTERVAL);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int ObCdcService::start()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -56,6 +142,8 @@ int ObCdcService::start()
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
EXTLOG_LOG(WARN, "ObCdcService not init", K(ret));
|
||||
} else if (OB_FAIL(start_tenant_tg_(MTL_ID()))) {
|
||||
EXTLOG_LOG(ERROR, "start CDCService failed", KR(ret));
|
||||
} else {
|
||||
stop_flag_ = false;
|
||||
}
|
||||
@ -66,10 +154,12 @@ int ObCdcService::start()
|
||||
void ObCdcService::stop()
|
||||
{
|
||||
ATOMIC_STORE(&stop_flag_, true);
|
||||
stop_tenant_tg_(MTL_ID());
|
||||
}
|
||||
|
||||
void ObCdcService::wait()
|
||||
{
|
||||
wait_tenant_tg_(MTL_ID());
|
||||
// do nothing
|
||||
}
|
||||
|
||||
@ -77,8 +167,12 @@ void ObCdcService::destroy()
|
||||
{
|
||||
is_inited_ = false;
|
||||
stop_flag_ = true;
|
||||
locator_.destroy();
|
||||
destroy_tenant_tg_(MTL_ID());
|
||||
fetcher_.destroy();
|
||||
locator_.destroy();
|
||||
dest_info_.reset();
|
||||
large_buffer_pool_.destroy();
|
||||
ls_ctx_map_.destroy();
|
||||
}
|
||||
|
||||
int ObCdcService::req_start_lsn_by_ts_ns(const obrpc::ObCdcReqStartLSNByTsReq &req,
|
||||
@ -167,6 +261,39 @@ int ObCdcService::fetch_missing_log(const obrpc::ObCdcLSFetchMissLogReq &req,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObCdcService::query_tenant_archive_info_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
share::ObArchivePersistHelper helper;
|
||||
uint64_t tenant_id = MTL_ID();
|
||||
ObMySQLProxy *mysql_proxy = GCTX.sql_proxy_;
|
||||
ObArchiveDestInfo tmp_info;
|
||||
|
||||
if (OB_ISNULL(mysql_proxy)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
EXTLOG_LOG(ERROR, "mysql_proxy is null, unexpected", KR(ret));
|
||||
} else if (OB_FAIL(helper.init(tenant_id))) {
|
||||
EXTLOG_LOG(WARN, "init ObArchivePersistHelper failed", KR(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(helper.get_valid_dest_pairs(*mysql_proxy, tmp_info))) {
|
||||
EXTLOG_LOG(WARN, "get_valid_dest_pairs failed", KR(ret), K(tenant_id));
|
||||
} else {
|
||||
// to minimize lock conflict
|
||||
ObSpinLockGuard lock_guard(dest_info_lock_);
|
||||
dest_info_ = tmp_info;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObCdcService::recycle_expired_ctx_(const int64_t cur_ts)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
RecycleCtxFunctor recycle_func(cur_ts);
|
||||
if (OB_FAIL(ls_ctx_map_.remove_if(recycle_func))) {
|
||||
EXTLOG_LOG(WARN, "recycle expired ctx failed", KR(ret), K(cur_ts));
|
||||
}
|
||||
return OB_SUCCESS;
|
||||
}
|
||||
|
||||
void ObCdcService::do_monitor_stat_(const int64_t start_ts,
|
||||
const int64_t end_ts,
|
||||
const int64_t send_ts,
|
||||
@ -179,5 +306,49 @@ void ObCdcService::do_monitor_stat_(const int64_t start_ts,
|
||||
ObCdcServiceMonitor::fetch_time(end_ts - start_ts);
|
||||
}
|
||||
|
||||
int ObCdcService::create_tenant_tg_(const int64_t tenant_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (! is_meta_tenant(tenant_id)) {
|
||||
if (OB_FAIL(TG_CREATE_TENANT(lib::TGDefIDs::CDCService, tg_id_))) {
|
||||
EXTLOG_LOG(WARN, "cdc thread group create for non-meta-tenant failed", KR(ret), K(tenant_id));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObCdcService::start_tenant_tg_(const int64_t tenant_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (! is_meta_tenant(tenant_id)) {
|
||||
if (OB_FAIL(TG_SET_RUNNABLE_AND_START(tg_id_, *this))) {
|
||||
EXTLOG_LOG(WARN, "cdcservie thread group set runnable and start failed", KR(ret), K(tenant_id));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObCdcService::wait_tenant_tg_(const int64_t tenant_id)
|
||||
{
|
||||
if (! is_meta_tenant(tenant_id)) {
|
||||
TG_WAIT(tg_id_);
|
||||
}
|
||||
}
|
||||
|
||||
void ObCdcService::stop_tenant_tg_(const int64_t tenant_id)
|
||||
{
|
||||
if (! is_meta_tenant(tenant_id)) {
|
||||
TG_STOP(tg_id_);
|
||||
}
|
||||
}
|
||||
|
||||
void ObCdcService::destroy_tenant_tg_(const int64_t tenant_id)
|
||||
{
|
||||
if (! is_meta_tenant(tenant_id)) {
|
||||
TG_DESTROY(tg_id_);
|
||||
}
|
||||
tg_id_ = -1;
|
||||
}
|
||||
|
||||
} // namespace cdc
|
||||
} // namespace oceanbase
|
||||
|
||||
Reference in New Issue
Block a user