[CP] [OBCDC] Decoupling tenant timezone and fix cdc progress exit issues

This commit is contained in:
SanmuWangZJU 2023-09-01 15:18:07 +00:00 committed by ob-robot
parent 38cdda42f8
commit 464ba63de9
45 changed files with 990 additions and 1382 deletions

View File

@ -225,8 +225,10 @@ int ObCDCPartTransResolver::read(
serve_info,
missing_info,
has_redo_in_cur_entry))) {
LOG_ERROR("read_trans_log_ fail", KR(ret), K_(tls_id), K(tx_log_block_header),
K(tx_header), K(has_redo_in_cur_entry));
if (OB_IN_STOP_STATE != ret) {
LOG_ERROR("read_trans_log_ fail", KR(ret), K_(tls_id), K(tx_log_block_header),
K(tx_header), K(has_redo_in_cur_entry));
}
}
}
@ -411,8 +413,10 @@ int ObCDCPartTransResolver::read_trans_log_(
case transaction::ObTxLogType::TX_REDO_LOG:
{
if (OB_FAIL(handle_redo_(tx_id, lsn, submit_ts, handling_miss_log, tx_log_block))) {
LOG_ERROR("handle_redo_ fail", KR(ret), K_(tls_id), K(tx_id), K(tx_id), K(lsn), K(tx_log_header),
K(missing_info));
if (OB_IN_STOP_STATE != ret) {
LOG_ERROR("handle_redo_ fail", KR(ret), K_(tls_id), K(tx_id), K(tx_id), K(lsn), K(tx_log_header),
K(missing_info));
}
}
break;
}
@ -541,7 +545,7 @@ int ObCDCPartTransResolver::handle_redo_(
LOG_DEBUG("redo_log duplication", KR(ret), K_(tls_id), K(tx_id), K(lsn), K(submit_ts),
K(redo_log), K(handling_miss_log), K(task));
ret = OB_SUCCESS;
} else {
} else if (OB_IN_STOP_STATE != ret) {
LOG_ERROR("push_redo_log into PartTransTask fail", KR(ret), K_(tls_id), K(tx_id), K(lsn),
K(handling_miss_log), K(task), K(redo_log));
}

View File

@ -194,7 +194,7 @@ int ObLogBatchBuffer::submit(IObLogBufTask *task)
if (OB_SUCC(ret)) {
if (auto_freeze_
&& OB_SUCCESS != (tmp_ret = try_freeze(next_flush_block_id_))) {
if (OB_IN_STOP_STATE != ret) {
if (OB_IN_STOP_STATE != tmp_ret) {
LOG_ERROR( "try_freeze failed", K(tmp_ret), K_(next_flush_block_id));
}
}

View File

@ -713,6 +713,7 @@ void ObLogCommitter::heartbeat_routine()
} else {
// Heartbeat thread that periodically generates heartbeat messages
while (! stop_flag_ && OB_SUCCESS == ret) {
ObLogTraceIdGuard trace_guard;
CheckpointTask *task = NULL;
bool need_continue = false;
@ -788,6 +789,7 @@ void ObLogCommitter::commit_routine()
int64_t commit_trans_count = 0;
while (OB_SUCC(ret) && ! stop_flag_) {
ObLogTraceIdGuard trace_guard;
PartTransTask *part_trans_task = NULL;
int64_t next_seq = trans_committer_queue_.begin_sn();
ret = trans_committer_queue_.get(next_seq, part_trans_task);

View File

@ -92,7 +92,7 @@ public:
#define OB_CLUSTER_PARAMETER(args...) args
// Liboblog config.
// max memory occupied by libobcdc: 20G
DEF_CAP(memory_limit, OB_CLUSTER_PARAMETER, "20G", "[2G,]", "memory limit");
DEF_CAP(memory_limit, OB_CLUSTER_PARAMETER, "8G", "[2G,]", "memory limit");
// Preserve the lower bound of system memory in %, in the range of 10% ~ 80%
// i.e.: ensure that the system memory remaining cannot be lower than this percentage based on the memory occupied by libobcdc
DEF_INT(system_memory_avail_percentage_lower_bound, OB_CLUSTER_PARAMETER, "10", "[10, 80]", "system memory avail upper bound");
@ -401,10 +401,10 @@ public:
T_DEF_BOOL(print_ls_server_list_update_info, OB_CLUSTER_PARAMETER, 0, "0:disabled, 1:enabled");
// Whether to sequentially output within a transaction
// Not on by default (participatn-by-participant output)
T_DEF_BOOL(enable_output_trans_order_by_sql_operation, OB_CLUSTER_PARAMETER, 0, "0:disabled, 1:enabled");
T_DEF_BOOL(enable_output_trans_order_by_sql_operation, OB_CLUSTER_PARAMETER, 1, "0:disabled, 1:enabled");
// redo dispatcher memory limit
DEF_CAP(redo_dispatcher_memory_limit, OB_CLUSTER_PARAMETER, "512M", "[128M,]", "redo dispatcher memory limit");
DEF_CAP(extra_redo_dispatch_memory_size, OB_CLUSTER_PARAMETER, "4M", "[0, 512M]", "extra redo dispatcher memory for data skew participant");
DEF_CAP(redo_dispatcher_memory_limit, OB_CLUSTER_PARAMETER, "64M", "[0M,]", "redo dispatcher memory limit");
DEF_CAP(extra_redo_dispatch_memory_size, OB_CLUSTER_PARAMETER, "1M", "[0, 512M]", "extra redo dispatcher memory for data skew participant");
// redo diepatcher memory limit ratio for output br by sql operation(compare with redo_dispatcher_memory_limit)
T_DEF_INT_INFT(redo_dispatched_memory_limit_exceed_ratio, OB_CLUSTER_PARAMETER, 2, 1,
"redo_dispatcher_memory_limit ratio for output by sql operation order");

View File

@ -41,6 +41,7 @@ int64_t ObLogFetcher::g_inner_heartbeat_interval =
ObLogFetcher::ObLogFetcher() :
is_inited_(false),
is_running_(false),
is_loading_data_dict_baseline_data_(false),
fetching_mode_(ClientFetchingMode::FETCHING_MODE_UNKNOWN),
archive_dest_(),
@ -113,10 +114,7 @@ int ObLogFetcher::init(
configure(cfg);
const common::ObRegion prefer_region(cfg.region.str());
// currently, libobcdc would output error log is is_tenant_mode is true and tenant_id is invalid.
// TODO: enable tenant_mode querying gv$ob_log_stat;
// const bool is_tenant_mode = TCTX.is_tenant_sync_mode();
const bool is_tenant_mode = false;
const bool is_tenant_mode = TCTX.is_tenant_sync_mode();
if (is_integrated_fetching_mode(fetching_mode) && OB_FAIL(log_route_service_.init(
proxy,
@ -134,7 +132,7 @@ int ObLogFetcher::init(
cfg.blacklist_history_overdue_time_min,
cfg.blacklist_history_clear_interval_min,
is_tenant_mode,
OB_INVALID_TENANT_ID))) {
TCTX.tenant_id_))) {
LOG_ERROR("ObLogRouterService init failer", KR(ret), K(prefer_region), K(cluster_id));
} else if (OB_FAIL(progress_controller_.init(cfg.ls_count_upper_limit))) {
LOG_ERROR("init progress controller fail", KR(ret));
@ -224,48 +222,50 @@ void ObLogFetcher::destroy()
{
stop();
// TODO: Global destroy all memory
is_inited_ = false;
is_loading_data_dict_baseline_data_ = false;
archive_dest_.reset();
large_buffer_pool_.destroy();
task_pool_ = NULL;
sys_ls_handler_ = NULL;
err_handler_ = NULL;
if (is_inited_) {
// TODO: Global destroy all memory
is_inited_ = false;
is_loading_data_dict_baseline_data_ = false;
archive_dest_.reset();
large_buffer_pool_.destroy();
task_pool_ = NULL;
sys_ls_handler_ = NULL;
err_handler_ = NULL;
misc_tid_ = 0;
heartbeat_dispatch_tid_ = 0;
last_timestamp_ = OB_INVALID_TIMESTAMP;
stop_flag_ = true;
paused_ = false;
pause_time_ = OB_INVALID_TIMESTAMP;
resume_time_ = OB_INVALID_TIMESTAMP;
misc_tid_ = 0;
heartbeat_dispatch_tid_ = 0;
last_timestamp_ = OB_INVALID_TIMESTAMP;
stop_flag_ = true;
paused_ = false;
pause_time_ = OB_INVALID_TIMESTAMP;
resume_time_ = OB_INVALID_TIMESTAMP;
stream_worker_.destroy();
fs_container_mgr_.destroy();
idle_pool_.destroy();
dead_pool_.destroy();
start_lsn_locator_.destroy();
rpc_.destroy();
progress_controller_.destroy();
ls_fetch_mgr_.destroy();
part_trans_resolver_factory_.destroy();
dispatcher_ = nullptr;
cluster_id_filter_.destroy();
if (is_integrated_fetching_mode(fetching_mode_)) {
log_route_service_.wait();
log_route_service_.destroy();
stream_worker_.destroy();
fs_container_mgr_.destroy();
idle_pool_.destroy();
dead_pool_.destroy();
start_lsn_locator_.destroy();
rpc_.destroy();
progress_controller_.destroy();
ls_fetch_mgr_.destroy();
part_trans_resolver_factory_.destroy();
dispatcher_ = nullptr;
cluster_id_filter_.destroy();
if (is_integrated_fetching_mode(fetching_mode_)) {
log_route_service_.wait();
log_route_service_.destroy();
}
// Finally reset fetching_mode_ because of some processing dependencies, such as ObLogRouteService
if (is_direct_fetching_mode(fetching_mode_)) {
log_ext_handler_.wait();
log_ext_handler_.destroy();
}
fetching_mode_ = ClientFetchingMode::FETCHING_MODE_UNKNOWN;
log_ext_handler_concurrency_ = 0;
LOG_INFO("destroy fetcher succ");
}
// Finally reset fetching_mode_ because of some processing dependencies, such as ObLogRouteService
if (is_direct_fetching_mode(fetching_mode_)) {
log_ext_handler_.wait();
log_ext_handler_.destroy();
}
fetching_mode_ = ClientFetchingMode::FETCHING_MODE_UNKNOWN;
log_ext_handler_concurrency_ = 0;
LOG_INFO("destroy fetcher succ");
}
int ObLogFetcher::start()
@ -305,6 +305,7 @@ int ObLogFetcher::start()
LOG_ERROR("start fetcher heartbeat dispatch thread fail", K(pthread_ret), KERRNOMSG(pthread_ret));
ret = OB_ERR_UNEXPECTED;
} else {
is_running_ = true;
LOG_INFO("start fetcher succ", K(misc_tid_), K(heartbeat_dispatch_tid_));
}
}
@ -314,33 +315,38 @@ int ObLogFetcher::start()
void ObLogFetcher::stop()
{
if (OB_LIKELY(is_inited_)) {
stop_flag_ = true;
mark_stop_flag();
LOG_INFO("stop fetcher begin");
stream_worker_.stop();
dead_pool_.stop();
idle_pool_.stop();
start_lsn_locator_.stop();
if (is_running_) {
is_running_ = false;
LOG_INFO("stop fetcher begin");
stream_worker_.stop();
dead_pool_.stop();
idle_pool_.stop();
start_lsn_locator_.stop();
if (0 != misc_tid_) {
int pthread_ret = pthread_join(misc_tid_, NULL);
if (0 != pthread_ret) {
LOG_ERROR_RET(OB_ERR_SYS, "join fetcher misc thread fail", K(misc_tid_), K(pthread_ret),
KERRNOMSG(pthread_ret));
if (0 != misc_tid_) {
int pthread_ret = pthread_join(misc_tid_, NULL);
if (0 != pthread_ret) {
LOG_ERROR_RET(OB_ERR_SYS, "join fetcher misc thread fail", K(misc_tid_), K(pthread_ret),
KERRNOMSG(pthread_ret));
}
misc_tid_ = 0;
}
misc_tid_ = 0;
}
if (0 != heartbeat_dispatch_tid_) {
int pthread_ret = pthread_join(heartbeat_dispatch_tid_, NULL);
if (0 != pthread_ret) {
LOG_ERROR_RET(OB_ERR_SYS, "join fetcher heartbeat dispatch thread fail", K(heartbeat_dispatch_tid_),
K(pthread_ret), KERRNOMSG(pthread_ret));
if (0 != heartbeat_dispatch_tid_) {
int pthread_ret = pthread_join(heartbeat_dispatch_tid_, NULL);
if (0 != pthread_ret) {
LOG_ERROR_RET(OB_ERR_SYS, "join fetcher heartbeat dispatch thread fail", K(heartbeat_dispatch_tid_),
K(pthread_ret), KERRNOMSG(pthread_ret));
}
heartbeat_dispatch_tid_ = 0;
}
heartbeat_dispatch_tid_ = 0;
}
if (is_integrated_fetching_mode(fetching_mode_)) {
log_route_service_.stop();
if (is_integrated_fetching_mode(fetching_mode_)) {
log_route_service_.stop();
}
log_ext_handler_.stop();
LOG_INFO("stop fetcher succ");
}
if (is_direct_fetching_mode(fetching_mode_)) {
log_ext_handler_.stop();
@ -745,8 +751,10 @@ void ObLogFetcher::heartbeat_dispatch_routine()
int64_t heartbeat_tstamp = OB_INVALID_TIMESTAMP;
PartTransTask *task = NULL;
if (!TCTX.is_running()) {
LOG_INFO("OBCDC not launch finish, skip generate heartbeat");
// Get the next heartbeat timestamp
if (OB_FAIL(next_heartbeat_timestamp_(heartbeat_tstamp, last_timestamp_))) {
} else if (OB_FAIL(next_heartbeat_timestamp_(heartbeat_tstamp, last_timestamp_))) {
if (OB_NEED_RETRY != ret) {
if (OB_EMPTY_RESULT != ret) {
LOG_ERROR("next_heartbeat_timestamp_ fail", KR(ret), K(heartbeat_tstamp), K_(last_timestamp));
@ -810,7 +818,9 @@ void ObLogFetcher::print_fetcher_stat_()
// Get global minimum progress
if (OB_FAIL(progress_controller_.get_min_progress(min_progress))) {
LOG_ERROR("get_min_progress fail", KR(ret), K(progress_controller_));
if (OB_EMPTY_RESULT != ret) {
LOG_ERROR("get_min_progress fail", KR(ret), K(progress_controller_));
}
} else if (OB_UNLIKELY(OB_INVALID_TIMESTAMP == min_progress)) {
//LOG_ERROR("current min progress is invalid", K(min_progress), K(progress_controller_));
ret = OB_INVALID_ERROR;

View File

@ -242,6 +242,7 @@ private:
private:
bool is_inited_;
bool is_running_;
bool is_loading_data_dict_baseline_data_;
ClientFetchingMode fetching_mode_;
ObBackupPathString archive_dest_;

View File

@ -37,6 +37,7 @@
#include "ob_cdc_lob_aux_table_parse.h" // ObCDCLobAuxMetaStorager
#include "ob_cdc_udt.h" // ObCDCUdtValueBuilder
#include "ob_log_trace_id.h" // ObLogTraceIdGuard
#include "ob_log_timezone_info_getter.h"
using namespace oceanbase::common;
using namespace oceanbase::storage;
@ -312,8 +313,8 @@ int ObLogFormatter::get_task_count(
int ObLogFormatter::handle(void *data, const int64_t thread_index, volatile bool &stop_flag)
{
int ret = OB_SUCCESS;
set_cdc_thread_name("Formatter", thread_index);
ObLogTraceIdGuard trace_guard;
set_cdc_thread_name("Formatter", thread_index);
bool cur_stmt_need_callback = false;
IStmtTask *stmt_task = static_cast<IStmtTask *>(data);
DmlStmtTask *dml_stmt_task = dynamic_cast<DmlStmtTask *>(stmt_task);
@ -1057,7 +1058,8 @@ int ObLogFormatter::build_row_value_(
ColValueList *old_cols = nullptr;
ObLobDataOutRowCtxList *new_lob_ctx_cols = nullptr;
TableSchemaInfo *tb_schema_info = NULL;
IObLogTenantMgr *tenant_mgr_ = TCTX.tenant_mgr_;
IObCDCTimeZoneInfoGetter *tz_info_getter = TCTX.timezone_info_getter_;
ObCDCTenantTimeZoneInfo *obcdc_tenant_tz_info = nullptr;
ObTimeZoneInfoWrap *tz_info_wrap = nullptr;
if (OB_UNLIKELY(! inited_)) {
@ -1082,12 +1084,13 @@ int ObLogFormatter::build_row_value_(
} else if (OB_ISNULL(tb_schema_info)) {
LOG_ERROR("tb_schema_info is null", K(tb_schema_info));
ret = OB_ERR_UNEXPECTED;
} else if (OB_ISNULL(tenant_mgr_)) {
} else if (OB_ISNULL(tz_info_getter)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("tenant_mgr_ is nullptr", KR(ret), K(tenant_mgr_));
} else if (OB_FAIL(tenant_mgr_->get_tenant_tz_wrap(tenant_id, tz_info_wrap))) {
LOG_ERROR("tz_info_getter is nullptr", KR(ret), K(tz_info_getter));
} else if (OB_FAIL(tz_info_getter->get_tenant_tz_info(tenant_id, obcdc_tenant_tz_info))) {
LOG_ERROR("get_tenant_tz_wrap failed", KR(ret), K(tenant_id));
} else {
const ObTimeZoneInfoWrap *tz_info_wrap = &(obcdc_tenant_tz_info->get_tz_wrap());
const int64_t column_num = tb_schema_info->get_usr_column_count();
const uint64_t aux_lob_meta_tid = tb_schema_info->get_aux_lob_meta_tid();
const bool is_cur_stmt_task_cb_progress = stmt_task->is_callback();

View File

@ -34,7 +34,7 @@
#include "ob_cdc_tenant_sql_server_provider.h" // ObCDCTenantSQLServerProvider(for cluster sync mode)
#include "ob_cdc_tenant_endpoint_provider.h" // ObCDCEndpointProvider (for tenant sync mode)
#include "ob_log_schema_getter.h" // ObLogSchemaGetter
#include "ob_log_timezone_info_getter.h" // ObLogTimeZoneInfoGetter
#include "ob_log_timezone_info_getter.h" // ObCDCTimeZoneInfoGetter
#include "ob_log_committer.h" // ObLogCommitter
#include "ob_log_formatter.h" // ObLogFormatter
#include "ob_cdc_lob_data_merger.h" // ObCDCLobDataMerger
@ -151,6 +151,7 @@ ObLogInstance::ObLogInstance() :
part_trans_task_count_(0),
trans_task_pool_alloc_(),
start_tstamp_ns_(0),
sys_start_schema_version_(OB_INVALID_VERSION),
is_schema_split_mode_(true),
enable_filter_sys_tenant_(false),
drc_message_factory_binlog_record_type_(),
@ -158,6 +159,7 @@ ObLogInstance::ObLogInstance() :
refresh_mode_(RefreshMode::UNKNOWN_REFRSH_MODE),
fetching_mode_(ClientFetchingMode::FETCHING_MODE_UNKNOWN),
is_tenant_sync_mode_(false),
tenant_id_(OB_INVALID_TENANT_ID),
global_info_(),
mysql_proxy_(),
tenant_sql_proxy_(),
@ -586,14 +588,17 @@ int ObLogInstance::init_common_(uint64_t start_tstamp_ns, ERROR_CALLBACK err_cb)
flow_control_tid_ = 0;
output_dml_br_count_ = 0;
output_ddl_br_count_ = 0;
last_heartbeat_timestamp_micro_sec_ = start_tstamp_ns / NS_CONVERSION;
last_heartbeat_timestamp_micro_sec_ = start_tstamp_ns / NS_CONVERSION - 1;
log_clean_cycle_time_us_ = TCONF.log_clean_cycle_time_in_hours * _HOUR_;
part_trans_task_count_ = 0;
}
}
if (OB_SUCC(ret)) {
LOG_INFO("init obcdc succ", K_(is_schema_split_mode), K_(start_tstamp_ns),
LOG_INFO("init obcdc succ",
K_(is_schema_split_mode),
K_(is_tenant_sync_mode),
K_(start_tstamp_ns),
"start_tstamp", NTS_TO_STR(start_tstamp_ns_),
"working_mode", print_working_mode(working_mode_),
"refresh_mode", print_refresh_mode(refresh_mode_),
@ -710,7 +715,6 @@ int ObLogInstance::init_components_(const uint64_t start_tstamp_ns)
const char *ob_trace_id_ptr = TCONF.ob_trace_id.str();
const char *drc_message_factory_binlog_record_type_str = TCONF.drc_message_factory_binlog_record_type.str();
// The starting schema version of the SYS tenant
int64_t sys_start_schema_version = OB_INVALID_VERSION;
const char *data_start_schema_version = TCONF.data_start_schema_version.str();
const char *store_service_path = TCONF.store_service_path.str();
const char *working_mode_str = TCONF.working_mode.str();
@ -745,6 +749,10 @@ int ObLogInstance::init_components_(const uint64_t start_tstamp_ns)
} else {
refresh_mode_ = refresh_mode;
if (is_data_dict_refresh_mode(refresh_mode_)) {
enable_filter_sys_tenant_ = true;
}
LOG_INFO("set refresh mode", K(refresh_mode_str), K(refresh_mode_), "refresh_mode", print_refresh_mode(refresh_mode_));
}
}
@ -796,7 +804,7 @@ int ObLogInstance::init_components_(const uint64_t start_tstamp_ns)
// init ObCompatModeGetter
if (OB_SUCC(ret)) {
if (is_online_sql_not_available()) {
if (is_data_dict_refresh_mode(refresh_mode_)) {
if (OB_FAIL(share::ObCompatModeGetter::instance().init_for_obcdc())) {
LOG_ERROR("compat_mode_getter init fail", KR(ret));
}
@ -853,12 +861,16 @@ int ObLogInstance::init_components_(const uint64_t start_tstamp_ns)
INIT(tenant_mgr_, ObLogTenantMgr, enable_oracle_mode_match_case_sensitive, refresh_mode_);
INIT(timezone_info_getter_, ObLogTimeZoneInfoGetter, TCONF.timezone.str(),
tenant_sql_proxy_.get_ob_mysql_proxy(), *systable_helper_, *tenant_mgr_, *err_handler);
if (OB_SUCC(ret)) {
// init interface for getting tenant timezone map
OTTZ_MGR.init(ObLogTimeZoneInfoGetter::get_tenant_timezone_map);
if (OB_FAIL(ObCDCTimeZoneInfoGetter::get_instance().init(TCONF.timezone.str(),
mysql_proxy_.get_ob_mysql_proxy(), *systable_helper_, *err_handler))) {
LOG_ERROR("init timezone_info_getter failed", KR(ret));
} else {
timezone_info_getter_ = &ObCDCTimeZoneInfoGetter::get_instance();
// init interface for getting tenant timezone map
// get_tenant_tz_map_function is defined in ob_log_timezone_info_getter file
OTTZ_MGR.init(get_tenant_tz_map_function);
}
}
const int64_t start_tstamp_usec = start_tstamp_ns / NS_CONVERSION;
@ -867,11 +879,11 @@ int ObLogInstance::init_components_(const uint64_t start_tstamp_ns)
if (OB_SUCC(ret)) {
// Initialize schema-related modules, split patterns, and SYS tenant starting schema versions based on start-up timestamps
if (is_online_refresh_mode(refresh_mode_)) {
if (OB_FAIL(init_schema_(start_tstamp_usec, sys_start_schema_version))) {
if (OB_FAIL(init_schema_(start_tstamp_usec, sys_start_schema_version_))) {
LOG_ERROR("init schema fail", KR(ret), K(start_tstamp_usec));
}
} else if (is_data_dict_refresh_mode(refresh_mode_)) {
sys_start_schema_version = start_tstamp_usec;
sys_start_schema_version_ = start_tstamp_usec;
// set g_liboblog_mode_ is true
ObSchemaService::g_liboblog_mode_ = true;
}
@ -969,35 +981,8 @@ int ObLogInstance::init_components_(const uint64_t start_tstamp_ns)
}
}
// config tenant mgr
if (OB_SUCC(ret)) {
if (OB_FAIL(config_tenant_mgr_(start_tstamp_ns, sys_start_schema_version))) {
LOG_ERROR("config_tenant_mgr_ fail", KR(ret), K(start_tstamp_ns), K(sys_start_schema_version));
}
}
if (OB_SUCC(ret)) {
if (is_data_dict_refresh_mode(refresh_mode_)) {
if (OB_FAIL(set_all_tenant_compat_mode_())) {
LOG_ERROR("set_all_tenant_compat_mode_ failed", KR(ret));
}
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(config_data_start_schema_version_(TCONF.global_data_start_schema_version))) {
LOG_ERROR("config_data_start_schema_version_ fail", KR(ret));
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(update_data_start_schema_version_on_split_mode_())) {
LOG_ERROR("update_data_start_schema_on_split_mode_ fail", KR(ret));
}
}
LOG_INFO("init all components done", KR(ret), K(start_tstamp_ns), K(sys_start_schema_version),
K(max_cached_trans_ctx_count), K_(is_schema_split_mode));
LOG_INFO("init all components done", KR(ret), K(start_tstamp_ns), K_(sys_start_schema_version),
K(max_cached_trans_ctx_count), K_(is_schema_split_mode), K_(enable_filter_sys_tenant));
return ret;
}
@ -1114,6 +1099,25 @@ int ObLogInstance::init_sql_provider_()
K(enable_ssl_client_authentication));
}
}
} else {
// query tenant_id
ObArray<uint64_t> tenant_id_list;
if (OB_ISNULL(systable_helper_)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("expect valid systable_helper_", KR(ret));
} else if (OB_FAIL(systable_helper_->query_tenant_id_list(tenant_id_list))) {
LOG_ERROR("query_tenant_id_list failed", KR(ret), K(tenant_id_list));
} else if (OB_UNLIKELY(tenant_id_list.count() <= 0)) {
LOG_ERROR("empty tenant_id_list in tenant sync mode", KR(ret), K(tenant_id_list));
} else if (OB_UNLIKELY(tenant_id_list.count() > 1)) {
LOG_ERROR("too much tenant_id found in tenant sync mode", KR(ret), K(tenant_id_list));
} else {
tenant_id_ = tenant_id_list.at(0);
if (OB_UNLIKELY(!is_user_tenant(tenant_id_))) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("expect user tenant in tenant_sync_mode", KR(ret), K_(tenant_id));
}
}
}
}
@ -1182,6 +1186,38 @@ int ObLogInstance::config_data_start_schema_version_(const int64_t global_data_s
return ret;
}
int ObLogInstance::start_tenant_service_()
{
int ret = OB_SUCCESS;
LOG_INFO("start_tenant_service_ begin", K_(start_tstamp_ns), K_(sys_start_schema_version));
// config tenant mgr
if (OB_SUCC(ret)) {
if (OB_FAIL(config_tenant_mgr_(start_tstamp_ns_, sys_start_schema_version_))) {
LOG_ERROR("config_tenant_mgr_ fail", KR(ret), K_(start_tstamp_ns), K_(sys_start_schema_version));
}
}
if (OB_SUCC(ret)) {
if (is_data_dict_refresh_mode(refresh_mode_)) {
if (OB_FAIL(set_all_tenant_compat_mode_())) {
LOG_ERROR("set_all_tenant_compat_mode_ failed", KR(ret));
}
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(config_data_start_schema_version_(TCONF.global_data_start_schema_version))) {
LOG_ERROR("config_data_start_schema_version_ fail", KR(ret));
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(update_data_start_schema_version_on_split_mode_())) {
LOG_ERROR("update_data_start_schema_on_split_mode_ fail", KR(ret));
}
}
LOG_INFO("start_tenant_service_ success", K_(start_tstamp_ns), K_(sys_start_schema_version));
return ret;
}
int ObLogInstance::config_tenant_mgr_(const int64_t start_tstamp_ns,
const int64_t sys_schema_version)
{
@ -1282,7 +1318,6 @@ void ObLogInstance::destroy_components_()
DESTROY(meta_manager_, ObLogMetaManager);
DESTROY(trans_ctx_mgr_, ObLogTransCtxMgr);
DESTROY(trans_stat_mgr_, ObLogTransStatMgr);
DESTROY(timezone_info_getter_, ObLogTimeZoneInfoGetter);
DESTROY(tenant_mgr_, ObLogTenantMgr);
DESTROY(log_entry_task_pool_, ObLogEntryTaskPool);
DESTROY(br_pool_, ObLogBRPool);
@ -1300,54 +1335,63 @@ void ObLogInstance::destroy()
{
do_stop_("DESTROY_OBCDC");
inited_ = false;
if (inited_) {
LOG_INFO("destroy obcdc begin");
inited_ = false;
oblog_major_ = 0;
oblog_minor_ = 0;
oblog_major_patch_ = 0;
oblog_minor_patch_ = 0;
oblog_major_ = 0;
oblog_minor_ = 0;
oblog_major_patch_ = 0;
oblog_minor_patch_ = 0;
destroy_components_();
err_cb_ = NULL;
destroy_components_();
err_cb_ = NULL;
TCONF.destroy();
stop_flag_ = true;
last_heartbeat_timestamp_micro_sec_ = 0;
trans_stat_mgr_ = NULL;
tenant_mgr_ = NULL;
global_errno_ = 0;
handle_error_flag_ = 0;
disable_redirect_log_ = false;
log_clean_cycle_time_us_ = 0;
global_info_.reset();
hbase_util_.destroy();
obj2str_helper_.destroy();
br_queue_.destroy();
timer_tid_ = 0;
sql_tid_ = 0;
flow_control_tid_ = 0;
TCONF.destroy();
stop_flag_ = true;
last_heartbeat_timestamp_micro_sec_ = 0;
trans_stat_mgr_ = NULL;
tenant_mgr_ = NULL;
global_errno_ = 0;
handle_error_flag_ = 0;
disable_redirect_log_ = false;
log_clean_cycle_time_us_ = 0;
global_info_.reset();
hbase_util_.destroy();
obj2str_helper_.destroy();
br_queue_.destroy();
timer_tid_ = 0;
sql_tid_ = 0;
flow_control_tid_ = 0;
(void)trans_task_pool_.destroy();
(void)trans_task_pool_alloc_.destroy();
(void)trans_task_pool_.destroy();
(void)trans_task_pool_alloc_.destroy();
output_dml_br_count_ = 0;
output_ddl_br_count_ = 0;
output_dml_br_count_ = 0;
output_ddl_br_count_ = 0;
ObKVGlobalCache::get_instance().destroy();
ObMemoryDump::get_instance().destroy();
ObClockGenerator::destroy();
ObCDCTimeZoneInfoGetter::get_instance().destroy();
timezone_info_getter_ = nullptr;
ObKVGlobalCache::get_instance().destroy();
ObMemoryDump::get_instance().destroy();
ObClockGenerator::destroy();
is_assign_log_dir_valid_ = false;
MEMSET(assign_log_dir_, 0, sizeof(assign_log_dir_));
MEMSET(ob_trace_id_str_, 0, sizeof(ob_trace_id_str_));
br_index_in_trans_ = 0;
part_trans_task_count_ = 0;
start_tstamp_ns_ = 0;
is_schema_split_mode_ = true;
working_mode_ = WorkingMode::UNKNOWN_MODE;
refresh_mode_ = RefreshMode::UNKNOWN_REFRSH_MODE;
fetching_mode_ = ClientFetchingMode::FETCHING_MODE_UNKNOWN;
is_tenant_sync_mode_ = false;
is_assign_log_dir_valid_ = false;
MEMSET(assign_log_dir_, 0, sizeof(assign_log_dir_));
MEMSET(ob_trace_id_str_, 0, sizeof(ob_trace_id_str_));
br_index_in_trans_ = 0;
part_trans_task_count_ = 0;
start_tstamp_ns_ = 0;
sys_start_schema_version_ = OB_INVALID_VERSION;
is_schema_split_mode_ = true;
enable_filter_sys_tenant_ = false;
working_mode_ = WorkingMode::UNKNOWN_MODE;
refresh_mode_ = RefreshMode::UNKNOWN_REFRSH_MODE;
fetching_mode_ = ClientFetchingMode::FETCHING_MODE_UNKNOWN;
is_tenant_sync_mode_ = false;
tenant_id_ = OB_INVALID_TENANT_ID;
LOG_INFO("destroy obcdc end");
}
}
int ObLogInstance::launch()
@ -1392,6 +1436,8 @@ int ObLogInstance::launch()
LOG_ERROR("start_threads_ fail", KR(ret));
} else if (OB_FAIL(timezone_info_getter_->start())) {
LOG_ERROR("start_timezone_info_thread_ fail", KR(ret));
} else if (OB_FAIL(start_tenant_service_())) {
LOG_ERROR("start_tenant_service_ failed", KR(ret));
} else {
is_running_ = true;
LOG_INFO("launch all components end success");

View File

@ -52,7 +52,7 @@ namespace libobcdc
{
class IObLogMetaManager;
class IObLogSchemaGetter;
class IObLogTimeZoneInfoGetter;
class IObCDCTimeZoneInfoGetter;
class IObLogFetcher;
class IObLogSysLsTaskHandler;
class IObLogDmlParser;
@ -155,6 +155,7 @@ public:
int32_t get_log_level() const;
const char *get_log_file() const;
void set_disable_redirect_log(const bool flag) { disable_redirect_log_ = flag; }
bool is_running() { return is_running_; }
static void print_version();
int set_assign_log_dir(const char *log_dir, const int64_t log_dir_len);
@ -204,8 +205,9 @@ private:
int init_self_addr_();
int init_schema_(const int64_t start_tstamp_us, int64_t &sys_start_schema_version);
int init_components_(const uint64_t start_tstamp_ns);
int config_tenant_mgr_(const int64_t start_tstamp_ns, const int64_t sys_schema_version);
void destroy_components_();
int start_tenant_service_();
int config_tenant_mgr_(const int64_t start_tstamp_us, const int64_t sys_schema_version);
void write_pid_file_();
static void *timer_thread_func_(void *args);
static void *sql_thread_func_(void *args);
@ -322,6 +324,7 @@ private:
// External global exposure of variables via TCTX
public:
int64_t start_tstamp_ns_;
int64_t sys_start_schema_version_;
bool is_schema_split_mode_;
bool enable_filter_sys_tenant_;
std::string drc_message_factory_binlog_record_type_;
@ -329,12 +332,13 @@ public:
RefreshMode refresh_mode_;
ClientFetchingMode fetching_mode_;
bool is_tenant_sync_mode_;
uint64_t tenant_id_; // tenant_id in tenant_sync_mode, OB_INVALID_TENANT_ID in cluster_sync_mode
ObCDCGlobalInfo global_info_;
// compoments
ObLogMysqlProxy mysql_proxy_;
ObLogMysqlProxy tenant_sql_proxy_;
IObLogTimeZoneInfoGetter *timezone_info_getter_;
IObCDCTimeZoneInfoGetter *timezone_info_getter_;
ObLogHbaseUtil hbase_util_;
ObObj2strHelper obj2str_helper_;
BRQueue br_queue_;
@ -368,9 +372,6 @@ public:
IObLogFetcher *fetcher_;
IObLogTransStatMgr *trans_stat_mgr_; // Transaction Statistics Management
IObLogTenantMgr *tenant_mgr_;
// The tz information of the sys tenant is placed in instance because of the refresh schema dependency
ObTZInfoMap tz_info_map_;
ObTimeZoneInfoWrap tz_info_wrap_;
IObLogTransRedoDispatcher *trans_redo_dispatcher_;
IObLogTransMsgSorter *trans_msg_sorter_;

View File

@ -531,7 +531,7 @@ int LSFetchCtx::read_miss_tx_log(
} else {
if (OB_FAIL(part_trans_resolver_->read(buf, buf_len, pos, lsn, submit_ts, serve_info_, missing, tsi))) {
if (OB_ITEM_NOT_SETTED != ret) {
if (OB_ITEM_NOT_SETTED != ret && OB_IN_STOP_STATE != ret) {
LOG_ERROR("resolve miss_log fail", KR(ret), K(log_entry), K(log_base_header), K(lsn), K(missing));
}
}
@ -1440,7 +1440,7 @@ void LSFetchInfoForPrint::print_fetch_progress(const char *description,
"discarded=%d delay=%s tps=%.2lf progress=%s",
description, idx, array_cnt, to_cstring(tls_id_),
to_cstring(fetch_mod_),
is_discarded_, TVAL_TO_STR(cur_time * NS_CONVERSION - progress_.get_progress()),
is_discarded_, TVAL_TO_STR(cur_time - progress_.get_progress() / NS_CONVERSION),
tps_, to_cstring(progress_));
}

View File

@ -862,11 +862,13 @@ int FetchStream::read_group_entry_(palf::LogGroupEntry &group_entry,
reconsume_miss_info,
local_tsi,
stop_flag))) {
LOG_ERROR("reconsume log_entry after missing_log_info resolved failed", KR(ret),
K(log_entry), K(entry_lsn), K(missing_info), K(reconsume_miss_info));
if (OB_IN_STOP_STATE != ret) {
LOG_ERROR("reconsume log_entry after missing_log_info resolved failed", KR(ret),
K(log_entry), K(entry_lsn), K(missing_info), K(reconsume_miss_info));
}
}
}
} else {
} else if (OB_IN_STOP_STATE != ret) {
LOG_ERROR("handle log_entry failed", KR(ret), K(log_entry), K_(ls_fetch_ctx), K(entry_lsn));
}
} else {
@ -1720,7 +1722,7 @@ int FetchStream::handle_miss_record_or_state_log_(
ret = OB_SUCCESS;
LOG_INFO("found new miss_record_or_state_log while resolving current miss_record_or_state_log",
"tls_id", ls_fetch_ctx_->get_tls_id(), K(misslog_lsn), K(missing_info));
} else {
} else if (OB_IN_STOP_STATE != ret) {
LOG_ERROR("read miss_log failed", KR(ret), K(miss_log_entry), K(misslog_lsn), K(missing_info));
}
}
@ -1915,8 +1917,10 @@ int FetchStream::read_batch_misslog_(
} else if (OB_FAIL(miss_log_entry.deserialize(buf, len, pos))) {
LOG_ERROR("deserialize miss_log_entry fail", KR(ret), K(len), K(pos));
} else if (OB_FAIL(ls_fetch_ctx_->read_miss_tx_log(miss_log_entry, misslog_lsn, tsi, tmp_miss_info))) {
LOG_ERROR("read_miss_log fail", KR(ret), K(miss_log_entry),
K(misslog_lsn), K(fetched_missing_log_cnt), K(idx), K(tmp_miss_info));
if (OB_IN_STOP_STATE != ret) {
LOG_ERROR("read_miss_log fail", KR(ret), K(miss_log_entry),
K(misslog_lsn), K(fetched_missing_log_cnt), K(idx), K(tmp_miss_info));
}
} else {
fetched_missing_log_cnt++;
}

View File

@ -110,7 +110,9 @@ int ObLogMetaDataFetcher::start()
{
int ret = OB_SUCCESS;
if (OB_ISNULL(log_fetcher_)) {
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else if (OB_ISNULL(log_fetcher_)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("log_fetcher_ is nullptr", KR(ret), K(log_fetcher_));
} else if (OB_FAIL(log_fetcher_->start())) {
@ -126,10 +128,13 @@ void ObLogMetaDataFetcher::stop()
{
int ret = OB_SUCCESS;
if (OB_ISNULL(log_fetcher_)) {
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else if (OB_ISNULL(log_fetcher_)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("log_fetcher_ is nullptr", KR(ret), K(log_fetcher_));
} else {
LOG_INFO("ObLogMetaDataFetcher stop begin");
bool stop_flag = false;
RETRY_FUNC(stop_flag, *log_fetcher_, wait_for_all_ls_to_be_removed, FETCHER_WAIT_LS_TIMEOUT);
log_fetcher_->mark_stop_flag();
@ -142,6 +147,7 @@ void ObLogMetaDataFetcher::stop()
void ObLogMetaDataFetcher::destroy()
{
if (is_inited_) {
stop();
LOG_INFO("ObLogMetaDataFetcher destroy begin");
DESTROY(log_fetcher_, ObLogFetcher);

View File

@ -91,7 +91,7 @@ int ObLogMetaDataService::init(
void ObLogMetaDataService::destroy()
{
if (is_inited_) {
if (IS_INIT) {
fetcher_.destroy();
sql_queryer_.destroy();
baseline_loader_.destroy();

View File

@ -30,6 +30,7 @@
#include "ob_log_config.h" // TCONF
#include "ob_log_instance.h" // TCTX
#include "ob_log_schema_cache_info.h" // TableSchemaInfo
#include "ob_log_timezone_info_getter.h" // IObCDCTimeZoneInfoGetter
#define DEFAULT_ENCODING ""
@ -939,20 +940,22 @@ int ObLogMetaManager::build_column_metas_(
common::ObArray<share::schema::ObColDesc> column_ids;
const bool ignore_virtual_column = true;
const uint64_t tenant_id = table_schema->get_tenant_id();
IObLogTenantMgr *tenant_mgr_ = TCTX.tenant_mgr_;
IObCDCTimeZoneInfoGetter *tz_info_getter = TCTX.timezone_info_getter_;
ObTimeZoneInfoWrap *tz_info_wrap = nullptr;
ObCDCTenantTimeZoneInfo *obcdc_tenant_tz_info = nullptr;
if (OB_ISNULL(table_meta) || OB_ISNULL(table_schema)) {
LOG_ERROR("invalid argument", K(table_meta), K(table_schema));
ret = OB_INVALID_ARGUMENT;
} else if (OB_FAIL(table_schema->get_column_ids(column_ids, ignore_virtual_column))) {
LOG_ERROR("get column_ids from table_schema failed", KR(ret), KPC(table_schema));
} else if (OB_ISNULL(tenant_mgr_)) {
} else if (OB_ISNULL(tz_info_getter)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("tenant_mgr_ is nullptr", KR(ret), K(tenant_mgr_));
} else if (OB_FAIL(tenant_mgr_->get_tenant_tz_wrap(tenant_id, tz_info_wrap))) {
LOG_ERROR("get_tenant_tz_wrap failed", KR(ret), K(tenant_id));
LOG_ERROR("tz_info_getter is nullptr", KR(ret), K(tz_info_getter));
} else if (OB_FAIL(tz_info_getter->get_tenant_tz_info(tenant_id, obcdc_tenant_tz_info))) {
LOG_ERROR("get_tenant_tz_info failed", KR(ret), K(tenant_id));
} else {
const ObTimeZoneInfoWrap *tz_info_wrap = &(obcdc_tenant_tz_info->get_tz_wrap());
int64_t version = table_schema->get_schema_version();
uint64_t table_id = table_schema->get_table_id();
const bool is_heap_table = table_schema->is_heap_table();

View File

@ -224,6 +224,7 @@ int PartProgressController::get_min_progress(int64_t &progress)
ret = OB_ERR_UNEXPECTED;
} else if (ATOMIC_LOAD(&valid_progress_cnt_) <= 0) {
progress = INVALID_PROGRESS;
ret = OB_EMPTY_RESULT;
} else {
progress = INVALID_PROGRESS;
const int64_t cnt = ATOMIC_LOAD(&progress_cnt_);

View File

@ -28,6 +28,7 @@
#include "ob_log_store_key.h" // ObLogStoreKey
#include "ob_log_binlog_record.h" // ObLogBR
#include "ob_log_meta_manager.h" // IObLogMetaManager
#include "ob_log_trace_id.h" // ObLogTraceIdGuard
#include "ob_log_instance.h"
#include "ob_log_tenant.h"
#include "ob_log_config.h"

View File

@ -264,6 +264,7 @@ void ObLogSequencer::run1()
ObByteLockGuard guard(trans_queue_lock_);
while (OB_SUCC(ret) && ! trans_queue_.empty() && ! lib::ThreadPool::has_set_stop()) {
ObLogTraceIdGuard trace_guard;
TrxSortElem top_trx_sort_elem = trans_queue_.top();
const int64_t trans_commit_version = top_trx_sort_elem.get_trans_commit_version();
monitor.mark_and_get_cost("begin", true);
@ -290,6 +291,10 @@ void ObLogSequencer::run1()
}
}
if (OB_SUCC(ret) && lib::ThreadPool::has_set_stop()) {
ret = OB_IN_STOP_STATE;
}
// exit on fail
if (OB_SUCCESS != ret && OB_IN_STOP_STATE != ret && NULL != err_handler_) {
err_handler_->handle_error(ret, "sequencer thread exits, err=%d", ret);
@ -744,8 +749,10 @@ int ObLogSequencer::handle_participants_ready_trans_(const bool is_dml_trans,
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("tenant is NULL, unexpected error", KR(ret), K(guard));
} else {
if (OB_FAIL(recycle_resources_after_trans_ready_(*trans_ctx, *tenant))) {
LOG_ERROR("recycle_resources_after_trans_ready_ fail", KR(ret), KPC(trans_ctx), KPC(tenant));
if (OB_FAIL(recycle_resources_after_trans_ready_(*trans_ctx, *tenant, stop_flag))) {
if (OB_IN_STOP_STATE != ret) {
LOG_ERROR("recycle_resources_after_trans_ready_ fail", KR(ret), KPC(trans_ctx), KPC(tenant));
}
}
}
@ -843,7 +850,7 @@ int ObLogSequencer::handle_multi_data_source_info_(
PartTransTask *part_trans_task = trans_ctx.get_participant_objs();
IObLogPartMgr &part_mgr = tenant.get_part_mgr();
while (OB_SUCC(ret) && OB_NOT_NULL(part_trans_task)) {
while (OB_SUCC(ret) && OB_NOT_NULL(part_trans_task) && ! stop_flag) {
if (! part_trans_task->is_sys_ls_part_trans()) {
// USER_LS part_trans_task in DIST_DDL_TRANS won't into dispatcher, set_ref_cnt to 1 to
// recycle the part_trans_task.
@ -853,7 +860,7 @@ int ObLogSequencer::handle_multi_data_source_info_(
const CDCTabletChangeInfoArray &tablet_change_info_arr =
part_trans_task->get_multi_data_source_info().get_tablet_change_info_arr();
for (int64_t tablet_change_info_idx = 0;
OB_SUCC(ret) && tablet_change_info_idx < tablet_change_info_arr.count();
OB_SUCC(ret) && ! stop_flag && tablet_change_info_idx < tablet_change_info_arr.count();
tablet_change_info_idx++) {
const ObCDCTabletChangeInfo &tablet_change_info = tablet_change_info_arr.at(tablet_change_info_idx);
if (OB_UNLIKELY(! tablet_change_info.is_valid())) {
@ -1084,7 +1091,7 @@ int ObLogSequencer::wait_until_formatter_done_(volatile bool &stop_flag)
// 6. The inability to sequence does not decrement the reference count, leading to interdependencies and deadlocks
//
// Therefore, unlike previous implementations, resources are not reclaimed after sequencing, but after the distributed transaction has been assembled
int ObLogSequencer::recycle_resources_after_trans_ready_(TransCtx &trans_ctx, ObLogTenant &tenant)
int ObLogSequencer::recycle_resources_after_trans_ready_(TransCtx &trans_ctx, ObLogTenant &tenant, volatile bool &stop_flag)
{
int ret = OB_SUCCESS;
@ -1099,7 +1106,7 @@ int ObLogSequencer::recycle_resources_after_trans_ready_(TransCtx &trans_ctx, Ob
PartTransTask *participant = trans_ctx.get_participant_objs();
// Iterate over each statement of each partitioned transaction of a distributed transaction
while (NULL != participant) {
while (NULL != participant && ! stop_flag) {
// TODO is_ddl_trans: LS_TABLE的事务如何处理?
if (participant->is_dml_trans() || participant->is_ddl_trans()) {
const logservice::TenantLSID &tls_id = participant->get_tls_id();
@ -1111,6 +1118,10 @@ int ObLogSequencer::recycle_resources_after_trans_ready_(TransCtx &trans_ctx, Ob
participant = participant->next_task();
}
if (OB_SUCC(ret) && stop_flag) {
ret = OB_IN_STOP_STATE;
}
}
return ret;

View File

@ -175,7 +175,7 @@ private:
volatile bool &stop_flag);
// wait reader/parser/formatter module empty
int wait_until_formatter_done_(volatile bool &stop_flag);
int recycle_resources_after_trans_ready_(TransCtx &trans_ctx, ObLogTenant &tenant);
int recycle_resources_after_trans_ready_(TransCtx &trans_ctx, ObLogTenant &tenant, volatile bool &stop_flag);
int push_task_into_br_sorter_(TransCtx &trans_ctx, volatile bool &stop_flag);
int push_task_into_redo_dispatcher_(TransCtx &trans_ctx, volatile bool &stop_flag);
int push_task_into_committer_(PartTransTask *task,

View File

@ -272,11 +272,17 @@ void ObLogStartLSNLocator::run(const int64_t thread_index)
while (! stop_flag_ && OB_SUCCESS == ret) {
if (OB_FAIL(do_retrieve_(thread_index, data))) {
LOG_ERROR("retrieve request fail", KR(ret), K(thread_index));
} else if (! data.has_valid_req()) {
if (REACH_TIME_INTERVAL(30 * _SEC_)) {
LOG_INFO("no request should be launch, ignore");
}
} else if (OB_FAIL(do_request_(data))) {
if (OB_IN_STOP_STATE != ret) {
LOG_ERROR("do request fail", KR(ret));
}
} else {
}
if (OB_SUCC(ret)) {
cond_timedwait(thread_index, DATA_OP_TIMEOUT);
}
}
@ -314,7 +320,7 @@ int ObLogStartLSNLocator::do_retrieve_(const int64_t thread_index, WorkerData &w
int ret = OB_SUCCESS;
int64_t batch_count = ATOMIC_LOAD(&g_batch_count);
for (int64_t cnt = 0; OB_SUCCESS == ret && (cnt < batch_count); ++cnt) {
for (int64_t cnt = 0; OB_SUCCESS == ret && !stop_flag_ && (cnt < batch_count); ++cnt) {
StartLSNLocateReq *request = NULL;
StartLSNLocateReq::SvrItem *item = NULL;
SvrReq *svr_req = NULL;
@ -477,7 +483,7 @@ int ObLogStartLSNLocator::do_integrated_request_(WorkerData &data)
// 2. Each partition request is removed from the request list as soon as it completes, so each request is split into multiple requests, each starting with the first element
// 3. Partition request completion condition: regardless of success, as long as no breakpoint message is returned, the request is considered completed
while (! stop_flag_ && OB_SUCCESS == ret && svr_req.locate_req_list_.count() > 0) {
// 一次请求的最大个数
// maximum request count
int64_t item_cnt_limit = RpcReq::ITEM_CNT_LMT;
int64_t req_cnt = std::min(svr_req.locate_req_list_.count(), item_cnt_limit);
@ -495,6 +501,10 @@ int ObLogStartLSNLocator::do_integrated_request_(WorkerData &data)
// Build request parameters
if (OB_FAIL(build_request_params_(rpc_req, svr_req, req_cnt))) {
LOG_ERROR("build request params fail", KR(ret), K(rpc_req), K(req_cnt), K(svr_req));
} else if (svr_req.locate_req_list_.count() <= 0) {
if (REACH_TIME_INTERVAL(30 * _SEC_)) {
LOG_INFO("no svr_req to request, ignore", K(svr_req));
}
}
// Executing RPC requests
else if (OB_FAIL(do_rpc_and_dispatch_(*(rpc_), rpc_req, svr_req, succ_req_cnt))) {
@ -522,7 +532,7 @@ int ObLogStartLSNLocator::do_direct_request_(WorkerData &data)
DirectReqList &archive_req_lst = data.archive_req_list_;
const int64_t lst_cnt = archive_req_lst.count();
for (int64_t idx = lst_cnt - 1;
OB_SUCC(ret) && !stop_flag_ && idx >= 0; --idx) {
OB_SUCC(ret) && ! stop_flag_ && idx >= 0; --idx) {
LSN start_lsn;
StartLSNLocateReq *req = archive_req_lst.at(idx);
if (OB_ISNULL(req)) {
@ -568,7 +578,7 @@ int ObLogStartLSNLocator::build_request_params_(RpcReq &req,
int64_t total_cnt = svr_req.locate_req_list_.count();
req.reset();
for (int64_t index = 0; OB_SUCCESS == ret && index < req_cnt && index < total_cnt; ++index) {
for (int64_t index = 0; OB_SUCCESS == ret && ! stop_flag_ && index < req_cnt && index < total_cnt; ++index) {
StartLSNLocateReq *request = svr_req.locate_req_list_.at(index);
StartLSNLocateReq::SvrItem *svr_item = NULL;
@ -639,7 +649,7 @@ int ObLogStartLSNLocator::do_rpc_and_dispatch_(
if (OB_SUCCESS == ret) {
// Scanning of arrays in reverse order to support deletion of completed ls requests
for (int64_t idx = request_cnt - 1; OB_SUCCESS == ret && idx >= 0; idx--) {
for (int64_t idx = request_cnt - 1; OB_SUCCESS == ret && ! stop_flag_ && idx >= 0; idx--) {
int ls_err = OB_SUCCESS;
palf::LSN start_lsn;
int64_t start_log_tstamp = OB_INVALID_TIMESTAMP;

View File

@ -224,6 +224,10 @@ private:
svr_req_map_.reset();
archive_req_list_.reset();
}
bool has_valid_req() const
{
return svr_req_list_.count() > 0 || archive_req_list_.count() > 0;
}
};
// member variables

View File

@ -21,7 +21,7 @@
#include "ob_log_schema_getter.h" // IObLogSchemaGetter
#include "ob_log_tenant_mgr.h" // IObLogTenantMgr
#include "ob_log_config.h" // TCONF
#include "ob_log_trace_id.h"
#include "ob_log_trace_id.h" // ObLogTraceIdGuard
#define _STAT(level, fmt, args...) _OBLOG_LOG(level, "[STAT] [SYS_LS_HANDLER] " fmt, ##args)
#define STAT(level, fmt, args...) OBLOG_LOG(level, "[STAT] [SYS_LS_HANDLER] " fmt, ##args)
@ -344,6 +344,7 @@ int ObLogSysLsTaskHandler::handle_task_(PartTransTask &task,
const bool is_tenant_served)
{
int ret = OB_SUCCESS;
ObLogTraceIdGuard trace_guard;
if (OB_UNLIKELY(! task.is_ddl_trans()
&& ! task.is_ls_op_trans()

View File

@ -105,22 +105,19 @@ int QueryTimeZoneInfoVersionStrategy::build_sql_statement(char *sql_buf,
int ret = OB_SUCCESS;
pos = 0;
const char *query_sql = NULL;
const bool need_query_tenant_timezone_version = true;
const bool tenant_sync_mode = TCTX.is_tenant_sync_mode();
query_sql = "SELECT VALUE FROM __ALL_VIRTUAL_SYS_STAT WHERE NAME = 'CURRENT_TIMEZONE_VERSION' AND TENANT_ID = ";
if (OB_ISNULL(sql_buf) || OB_UNLIKELY(mul_statement_buf_len <=0)) {
LOG_ERROR("invalid argument", K(sql_buf), K(mul_statement_buf_len));
ret = OB_INVALID_ARGUMENT;
} else if (TCTX.is_tenant_sync_mode()) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("not support fetch timezone_version in tenant_sync_mode", KR(ret));
} else if (OB_FAIL(databuff_printf(sql_buf, mul_statement_buf_len, pos,
"%s", query_sql))) {
LOG_ERROR("build sql fail", KR(ret), K(pos), K(query_sql), "buf_size", mul_statement_buf_len, K(sql_buf));
} else {
if (need_query_tenant_timezone_version) {
if (OB_FAIL(databuff_printf(sql_buf, mul_statement_buf_len, pos, "%lu;", tenant_id_))) {
LOG_ERROR("build tenant_id sql fail", KR(ret), K(pos), K(query_sql),
"buf_size", mul_statement_buf_len, K(sql_buf), K(tenant_id_));
}
}
"%s %lu", query_sql, tenant_id_))) {
LOG_ERROR("build sql fail", KR(ret), K(pos), K(query_sql), "buf_size", mul_statement_buf_len, K(sql_buf), K_(tenant_id));
}
return ret;

View File

@ -469,7 +469,7 @@ public:
virtual int query_cluster_min_observer_version(uint64_t &min_observer_version);
/// Query timezone info version, for oracle new timezone type synchronization
/// ObLogTimeZoneInfoGetter
/// ObCDCTimeZoneInfoGetter
virtual int query_timezone_info_version(const uint64_t tenant_id,
int64_t &timezone_info_version);

View File

@ -22,7 +22,7 @@
#include "ob_log_tenant_mgr.h" // ObLogTenantMgr
#include "ob_log_instance.h" // TCTX
#include "ob_log_config.h" // TCONF
#include "ob_log_timezone_info_getter.h" // ObLogTimeZoneInfoGetter
#include "ob_log_timezone_info_getter.h" // ObCDCTimeZoneInfoGetter
#include "ob_log_start_schema_matcher.h" // ObLogStartSchemaMatcher
@ -54,9 +54,6 @@ ObLogTenant::ObLogTenant() :
committer_global_heartbeat_(OB_INVALID_VERSION),
committer_cur_schema_version_(OB_INVALID_VERSION),
committer_next_trans_schema_version_(OB_INVALID_VERSION),
tz_info_map_version_(OB_INVALID_TIMESTAMP),
tz_info_map_(NULL),
tz_info_wrap_(NULL),
cf_handle_(NULL)
{
tenant_name_[0] = '\0';
@ -106,8 +103,8 @@ int ObLogTenant::init(
LOG_ERROR("part_mgr_ init fail", KR(ret), K(tenant_id), K(start_schema_version));
} else if (OB_FAIL(databuff_printf(tenant_name_, sizeof(tenant_name_), pos, "%s", tenant_name))) {
LOG_ERROR("print tenant name fail", KR(ret), K(pos), K(tenant_id), K(tenant_name));
} else if (OB_FAIL(init_tz_info_(tenant_id))) {
LOG_ERROR("init tz info failed", KR(ret), K(tenant_id));
} else if (OB_FAIL(TCTX.timezone_info_getter_->init_tenant_tz_info(tenant_id))) {
LOG_ERROR("fail to init tenant timezone info", KR(ret), K(tenant_id));
} else if (OB_FAIL(init_all_ddl_operation_table_schema_info_())) {
LOG_ERROR("init_all_ddl_operation_table_schema_info_ failed", KR(ret), K(tenant_id));
}
@ -182,17 +179,6 @@ void ObLogTenant::reset()
tenant_state_.reset();
tz_info_map_version_ = OB_INVALID_TIMESTAMP;
if (OB_SYS_TENANT_ID != tenant_id) {
if (! OB_ISNULL(tz_info_map_)) {
OB_DELETE(ObTZInfoMap, "ObLogTenantTz", tz_info_map_);
tz_info_map_ = NULL;
}
if (! OB_ISNULL(tz_info_wrap_)) {
OB_DELETE(ObTimeZoneInfoWrap, "ObLogTenantTz", tz_info_wrap_);
tz_info_wrap_ = NULL;
}
}
sys_ls_progress_ = OB_INVALID_TIMESTAMP;
ddl_log_lsn_.reset();
all_ddl_operation_table_schema_info_.reset();
@ -877,51 +863,6 @@ int ObLogTenant::update_data_start_schema_version_on_split_mode()
return ret;
}
int ObLogTenant::init_tz_info_(const uint64_t tenant_id)
{
int ret = OB_SUCCESS;
if (OB_SYS_TENANT_ID == tenant_id) {
tz_info_map_ = &TCTX.tz_info_map_;
tz_info_wrap_ = &TCTX.tz_info_wrap_;
} else {
if (OB_ISNULL(tz_info_map_ = OB_NEW(ObTZInfoMap, "ObLogTenantTz"))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_ERROR("create tenant timezone info map failed", KR(ret));
} else if (OB_ISNULL(tz_info_wrap_ = OB_NEW(ObTimeZoneInfoWrap, "ObLogTenantTz"))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_ERROR("create tenant timezone info wrap failed", KR(ret));
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(tz_info_map_->init(ObMemAttr(OB_SERVER_TENANT_ID,
ObModIds::OB_HASH_BUCKET_TIME_ZONE_INFO_MAP)))) {
LOG_ERROR("fail to init tz_info_map_", K(tenant_id), KR(ret));
} else if (TCTX.timezone_info_getter_->init_tz_info_wrap(
tenant_id,
tz_info_map_version_,
*tz_info_map_,
*tz_info_wrap_)) {
LOG_ERROR("fail to init tz info wrap", KR(ret), K(tenant_id));
} else {
// succ
}
if (OB_FAIL(ret)) {
if (NULL != tz_info_map_) {
OB_DELETE(ObTZInfoMap, "ObLogTenantTz", tz_info_map_);
tz_info_map_ = NULL;
}
if (NULL != tz_info_wrap_) {
OB_DELETE(ObTimeZoneInfoWrap, "ObLogTenantTz", tz_info_wrap_);
tz_info_wrap_ = NULL;
}
}
return ret;
}
} // namespace libobcdc
} // namespace oceanbase

View File

@ -23,15 +23,10 @@
#include "ob_log_part_mgr.h" // ObLogPartMgr
#include "ob_log_ls_mgr.h" // ObLogLSMgr
#include "ob_log_ref_state.h" // RefState
#include "lib/timezone/ob_timezone_info.h" // ObTimeZoneInfo
#include <cstdint>
namespace oceanbase
{
namespace common
{
}
namespace libobcdc
{
@ -135,14 +130,6 @@ public:
IObLogPartMgr &get_part_mgr() { return part_mgr_; }
int64_t get_global_schema_version() const { return global_seq_and_schema_version_.hi; }
int64_t get_global_seq() const { return global_seq_and_schema_version_.lo; }
// get timezone info version
int64_t get_timezone_info_version() const { return ATOMIC_LOAD(&tz_info_map_version_); }
// update timezone info version
void update_timezone_info_version(const int64_t timezone_info_version)
{ ATOMIC_STORE(&tz_info_map_version_, timezone_info_version); }
common::ObTimeZoneInfoWrap *get_tz_info_wrap() { return tz_info_wrap_; }
common::ObTZInfoMap *get_tz_info_map() { return tz_info_map_; }
void *get_cf() { return cf_handle_; }
public:
@ -270,9 +257,6 @@ private:
int start_drop_tenant_if_needed_(bool &need_drop_tenant);
bool need_drop_tenant_() const;
int drop_sys_ls_();
// 1. If the low version of OB upgrades to 226, if the low version imports a time zone table, then the post script will split the time zone related table under the tenant
// 2. If the low version does not import the time zone table, do nothing
int init_tz_info_(const uint64_t tenant_id);
public:
TO_STRING_KV(
@ -291,8 +275,7 @@ public:
//"cur_schema_version", part_mgr_.get_schema_version(),
K_(committer_cur_schema_version),
K_(committer_next_trans_schema_version),
KPC_(task_queue),
K_(tz_info_map_version));
KPC_(task_queue));
private:
bool inited_;
@ -340,11 +323,6 @@ private:
// Transaction data and DDL data need to be matched for consumption, where the global_schema_version of the current transaction is recorded, which is used by the DDL to determine if it needs to be consumed.
int64_t committer_next_trans_schema_version_ CACHE_ALIGNED;
// 2_2_6 branch start: Oracle time zone related data types: internal table dependency split to tenant
int64_t tz_info_map_version_;
common::ObTZInfoMap *tz_info_map_;
common::ObTimeZoneInfoWrap *tz_info_wrap_;
void *cf_handle_;
private:

View File

@ -24,6 +24,7 @@
#include "ob_log_common.h" // DEFAULT_START_SEQUENCE_NUM
#include "ob_log_config.h" // TCONF
#include "ob_log_store_service.h"
#include "ob_log_timezone_info_getter.h"
#include "ob_cdc_tenant_sql_server_provider.h"
#include "lib/utility/ob_macro_utils.h"
@ -1004,6 +1005,7 @@ int ObLogTenantMgr::remove_tenant_(const uint64_t tenant_id, ObLogTenant *tenant
// should not affect following process for remove tenant.
ret = OB_SUCCESS;
} else {
ObCDCTimeZoneInfoGetter::get_instance().remove_tenant_tz_info(tenant_id);
//do nothing
}
@ -1414,68 +1416,6 @@ int ObLogTenantMgr::get_all_tenant_ids(std::vector<uint64_t> &tenant_ids)
return ret;
}
// the following function needs to be compatible, pre-226 versions only had time zone tables for system tenants, so only one map needs to be maintained.
// After 226 the time zone table is split into tenant level and a tz_info_map needs to be maintained for each tenant
// Obj2Str uses this interface to get the tz_info_wrap of a particular tenant for obj to string conversion
int ObLogTenantMgr::get_tenant_tz_wrap(const uint64_t tenant_id, ObTimeZoneInfoWrap *&tz_info_wrap)
{
int ret = OB_SUCCESS;
ObLogTenantGuard guard;
const uint64_t tz_tenant_id = tenant_id;
if (OB_SYS_TENANT_ID == tz_tenant_id) {
tz_info_wrap = &TCTX.tz_info_wrap_;
} else {
ObLogTenantGuard tenant_guard;
ObLogTenant *tenant = NULL;
if (OB_FAIL(get_tenant_guard(tz_tenant_id, guard))) {
if (OB_ENTRY_NOT_EXIST == ret) {
LOG_ERROR("tenant not exist when get tz_wrap", KR(ret), K(tenant_id), K(tz_tenant_id));
} else {
LOG_ERROR("get_tenant_guard fail", KR(ret), K(tenant_id), K(tz_tenant_id));
}
} else if (OB_ISNULL(tenant = guard.get_tenant())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("invalid tenant", KR(ret), K(tenant), K(tenant_id), K(tz_tenant_id));
} else {
tz_info_wrap = tenant->get_tz_info_wrap();
}
}
return ret;
}
// get_tenant_timezone
int ObLogTenantMgr::get_tenant_tz_map(const uint64_t tenant_id,
ObTZInfoMap *&tz_info_map)
{
int ret = OB_SUCCESS;
ObLogTenantGuard guard;
//TODO:(madoll.tw) should use tenant_id as tz_tenant_id
const uint64_t tz_tenant_id = OB_SYS_TENANT_ID;
if (OB_SYS_TENANT_ID == tz_tenant_id) {
tz_info_map = &TCTX.tz_info_map_;
} else {
ObLogTenantGuard tenant_guard;
ObLogTenant *tenant = NULL;
if (OB_FAIL(get_tenant_guard(tz_tenant_id, guard))) {
// TODO ERROR auto_switch_mode_and_refresh_schema not exit
if (OB_ENTRY_NOT_EXIST == ret) {
LOG_ERROR("tenant not exist when get tz_wrap", KR(ret), K(tenant_id), K(tz_tenant_id));
} else {
LOG_ERROR("get_tenant_guard fail", KR(ret), K(tenant_id), K(tz_tenant_id));
}
} else if (OB_ISNULL(tenant = guard.get_tenant())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("invalid tenant", KR(ret), K(tenant), K(tenant_id), K(tz_tenant_id));
} else {
tz_info_map = tenant->get_tz_info_map();
}
}
return ret;
}
bool ObLogTenantMgr::TenantPrinter::operator()(const TenantID &tid, ObLogTenant *tenant)
{
if (NULL != tenant) {

View File

@ -31,7 +31,6 @@ namespace oceanbase
namespace libobcdc
{
class IObLogSchemaGetter;
class IObLogTimeZoneInfoGetter;
class IObLogTenantMgr
{
@ -121,25 +120,6 @@ public:
virtual int get_all_tenant_ids(std::vector<uint64_t> &tenant_ids) = 0;
virtual bool is_inited() = 0;
/// Get the specified Tenant ID tz_info_wrap, called by ObObj2strHelper, where the tenant does not exist and an error is required
///
/// @param [in] tenant_id TenantID
/// @param [out] tz_info_wrap Timezone info
///
/// @retval OB_SUCCESS Success
/// @retval other error code Fail
virtual int get_tenant_tz_wrap(const uint64_t tenant_id, common::ObTimeZoneInfoWrap *&tz_info_wrap) = 0;
/// get tz_info_map, ObLogTimeZoneInfoGetter with specified Tenant ID
///
/// @param [in] tenant_id TenantID
/// @param [out] tz_info_wrap Timezone info
///
/// @retval OB_SUCCESS Success
/// @retval other error code Fail
virtual int get_tenant_tz_map(const uint64_t tenant_id,
common::ObTZInfoMap *&tz_info_map) = 0;
// @retval OB_SUCCESS Success
// @retval OB_ENTRY_NOT_EXIST Tenant not exist
// @retval other error code Fail
@ -239,11 +219,6 @@ public:
int filter_tenant(const char *tenant_name, bool &chosen);
int get_all_tenant_ids(std::vector<uint64_t> &tenant_ids);
virtual int get_tenant_tz_wrap(const uint64_t tenant_id,
common::ObTimeZoneInfoWrap *&tz_info_wrap);
virtual int get_tenant_tz_map(const uint64_t tenant_id,
common::ObTZInfoMap *&tz_info_map);
// Get the corresponding ObLogTenant based on tenant id
int get_tenant_guard(const uint64_t tenant_id, ObLogTenantGuard &guard);
int get_tenant(const uint64_t tenant_id, ObLogTenant *&tenant);

View File

@ -12,7 +12,7 @@
* TimeZone Info Getter
*/
#define USING_LOG_PREFIX OBLOG
#define USING_LOG_PREFIX OBLOG_SCHEMA
#include "ob_log_timezone_info_getter.h"
@ -27,7 +27,63 @@ namespace libobcdc
using namespace oceanbase::common;
using namespace oceanbase::common::sqlclient;
ObLogTimeZoneInfoGetter::ObLogTimeZoneInfoGetter()
int ObCDCTenantTimeZoneInfo::init(const uint64_t tenant_id)
{
int ret = OB_SUCCESS;
if (IS_INIT) {
ret = OB_INIT_TWICE;
LOG_ERROR("oblog_tz_info already inited", KR(ret), K(tenant_id));
} else if (OB_FAIL(OB_FAIL(
tz_info_map_.init(ObMemAttr(OB_SERVER_TENANT_ID, ObModIds::OB_HASH_BUCKET_TIME_ZONE_INFO_MAP))))) {
// must use OB_SERVER_TENANT_ID cause alloc memory require user tenant should has its own ObMallocAllocator
LOG_ERROR("init tz_info_map_ failed", KR(ret), K(tenant_id));
} else {
tenant_id_ = tenant_id;
is_inited_ = true;
}
return ret;
}
void ObCDCTenantTimeZoneInfo::destroy()
{
if (IS_INIT) {
is_inited_ = false;
tenant_id_ = OB_INVALID_TENANT_ID;
timezone_info_version_ = OB_INVALID_VERSION;
tz_info_map_.destroy();
}
}
int ObCDCTenantTimeZoneInfo::set_time_zone(const ObString &timezone_str)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObCDCTenantTimeZoneInfo not init", KR(ret), KPC(this));
} else if (OB_FAIL(tz_info_wrap_.init_time_zone(timezone_str, timezone_info_version_, tz_info_map_))) {
LOG_ERROR("tz_info_wrap set_time_zone failed", KR(ret), K(timezone_str), KPC(this));
}
return ret;
}
int get_tenant_tz_map_function(
const uint64_t tenant_id,
common::ObTZMapWrap &tz_map_wrap)
{
return ObCDCTimeZoneInfoGetter::get_instance().get_tenant_timezone_map(tenant_id, tz_map_wrap);
}
ObCDCTimeZoneInfoGetter& ObCDCTimeZoneInfoGetter::get_instance()
{
static ObCDCTimeZoneInfoGetter instance;
return instance;
}
ObCDCTimeZoneInfoGetter::ObCDCTimeZoneInfoGetter()
: inited_(false),
tz_tid_(0),
tz_cond_(),
@ -36,24 +92,26 @@ ObLogTimeZoneInfoGetter::ObLogTimeZoneInfoGetter()
systable_helper_(NULL),
err_handler_(NULL),
lock_(ObLatchIds::OBCDC_TIMEZONE_GETTER_LOCK),
tenant_mgr_(NULL),
timezone_str_(NULL)
timezone_str_(NULL),
oblog_tz_info_map_(),
allocator_()
{
}
ObLogTimeZoneInfoGetter::~ObLogTimeZoneInfoGetter()
ObCDCTimeZoneInfoGetter::~ObCDCTimeZoneInfoGetter()
{
destroy();
}
int ObLogTimeZoneInfoGetter::init(
int ObCDCTimeZoneInfoGetter::init(
const char *timezone_str,
common::ObMySQLProxy &mysql_proxy,
IObLogSysTableHelper &systable_helper,
IObLogTenantMgr &tenant_mgr,
IObLogErrHandler &err_handler)
{
int ret = OB_SUCCESS;
lib::ObLabel label = ObModIds::OB_HASH_BUCKET_TIME_ZONE_INFO_MAP;
lib::ObMemAttr tz_info_attr(OB_SYS_TENANT_ID, label);
if (OB_UNLIKELY(inited_)) {
ret = OB_INIT_TWICE;
@ -61,48 +119,61 @@ int ObLogTimeZoneInfoGetter::init(
} else if (OB_ISNULL(timezone_str)) {
ret = OB_INVALID_ARGUMENT;
LOG_ERROR("invalid argument", KR(ret), K(timezone_str));
} else if (OB_FAIL(allocator_.init(TENANT_TZ_INFO_VALUE_SIZE,
OB_MALLOC_NORMAL_BLOCK_SIZE, common::default_blk_alloc, tz_info_attr))) {
LOG_ERROR("init allocator failed", KR(ret));
} else if (OB_FAIL(oblog_tz_info_map_.create(MAP_BUCKET_NUM, label))) {
LOG_ERROR("init tz_info_map_ failed", KR(ret));
} else {
tz_tid_ = 0;
stop_flag_ = false;
timezone_str_ = timezone_str;
mysql_proxy_ = &mysql_proxy;
systable_helper_ = &systable_helper;
tenant_mgr_ = &tenant_mgr;
err_handler_ = &err_handler;
allocator_.set_nway(NWAY);
inited_ = true;
LOG_INFO("init timezone info getter succ", "is_online_tz_info_available", is_online_tz_info_available());
LOG_INFO("init timezone info getter succ",
"enable_refresh_timezone_info_by_sql", need_fetch_tz_info_online_());
}
return ret;
}
void ObLogTimeZoneInfoGetter::destroy()
void ObCDCTimeZoneInfoGetter::destroy()
{
stop();
inited_ = false;
tz_tid_ = 0;
if (inited_) {
LOG_INFO("destroy ObCDCTimeZoneInfoGetter begin");
inited_ = false;
tz_tid_ = 0;
timezone_str_ = NULL;
mysql_proxy_ = NULL;
systable_helper_ = NULL;
tenant_mgr_ = NULL;
err_handler_ = NULL;
timezone_str_ = NULL;
mysql_proxy_ = NULL;
systable_helper_ = NULL;
err_handler_ = NULL;
oblog_tz_info_map_.clear();
oblog_tz_info_map_.destroy();
LOG_INFO("destroy ObCDCTimeZoneInfoGetter success");
}
}
int ObLogTimeZoneInfoGetter::start()
int ObCDCTimeZoneInfoGetter::start()
{
int ret = OB_SUCCESS;
int pthread_ret = 0;
if (is_online_tz_info_available()) {
if (!need_fetch_tz_info_online_()) {
// do nothing
} else {
if (OB_UNLIKELY(0 != tz_tid_)) {
LOG_ERROR("timezone info thread has been started", K(tz_tid_));
ret = OB_NOT_SUPPORTED;
LOG_ERROR("timezone info thread has been started", KR(ret), K(tz_tid_));
} else if (0 != (pthread_ret = pthread_create(&tz_tid_, NULL, tz_thread_func_, this))) {
LOG_ERROR("start timezone info thread fail", K(pthread_ret), KERRNOMSG(pthread_ret));
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("start timezone info thread fail", KR(ret), K(pthread_ret), KERRNOMSG(pthread_ret));
} else {
LOG_INFO("start timezone info thread succ");
}
@ -111,38 +182,166 @@ int ObLogTimeZoneInfoGetter::start()
return ret;
}
void ObLogTimeZoneInfoGetter::stop()
void ObCDCTimeZoneInfoGetter::stop()
{
stop_flag_ = true;
if (! stop_flag_) {
LOG_INFO("stop ObCDCTimeZoneInfoGetter begin");
stop_flag_ = true;
if (is_online_tz_info_available()) {
if (0 != tz_tid_) {
tz_cond_.signal();
if (! need_fetch_tz_info_online_()) {
// do nothing
} else {
if (0 != tz_tid_) {
tz_cond_.signal();
int pthread_ret = pthread_join(tz_tid_, NULL);
if (0 != pthread_ret) {
LOG_ERROR_RET(OB_ERR_SYS, "join timezone info thread fail", K(tz_tid_), K(pthread_ret),
KERRNOMSG(pthread_ret));
} else {
LOG_INFO("stop timezone info thread succ");
int pthread_ret = pthread_join(tz_tid_, NULL);
if (0 != pthread_ret) {
LOG_ERROR_RET(OB_ERR_SYS, "join timezone info thread fail", K(tz_tid_), K(pthread_ret),
KERRNOMSG(pthread_ret));
} else {
LOG_INFO("stop timezone info thread succ");
}
tz_tid_ = 0;
}
tz_tid_ = 0;
}
LOG_INFO("stop ObCDCTimeZoneInfoGetter end");
}
}
void *ObLogTimeZoneInfoGetter::tz_thread_func_(void *args)
void *ObCDCTimeZoneInfoGetter::tz_thread_func_(void *args)
{
if (NULL != args) {
ObLogTimeZoneInfoGetter *tz_info_getter = static_cast<ObLogTimeZoneInfoGetter*>(args);
ObCDCTimeZoneInfoGetter *tz_info_getter = static_cast<ObCDCTimeZoneInfoGetter*>(args);
tz_info_getter->tz_routine();
}
return NULL;
}
void ObLogTimeZoneInfoGetter::tz_routine()
int ObCDCTimeZoneInfoGetter::init_tenant_tz_info(const uint64_t tenant_id)
{
int ret = OB_SUCCESS;
ObCDCTenantTimeZoneInfo *tenant_tz_info = nullptr;
if (OB_FAIL(create_tenant_tz_info_(tenant_id, tenant_tz_info))) {
LOG_ERROR("create_tenant_tz_info failed", KR(ret), K(tenant_id));
} else {
// get or init success
}
return ret;
}
int ObCDCTimeZoneInfoGetter::get_tenant_tz_info(
const uint64_t tenant_id,
ObCDCTenantTimeZoneInfo *&tenant_tz_info)
{
int ret = OB_SUCCESS;
const uint64_t exec_tenant_id = get_exec_tenant_id(tenant_id);
SpinRLockGuard guard(lock_);
if (OB_UNLIKELY(! inited_)) {
ret = OB_NOT_INIT;
LOG_ERROR("ob_log_timezone_info_getter not initted", KR(ret), K(tenant_id), K(exec_tenant_id));
} else if (OB_FAIL(oblog_tz_info_map_.get_refactored(exec_tenant_id, tenant_tz_info))) {
if (OB_HASH_NOT_EXIST == ret) {
LOG_INFO("tenant_tz_info not exist", KR(ret), K(tenant_id), K(exec_tenant_id));
} else {
LOG_ERROR("get_oblog_tz_info failed", KR(ret), K(tenant_id), K(exec_tenant_id));
}
}
if (OB_FAIL(ret)) {
} else if (OB_ISNULL(tenant_tz_info)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("oblog_tz_info should not be null", KR(ret), K(tenant_id), K(exec_tenant_id));
} else if (OB_UNLIKELY(!tenant_tz_info->is_inited())) {
ret = OB_NOT_INIT;
LOG_WARN("oblog_tz_info not init", KR(ret), K(tenant_id), K(exec_tenant_id));
} else {
// success
}
return ret;
}
int ObCDCTimeZoneInfoGetter::refresh_tenant_timezone_info_until_succ(const uint64_t tenant_id)
{
int ret = OB_SUCCESS;
ObCDCTenantTimeZoneInfo *tenant_tz_info = nullptr;
if (OB_FAIL(get_tenant_tz_info(tenant_id, tenant_tz_info))) {
LOG_ERROR("get_tenant_tz_info failed", KR(ret), K(tenant_id));
} else {
SpinWLockGuard guard(lock_);
const uint64_t exec_tenant_id = get_exec_tenant_id(tenant_id);
if (OB_FAIL(refresh_tenant_timezone_info_until_succ_(exec_tenant_id, *tenant_tz_info))) {
if (OB_IN_STOP_STATE != ret) {
LOG_ERROR("refresh_tenant_timezone_info_until_succ_ failed", KR(ret), K(tenant_id), K(exec_tenant_id), KPC(tenant_tz_info));
}
} else {
LOG_TRACE("refresh_tenant_timezone_info succ", K(tenant_id));
}
}
return ret;
}
int ObCDCTimeZoneInfoGetter::get_tenant_timezone_map(
const uint64_t tenant_id,
ObTZMapWrap &tz_mgr_wrap)
{
int ret = OB_SUCCESS;
ObCDCTenantTimeZoneInfo *tenant_tz_info = nullptr;
if (OB_UNLIKELY(! inited_)) {
ret = OB_NOT_INIT;
LOG_ERROR("ObCDCTimeZoneInfoGetter not inited", KR(ret), K(tenant_id));
} else if (OB_FAIL(get_tenant_tz_info(tenant_id, tenant_tz_info))) {
if (OB_HASH_NOT_EXIST == ret) {
if (OB_FAIL(create_tenant_tz_info_(tenant_id, tenant_tz_info))) {
LOG_WARN("tenant_tz_info not exist, tenant may already dropped, will ignore", KR(ret), K(tenant_id));
}
} else {
LOG_ERROR("get_tenant_tz_info failed", KR(ret), K(tenant_id));
}
}
if (OB_SUCC(ret)) {
tz_mgr_wrap.set_tz_map(&tenant_tz_info->get_tz_map());
}
return ret;
}
void ObCDCTimeZoneInfoGetter::remove_tenant_tz_info(const uint64_t tenant_id)
{
int ret = OB_SUCCESS;
ObCDCTenantTimeZoneInfo *tenant_tz_info = nullptr;
if (OB_UNLIKELY(! inited_)) {
// ignore
} else if (OB_FAIL(oblog_tz_info_map_.get_refactored(tenant_id, tenant_tz_info))) {
// ignore
} else if (OB_ISNULL(tenant_tz_info)) {
// ignore
} else {
SpinWLockGuard guard(lock_);
if (OB_FAIL(oblog_tz_info_map_.erase_refactored(tenant_id))) {
LOG_WARN("erase_refactored tenant_tz_info from oblog_tz_info_map_ failed", KR(ret), K(tenant_id));
}
allocator_.free(tenant_tz_info);
tenant_tz_info = nullptr;
}
LOG_INFO("remove_tenant_tz_info", KR(ret), K(tenant_id));
}
void ObCDCTimeZoneInfoGetter::tz_routine()
{
int ret = OB_SUCCESS;
@ -150,13 +349,14 @@ void ObLogTimeZoneInfoGetter::tz_routine()
LOG_ERROR("instance has not been initialized");
ret = OB_NOT_INIT;
} else {
while (! stop_flag_ && OB_SUCCESS == ret && tenant_mgr_->is_inited()) {
if (OB_FAIL(query_timezone_info_version_and_update_())) {
LOG_ERROR("query_timezone_info_version_and_update_ fail", KR(ret));
}
if (OB_NEED_RETRY == ret) {
ret = OB_SUCCESS;
while (! stop_flag_ && OB_SUCC(ret)) {
if (OB_FAIL(refresh_all_tenant_timezone_info_())) {
if (OB_IN_STOP_STATE != ret) {
LOG_WARN("timezone_info_getter_ refresh_timezone_info_ fail", KR(ret));
ret = OB_SUCCESS;
}
} else {
LOG_INFO("timezone_info_getter_ refresh_timezone_info_ succ");
}
tz_cond_.timedwait(QUERY_TIMEZONE_INFO_VERSION_INTERVAL);
@ -177,177 +377,158 @@ void ObLogTimeZoneInfoGetter::tz_routine()
LOG_INFO("timezone info thread exits", KR(ret), K_(stop_flag));
}
int ObLogTimeZoneInfoGetter::query_timezone_info_version_and_update_()
bool ObCDCTimeZoneInfoGetter::need_fetch_tz_info_online_() const
{
return ! is_data_dict_refresh_mode(TCTX.refresh_mode_);
}
int ObCDCTimeZoneInfoGetter::create_tenant_tz_info_(
const uint64_t tenant_id,
ObCDCTenantTimeZoneInfo *&tenant_tz_info)
{
int ret = OB_SUCCESS;
const uint64_t exec_tenant_id = get_exec_tenant_id(tenant_id);
SpinWLockGuard guard(lock_);
// Version change, active refresh
if (OB_FAIL(refresh_timezone_info_())) {
if (OB_NEED_RETRY == ret) {
LOG_WARN("timezone_info_getter_ refresh_timezone_info_ fail", KR(ret));
ret = OB_SUCCESS;
} else {
LOG_ERROR("timezone_info_getter_ refresh_timezone_info_ fail", KR(ret));
// double check in case of create_tenant_tz_info_ invoked multi-times
if (OB_FAIL(oblog_tz_info_map_.get_refactored(exec_tenant_id, tenant_tz_info))) {
if (OB_HASH_NOT_EXIST == ret) {
// 1. query the initial timezone_info_version
// 2. refresh timezone_info until successful
// 3. initialize tz_info_wrap_
const int64_t start_ts = get_timestamp();
if (OB_ISNULL(tenant_tz_info = static_cast<ObCDCTenantTimeZoneInfo*>(allocator_.alloc()))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_ERROR("oblog_tz_info is not valid", KR(ret), K(tenant_id), KP(tenant_tz_info));
} else if (OB_ISNULL(timezone_str_)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("timezone_str is null", KR(ret), K(tenant_id), K(timezone_str_));
} else {
new(tenant_tz_info) ObCDCTenantTimeZoneInfo();
if (OB_FAIL(tenant_tz_info->init(exec_tenant_id))) {
LOG_ERROR("tenant_tz_info init failed", KR(ret), K(tenant_id), K(exec_tenant_id), KPC(tenant_tz_info));
} else if (need_fetch_tz_info_online_()) {
if (OB_FAIL(query_timezone_info_version_(exec_tenant_id, tenant_tz_info->timezone_info_version_))) {
if (OB_ENTRY_NOT_EXIST == ret) {
// Not present, normal, tenant has not imported time zone table
LOG_TRACE("query_timezone_info_version_, timezone_info_version not exist", KR(ret), K(tenant_id), K(exec_tenant_id));
ret = OB_SUCCESS;
} else {
LOG_ERROR("query_timezone_info_version_ fail", KR(ret), K(tenant_tz_info));
}
} else if (OB_FAIL(refresh_tenant_timezone_info_until_succ_(exec_tenant_id, *tenant_tz_info))) {
if (OB_IN_STOP_STATE != ret) {
LOG_ERROR("refresh_tenant_timezone_info_util_succ fail", KR(ret), K(tenant_id));
}
}
} else {
if (OB_FAIL(refresh_tenant_timezone_info_from_local_file_(tenant_id, tenant_tz_info->tz_info_map_))) {
if (OB_IO_ERROR == ret) {
LOG_INFO("refresh_tenant_timezone_info_from_local_file_ tz_info may not exist "
"or tenant is not oracle mode, ignore.", KR(ret), K(tenant_id));
ret = OB_SUCCESS;
} else {
LOG_ERROR("refresh_tenant_timezone_info_from_local_file_ failed", KR(ret), K(tenant_id));
}
} else {
LOG_INFO("refresh_tenant_timezone_info_from_local_file_ success", K(tenant_id));
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(tenant_tz_info->set_time_zone(ObString(timezone_str_)))) {
LOG_ERROR("set tenant_tz_info failed", KR(ret), K(tenant_id), KPC(tenant_tz_info));
// regist into tz_info_map even if OB_ENTRY_NOT_EXIST(tenant doesn't have timezone info)
} else if (OB_FAIL(oblog_tz_info_map_.set_refactored(exec_tenant_id, tenant_tz_info))) {
LOG_ERROR("insert obcdc_tenant_tz_info into tz_info_map failed", KR(ret), K(tenant_id));
} else {
const int64_t cost_time_usec = get_timestamp() - start_ts;
LOG_INFO("create tenant timezone info for obcdc success", KR(ret), K(tenant_id), K(exec_tenant_id), K(cost_time_usec));
}
}
if (OB_FAIL(ret) && OB_NOT_NULL(tenant_tz_info)) {
allocator_.free(tenant_tz_info);
tenant_tz_info = nullptr;
}
}
} else {
LOG_INFO("timezone_info_getter_ refresh_timezone_info_ succ");
}
if (OB_SUCC(ret) && OB_ISNULL(tenant_tz_info)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("tenant_tz_info should no be null", KR(ret), K(tenant_id));
}
return ret;
}
int ObLogTimeZoneInfoGetter::refresh_timezone_info_()
{
int ret = OB_SUCCESS;
// Requires locking to prevent multi-threaded access: formatter and ObLogTimeZoneInfoGetter query threads themselves
ObSpinLockGuard guard(lock_);
const bool fetch_timezone_info_by_tennat = need_fetch_timezone_info_by_tennat_();
if (! fetch_timezone_info_by_tennat) {
// Global use of a time zone table
if (OB_FAIL(refresh_tenant_timezone_info_(OB_SYS_TENANT_ID))) {
LOG_WARN("refresh_sys_tenant_timezone_info fail", KR(ret));
}
} else {
// refresh by tenant
if (OB_FAIL(refresh_all_tenant_timezone_info_())) {
LOG_WARN("fail to refresh all tenant timezone info", KR(ret));
}
}
return ret;
}
int ObLogTimeZoneInfoGetter::refresh_tenant_timezone_info_(const uint64_t tenant_id)
{
int ret = OB_SUCCESS;
if (! is_online_tz_info_available()) {
ret = OB_NOT_SUPPORTED;
LOG_ERROR("refresh tenant_timezone_info only avaliable when obcdc is using online schema", KR(ret));
} else {
ret = refresh_tenant_timezone_info_based_on_version_(tenant_id);
}
return ret;
}
int ObLogTimeZoneInfoGetter::refresh_tenant_timezone_info_based_on_version_(const uint64_t tenant_id)
int ObCDCTimeZoneInfoGetter::refresh_tenant_timezone_info_based_on_version_(
const uint64_t tenant_id,
ObCDCTenantTimeZoneInfo &oblog_tz_info)
{
int ret = OB_SUCCESS;
int64_t tz_info_version = OB_INVALID_TIMESTAMP;
ObLogTenantGuard guard;
ObLogTenant *tenant = NULL;
if (OB_UNLIKELY(! inited_)) {
if (! inited_) {
ret = OB_NOT_INIT;
LOG_ERROR("timezone_info_getter_ not inited", KR(ret), K_(inited));
} else if (OB_ISNULL(tenant_mgr_)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("tenant_mgr_ is NULL", KR(ret));
LOG_ERROR("oblog_timezone_info_getter not inited");
} else if (OB_FAIL(query_timezone_info_version_(tenant_id, tz_info_version))) {
if (OB_ENTRY_NOT_EXIST == ret) {
// Not present, normal, tenant has not imported time zone table
LOG_INFO("query_timezone_info_version_, timezone_info_version not exist", K(tenant_id));
LOG_TRACE("query_timezone_info_version_, timezone_info_version not exist", K(tenant_id));
ret = OB_SUCCESS;
} else {
LOG_WARN("query_timezone_info_version_ fail", KR(ret), K(tz_info_version));
}
} else if (OB_FAIL(tenant_mgr_->get_tenant_guard(tenant_id, guard))) {
if (OB_ENTRY_NOT_EXIST == ret) {
// No need to deal with tenant non-existence, deletion
LOG_INFO("tenant not exist, do nothing", K(tenant_id));
ret = OB_SUCCESS;
} else {
LOG_ERROR("get tenant fail", KR(ret), K(tenant_id));
}
} else if (OB_ISNULL(tenant = guard.get_tenant())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("invalid tenant", KR(ret), K(tenant_id), K(tenant));
} else if (tz_info_version <= tenant->get_timezone_info_version()) {
} else if (! oblog_tz_info.need_update_tz_info(tz_info_version)) {
// do nothing
LOG_INFO("timezone_info_version is unchanged, don't need to update timezone info", K(tenant_id),
"current_tz_info_version", tenant->get_timezone_info_version(), K(tz_info_version));
LOG_INFO("timezone_info_version is unchanged, don't need to update timezone info", K(oblog_tz_info));
} else {
// Version change, active refresh
if (OB_FAIL(fetch_tenant_timezone_info_(tenant_id, tenant->get_tz_info_map()))) {
LOG_ERROR("fetch_tenant_timezone_info_ fail", KR(ret), K(tenant_id));
if (OB_FAIL(refresh_tenant_timezone_info_map_(tenant_id, oblog_tz_info))) {
LOG_ERROR("refresh_tenant_timezone_info_map_ fail", KR(ret), K(oblog_tz_info));
} else {
// update version
tenant->update_timezone_info_version(tz_info_version);
oblog_tz_info.update_timezone_info_version(tz_info_version);
LOG_TRACE("update tenant timezone info for obcdc success", KR(ret), K(tenant_id), K(tz_info_version));
}
}
return ret;
}
int ObLogTimeZoneInfoGetter::refresh_tenant_timezone_info_from_local_file_(
int ObCDCTimeZoneInfoGetter::refresh_tenant_timezone_info_map_(
const uint64_t tenant_id,
common::ObTZInfoMap &tz_info_map)
ObCDCTenantTimeZoneInfo &tenant_tz_info)
{
int ret = OB_SUCCESS;
ObDictTenantInfoGuard dict_tenant_info_guard;
ObDictTenantInfo *tenant_info = nullptr;
ObTZInfoMap &tz_info_map = tenant_tz_info.tz_info_map_;
if (is_online_tz_info_available()) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("only effect in data_dict mode)", KR(ret));
} else if (OB_FAIL(GLOGMETADATASERVICE.get_tenant_info_guard(
tenant_id,
dict_tenant_info_guard))) {
if (OB_ENTRY_NOT_EXIST != ret) {
LOG_ERROR("get_tenant_info_guard failed", KR(ret), K(tenant_id));
} else {
LOG_INFO("get_tenant_info_guard failed cause tenant_meta not exist, ignore.", KR(ret), K(tenant_id));
}
} else if (OB_ISNULL(tenant_info = dict_tenant_info_guard.get_tenant_info())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("tenant_info is nullptr", KR(ret), K(tenant_id));
} else if (common::ObCompatibilityMode::MYSQL_MODE == tenant_info->get_compatibility_mode()) {
// ignore if mysql mode
} else if (OB_FAIL(import_timezone_info_(tz_info_map))) {
LOG_ERROR("import_timezone_info failed", KR(ret), K(tenant_id));
}
return ret;
}
int ObLogTimeZoneInfoGetter::fetch_tenant_timezone_info_(
const uint64_t tenant_id,
common::ObTZInfoMap *tz_info_map)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(tz_info_map)) {
LOG_WARN("get tenant timezone info map fail", KR(ret), K(tenant_id), K(tz_info_map));
ret = OB_ERR_UNEXPECTED;
} else {
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
sqlclient::ObMySQLResult *result = nullptr;
if (OB_ISNULL(mysql_proxy_)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("mysql_proxy_ is null", KR(ret), K(mysql_proxy_));
} else if (! need_fetch_timezone_info_by_tennat_()) {
if (OB_FAIL(mysql_proxy_->read(res, ObTimeZoneInfoManager::FETCH_TZ_INFO_SQL))) {
LOG_WARN("fail to execute sql", KR(ret));
ret = OB_NEED_RETRY;
}
} else {
if (OB_FAIL(mysql_proxy_->read(res, tenant_id, ObTimeZoneInfoManager::FETCH_TENANT_TZ_INFO_SQL))) {
LOG_WARN("fail to execute sql", KR(ret));
ret = OB_NEED_RETRY;
}
}
if (OB_FAIL(ret)) {
} else if (OB_ISNULL(result = res.get_result())) {
LOG_WARN("fail to get result", K(result));
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
sqlclient::ObMySQLResult *result = nullptr;
if (OB_ISNULL(mysql_proxy_)) {
LOG_ERROR("mysql_proxy_ is null", K(mysql_proxy_));
ret = OB_ERR_UNEXPECTED;
} else if (! need_fetch_timezone_info_by_tennat_()) {
if (OB_FAIL(mysql_proxy_->read(res, ObTimeZoneInfoManager::FETCH_TZ_INFO_SQL))) {
LOG_WARN("fail to execute sql", KR(ret));
ret = OB_NEED_RETRY;
} else if (OB_FAIL(ObTimeZoneInfoManager::fill_tz_info_map(*result, *tz_info_map))) {
LOG_ERROR("fill_tz_info_map fail", KR(ret), K(tenant_id));
} else if (OB_FAIL(export_timezone_info_(*tz_info_map))) {
LOG_ERROR("export_timezone_info failed", KR(ret), K(tenant_id));
}
} else if (OB_FAIL(mysql_proxy_->read(res, tenant_id, ObTimeZoneInfoManager::FETCH_TENANT_TZ_INFO_SQL))) {
LOG_WARN("fail to execute sql", KR(ret));
ret = OB_NEED_RETRY;
}
if (OB_FAIL(ret)) {
} else if (OB_ISNULL(result = res.get_result())) {
LOG_WARN("fail to get result", K(result));
ret = OB_NEED_RETRY;
} else if (OB_FAIL(ObTimeZoneInfoManager::fill_tz_info_map(*result, tz_info_map))) {
LOG_ERROR("fill_tz_info_map fail", KR(ret), K(tenant_id));
} else if (OB_FAIL(export_timezone_info_(tz_info_map))) {
LOG_ERROR("export_timezone_info failed", KR(ret), K(tenant_id));
} else {
// success
}
}
@ -355,93 +536,30 @@ int ObLogTimeZoneInfoGetter::fetch_tenant_timezone_info_(
}
// 226 does a tenant split of the time zone table and needs to maintain a tz_info_map for each tenant
int ObLogTimeZoneInfoGetter::refresh_all_tenant_timezone_info_()
int ObCDCTimeZoneInfoGetter::refresh_all_tenant_timezone_info_()
{
int ret = OB_SUCCESS;
std::vector<uint64_t> all_tenant_ids;
if (OB_FAIL(tenant_mgr_->get_all_tenant_ids(all_tenant_ids))) {
LOG_WARN("fail to get all tenant ids", KR(ret));
} else if (OB_ISNULL(mysql_proxy_)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("mysql_proxy_ is null", KR(ret), K(mysql_proxy_));
if (OB_ISNULL(mysql_proxy_)) {
LOG_ERROR("mysql_proxy_ is null", K(mysql_proxy_));
} else {
for (int64_t idx = 0; OB_SUCC(ret) && idx < all_tenant_ids.size(); idx++) {
const uint64_t tenant_id = all_tenant_ids[idx];
// Requires locking to prevent multi-threaded access: formatter and ObCDCTimeZoneInfoGetter query threads themselves
SpinWLockGuard guard(lock_);
if (OB_FAIL(refresh_tenant_timezone_info_(tenant_id))) {
if (OB_ENTRY_NOT_EXIST != ret) {
LOG_WARN("refresh_tenant_timezone_info_ fail", KR(ret), K(tenant_id));
} else {
// tenant not exist, reset ret
ret = OB_SUCCESS;
}
for (ObLogTZInfoMap::iterator iter = oblog_tz_info_map_.begin(); iter != oblog_tz_info_map_.end(); iter++) {
const uint64_t tenant_id = iter->first;
ObCDCTenantTimeZoneInfo *tenant_tz_info = iter->second;
if (OB_FAIL(refresh_tenant_timezone_info_based_on_version_(tenant_id, *tenant_tz_info))) {
LOG_WARN("refresh_tenant_timezone_info_based_on_version_ failed", KR(ret), K(tenant_id));
}
} // for
}
return ret;
}
int ObLogTimeZoneInfoGetter::init_tz_info_wrap(
const uint64_t tenant_id,
int64_t &tz_info_version,
ObTZInfoMap &tz_info_map,
ObTimeZoneInfoWrap &tz_info_wrap)
{
int ret = OB_SUCCESS;
// 1. query the initial timezone_info_version
// 2. refresh timezone_info until successful
// 3. initialize tz_info_wrap_
tz_info_version = OB_INVALID_TIMESTAMP;
if (OB_ISNULL(timezone_str_)) {
LOG_ERROR("timezone_str is null", K(timezone_str_));
ret = OB_ERR_UNEXPECTED;
} else if (is_online_tz_info_available()) {
if (OB_FAIL(query_timezone_info_version_(tenant_id, tz_info_version))) {
if (OB_ENTRY_NOT_EXIST == ret) {
// Not present, normal, tenant has not imported time zone table
LOG_INFO("query_timezone_info_version_, timezone_info_version not exist", K(tenant_id));
ret = OB_SUCCESS;
} else {
LOG_ERROR("query_timezone_info_version_ fail", KR(ret), K(tenant_id), K(tz_info_version));
}
} else if (OB_FAIL(fetch_tenant_timezone_info_util_succ(tenant_id, &tz_info_map))) {
LOG_ERROR("fetch_tenant_timezone_info_util_succ fail", KR(ret), K(tenant_id));
} else {
// succ
}
} else {
if (OB_FAIL(refresh_tenant_timezone_info_from_local_file_(tenant_id, tz_info_map))) {
if (OB_IO_ERROR == ret) {
LOG_INFO("refresh_tenant_timezone_info_from_local_file_ tz_info may not exist "
"or tenant is not oracle mode, ignore.", KR(ret), K(tenant_id));
ret = OB_SUCCESS;
} else {
LOG_ERROR("refresh_tenant_timezone_info_from_local_file_ failed", KR(ret), K(tenant_id));
}
} else {
LOG_INFO("refresh_tenant_timezone_info_from_local_file_ success", K(tenant_id));
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(tz_info_wrap.init_time_zone(ObString(timezone_str_), tz_info_version, tz_info_map))) {
LOG_ERROR("tz_info_wrap init_time_zone fail", KR(ret), K(tenant_id), "timezone", timezone_str_,
K(tz_info_version), K(tz_info_wrap));
} else {
LOG_INFO("tz_info_wrap init_time_zone succ", K(tenant_id), "timezone", timezone_str_,
K(tz_info_version), K(tz_info_wrap));
}
}
return ret;
}
int ObLogTimeZoneInfoGetter::query_timezone_info_version_(
int ObCDCTimeZoneInfoGetter::query_timezone_info_version_(
const uint64_t tenant_id,
int64_t &timezone_info_version)
{
@ -452,11 +570,14 @@ int ObLogTimeZoneInfoGetter::query_timezone_info_version_(
LOG_ERROR("systable_helper_ is null", K(systable_helper_));
ret = OB_ERR_UNEXPECTED;
} else {
while (! done && OB_SUCC(ret)) {
while (! done && OB_SUCC(ret) && ! stop_flag_) {
if (OB_FAIL(systable_helper_->query_timezone_info_version(tenant_id, timezone_info_version))) {
if (OB_ENTRY_NOT_EXIST != ret) {
LOG_WARN("systable_helper_ query_timezone_info_version fail", KR(ret), K(tenant_id),
K(timezone_info_version));
} else {
LOG_TRACE("query_timezone_info_version failed, tenant may not has timezone info(mysql mode), \
or not exist(may be already dropped)", KR(ret), K(tenant_id));
}
} else if (OB_UNLIKELY(OB_INVALID_TIMESTAMP == timezone_info_version)) {
LOG_ERROR("timezone_info_version is not valid", K(tenant_id), K(timezone_info_version));
@ -475,56 +596,46 @@ int ObLogTimeZoneInfoGetter::query_timezone_info_version_(
return ret;
}
int ObLogTimeZoneInfoGetter::fetch_tenant_timezone_info_util_succ(
int ObCDCTimeZoneInfoGetter::refresh_tenant_timezone_info_until_succ_(
const uint64_t tenant_id,
ObTZInfoMap *tz_info_map)
ObCDCTenantTimeZoneInfo &tenant_tz_info)
{
int ret = OB_SUCCESS;
bool done = false;
if (! is_online_tz_info_available()) {
int64_t retry_cnt = 0;
if (! need_fetch_tz_info_online_()) {
ret = OB_NOT_SUPPORTED;
LOG_ERROR("not support update timezone_info cause online schema is not support in current mode", KR(ret),
"refresh_mode", TCTX.refresh_mode_,
"fetch_log_mode", TCTX.fetching_mode_);
}
while (! done && OB_SUCC(ret)) {
if (OB_FAIL(fetch_tenant_timezone_info_(tenant_id, tz_info_map))) {
LOG_WARN("fetch_tenant_timezone_info_ fail", KR(ret), K(tenant_id));
while (! done && OB_SUCC(ret) && ! stop_flag_) {
if (OB_FAIL(refresh_tenant_timezone_info_map_(tenant_id, tenant_tz_info))) {
LOG_WARN("refresh_tenant_timezone_info_map_ fail", KR(ret), K(tenant_id));
} else {
done = true;
}
if (OB_NEED_RETRY == ret) {
retry_cnt++;
if (retry_cnt % 1000) {
// LOG retry info every 10 sec.
LOG_WARN("retry to refresh tenant_timezone_info", KR(ret), K(tenant_id), K(retry_cnt));
}
ret = OB_SUCCESS;
ob_usleep(100L * 1000L);
ob_usleep(10L * 1000L); // retry interval 10 ms
}
}
return ret;
}
int ObLogTimeZoneInfoGetter::get_tenant_timezone_map(
const uint64_t tenant_id,
ObTZMapWrap &tz_mgr_wrap)
{
int ret = OB_SUCCESS;
IObLogTenantMgr *log_tenant_mgr = nullptr;
ObTZInfoMap *tz_info_map = nullptr;
if (OB_ISNULL(log_tenant_mgr = TCTX.tenant_mgr_)) {
ret = OB_NOT_INIT;
LOG_WARN("log tenant mgr not init", K(ret));
} else if (OB_FAIL(log_tenant_mgr->get_tenant_tz_map(tenant_id, tz_info_map))) {
LOG_WARN("log tenant mgr get tenant tz map failed", KR(ret), K(tenant_id));
} else {
tz_mgr_wrap.set_tz_map(tz_info_map);
if (stop_flag_) {
ret = OB_IN_STOP_STATE;
}
return ret;
}
int ObLogTimeZoneInfoGetter::export_timezone_info_(common::ObTZInfoMap &tz_info_map)
int ObCDCTimeZoneInfoGetter::export_timezone_info_(common::ObTZInfoMap &tz_info_map)
{
int ret = OB_SUCCESS;
char *buf = nullptr;
@ -572,7 +683,38 @@ int ObLogTimeZoneInfoGetter::export_timezone_info_(common::ObTZInfoMap &tz_info_
return ret;
}
int ObLogTimeZoneInfoGetter::import_timezone_info_(common::ObTZInfoMap &tz_info_map)
int ObCDCTimeZoneInfoGetter::refresh_tenant_timezone_info_from_local_file_(
const uint64_t tenant_id,
common::ObTZInfoMap &tz_info_map)
{
int ret = OB_SUCCESS;
ObDictTenantInfoGuard dict_tenant_info_guard;
ObDictTenantInfo *tenant_info = nullptr;
if (need_fetch_tz_info_online_()) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("only effect while online schema not available)", KR(ret));
} else if (OB_FAIL(GLOGMETADATASERVICE.get_tenant_info_guard(
tenant_id,
dict_tenant_info_guard))) {
if (OB_ENTRY_NOT_EXIST != ret) {
LOG_ERROR("get_tenant_info_guard failed", KR(ret), K(tenant_id));
} else {
LOG_INFO("get_tenant_info_guard failed cause tenant_meta not exist, ignore.", KR(ret), K(tenant_id));
}
} else if (OB_ISNULL(tenant_info = dict_tenant_info_guard.get_tenant_info())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("tenant_info is nullptr", KR(ret), K(tenant_id));
} else if (common::ObCompatibilityMode::MYSQL_MODE == tenant_info->get_compatibility_mode()) {
// ignore if mysql mode
} else if (OB_FAIL(import_timezone_info_(tz_info_map))) {
LOG_ERROR("import_timezone_info failed", KR(ret), K(tenant_id));
}
return ret;
}
int ObCDCTimeZoneInfoGetter::import_timezone_info_(common::ObTZInfoMap &tz_info_map)
{
int ret = OB_SUCCESS;
char *buf = nullptr;

View File

@ -17,9 +17,8 @@
#include "lib/mysqlclient/ob_mysql_proxy.h" // ObMySQLProxy
#include "lib/timezone/ob_timezone_info.h" // ObTZInfoMap
#include "lib/lock/ob_spin_lock.h" // ObSpinLock
#include "lib/lock/ob_spin_rwlock.h" // ObSpinRWLock
#include "common/ob_queue_thread.h" // ObCond
#include "ob_log_tenant_mgr.h" // ObLogTenantMgr
#include "ob_log_instance.h" //TCTX
namespace oceanbase
@ -30,52 +29,93 @@ class ObTenantTimezoneGuard;
}
namespace libobcdc
{
///////////////////////////////////// IObLogTimeZoneInfoGetter /////////////////////////////////
class IObLogTimeZoneInfoGetter
class ObCDCTenantTimeZoneInfo;
///////////////////////////////////// IObCDCTimeZoneInfoGetter /////////////////////////////////
class IObCDCTimeZoneInfoGetter
{
public:
virtual ~IObLogTimeZoneInfoGetter() {}
virtual ~IObCDCTimeZoneInfoGetter() {}
public:
virtual int start() = 0;
virtual void stop() = 0;
virtual void mark_stop_flag() = 0;
// Init by ObLogTenant initialisation
virtual int init_tz_info_wrap(
const uint64_t tenant_id,
int64_t &tz_info_map_version,
common::ObTZInfoMap &tz_info_map,
common::ObTimeZoneInfoWrap &tz_info_wrap) = 0;
// Init by ObLogTenant
virtual int init_tenant_tz_info(const uint64_t tenant_id) = 0;
/// Refresh timezone info until successful (try refreshing several times)
virtual int fetch_tenant_timezone_info_util_succ(
virtual int get_tenant_tz_info(
const uint64_t tenant_id,
common::ObTZInfoMap *tz_info_map) = 0;
ObCDCTenantTimeZoneInfo *&tenant_tz_info) = 0;
virtual void remove_tenant_tz_info(const uint64_t tenant_id) = 0;
/// Refresh timezone info until successful (try refreshing several times)
virtual int refresh_tenant_timezone_info_until_succ(const uint64_t tenant_id) = 0;
};
///////////////////////////////////// ObLogTimeZoneInfoGetter /////////////////////////////////
///////////////////////////////////// ObCDCTimeZoneInfoGetter /////////////////////////////////
// OP(init/refresh/get) of ObCDCTenantTimeZoneInfo should be thread safe by invoker.
struct ObCDCTenantTimeZoneInfo
{
ObCDCTenantTimeZoneInfo() :
is_inited_(false),
tenant_id_(OB_INVALID_TENANT_ID),
timezone_info_version_(OB_INVALID_VERSION),
tz_info_wrap_(),
tz_info_map_() {}
~ObCDCTenantTimeZoneInfo() { destroy(); }
int init(const uint64_t tenant_id);
void destroy();
int set_time_zone(const ObString &timezone_str);
OB_INLINE bool is_inited() const { return is_inited_; }
OB_INLINE bool is_valid() const
{ return is_inited_ && timezone_info_version_ > OB_INVALID_VERSION; }
OB_INLINE bool need_update_tz_info(int64_t target_tz_info_version) const
{ return target_tz_info_version > timezone_info_version_; }
void update_timezone_info_version(const int64_t timezone_info_version)
{ ATOMIC_STORE(&timezone_info_version_, timezone_info_version); }
OB_INLINE const common::ObTimeZoneInfoWrap &get_tz_wrap() const { return tz_info_wrap_; }
OB_INLINE const common::ObTZInfoMap &get_tz_map() const { return tz_info_map_; }
OB_INLINE const common::ObTimeZoneInfo *get_timezone_info() const
{ return tz_info_wrap_.get_time_zone_info(); }
TO_STRING_KV(K_(tenant_id), K_(is_inited), K_(timezone_info_version), K_(tz_info_wrap));
bool is_inited_;
uint64_t tenant_id_;
// 2_2_6 branch start: Oracle time zone related data types: internal table dependency split to tenant
// 1. If the low version of OB upgrades to 226, if the low version imports a time zone table, then the post script will split the time zone related table under the tenant
// 2. If the low version does not import the time zone table, do nothing
int64_t timezone_info_version_;
common::ObTimeZoneInfoWrap tz_info_wrap_;
common::ObTZInfoMap tz_info_map_;
};
// for init interface OTTZ_MGR.tenant_tz_map_getter_
// return an refreshed tz_map_wrap
int get_tenant_tz_map_function(
const uint64_t tenant_id,
common::ObTZMapWrap &tz_map_wrap);
class IObLogErrHandler;
class IObLogSysTableHelper;
class IObLogTenantMgr;
class ObLogTimeZoneInfoGetter : public IObLogTimeZoneInfoGetter
typedef common::hash::ObHashMap<uint64_t, ObCDCTenantTimeZoneInfo*> ObLogTZInfoMap;
class ObCDCTimeZoneInfoGetter : public IObCDCTimeZoneInfoGetter
{
static const int64_t SLEEP_TIME_ON_SCHEMA_FAIL = 500 * 1000;
static const int64_t QUERY_TIMEZONE_INFO_VERSION_INTERVAL = 100 * 1000 * 1000;
public:
ObLogTimeZoneInfoGetter();
virtual ~ObLogTimeZoneInfoGetter();
virtual ~ObCDCTimeZoneInfoGetter();
static ObCDCTimeZoneInfoGetter &get_instance();
public:
int init(
const char *timezone_str,
common::ObMySQLProxy &mysql_proxy,
IObLogSysTableHelper &systable_helper,
IObLogTenantMgr &tenant_mgr,
IObLogErrHandler &err_handler);
void destroy();
@ -85,41 +125,47 @@ public:
virtual void mark_stop_flag() { ATOMIC_STORE(&stop_flag_, true); }
// Init by ObLogTenant initialisation
virtual int init_tz_info_wrap(
const uint64_t tenant_id,
int64_t &tz_info_map_version,
common::ObTZInfoMap &tz_info_map,
common::ObTimeZoneInfoWrap &tz_info_wrap);
virtual int init_tenant_tz_info(const uint64_t tenant_id) override;
virtual int fetch_tenant_timezone_info_util_succ(
virtual int get_tenant_tz_info(
const uint64_t tenant_id,
common::ObTZInfoMap *tz_info_map);
ObCDCTenantTimeZoneInfo *&tenant_tz_info) override;
// for init interface OTTZ_MGR.tenant_tz_map_getter_
static int get_tenant_timezone_map(
const uint64_t tenant_id,
virtual void remove_tenant_tz_info(const uint64_t tenant_id);
virtual int refresh_tenant_timezone_info_until_succ(const uint64_t tenant_id) override;
// for OTTZ_MGR, may accquire tenant not in obcdc whitelist
int get_tenant_timezone_map(const uint64_t tenant_id,
common::ObTZMapWrap &tz_map_wrap);
void revert_tenant_tz_info_(ObCDCTenantTimeZoneInfo *tenant_tz_info);
private:
ObCDCTimeZoneInfoGetter();
static void *tz_thread_func_(void *args);
void tz_routine();
// 1. local maintenance of timezone info version
// 2. Periodically query all_zone table - time_zone_info_version:
// update timezone info when changes occur
// otherwise not updated (updating timezone info involves multiple table joins)
int query_timezone_info_version_and_update_();
OB_INLINE bool need_fetch_timezone_info_by_tennat_() const
{ return GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_2260; }
OB_INLINE uint64_t get_exec_tenant_id(const uint64_t tenant_id) const
{ return need_fetch_timezone_info_by_tennat_() ? tenant_id : OB_SYS_TENANT_ID; }
bool need_fetch_tz_info_online_() const;
OB_INLINE bool need_fetch_timezone_info_by_tennat_() const { return true; }
int create_tenant_tz_info_(
const uint64_t tenant_id,
ObCDCTenantTimeZoneInfo *&tenant_tz_info);
int refresh_all_tenant_timezone_info_();
int refresh_tenant_timezone_info_until_succ_(const uint64_t tenant_id, ObCDCTenantTimeZoneInfo &tenant_tz_info);
int refresh_tenant_timezone_info_(const uint64_t tenant_id);
// 1. Check the version first, if the version has not changed, then do not refresh
// 2. Refresh only if the version has changed
//
// @retval OB_SUCCESS Success
// @retval OB_ENTRY_NOT_EXIST tenant not exist
// @retval other_error_code Fail
int refresh_tenant_timezone_info_based_on_version_(const uint64_t tenant_id);
int refresh_tenant_timezone_info_based_on_version_(const uint64_t tenant_id, ObCDCTenantTimeZoneInfo &oblog_tz_info);
// refresh tenant timezone_info from local timezone.info file.
int refresh_tenant_timezone_info_from_local_file_(
@ -132,24 +178,20 @@ private:
// @retval OB_SUCCESS Success
// @retval OB_ENTRY_NOT_EXIST tenant not exist
// @retval other_error_code Fail
int query_timezone_info_version_(
const uint64_t tenant_id,
int query_timezone_info_version_(const uint64_t tenant_id,
int64_t &timezone_info_version);
// refresh timezone info
int refresh_timezone_info_();
// fetch timezone info by SQL
int fetch_tenant_timezone_info_(const uint64_t tenant_id, common::ObTZInfoMap *tz_info_map);
int refresh_all_tenant_timezone_info_();
int refresh_tenant_timezone_info_map_(const uint64_t tenant_id, ObCDCTenantTimeZoneInfo &tenant_tz_info);
// export timezone_info_map and demp to local file.
int export_timezone_info_(common::ObTZInfoMap &tz_info_map);
// import timezone_info from local file and convert to ObTZInfoMap
int import_timezone_info_(common::ObTZInfoMap &tz_info_map);
int load_tzinfo_from_file_(char *buf, const int64_t buf_len);
// currently refresh tz_info while using online_refresh_mode(data_dict won't refresh tz_info even
// if in intergate_mode)
bool is_online_tz_info_available() const { return is_online_refresh_mode(TCTX.refresh_mode_); };
private:
static const int64_t TENANT_TZ_INFO_VALUE_SIZE = sizeof(ObCDCTenantTimeZoneInfo);
static const int NWAY = 4;
static const int MAP_BUCKET_NUM = 4;
private:
bool inited_;
pthread_t tz_tid_;
@ -161,12 +203,13 @@ private:
IObLogSysTableHelper *systable_helper_;
IObLogErrHandler *err_handler_;
common::ObSpinLock lock_;
IObLogTenantMgr *tenant_mgr_;
common::SpinRWLock lock_;
// save for init tz_info_wrap
const char *timezone_str_;
ObLogTZInfoMap oblog_tz_info_map_;
ObSliceAlloc allocator_;
private:
DISALLOW_COPY_AND_ASSIGN(ObLogTimeZoneInfoGetter);
DISALLOW_COPY_AND_ASSIGN(ObCDCTimeZoneInfoGetter);
};
} // namespace libobcdc
} // namespace oceanbase

View File

@ -16,6 +16,8 @@
#include "ob_log_trans_dispatch_ctx.h"
#include "ob_log_config.h" // TCONF
#include "ob_log_instance.h" // TCTX
#include "ob_log_trans_msg_sorter.h" // IObLogTransMsgSorter
namespace oceanbase
{
@ -112,14 +114,18 @@ void TransDispatchCtx::set_normal_priority_budget_(const int64_t &average_budget
for(int64_t i = 0; i < normal_priority_part_budget_arr_.count(); i++) {
PartTransDispatchBudget &budget = normal_priority_part_budget_arr_[i];
PartTransTask *part_trans_task = budget.part_trans_task_;
const static int64_t PRINT_STAT_INTERVAL = 10 * _SEC_;
IObLogTransMsgSorter *msg_sorter = TCTX.trans_msg_sorter_;
if (average_budget <= 0
&& OB_NOT_NULL(part_trans_task)
&& OB_NOT_NULL(msg_sorter)
&& (part_trans_task->get_trans_id() == msg_sorter->get_cur_sort_trans_id()) // wait last trans handled in sorter
&& part_trans_task->is_dispatched_redo_be_sorted()) {
const int64_t extra_redo_dispatch_size = TCONF.extra_redo_dispatch_memory_size;
if (REACH_TIME_INTERVAL(1 * _SEC_)) {
LOG_INFO("[NOTICE][REDO_DISPATCH] budget usedup but dispatched_redo all sorted, use extra_redo budget",
if (REACH_TIME_INTERVAL(PRINT_STAT_INTERVAL)) {
LOG_INFO("[NOTICE][REDO_DISPATCH][DATA_SKEW] budget usedup but dispatched_redo all sorted, use extra_redo budget",
K(budget),
K(average_budget),
"extra_redo_dispatch_size", SIZE_TO_STR(extra_redo_dispatch_size),

View File

@ -222,10 +222,10 @@ public:
const ObLink *get_row_tail() const { return row_tail_; }
void set_row_tail(ObLink *row_tail) { row_tail_ = row_tail; }
void inc_valid_row_num() { ++valid_row_num_; }
void set_valid_row_num(const int64_t row_num) { valid_row_num_ = row_num; }
int64_t get_valid_row_num() const { return valid_row_num_; }
bool is_contain_valid_row() const { return 0 != valid_row_num_; }
void inc_valid_row_num() { ATOMIC_INC(&valid_row_num_); }
void set_valid_row_num(const int64_t row_num) { ATOMIC_SET(&valid_row_num_, row_num); }
int64_t get_valid_row_num() const { return ATOMIC_LOAD(&valid_row_num_); }
bool is_contain_valid_row() const { return get_valid_row_num() > 0; }
// Retrieve the last digit of reserve_field_
bool is_stored() const { return reserve_field_ & 0x01; }

View File

@ -60,6 +60,7 @@ ObLogTransMsgSorter::ObLogTransMsgSorter() :
total_task_count_(0),
trans_stat_mgr_(NULL),
enable_sort_by_seq_no_(false),
cur_sort_trans_id_(),
err_handler_(NULL)
{
br_sort_func_ = NULL;
@ -116,6 +117,7 @@ void ObLogTransMsgSorter::destroy()
task_limit_ = 0;
total_task_count_ = 0;
trans_stat_mgr_ = NULL;
cur_sort_trans_id_.reset();
err_handler_ = NULL;
br_sort_func_ = NULL;
LOG_INFO("TransMsgSorter destroy succ");
@ -222,6 +224,7 @@ int ObLogTransMsgSorter::sort_br_by_part_order_(TransCtx &trans)
int ret = OB_SUCCESS;
LOG_DEBUG("br sorter handle trans begin", K(trans));
PartTransTask *part_trans_task = trans.get_participant_objs();
cur_sort_trans_id_ = trans.get_trans_id();
if (OB_ISNULL(part_trans_task)) {
ret = OB_ERR_UNEXPECTED;
@ -250,6 +253,7 @@ int ObLogTransMsgSorter::sort_br_by_seq_no_(TransCtx &trans)
int ret = OB_SUCCESS;
LOG_DEBUG("br sorter handle trans begin", K(trans));
DmlStmtTask *dml_stmt_task = NULL;
cur_sort_trans_id_ = trans.get_trans_id();
// 1. build a min-top heap
std::priority_queue<DmlStmtTask*, std::vector<DmlStmtTask*>, StmtSequerenceCompFunc> heap;
PartTransTask *part_trans_task = trans.get_participant_objs();

View File

@ -38,6 +38,7 @@ public:
virtual int submit(TransCtx *trans) = 0;
virtual void mark_stop_flag() = 0;
virtual int get_task_count(int64_t &task_count) = 0;
virtual const transaction::ObTransID &get_cur_sort_trans_id() const = 0;
};
/**
@ -69,6 +70,8 @@ public:
IObLogTransStatMgr &trans_stat_mgr,
IObLogErrHandler *err_handler);
void destroy();
OB_INLINE const transaction::ObTransID &get_cur_sort_trans_id() const
{ return cur_sort_trans_id_; }
private:
struct StmtSequerenceCompFunc
@ -106,6 +109,7 @@ private:
int64_t total_task_count_;
IObLogTransStatMgr *trans_stat_mgr_;
bool enable_sort_by_seq_no_;
transaction::ObTransID cur_sort_trans_id_;
IObLogErrHandler *err_handler_;
private:

View File

@ -98,20 +98,21 @@ void ObLSWorker::destroy()
{
stop();
inited_ = false;
stream_paused_ = false;
fetcher_resume_time_ = OB_INVALID_TIMESTAMP;
StreamWorkerThread::destroy();
if (inited_) {
LOG_INFO("destroy stream worker begin");
inited_ = false;
stream_paused_ = false;
fetcher_resume_time_ = OB_INVALID_TIMESTAMP;
StreamWorkerThread::destroy();
timer_.destroy();
fetcher_host_ = nullptr;
idle_pool_ = NULL;
dead_pool_ = NULL;
err_handler_ = NULL;
stream_task_seq_ = 0;
timer_.destroy();
fetcher_host_ = nullptr;
idle_pool_ = NULL;
dead_pool_ = NULL;
err_handler_ = NULL;
stream_task_seq_ = 0;
LOG_INFO("destroy stream worker succ");
LOG_INFO("destroy stream worker succ");
}
}
int ObLSWorker::start()
@ -133,6 +134,9 @@ int ObLSWorker::start()
void ObLSWorker::stop()
{
if (OB_LIKELY(inited_)) {
LOG_INFO("stop stream worker begin");
mark_stop_flag();
timer_.stop();
StreamWorkerThread::stop();
LOG_INFO("stop stream worker succ");
}
@ -140,8 +144,10 @@ void ObLSWorker::stop()
void ObLSWorker::mark_stop_flag()
{
LOG_INFO("stream worker mark_stop_flag begin");
timer_.mark_stop_flag();
StreamWorkerThread::mark_stop_flag();
LOG_INFO("stream worker mark_stop_flag end");
}
void ObLSWorker::pause()

View File

@ -83,7 +83,7 @@ public:
void destroy();
int start();
void stop();
void mark_stop_flag() { ATOMIC_STORE(&stop_flag_, false); }
void mark_stop_flag() { ATOMIC_STORE(&stop_flag_, true); }
bool is_stoped() const { return ATOMIC_LOAD(&stop_flag_); }
int64_t get_thread_num() const { return thread_num_; }
@ -340,6 +340,9 @@ int ObMapQueueThread<MAX_THREAD_NUM>::push(void *data, const uint64_t hash_val)
if (OB_UNLIKELY(! inited_)) {
LIB_LOG(ERROR, "not init");
ret = OB_NOT_INIT;
} else if (OB_UNLIKELY(is_stoped())) {
ret = OB_IN_STOP_STATE;
LIB_LOG(INFO, "thread pool is not running", KR(ret), K_(stop_flag));
} else if (OB_ISNULL(data)) {
LIB_LOG(ERROR, "invalid argument", K(data));
ret = OB_INVALID_ARGUMENT;

View File

@ -52,7 +52,7 @@ ObObj2strHelper::~ObObj2strHelper()
destroy();
}
int ObObj2strHelper::init(IObLogTimeZoneInfoGetter &timezone_info_getter,
int ObObj2strHelper::init(IObCDCTimeZoneInfoGetter &timezone_info_getter,
ObLogHbaseUtil &hbase_util,
const bool enable_hbase_mode,
const bool enable_convert_timestamp_to_unix_timestamp,
@ -224,14 +224,19 @@ int ObObj2strHelper::obj2str(const uint64_t tenant_id,
if (OB_SUCC(ret)) {
common::ObObj str_obj;
common::ObObjType target_type = common::ObMaxType;
ObCDCTenantTimeZoneInfo *obcdc_tenant_tz_info = nullptr;
// OBCDC need use_standard_format
const common::ObTimeZoneInfo *tz_info = nullptr;
if (OB_ISNULL(tz_info_wrap)) {
if (OB_ISNULL(timezone_info_getter_)) {
ret = OB_ERR_UNEXPECTED;
OBLOG_LOG(ERROR, "tz_info_wrap is null", KR(ret), K(tenant_id));
OBLOG_LOG(ERROR, "timezone_info_getter_ is null", K(timezone_info_getter_));
} else if (OB_FAIL(timezone_info_getter_->get_tenant_tz_info(tenant_id, obcdc_tenant_tz_info))) {
OBLOG_LOG(ERROR, "get_tenant_tz_wrap failed", KR(ret), K(tenant_id));
} else if (OB_ISNULL(obcdc_tenant_tz_info)) {
ret = OB_ERR_UNEXPECTED;
OBLOG_LOG(ERROR, "tenant_tz_info not valid", KR(ret), K(tenant_id));
} else {
tz_info = tz_info_wrap->get_time_zone_info();
// obcdc need use_standard_format
const common::ObTimeZoneInfo *tz_info = obcdc_tenant_tz_info->get_timezone_info();
const ObDataTypeCastParams dtc_params(tz_info);
ObObjCastParams cast_param(&allocator, &dtc_params, CM_NONE, collation_type);
cast_param.format_number_with_limit_ = false;//here need no limit format number for libobcdc
@ -318,16 +323,13 @@ int ObObj2strHelper::convert_timestamp_with_timezone_data_util_succ_(const commo
{
int ret = OB_SUCCESS;
bool done = false;
ObTZInfoMap *tz_info_map = NULL;
if (OB_ISNULL(timezone_info_getter_)) {
OBLOG_LOG(ERROR, "timezone_info_getter_ is null", K(timezone_info_getter_));
ret = OB_ERR_UNEXPECTED;
} else if (OB_FAIL(tenant_mgr_->get_tenant_tz_map(tenant_id, tz_info_map))) {
OBLOG_LOG(ERROR, "get_tenant_tz_map failed", KR(ret), K(tenant_id));
OBLOG_LOG(ERROR, "timezone_info_getter_ is null", KR(ret), K(timezone_info_getter_));
} else {
while (! done && OB_SUCCESS == ret) {
if (OB_FAIL(timezone_info_getter_->fetch_tenant_timezone_info_util_succ(tenant_id, tz_info_map))) {
while (! done && OB_SUCC(ret)) {
if (OB_FAIL(timezone_info_getter_->refresh_tenant_timezone_info_until_succ(tenant_id))) {
OBLOG_LOG(ERROR, "fetch_tenant_timezone_info_util_succ fail", KR(ret), K(tenant_id));
} else if (OB_FAIL(ObObjCaster::to_type(target_type, cast_param, in_obj, str_obj))) {
if (OB_ERR_INVALID_TIMEZONE_REGION_ID == ret) {

View File

@ -33,7 +33,7 @@ class ObTimeZoneInfo;
namespace libobcdc
{
class IObLogTimeZoneInfoGetter;
class IObCDCTimeZoneInfoGetter;
class ObObj2strHelper
{
public:
@ -62,7 +62,7 @@ public:
const ObTimeZoneInfoWrap *tz_info_wrap) const;
public:
int init(IObLogTimeZoneInfoGetter &timezone_info_getter,
int init(IObCDCTimeZoneInfoGetter &timezone_info_getter,
ObLogHbaseUtil &hbase_util,
const bool enable_hbase_mode,
const bool enable_convert_timestamp_to_unix_timestamp,
@ -127,7 +127,7 @@ private:
private:
bool inited_;
IObLogTimeZoneInfoGetter *timezone_info_getter_;
IObCDCTimeZoneInfoGetter *timezone_info_getter_;
ObLogHbaseUtil *hbase_util_;
bool enable_hbase_mode_;
bool enable_convert_timestamp_to_unix_timestamp_;

View File

@ -60,7 +60,7 @@ ObLogMain::ObLogMain() : inited_(false),
start_timestamp_usec_(0),
tenant_id_(OB_INVALID_TENANT_ID),
tg_match_pattern_(NULL),
last_heartbeat_timestamp_micro_sec_(0),
last_heartbeat_timestamp_usec_(OB_INVALID_VERSION),
stop_flag_(true)
{
}
@ -89,7 +89,7 @@ int ObLogMain::init(int argc, char **argv)
} else {
stop_flag_ = true;
inited_ = true;
last_heartbeat_timestamp_micro_sec_ = start_timestamp_usec_;
last_heartbeat_timestamp_usec_ = OB_INVALID_VERSION;
}
return ret;
@ -116,7 +116,7 @@ void ObLogMain::destroy()
start_timestamp_usec_ = 0;
tenant_id_ = OB_INVALID_TENANT_ID;
tg_match_pattern_ = NULL;
last_heartbeat_timestamp_micro_sec_ = 0;
last_heartbeat_timestamp_usec_ = OB_INVALID_VERSION;
stop_flag_ = true;
output_br_detail_ = false;
output_br_special_detail_ = false;
@ -438,55 +438,63 @@ int ObLogMain::verify_record_info_(IBinlogRecord *br)
LOG_ERROR("get user data fail", K(br), K(oblog_br));
ret = OB_INVALID_ARGUMENT;
} else {
// heartbeat, updtae last_heartbeat_timestamp_micro_sec_
if (HEARTBEAT == br->recordType()) {
int64_t timestamp_usec = OB_INVALID_TIMESTAMP;
if (is_first_br) {
// oblog_tailf -f $CONFIG -t 0 means start at current time
// The libobcdc start timestamp is not available
// So the first BinlogRecord is obtained based on the checkpoint
timestamp_usec = br->getCheckpoint1() * 1000000 + br->getCheckpoint2();
is_first_br = false;
} else {
timestamp_usec = br->getTimestamp() * 1000000 + br->getRecordUsec();
// heartbeat, updtae last_heartbeat_timestamp_usec_
int64_t checkpoint_timestamp_usec = OB_INVALID_TIMESTAMP;
if (is_first_br) {
// oblog_tailf -f $CONFIG -t 0 means start at current time
// The libobcdc start timestamp is not available
// So the first BinlogRecord is obtained based on the checkpoint
checkpoint_timestamp_usec = br->getCheckpoint1() * 1000000 + br->getCheckpoint2();
is_first_br = false;
} else if (HEARTBEAT == br->recordType()) {
checkpoint_timestamp_usec = br->getTimestamp() * 1000000 + br->getRecordUsec();
if (checkpoint_timestamp_usec < last_heartbeat_timestamp_usec_) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("checkpoint rollbacked", KR(ret), K(checkpoint_timestamp_usec), K_(last_heartbeat_timestamp_usec));
}
last_heartbeat_timestamp_micro_sec_ = std::max(timestamp_usec, last_heartbeat_timestamp_micro_sec_);
}
// Calibration timestamp and checkpoint
int64_t precise_timestamp = ObBinlogRecordPrinter::get_precise_timestamp(*br);
int64_t timestamp_sec = precise_timestamp / 1000000;
int64_t timestamp_usec = precise_timestamp % 1000000;
int64_t expect_checkpoint1 = last_heartbeat_timestamp_micro_sec_ / 1000000;
int64_t expect_checkpoint2 = last_heartbeat_timestamp_micro_sec_ % 1000000;
if (OB_SUCC(ret)) {
last_heartbeat_timestamp_usec_ = std::max(checkpoint_timestamp_usec, last_heartbeat_timestamp_usec_);
if (OB_UNLIKELY(timestamp_sec != br->getTimestamp())
|| OB_UNLIKELY(timestamp_usec != br->getRecordUsec())) {
LOG_ERROR("timestamp is not right", K(precise_timestamp), "br_sec", br->getTimestamp(),
"br_usec", br->getRecordUsec());
ret = OB_ERR_UNEXPECTED;
} else if (OB_UNLIKELY(expect_checkpoint1 != br->getCheckpoint1())
|| OB_UNLIKELY(expect_checkpoint2 != br->getCheckpoint2())) {
LOG_ERROR("checkpoint is not right", K(br), K(last_heartbeat_timestamp_micro_sec_),
K(expect_checkpoint1), "br_checkpoint1", br->getCheckpoint1(),
K(expect_checkpoint2), "br_checkpoint2", br->getCheckpoint2(),
"getTimestamp", br->getTimestamp(), "getRecordUsec", br->getRecordUsec(),
K(is_first_br));
ret = OB_ERR_UNEXPECTED;
} else {
// succ
// Calibration timestamp and checkpoint
int64_t precise_timestamp = ObBinlogRecordPrinter::get_precise_timestamp(*br);
int64_t timestamp_sec = precise_timestamp / 1000000;
int64_t timestamp_usec = precise_timestamp % 1000000;
int64_t expect_checkpoint1 = last_heartbeat_timestamp_usec_ / 1000000;
int64_t expect_checkpoint2 = last_heartbeat_timestamp_usec_ % 1000000;
if (OB_UNLIKELY(timestamp_sec != br->getTimestamp())
|| OB_UNLIKELY(timestamp_usec != br->getRecordUsec())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("timestamp is not right", KR(ret), K(precise_timestamp), "br_sec", br->getTimestamp(),
"br_usec", br->getRecordUsec());
} else if (OB_UNLIKELY(expect_checkpoint1 != br->getCheckpoint1())
|| OB_UNLIKELY(expect_checkpoint2 != br->getCheckpoint2())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("checkpoint is not right", KR(ret), K(br), K(last_heartbeat_timestamp_usec_),
K(expect_checkpoint1), "br_checkpoint1", br->getCheckpoint1(),
K(expect_checkpoint2), "br_checkpoint2", br->getCheckpoint2(),
"getTimestamp", br->getTimestamp(), "getRecordUsec", br->getRecordUsec(),
K(is_first_br));
} else {
// succ
}
}
}
return ret;
}
// use tools/import_time_zone_info.py to import tools/timezone_V1.log into server and will dump
// timezone_info.conf in obcdc online schema mode
int ObLogMain::parse_timezone_info_(const char *tzinfo_fpath)
{
int ret = OB_SUCCESS;
char *buf = nullptr;
char *print_buf = nullptr;
const int64_t buf_len = 24 * _M_;
const int64_t buf_len = 128 * _M_;
int64_t pos = 0;
int64_t str_len = 0;
common::ObRequestTZInfoResult tz_info_res;
@ -503,13 +511,14 @@ int ObLogMain::parse_timezone_info_(const char *tzinfo_fpath)
} else if (OB_FAIL(tz_info_res.deserialize(buf, buf_len, pos))) {
LOG_ERROR("deserialize tz_info_res failed", KR(ret), K(buf_len), K(pos), KP(buf));
} else {
LOG_STD("prepare parse timezone_info\n");
const int64_t tz_info_cnt = tz_info_res.tz_array_.count();
common::databuff_printf(print_buf, buf_len, str_len, "package: %s\r\n", PACKAGE_STRING);
common::databuff_printf(print_buf, buf_len, str_len, "timezone_info_version: %ld\r\n", tz_info_res.last_version_);
for(int64_t i = 0; OB_SUCC(ret) && i < tz_info_cnt; i++) {
const common::ObTimeZoneInfoPos &pos = tz_info_res.tz_array_[i];
int64_t tmp_buf_len = 1 * _M_;
int64_t tmp_buf_len = 2 * _M_;
char tmp_buf[tmp_buf_len];
int64_t tmp_pos = pos.to_string(tmp_buf, tmp_buf_len);
tmp_buf[tmp_pos] = '\0';
@ -517,6 +526,8 @@ int ObLogMain::parse_timezone_info_(const char *tzinfo_fpath)
if (OB_FAIL(common::databuff_printf(print_buf, buf_len, str_len,
"timezone_info_pos[%ld/%ld]: %s\r\n",
i + 1, tz_info_cnt, tmp_buf))) {
LOG_STD("print timezone_info_pos failed, buf_len:%ld, str_len:%ld, tz_info_cnt:%ld/%ld, tmp_buf_len:%ld\n",
buf_len, str_len, i, tz_info_cnt, tmp_buf_len);
LOG_ERROR("print timezone_info_pos failed", KR(ret), K(buf_len), K(str_len), K(i), K(tz_info_cnt));
}
}
@ -536,6 +547,10 @@ int ObLogMain::parse_timezone_info_(const char *tzinfo_fpath)
}
}
if (OB_FAIL(ret)) {
LOG_STD("parse_timezone_info done, ret:%d", ret);
}
if (OB_NOT_NULL(print_buf)) {
ob_cdc_free(print_buf);
print_buf = nullptr;

View File

@ -85,7 +85,7 @@ private:
const char *tg_match_pattern_;
// Record heartbeat microsecond time stamps
int64_t last_heartbeat_timestamp_micro_sec_;
int64_t last_heartbeat_timestamp_usec_;
volatile bool stop_flag_ CACHE_ALIGNED;

File diff suppressed because one or more lines are too long

View File

@ -276,11 +276,17 @@ void ObLogStartLSNLocator::run(const int64_t thread_index)
while (! stop_flag_ && OB_SUCCESS == ret) {
if (OB_FAIL(do_retrieve_(thread_index, data))) {
LOG_ERROR("retrieve request fail", KR(ret), K(thread_index));
} else if (! data.has_valid_req()) {
if (REACH_TIME_INTERVAL(30 * _SEC_)) {
LOG_INFO("no request should be launch, ignore");
}
} else if (OB_FAIL(do_request_(data))) {
if (OB_IN_STOP_STATE != ret) {
LOG_ERROR("do request fail", KR(ret));
}
} else {
}
if (OB_SUCC(ret)) {
cond_timedwait(thread_index, DATA_OP_TIMEOUT);
}
}
@ -318,7 +324,7 @@ int ObLogStartLSNLocator::do_retrieve_(const int64_t thread_index, WorkerData &w
int ret = OB_SUCCESS;
int64_t batch_count = ATOMIC_LOAD(&g_batch_count);
for (int64_t cnt = 0; OB_SUCCESS == ret && (cnt < batch_count); ++cnt) {
for (int64_t cnt = 0; OB_SUCC(ret) && ! stop_flag_ && (cnt < batch_count); ++cnt) {
StartLSNLocateReq *request = NULL;
StartLSNLocateReq::SvrItem *item = NULL;
SvrReq *svr_req = NULL;
@ -481,7 +487,7 @@ int ObLogStartLSNLocator::do_integrated_request_(WorkerData &data)
// 2. Each partition request is removed from the request list as soon as it completes, so each request is split into multiple requests, each starting with the first element
// 3. Partition request completion condition: regardless of success, as long as no breakpoint message is returned, the request is considered completed
while (! stop_flag_ && OB_SUCCESS == ret && svr_req.locate_req_list_.count() > 0) {
// 一次请求的最大个数
// maximum request count
int64_t item_cnt_limit = RpcReq::ITEM_CNT_LMT;
int64_t req_cnt = std::min(svr_req.locate_req_list_.count(), item_cnt_limit);
@ -499,6 +505,10 @@ int ObLogStartLSNLocator::do_integrated_request_(WorkerData &data)
// Build request parameters
if (OB_FAIL(build_request_params_(rpc_req, svr_req, req_cnt))) {
LOG_ERROR("build request params fail", KR(ret), K(rpc_req), K(req_cnt), K(svr_req));
} else if (svr_req.locate_req_list_.count() <= 0) {
if (REACH_TIME_INTERVAL(30 * _SEC_)) {
LOG_INFO("no svr_req to request, ignore", K(svr_req));
}
}
// Executing RPC requests
else if (OB_FAIL(do_rpc_and_dispatch_(*(rpc_), rpc_req, svr_req, succ_req_cnt))) {
@ -572,7 +582,7 @@ int ObLogStartLSNLocator::build_request_params_(RpcReq &req,
int64_t total_cnt = svr_req.locate_req_list_.count();
req.reset();
for (int64_t index = 0; OB_SUCCESS == ret && index < req_cnt && index < total_cnt; ++index) {
for (int64_t index = 0; OB_SUCC(ret) && ! stop_flag_ && index < req_cnt && index < total_cnt; ++index) {
StartLSNLocateReq *request = svr_req.locate_req_list_.at(index);
StartLSNLocateReq::SvrItem *svr_item = NULL;
@ -643,7 +653,7 @@ int ObLogStartLSNLocator::do_rpc_and_dispatch_(
if (OB_SUCCESS == ret) {
// Scanning of arrays in reverse order to support deletion of completed ls requests
for (int64_t idx = request_cnt - 1; OB_SUCCESS == ret && idx >= 0; idx--) {
for (int64_t idx = request_cnt - 1; OB_SUCC(ret) && ! stop_flag_ && idx >= 0; idx--) {
int ls_err = OB_SUCCESS;
palf::LSN start_lsn;
int64_t start_log_tstamp = OB_INVALID_TIMESTAMP;

View File

@ -225,6 +225,10 @@ private:
svr_req_map_.reset();
archive_req_list_.reset();
}
bool has_valid_req() const
{
return svr_req_list_.count() > 0 || archive_req_list_.count() > 0;
}
};
// member variables

View File

@ -176,11 +176,19 @@ int ObLogAllSvrCache::get_svr_item_(const common::ObAddr &svr, SvrItem &item)
// succ
}
LOG_DEBUG("[STAT] [ALL_SVR_CACHE] [GET_SVR_ITEM]", KR(ret), K(svr),
"status", OB_SUCCESS == ret ? print_svr_status_(item.status_) : "NOT_EXIST",
"svr_ver", item.version_, K(cur_ver),
"zone", item.zone_,
"region_priority", item.region_priority_);
if (OB_SUCC(ret)) {
LOG_DEBUG("[STAT] [ALL_SVR_CACHE] [GET_SVR_ITEM]", K(svr),
"status", OB_SUCCESS == ret ? print_svr_status_(item.status_) : "NOT_EXIST",
"svr_ver", item.version_, K(cur_ver),
"zone", item.zone_,
"region_priority", item.region_priority_);
} else {
LOG_INFO("[STAT] [ALL_SVR_CACHE] [GET_SVR_ITEM]", KR(ret), K(svr),
"status", OB_SUCCESS == ret ? print_svr_status_(item.status_) : "NOT_EXIST",
"svr_ver", item.version_, K(cur_ver),
"zone", item.zone_,
"region_priority", item.region_priority_);
}
return ret;
}

View File

@ -147,8 +147,7 @@ int ObLogRouteService::start()
if (OB_FAIL(ls_route_timer_task_.init(lib::TGDefIDs::LogRouterTimer))) {
LOG_WARN("ObLSRouteTimerTask init failed", KR(ret));
} else if (OB_FAIL(timer_.schedule(ls_route_timer_task_, ObLSRouteTimerTask::REFRESH_INTERVAL,
true/*repeat*/))) {
} else if (OB_FAIL(timer_.schedule_repeate_task_immediately(ls_route_timer_task_, ObLSRouteTimerTask::REFRESH_INTERVAL))) {
LOG_WARN("fail to schedule min minor sstable gc task", K(ret));
} else {
is_stopped_ = false;
@ -976,7 +975,7 @@ int ObLogRouteService::update_server_list_(
ret = OB_NEED_RETRY;
}
LOG_INFO("update server list succ", KR(ret), K(router_key), K(ls_log_info), K(ls_svr_list));
LOG_INFO("update server list done", KR(ret), K(router_key), K(ls_log_info), K(ls_svr_list), K_(tg_id));
}
return ret;