fix scn range of compact ddl sstable
This commit is contained in:
@ -373,20 +373,10 @@ int ObDDLTableMergeTask::process()
|
|||||||
} else if (merge_param_.start_scn_ > SCN::min_scn() && merge_param_.start_scn_ < ddl_param.start_scn_) {
|
} else if (merge_param_.start_scn_ > SCN::min_scn() && merge_param_.start_scn_ < ddl_param.start_scn_) {
|
||||||
ret = OB_TASK_EXPIRED;
|
ret = OB_TASK_EXPIRED;
|
||||||
LOG_INFO("ddl merge task expired, do nothing", K(merge_param_), "new_start_scn", ddl_param.start_scn_);
|
LOG_INFO("ddl merge task expired, do nothing", K(merge_param_), "new_start_scn", ddl_param.start_scn_);
|
||||||
} else if (merge_param_.is_commit_ && OB_FAIL(ObTabletDDLUtil::check_data_integrity(ddl_sstable_handles,
|
} else if (OB_FAIL(ObTabletDDLUtil::compact_ddl_sstable(ddl_sstable_handles,
|
||||||
ddl_param.start_scn_,
|
|
||||||
merge_param_.rec_scn_,
|
|
||||||
is_data_complete))) {
|
|
||||||
LOG_WARN("check ddl sstable integrity failed", K(ret), K(ddl_sstable_handles), K(ddl_param), K(merge_param_));
|
|
||||||
} else if (merge_param_.is_commit_ && !is_data_complete) {
|
|
||||||
ret = OB_EAGAIN;
|
|
||||||
if (TC_REACH_TIME_INTERVAL(10L * 1000L * 1000L)) {
|
|
||||||
LOG_WARN("current ddl sstables not contain all data", K(ddl_sstable_handles), K(ddl_param), K(merge_param_));
|
|
||||||
}
|
|
||||||
} else if (FALSE_IT(ddl_param.table_key_.table_type_ = merge_param_.is_commit_ ?
|
|
||||||
ObITable::TableType::MAJOR_SSTABLE : ObITable::TableType::DDL_DUMP_SSTABLE)) {
|
|
||||||
} else if (OB_FAIL(ObTabletDDLUtil::compact_ddl_sstable(ddl_sstable_handles.get_tables(),
|
|
||||||
tablet_handle.get_obj()->get_index_read_info(),
|
tablet_handle.get_obj()->get_index_read_info(),
|
||||||
|
merge_param_.is_commit_,
|
||||||
|
merge_param_.rec_scn_,
|
||||||
ddl_param,
|
ddl_param,
|
||||||
table_handle))) {
|
table_handle))) {
|
||||||
LOG_WARN("compact sstables failed", K(ret));
|
LOG_WARN("compact sstables failed", K(ret));
|
||||||
@ -769,9 +759,11 @@ int ObTabletDDLUtil::update_ddl_table_store(const ObTabletDDLParam &ddl_param,
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObTabletDDLUtil::compact_ddl_sstable(const ObIArray<ObITable *> &ddl_sstables,
|
int ObTabletDDLUtil::compact_ddl_sstable(const ObTablesHandleArray &ddl_sstables,
|
||||||
const ObTableReadInfo &read_info,
|
const ObTableReadInfo &read_info,
|
||||||
const ObTabletDDLParam &ddl_param,
|
const bool is_commit,
|
||||||
|
const share::SCN &rec_scn,
|
||||||
|
ObTabletDDLParam &ddl_param,
|
||||||
ObTableHandleV2 &table_handle)
|
ObTableHandleV2 &table_handle)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
@ -779,9 +771,20 @@ int ObTabletDDLUtil::compact_ddl_sstable(const ObIArray<ObITable *> &ddl_sstable
|
|||||||
ObArenaAllocator arena;
|
ObArenaAllocator arena;
|
||||||
ObBlockMetaTree meta_tree;
|
ObBlockMetaTree meta_tree;
|
||||||
ObArray<const ObDataMacroBlockMeta *> sorted_metas;
|
ObArray<const ObDataMacroBlockMeta *> sorted_metas;
|
||||||
if (OB_UNLIKELY(!ddl_param.is_valid() || ddl_sstables.empty())) {
|
bool is_data_complete = false;
|
||||||
|
if (OB_UNLIKELY(!ddl_param.is_valid() || ddl_sstables.empty() || (is_commit && !rec_scn.is_valid_and_not_min()))) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("invalid argument", K(ret), K(ddl_param), K(ddl_sstables.count()));
|
LOG_WARN("invalid argument", K(ret), K(ddl_param), K(ddl_sstables.get_count()), K(is_commit), K(rec_scn));
|
||||||
|
} else if (OB_FAIL(ObTabletDDLUtil::check_data_integrity(ddl_sstables,
|
||||||
|
ddl_param.start_scn_,
|
||||||
|
is_commit ? rec_scn : ddl_sstables.get_table(ddl_sstables.get_count() - 1)->get_end_scn(),
|
||||||
|
is_data_complete))) {
|
||||||
|
LOG_WARN("check ddl sstable integrity failed", K(ret), K(ddl_sstables), K(ddl_param));
|
||||||
|
} else if (!is_data_complete) {
|
||||||
|
ret = OB_EAGAIN;
|
||||||
|
if (TC_REACH_TIME_INTERVAL(10L * 1000L * 1000L)) {
|
||||||
|
LOG_WARN("current ddl sstables not contain all data", K(ddl_sstables), K(ddl_param));
|
||||||
|
}
|
||||||
} else if (OB_FAIL(meta_tree.init(ddl_param.ls_id_, ddl_param.table_key_, ddl_param.start_scn_, ddl_param.data_format_version_))) {
|
} else if (OB_FAIL(meta_tree.init(ddl_param.ls_id_, ddl_param.table_key_, ddl_param.start_scn_, ddl_param.data_format_version_))) {
|
||||||
LOG_WARN("init meta tree failed", K(ret), K(ddl_param));
|
LOG_WARN("init meta tree failed", K(ret), K(ddl_param));
|
||||||
} else {
|
} else {
|
||||||
@ -790,8 +793,8 @@ int ObTabletDDLUtil::compact_ddl_sstable(const ObIArray<ObITable *> &ddl_sstable
|
|||||||
ObDatumRange query_range;
|
ObDatumRange query_range;
|
||||||
query_range.set_whole_range();
|
query_range.set_whole_range();
|
||||||
ObDataMacroBlockMeta data_macro_meta;
|
ObDataMacroBlockMeta data_macro_meta;
|
||||||
for (int64_t i = 0; OB_SUCC(ret) && i < ddl_sstables.count(); ++i) {
|
for (int64_t i = 0; OB_SUCC(ret) && i < ddl_sstables.get_count(); ++i) {
|
||||||
const ObSSTable *cur_sstable = static_cast<const ObSSTable *>(ddl_sstables.at(i));
|
const ObSSTable *cur_sstable = static_cast<const ObSSTable *>(ddl_sstables.get_table(i));
|
||||||
meta_iter.reset();
|
meta_iter.reset();
|
||||||
if (OB_FAIL(meta_iter.open(query_range,
|
if (OB_FAIL(meta_iter.open(query_range,
|
||||||
ObMacroBlockMetaType::DATA_BLOCK_META,
|
ObMacroBlockMetaType::DATA_BLOCK_META,
|
||||||
@ -843,13 +846,22 @@ int ObTabletDDLUtil::compact_ddl_sstable(const ObIArray<ObITable *> &ddl_sstable
|
|||||||
}
|
}
|
||||||
// close
|
// close
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
|
if (is_commit) {
|
||||||
|
ddl_param.table_key_.table_type_ = ObITable::TableType::MAJOR_SSTABLE;
|
||||||
|
ddl_param.table_key_.version_range_.base_version_ = 0;
|
||||||
|
ddl_param.table_key_.version_range_.snapshot_version_ = ddl_param.snapshot_version_;
|
||||||
|
} else {
|
||||||
|
ddl_param.table_key_.table_type_ = ObITable::TableType::DDL_DUMP_SSTABLE;
|
||||||
|
ddl_param.table_key_.scn_range_.start_scn_ = ddl_sstables.get_table(0)->get_start_scn();
|
||||||
|
ddl_param.table_key_.scn_range_.end_scn_ = ddl_sstables.get_table(ddl_sstables.get_count() - 1)->get_end_scn();
|
||||||
|
}
|
||||||
if (OB_FAIL(meta_tree.build_sorted_rowkeys())) {
|
if (OB_FAIL(meta_tree.build_sorted_rowkeys())) {
|
||||||
LOG_WARN("build sorted rowkey failed", K(ret));
|
LOG_WARN("build sorted rowkey failed", K(ret));
|
||||||
} else if (OB_FAIL(meta_tree.get_sorted_meta_array(sorted_metas))) {
|
} else if (OB_FAIL(meta_tree.get_sorted_meta_array(sorted_metas))) {
|
||||||
LOG_WARN("get sorted metas failed", K(ret));
|
LOG_WARN("get sorted metas failed", K(ret));
|
||||||
} else if (OB_FAIL(create_ddl_sstable(ddl_param,
|
} else if (OB_FAIL(create_ddl_sstable(ddl_param,
|
||||||
sorted_metas,
|
sorted_metas,
|
||||||
static_cast<ObSSTable *>(ddl_sstables.at(0)),
|
static_cast<ObSSTable *>(ddl_sstables.get_table(0)),
|
||||||
table_handle))) {
|
table_handle))) {
|
||||||
LOG_WARN("create ddl sstable failed", K(ret));
|
LOG_WARN("create ddl sstable failed", K(ret));
|
||||||
} else if (OB_FAIL(update_ddl_table_store(ddl_param, table_handle))) {
|
} else if (OB_FAIL(update_ddl_table_store(ddl_param, table_handle))) {
|
||||||
|
|||||||
@ -165,9 +165,11 @@ public:
|
|||||||
static int update_ddl_table_store(const ObTabletDDLParam &ddl_param,
|
static int update_ddl_table_store(const ObTabletDDLParam &ddl_param,
|
||||||
const ObTableHandleV2 &table_handle);
|
const ObTableHandleV2 &table_handle);
|
||||||
|
|
||||||
static int compact_ddl_sstable(const ObIArray<ObITable *> &ddl_sstables,
|
static int compact_ddl_sstable(const ObTablesHandleArray &ddl_sstables,
|
||||||
const ObTableReadInfo &read_info,
|
const ObTableReadInfo &read_info,
|
||||||
const ObTabletDDLParam &ddl_param,
|
const bool is_commit,
|
||||||
|
const share::SCN &rec_scn,
|
||||||
|
ObTabletDDLParam &ddl_param,
|
||||||
ObTableHandleV2 &table_handle);
|
ObTableHandleV2 &table_handle);
|
||||||
|
|
||||||
static int report_ddl_checksum(const share::ObLSID &ls_id,
|
static int report_ddl_checksum(const share::ObLSID &ls_id,
|
||||||
|
|||||||
Reference in New Issue
Block a user