shrink memory use in ddl

This commit is contained in:
simonjoylet
2023-02-15 14:42:18 +00:00
committed by ob-robot
parent facae95a7d
commit 076558b3cc
6 changed files with 185 additions and 193 deletions

View File

@ -359,7 +359,6 @@ int ObDDLTableMergeTask::process()
ObTabletDDLParam ddl_param;
ObTableHandleV2 table_handle;
bool is_data_complete = false;
bool is_scn_overlap = false;
const ObSSTable *latest_major_sstable = nullptr;
if (OB_FAIL(ObTabletDDLUtil::check_and_get_major_sstable(merge_param_.ls_id_, merge_param_.tablet_id_, latest_major_sstable))) {
LOG_WARN("check if major sstable exist failed", K(ret));
@ -377,8 +376,7 @@ int ObDDLTableMergeTask::process()
} else if (merge_param_.is_commit_ && OB_FAIL(ObTabletDDLUtil::check_data_integrity(ddl_sstable_handles,
ddl_param.start_scn_,
merge_param_.rec_scn_,
is_data_complete,
is_scn_overlap))) {
is_data_complete))) {
LOG_WARN("check ddl sstable integrity failed", K(ret), K(ddl_sstable_handles), K(ddl_param), K(merge_param_));
} else if (merge_param_.is_commit_ && !is_data_complete) {
ret = OB_EAGAIN;
@ -390,7 +388,6 @@ int ObDDLTableMergeTask::process()
} else if (OB_FAIL(ObTabletDDLUtil::compact_ddl_sstable(ddl_sstable_handles.get_tables(),
tablet_handle.get_obj()->get_index_read_info(),
ddl_param,
is_scn_overlap,
table_handle))) {
LOG_WARN("compact sstables failed", K(ret));
} else {
@ -439,18 +436,15 @@ int ObDDLTableMergeTask::process()
int ObTabletDDLUtil::check_data_integrity(const ObTablesHandleArray &ddl_sstables,
const SCN &start_scn,
const SCN &prepare_scn,
bool &is_data_complete,
bool &is_scn_overlap)
bool &is_data_complete)
{
int ret = OB_SUCCESS;
is_data_complete = false;
is_scn_overlap = false;
if (OB_UNLIKELY(!start_scn.is_valid_and_not_min() || !prepare_scn.is_valid_and_not_min() || prepare_scn <= start_scn)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(ddl_sstables.get_count()), K(start_scn), K(prepare_scn));
} else if (ddl_sstables.empty()) {
is_data_complete = false;
is_scn_overlap = false;
} else {
const ObSSTable *first_ddl_sstable = static_cast<const ObSSTable *>(ddl_sstables.get_tables().at(0));
const ObSSTable *last_ddl_sstable = static_cast<const ObSSTable *>(ddl_sstables.get_tables().at(ddl_sstables.get_count() - 1));
@ -464,7 +458,6 @@ int ObTabletDDLUtil::check_data_integrity(const ObTablesHandleArray &ddl_sstable
for (int64_t i = 1; OB_SUCC(ret) && i < ddl_sstables.get_count(); ++i) {
const ObSSTable *cur_ddl_sstable = static_cast<const ObSSTable *>(ddl_sstables.get_tables().at(i));
if (cur_ddl_sstable->get_start_scn() < last_end_scn) {
is_scn_overlap = true;
last_end_scn = SCN::max(last_end_scn, cur_ddl_sstable->get_end_scn());
} else if (cur_ddl_sstable->get_start_scn() == last_end_scn) {
last_end_scn = SCN::max(last_end_scn, cur_ddl_sstable->get_end_scn());
@ -572,53 +565,68 @@ int ObTabletDDLUtil::prepare_index_data_desc(const share::ObLSID &ls_id,
return ret;
}
int ObTabletDDLUtil::prepare_index_builder(const ObTabletDDLParam &ddl_param,
ObIAllocator &allocator,
const ObSSTableIndexBuilder::ObSpaceOptimizationMode mode,
const blocksstable::ObSSTable *first_ddl_sstable,
ObSSTableIndexBuilder *&sstable_index_builder,
ObIndexBlockRebuilder *&index_block_rebuilder)
int ObTabletDDLUtil::create_ddl_sstable(const ObTabletDDLParam &ddl_param,
const ObIArray<const ObDataMacroBlockMeta *> &meta_array,
const ObSSTable *first_ddl_sstable,
ObTableHandleV2 &table_handle)
{
int ret = OB_SUCCESS;
sstable_index_builder = nullptr;
index_block_rebuilder = nullptr;
ObDataStoreDesc data_desc;
table_handle.reset();
ObArenaAllocator arena("DdlCreateSST", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID());
void *buf = nullptr;
if (OB_FAIL(prepare_index_data_desc(ddl_param.ls_id_,
ddl_param.table_key_.get_tablet_id(),
ddl_param.table_key_.version_range_.snapshot_version_,
ddl_param.cluster_version_,
first_ddl_sstable,
data_desc))) {
LOG_WARN("prepare data desc failed", K(ret), K(ddl_param));
} else if (OB_ISNULL(buf = allocator.alloc(sizeof(ObSSTableIndexBuilder)))) {
ObSSTableIndexBuilder *sstable_index_builder = nullptr;
ObIndexBlockRebuilder *index_block_rebuilder = nullptr;
ObDataStoreDesc data_desc;
if (OB_UNLIKELY(!ddl_param.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(ddl_param));
} else if (OB_FAIL(ObTabletDDLUtil::prepare_index_data_desc(ddl_param.ls_id_,
ddl_param.table_key_.tablet_id_,
ddl_param.table_key_.version_range_.snapshot_version_,
ddl_param.cluster_version_,
first_ddl_sstable,
data_desc))) {
LOG_WARN("prepare data store desc failed", K(ret), K(ddl_param));
} else if (OB_ISNULL(buf = arena.alloc(sizeof(ObSSTableIndexBuilder)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate memory failed", K(ret));
LOG_WARN("allocate memory for sstable index builder failed", K(ret));
} else if (FALSE_IT(sstable_index_builder = new (buf) ObSSTableIndexBuilder)) {
} else if (OB_FAIL(sstable_index_builder->init(
data_desc,
nullptr, // this param is flush macro call back, nullptr is default val
mode))) {
LOG_WARN("init sstable index builder failed", K(ret));
} else if (OB_ISNULL(buf = allocator.alloc(sizeof(ObIndexBlockRebuilder)))) {
} else if (OB_FAIL(sstable_index_builder->init(data_desc,
nullptr, // macro block flush callback
ObSSTableIndexBuilder::DISABLE))) {
LOG_WARN("init sstable index builder failed", K(ret), K(data_desc));
} else if (OB_ISNULL(buf = arena.alloc(sizeof(ObIndexBlockRebuilder)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate memory failed", K(ret));
} else if (FALSE_IT(index_block_rebuilder = new (buf) ObIndexBlockRebuilder)) {
} else if (OB_FAIL(index_block_rebuilder->init(*sstable_index_builder))) {
LOG_WARN("fail to alloc index builder", K(ret));
} else if (meta_array.empty()) {
// do nothing
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < meta_array.count(); ++i) {
if (OB_FAIL(index_block_rebuilder->append_macro_row(*meta_array.at(i)))) {
LOG_WARN("append block meta failed", K(ret), K(i));
}
}
}
if (OB_FAIL(ret)) {
if (nullptr != index_block_rebuilder) {
index_block_rebuilder->~ObIndexBlockRebuilder();
allocator.free(index_block_rebuilder);
index_block_rebuilder = nullptr;
}
if (nullptr != sstable_index_builder) {
sstable_index_builder->~ObSSTableIndexBuilder();
allocator.free(sstable_index_builder);
sstable_index_builder = nullptr;
if (OB_SUCC(ret)) {
if (OB_FAIL(index_block_rebuilder->close())) {
LOG_WARN("close index block rebuilder failed", K(ret));
} else if (OB_FAIL(ObTabletDDLUtil::create_ddl_sstable(sstable_index_builder, ddl_param, nullptr/*first_ddl_sstable*/, table_handle))) {
LOG_WARN("create ddl sstable failed", K(ret), K(ddl_param));
}
}
if (nullptr != index_block_rebuilder) {
index_block_rebuilder->~ObIndexBlockRebuilder();
arena.free(index_block_rebuilder);
index_block_rebuilder = nullptr;
}
if (nullptr != sstable_index_builder) {
sstable_index_builder->~ObSSTableIndexBuilder();
arena.free(sstable_index_builder);
sstable_index_builder = nullptr;
}
return ret;
}
@ -685,7 +693,7 @@ int ObTabletDDLUtil::create_ddl_sstable(ObSSTableIndexBuilder *sstable_index_bui
param.data_checksum_ = res.data_checksum_;
param.occupy_size_ = res.occupy_size_;
param.original_size_ = res.original_size_;
param.max_merged_trans_version_ = res.max_merged_trans_version_;
param.max_merged_trans_version_ = ddl_param.snapshot_version_;
param.contain_uncommitted_row_ = res.contain_uncommitted_row_;
param.compressor_type_ = res.compressor_type_;
param.encrypt_id_ = res.encrypt_id_;
@ -715,14 +723,12 @@ int ObTabletDDLUtil::create_ddl_sstable(ObSSTableIndexBuilder *sstable_index_bui
return ret;
}
int ObTabletDDLUtil::update_ddl_table_store(ObSSTableIndexBuilder *sstable_index_builder,
const ObTabletDDLParam &ddl_param,
const ObSSTable *first_ddl_sstable,
ObTableHandleV2 &table_handle)
int ObTabletDDLUtil::update_ddl_table_store(const ObTabletDDLParam &ddl_param,
const ObTableHandleV2 &table_handle)
{
int ret = OB_SUCCESS;
if (OB_FAIL(create_ddl_sstable(sstable_index_builder, ddl_param, first_ddl_sstable, table_handle))) {
LOG_WARN("create ddl sstable failed", K(ret));
if (OB_UNLIKELY(!ddl_param.is_valid() || !table_handle.is_valid())) {
LOG_WARN("invalid argument", K(ret), K(ddl_param), K(table_handle));
} else {
ObLSService *ls_service = MTL(ObLSService *);
ObLSHandle ls_handle;
@ -759,24 +765,18 @@ int ObTabletDDLUtil::update_ddl_table_store(ObSSTableIndexBuilder *sstable_index
int ObTabletDDLUtil::compact_ddl_sstable(const ObIArray<ObITable *> &ddl_sstables,
const ObTableReadInfo &read_info,
const ObTabletDDLParam &ddl_param,
const bool is_scn_overlap,
ObTableHandleV2 &table_handle)
{
int ret = OB_SUCCESS;
table_handle.reset();
ObArenaAllocator arena;
ObSSTableIndexBuilder *sstable_index_builder = nullptr;
ObIndexBlockRebuilder *index_block_rebuilder = nullptr;
const ObSSTableIndexBuilder::ObSpaceOptimizationMode mode = ddl_param.table_key_.is_ddl_sstable()
? ObSSTableIndexBuilder::DISABLE : ObSSTableIndexBuilder::AUTO;
ObBlockMetaTree meta_tree;
ObArray<const ObDataMacroBlockMeta *> sorted_metas;
if (OB_UNLIKELY(!ddl_param.is_valid() || ddl_sstables.empty())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(ddl_param), K(ddl_sstables.count()));
} else if (OB_FAIL(prepare_index_builder(ddl_param, arena, mode,
static_cast<ObSSTable *>(ddl_sstables.at(0)),
sstable_index_builder, index_block_rebuilder))) {
LOG_WARN("prepare sstable index builder failed", K(ret));
} else if (OB_FAIL(meta_tree.init(ddl_param.ls_id_, ddl_param.table_key_, ddl_param.start_scn_, ddl_param.cluster_version_))) {
LOG_WARN("init meta tree failed", K(ret), K(ddl_param));
} else {
ObDatumRowkey last_rowkey;
SMART_VAR(ObSSTableSecMetaIterator, meta_iter) {
@ -785,6 +785,7 @@ int ObTabletDDLUtil::compact_ddl_sstable(const ObIArray<ObITable *> &ddl_sstable
ObDataMacroBlockMeta data_macro_meta;
for (int64_t i = 0; OB_SUCC(ret) && i < ddl_sstables.count(); ++i) {
const ObSSTable *cur_sstable = static_cast<const ObSSTable *>(ddl_sstables.at(i));
meta_iter.reset();
if (OB_FAIL(meta_iter.open(query_range,
ObMacroBlockMetaType::DATA_BLOCK_META,
*cur_sstable,
@ -796,30 +797,31 @@ int ObTabletDDLUtil::compact_ddl_sstable(const ObIArray<ObITable *> &ddl_sstable
if (OB_FAIL(meta_iter.get_next(data_macro_meta))) {
if (OB_ITER_END != ret) {
LOG_WARN("get data macro meta failed", K(ret));
} else {
ret = OB_SUCCESS;
break;
}
} else {
int cmp_ret = 1; // defaut 1 for last_rowkey is invalid
const ObDatumRowkey &cur_rowkey = data_macro_meta.end_key_;
if (is_scn_overlap && last_rowkey.is_valid()
&& OB_FAIL(cur_rowkey.compare(last_rowkey,
sstable_index_builder->get_index_store_desc().datum_utils_,
cmp_ret))) {
LOG_WARN("compare rowkey failed", K(ret), K(cur_rowkey), K(last_rowkey));
} else if (cmp_ret <= 0) { // cur_rowkey <= last_rowkey
// exist rowkey, skip
} else if (OB_FAIL(index_block_rebuilder->append_macro_row(data_macro_meta))) {
LOG_WARN("append macro row failed", K(ret));
}
}
}
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
if (is_scn_overlap && data_macro_meta.end_key_.is_valid()) {
if (OB_FAIL(data_macro_meta.end_key_.deep_copy(last_rowkey, arena))) {
LOG_WARN("deep copy to last rowkey failed", K(ret));
ObDataMacroBlockMeta *copied_meta = nullptr; // copied meta will destruct in the meta tree
ObDDLMacroHandle macro_handle;
bool is_exist = false;
if (OB_FAIL(meta_tree.exist(&data_macro_meta.end_key_, is_exist))) {
LOG_WARN("check block meta exist failed", K(ret), K(data_macro_meta));
} else if (is_exist) {
// skip
} else if (OB_FAIL(macro_handle.set_block_id(data_macro_meta.get_macro_id()))) {
LOG_WARN("hold macro block failed", K(ret));
} else if (OB_FAIL(data_macro_meta.deep_copy(copied_meta, arena))) {
LOG_WARN("deep copy macro block meta failed", K(ret));
} else if (OB_FAIL(meta_tree.insert_macro_block(macro_handle, &copied_meta->end_key_, copied_meta))) {
LOG_WARN("insert meta tree failed", K(ret), K(macro_handle), KPC(copied_meta));
copied_meta->~ObDataMacroBlockMeta();
}
}
}
LOG_INFO("append meta tree finished", K(ret), K(i),
"data_macro_block_cnt_in_sstable", cur_sstable->get_meta().get_basic_meta().get_data_macro_block_count(),
K(meta_tree.get_macro_block_cnt()));
#ifdef ERRSIM
if (OB_SUCC(ret) && ddl_param.table_key_.is_major_sstable()) {
ret = OB_E(EventTable::EN_DDL_COMPACT_FAIL) OB_SUCCESS;
@ -829,35 +831,26 @@ int ObTabletDDLUtil::compact_ddl_sstable(const ObIArray<ObITable *> &ddl_sstable
}
#endif
}
if (OB_SUCC(ret)) {
meta_iter.reset();
}
}
}
}
// close
if (OB_SUCC(ret)) {
if (OB_FAIL(index_block_rebuilder->close())) {
LOG_WARN("index block rebuilder close failed", K(ret));
} else if (OB_FAIL(update_ddl_table_store(sstable_index_builder,
ddl_param,
static_cast<ObSSTable *>(ddl_sstables.at(0)),
table_handle))) {
if (OB_FAIL(meta_tree.build_sorted_rowkeys())) {
LOG_WARN("build sorted rowkey failed", K(ret));
} else if (OB_FAIL(meta_tree.get_sorted_meta_array(sorted_metas))) {
LOG_WARN("get sorted metas failed", K(ret));
} else if (OB_FAIL(create_ddl_sstable(ddl_param,
sorted_metas,
static_cast<ObSSTable *>(ddl_sstables.at(0)),
table_handle))) {
LOG_WARN("create ddl sstable failed", K(ret));
} else if (OB_FAIL(update_ddl_table_store(ddl_param, table_handle))) {
LOG_WARN("update ddl table store failed", K(ret));
} else {
LOG_INFO("compact ddl sstable success", K(ddl_param));
}
}
if (nullptr != index_block_rebuilder) {
index_block_rebuilder->~ObIndexBlockRebuilder();
arena.free(index_block_rebuilder);
index_block_rebuilder = nullptr;
}
if (nullptr != sstable_index_builder) {
sstable_index_builder->~ObSSTableIndexBuilder();
arena.free(sstable_index_builder);
sstable_index_builder = nullptr;
}
return ret;
}