add defensive code in build table store && get report info
This commit is contained in:
parent
a003a7def8
commit
ece07a478e
@ -1946,8 +1946,8 @@ int ObService::inner_fill_tablet_info_(
|
||||
int64_t data_size = 0;
|
||||
int64_t required_size = 0;
|
||||
ObArray<int64_t> column_checksums;
|
||||
if (OB_FAIL(tablet_handle.get_obj()->get_tablet_report_info(column_checksums, data_size,
|
||||
required_size, need_checksum))) {
|
||||
if (OB_FAIL(tablet_handle.get_obj()->get_tablet_report_info(snapshot_version, column_checksums,
|
||||
data_size, required_size, need_checksum))) {
|
||||
LOG_WARN("fail to get tablet report info from tablet", KR(ret), K(tenant_id), K(tablet_id));
|
||||
} else if (OB_FAIL(tablet_replica.init(
|
||||
tenant_id,
|
||||
|
@ -102,12 +102,15 @@ int ObPartitionMergePolicy::get_mini_merge_tables(
|
||||
LOG_WARN("get unexpected null memtable mgr from tablet or invalid table store", K(ret), K(tablet), K(table_store));
|
||||
} else if (table_store.get_minor_sstables().count() >= MAX_SSTABLE_CNT_IN_STORAGE) {
|
||||
ret = OB_SIZE_OVERFLOW;
|
||||
LOG_ERROR("Too many sstables, delay mini merge until sstable count falls below MAX_SSTABLE_CNT", K(ret));
|
||||
LOG_ERROR("Too many sstables, delay mini merge until sstable count falls below MAX_SSTABLE_CNT",
|
||||
K(ret), K(table_store), K(tablet));
|
||||
// add compaction diagnose info
|
||||
diagnose_table_count_unsafe(MINI_MERGE, tablet);
|
||||
} else if (OB_FAIL(tablet.get_memtable_mgr()->get_all_memtables(memtable_handles))) {
|
||||
LOG_WARN("failed to get all memtables from memtable mgr", K(ret));
|
||||
} else if (OB_FAIL(get_neighbour_freeze_info(merge_inc_base_version, table_store.get_major_sstables().get_boundary_table(true), freeze_info))) {
|
||||
} else if (OB_FAIL(get_neighbour_freeze_info(merge_inc_base_version,
|
||||
table_store.get_major_sstables().get_boundary_table(true),
|
||||
freeze_info))) {
|
||||
LOG_WARN("failed to get next major freeze", K(ret), K(merge_inc_base_version), K(table_store));
|
||||
} else if (OB_FAIL(find_mini_merge_tables(param, freeze_info, tablet, memtable_handles, result))) {
|
||||
if (OB_NO_NEED_MERGE != ret) {
|
||||
@ -132,7 +135,6 @@ int ObPartitionMergePolicy::find_mini_merge_tables(
|
||||
// TODO: @dengzhi.ldz, remove max_snapshot_version, merge all forzen memtables
|
||||
// Keep max_snapshot_version currently because major merge must be done step by step
|
||||
int64_t max_snapshot_version = freeze_info.next.freeze_ts;
|
||||
ObITable *last_table = tablet.get_table_store().get_minor_sstables().get_boundary_table(true/*last*/);
|
||||
const int64_t clog_checkpoint_ts = tablet.get_clog_checkpoint_ts();
|
||||
|
||||
// Freezing in the restart phase may not satisfy end >= last_max_sstable,
|
||||
@ -163,8 +165,8 @@ int ObPartitionMergePolicy::find_mini_merge_tables(
|
||||
}
|
||||
} else if (result.handle_.get_count() > 0) {
|
||||
if (result.log_ts_range_.end_log_ts_ < memtable->get_start_log_ts()) {
|
||||
FLOG_INFO("log id not continues, reset previous minor merge tables",
|
||||
"last_end_log_ts", result.log_ts_range_.end_log_ts_, KPC(memtable));
|
||||
FLOG_INFO("log ts range not continues, reset previous minor merge tables",
|
||||
"last_end_log_ts", result.log_ts_range_.end_log_ts_, KPC(memtable), K(tablet));
|
||||
// mini merge always use the oldest memtable to dump
|
||||
break;
|
||||
} else if (memtable->get_snapshot_version() > max_snapshot_version) {
|
||||
@ -1023,16 +1025,18 @@ int ObPartitionMergePolicy::refine_mini_merge_result(
|
||||
// no minor sstable, skip to cut memtable's boundary
|
||||
} else if (result.log_ts_range_.start_log_ts_ > last_table->get_end_log_ts()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("Unexpected uncontinuous log_ts_range in mini merge", K(ret), K(result), KPC(last_table));
|
||||
LOG_ERROR("Unexpected uncontinuous log_ts_range in mini merge",
|
||||
K(ret), K(result), KPC(last_table), K(table_store), K(tablet));
|
||||
} else if (result.log_ts_range_.start_log_ts_ < last_table->get_end_log_ts()
|
||||
&& !tablet.get_tablet_meta().tablet_id_.is_special_merge_tablet()) {
|
||||
// fix start_log_ts to make log_ts_range continuous in migrate phase for issue 42832934
|
||||
if (result.log_ts_range_.end_log_ts_ <= last_table->get_end_log_ts()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("No need mini merge memtable which is covered by existing sstable", K(ret), K(result), KPC(last_table));
|
||||
LOG_WARN("No need mini merge memtable which is covered by existing sstable",
|
||||
K(ret), K(result), KPC(last_table), K(table_store), K(tablet));
|
||||
} else {
|
||||
result.log_ts_range_.start_log_ts_ = last_table->get_end_log_ts();
|
||||
FLOG_INFO("Fix mini merge result log ts range", K(ret), K(result), KPC(last_table));
|
||||
FLOG_INFO("Fix mini merge result log ts range", K(ret), K(result), KPC(last_table), K(table_store), K(tablet));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
|
@ -2339,6 +2339,7 @@ int ObTablet::replay_schema_version_change_log(const int64_t schema_version)
|
||||
}
|
||||
|
||||
int ObTablet::get_tablet_report_info(
|
||||
const int64_t snapshot_version,
|
||||
common::ObIArray<int64_t> &column_checksums,
|
||||
int64_t &data_size,
|
||||
int64_t &required_size,
|
||||
@ -2357,9 +2358,10 @@ int ObTablet::get_tablet_report_info(
|
||||
} else if (major_sstables.count_ == 0) {
|
||||
ret = OB_TABLET_NOT_EXIST;
|
||||
LOG_INFO("no major sstables in this tablet, cannot report", K(ret));
|
||||
} else if (OB_ISNULL(main_major = static_cast<ObSSTable *>(major_sstables.get_boundary_table(true)))) {
|
||||
} else if (FALSE_IT(main_major = static_cast<ObSSTable *>(major_sstables.get_boundary_table(true)))) {
|
||||
} else if (OB_UNLIKELY(nullptr == main_major || snapshot_version != main_major->get_snapshot_version())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("faild to get unexpected null major", K(ret));
|
||||
LOG_WARN("get unexpected main major", K(ret), K(snapshot_version), KPC(main_major), KPC(this));
|
||||
} else if (need_checksums && OB_FAIL(column_checksums.assign(main_major->get_meta().get_col_checksum()))) {
|
||||
LOG_WARN("failed to assign column checksums", K(ret));
|
||||
}
|
||||
|
@ -338,6 +338,7 @@ public:
|
||||
transaction::ObTransID &pending_tx_id);
|
||||
int replay_schema_version_change_log(const int64_t schema_version);
|
||||
int get_tablet_report_info(
|
||||
const int64_t snapshot_version,
|
||||
common::ObIArray<int64_t> &column_checksums,
|
||||
int64_t &data_size,
|
||||
int64_t &required_size,
|
||||
|
@ -954,9 +954,9 @@ int ObTabletTableStore::check_ready_for_read()
|
||||
int ret = OB_SUCCESS;
|
||||
is_ready_for_read_ = false;
|
||||
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_ERR_SYS;
|
||||
LOG_ERROR("table store not init", K(ret), K(*this));
|
||||
if (OB_UNLIKELY(!is_inited_ || nullptr == tablet_ptr_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("table store not init", K(ret), KPC(this), KPC(tablet_ptr_));
|
||||
} else if (major_tables_.empty()) {
|
||||
LOG_INFO("no valid major sstable, not ready for read", K(*this));
|
||||
} else if (OB_FAIL(check_continuous())) {
|
||||
@ -967,11 +967,19 @@ int ObTabletTableStore::check_ready_for_read()
|
||||
} else if (get_table_count() > ObTabletTableStore::MAX_SSTABLE_CNT) {
|
||||
ret = OB_SIZE_OVERFLOW;
|
||||
LOG_WARN("Too Many sstables, cannot add another sstable any more", K(ret), KPC(this), KPC(tablet_ptr_));
|
||||
if (OB_NOT_NULL(tablet_ptr_)) {
|
||||
ObPartitionMergePolicy::diagnose_table_count_unsafe(MAJOR_MERGE, *tablet_ptr_);
|
||||
}
|
||||
} else {
|
||||
ObPartitionMergePolicy::diagnose_table_count_unsafe(MAJOR_MERGE, *tablet_ptr_);
|
||||
} else if (minor_tables_.empty()) {
|
||||
is_ready_for_read_ = true;
|
||||
} else {
|
||||
const int64_t clog_checkpoint_ts = tablet_ptr_->get_clog_checkpoint_ts();
|
||||
const int64_t last_minor_end_log_ts = minor_tables_.get_boundary_table(true/*last*/)->get_end_log_ts();
|
||||
if (OB_UNLIKELY(clog_checkpoint_ts != last_minor_end_log_ts)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("last minor table's end_log_ts must be equal to clog_checkpoint_ts",
|
||||
K(ret), K(last_minor_end_log_ts), K(clog_checkpoint_ts), KPC(this), KPC(tablet_ptr_));
|
||||
} else {
|
||||
is_ready_for_read_ = true;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user