[OBCDC] ignore rollbacked schema_version after tenant start service
This commit is contained in:
parent
c6e23516d2
commit
03ff66996e
@ -2236,7 +2236,9 @@ void ObLogInstance::global_flow_control_()
|
||||
int64_t dml_parser_part_trans_task_count = 0;
|
||||
int64_t br_queue_part_trans_task_count = br_queue_.get_part_trans_task_count();
|
||||
int64_t out_part_trans_task_count = get_out_part_trans_task_count_();
|
||||
int64_t resource_collector_part_trans_task_count = resource_collector_->get_part_trans_task_count();
|
||||
int64_t resource_collector_part_trans_task_count = 0;
|
||||
int64_t resource_collector_br_count = 0;
|
||||
resource_collector_->get_task_count(resource_collector_part_trans_task_count, resource_collector_br_count);
|
||||
int64_t committer_ddl_part_trans_task_count = 0;
|
||||
int64_t committer_dml_part_trans_task_count = 0;
|
||||
committer_->get_part_trans_task_count(committer_ddl_part_trans_task_count,
|
||||
@ -2521,7 +2523,9 @@ int ObLogInstance::get_task_count_(int64_t &ready_to_seq_task_count,
|
||||
int64_t sys_ls_handle_part_trans_task_count = sys_ls_handler_->get_part_trans_task_count();
|
||||
int64_t br_queue_part_trans_task_count = br_queue_.get_part_trans_task_count();
|
||||
int64_t out_part_trans_task_count = get_out_part_trans_task_count_();
|
||||
int64_t resource_collector_part_trans_task_count = resource_collector_->get_part_trans_task_count();
|
||||
int64_t resource_collector_part_trans_task_count = 0;
|
||||
int64_t resource_collector_br_count = 0;
|
||||
resource_collector_->get_task_count(resource_collector_part_trans_task_count, resource_collector_br_count);
|
||||
int64_t dml_br_count_in_user_queue = br_queue_.get_dml_br_count();
|
||||
int64_t dml_br_count_output = output_dml_br_count_;
|
||||
|
||||
@ -2558,8 +2562,8 @@ int ObLogInstance::get_task_count_(int64_t &ready_to_seq_task_count,
|
||||
dml_br_count_in_user_queue);
|
||||
_LOG_INFO("[TASK_COUNT_STAT] [OUT] [PART_TRANS_TASK=%ld] [DDL_BR=%ld] [DML_BR=%ld]", out_part_trans_task_count,
|
||||
ddl_br_count_output, dml_br_count_output);
|
||||
_LOG_INFO("[TASK_COUNT_STAT] [RESOURCE_COLLECTOR] [PART_TRANS_TASK=%ld]",
|
||||
resource_collector_part_trans_task_count);
|
||||
_LOG_INFO("[TASK_COUNT_STAT] [RESOURCE_COLLECTOR] [PART_TRANS_TASK=%ld] [BR=%ld]",
|
||||
resource_collector_part_trans_task_count, resource_collector_br_count);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -38,10 +38,14 @@
|
||||
#define CHECK_SCHEMA_VERSION(check_schema_version, fmt, arg...) \
|
||||
do { \
|
||||
if (OB_UNLIKELY(check_schema_version < ATOMIC_LOAD(&cur_schema_version_))) { \
|
||||
LOG_ERROR(fmt, K(tenant_id_), K(cur_schema_version_), K(check_schema_version), ##arg); \
|
||||
if (!TCONF.skip_reversed_schema_verison) { \
|
||||
ret = OB_INVALID_ARGUMENT; \
|
||||
if (ATOMIC_LOAD(&enable_check_schema_version_)) { \
|
||||
LOG_ERROR(fmt, K(tenant_id_), K(cur_schema_version_), K(check_schema_version), ##arg); \
|
||||
if (!TCONF.skip_reversed_schema_verison) { \
|
||||
ret = OB_INVALID_ARGUMENT; \
|
||||
} \
|
||||
} \
|
||||
} else if (OB_UNLIKELY(! ATOMIC_LOAD(&enable_check_schema_version_))) { \
|
||||
ATOMIC_SET(&enable_check_schema_version_, true); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
@ -94,6 +98,7 @@ int ObLogPartMgr::init(const uint64_t tenant_id,
|
||||
table_id_cache_ = &table_id_cache;
|
||||
cur_schema_version_ = start_schema_version;
|
||||
enable_oracle_mode_match_case_sensitive_ = enable_oracle_mode_match_case_sensitive;
|
||||
enable_check_schema_version_ = false;
|
||||
|
||||
inited_ = true;
|
||||
LOG_INFO("init PartMgr succ", K(tenant_id), K(start_schema_version));
|
||||
@ -111,6 +116,7 @@ void ObLogPartMgr::reset()
|
||||
tablet_to_table_info_.destroy();
|
||||
cur_schema_version_ = OB_INVALID_VERSION;
|
||||
enable_oracle_mode_match_case_sensitive_ = false;
|
||||
enable_check_schema_version_ = false;
|
||||
schema_cond_.destroy();
|
||||
}
|
||||
|
||||
|
@ -485,6 +485,7 @@ private:
|
||||
|
||||
// Default whitelist match insensitive
|
||||
bool enable_oracle_mode_match_case_sensitive_;
|
||||
bool enable_check_schema_version_;
|
||||
|
||||
// Conditional
|
||||
common::ObThreadCond schema_cond_;
|
||||
|
@ -785,9 +785,10 @@ int ObLogResourceCollector::revert_single_binlog_record_(ObLogBR *br)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int64_t ObLogResourceCollector::get_part_trans_task_count() const
|
||||
void ObLogResourceCollector::get_task_count(int64_t &part_trans_task_count, int64_t &br_count) const
|
||||
{
|
||||
return ATOMIC_LOAD(&total_part_trans_task_count_);
|
||||
part_trans_task_count = ATOMIC_LOAD(&total_part_trans_task_count_);
|
||||
br_count = ATOMIC_LOAD(&br_count_);
|
||||
}
|
||||
|
||||
int ObLogResourceCollector::revert_unserved_part_trans_task_(const int64_t thread_idx, PartTransTask &task)
|
||||
|
@ -77,7 +77,7 @@ public:
|
||||
virtual int start() = 0;
|
||||
virtual void stop() = 0;
|
||||
virtual void mark_stop_flag() = 0;
|
||||
virtual int64_t get_part_trans_task_count() const = 0;
|
||||
virtual void get_task_count(int64_t &part_trans_task_count, int64_t &br_count) const = 0;
|
||||
virtual void print_stat_info() const = 0;
|
||||
};
|
||||
|
||||
@ -120,7 +120,7 @@ public:
|
||||
void stop();
|
||||
void mark_stop_flag();
|
||||
int handle(void *data, const int64_t thread_index, volatile bool &stop_flag);
|
||||
int64_t get_part_trans_task_count() const;
|
||||
void get_task_count(int64_t &part_trans_task_count, int64_t &br_count) const;
|
||||
void print_stat_info() const;
|
||||
|
||||
private:
|
||||
|
Loading…
x
Reference in New Issue
Block a user