Avoid ddl_get_tablet on replay
This commit is contained in:
@ -46,8 +46,7 @@ namespace storage
|
||||
ObDDLTableMergeDag::ObDDLTableMergeDag()
|
||||
: ObIDag(ObDagType::DAG_TYPE_DDL_KV_MERGE),
|
||||
is_inited_(false),
|
||||
ddl_param_(),
|
||||
compat_mode_(lib::Worker::CompatMode::INVALID)
|
||||
ddl_param_()
|
||||
{
|
||||
}
|
||||
|
||||
@ -69,20 +68,7 @@ int ObDDLTableMergeDag::init_by_param(const share::ObIDagInitParam *param)
|
||||
LOG_WARN("invalid arguments", K(ret), KP(param));
|
||||
} else {
|
||||
ddl_param_ = *static_cast<const ObDDLTableMergeDagParam *>(param);
|
||||
if (OB_FAIL(MTL(ObLSService *)->get_ls(ddl_param_.ls_id_, ls_handle, ObLSGetMod::DDL_MOD))) {
|
||||
LOG_WARN("failed to get log stream", K(ret), K(ddl_param_));
|
||||
} else if (OB_FAIL(ObDDLUtil::ddl_get_tablet(ls_handle,
|
||||
ddl_param_.tablet_id_,
|
||||
tablet_handle,
|
||||
ObMDSGetTabletMode::READ_ALL_COMMITED))) {
|
||||
LOG_WARN("failed to get tablet", K(ret), K(ddl_param_));
|
||||
} else if (OB_ISNULL(tablet = tablet_handle.get_obj())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("tablet is null", K(ret), K(ddl_param_));
|
||||
} else {
|
||||
compat_mode_ = tablet->get_tablet_meta().compat_mode_;
|
||||
is_inited_ = true;
|
||||
}
|
||||
is_inited_ = true;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -90,32 +76,9 @@ int ObDDLTableMergeDag::init_by_param(const share::ObIDagInitParam *param)
|
||||
int ObDDLTableMergeDag::create_first_task()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObLSService *ls_service = MTL(ObLSService *);
|
||||
ObLSHandle ls_handle;
|
||||
ObTabletHandle tablet_handle;
|
||||
ObDDLKvMgrHandle ddl_kv_mgr_handle;
|
||||
ObTablesHandleArray ddl_kvs_handle;
|
||||
ObDDLTableMergeTask *merge_task = nullptr;
|
||||
if (OB_FAIL(ls_service->get_ls(ddl_param_.ls_id_, ls_handle, ObLSGetMod::DDL_MOD))) {
|
||||
LOG_WARN("get ls failed", K(ret), K(ddl_param_));
|
||||
} else if (OB_FAIL(ObDDLUtil::ddl_get_tablet(ls_handle,
|
||||
ddl_param_.tablet_id_,
|
||||
tablet_handle,
|
||||
ObMDSGetTabletMode::READ_ALL_COMMITED))) {
|
||||
LOG_WARN("get tablet failed", K(ret), K(ddl_param_));
|
||||
} else if (OB_FAIL(tablet_handle.get_obj()->get_ddl_kv_mgr(ddl_kv_mgr_handle))) {
|
||||
if (OB_ENTRY_NOT_EXIST == ret) {
|
||||
ret = OB_TASK_EXPIRED;
|
||||
LOG_INFO("ddl kv mgr not exist", K(ret), K(ddl_param_));
|
||||
} else {
|
||||
LOG_WARN("get ddl kv mgr failed", K(ret), K(ddl_param_));
|
||||
}
|
||||
} else if (ddl_param_.start_scn_ < ddl_kv_mgr_handle.get_obj()->get_start_scn()) {
|
||||
ret = OB_TASK_EXPIRED;
|
||||
LOG_WARN("ddl task expired, skip it", K(ret), K(ddl_param_), "new_start_scn", ddl_kv_mgr_handle.get_obj()->get_start_scn());
|
||||
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->freeze_ddl_kv())) {
|
||||
LOG_WARN("ddl kv manager try freeze failed", K(ret), K(ddl_param_));
|
||||
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->get_ddl_kvs(true/*frozen_only*/, ddl_kvs_handle))) {
|
||||
if (OB_FAIL(ddl_param_.ddl_kv_mgr_handle_.get_obj()->get_ddl_kvs(true/*frozen_only*/, ddl_kvs_handle))) {
|
||||
LOG_WARN("get freezed ddl kv failed", K(ret), K(ddl_param_));
|
||||
} else if (OB_FAIL(alloc_task(merge_task))) {
|
||||
LOG_WARN("Fail to alloc task", K(ret), K(ddl_param_));
|
||||
@ -280,7 +243,7 @@ int ObDDLTableDumpTask::process()
|
||||
} else if (OB_ISNULL(ddl_kv = static_cast<ObDDLKV *>(ddl_kv_handle.get_table()))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("get ddl kv failed", K(ret));
|
||||
} else if (OB_FAIL(ddl_kv->close())) {
|
||||
} else if (OB_FAIL(ddl_kv->close(*tablet_handle.get_obj()))) {
|
||||
if (OB_EAGAIN != ret) {
|
||||
LOG_WARN("close ddl kv failed", K(ret));
|
||||
}
|
||||
@ -393,7 +356,8 @@ int ObDDLTableMergeTask::process()
|
||||
} else if (merge_param_.start_scn_ > SCN::min_scn() && merge_param_.start_scn_ < ddl_param.start_scn_) {
|
||||
ret = OB_TASK_EXPIRED;
|
||||
LOG_INFO("ddl merge task expired, do nothing", K(merge_param_), "new_start_scn", ddl_param.start_scn_);
|
||||
} else if (OB_FAIL(ObTabletDDLUtil::compact_ddl_sstable(ddl_table_iter,
|
||||
} else if (OB_FAIL(ObTabletDDLUtil::compact_ddl_sstable(*tablet_handle.get_obj(),
|
||||
ddl_table_iter,
|
||||
tablet_handle.get_obj()->get_rowkey_read_info(),
|
||||
merge_param_.is_commit_,
|
||||
merge_param_.rec_scn_,
|
||||
@ -508,8 +472,7 @@ bool ObTabletDDLParam::is_valid() const
|
||||
&& data_format_version_ >= 0;
|
||||
}
|
||||
|
||||
int ObTabletDDLUtil::prepare_index_data_desc(const share::ObLSID &ls_id,
|
||||
const ObTabletID &tablet_id,
|
||||
int ObTabletDDLUtil::prepare_index_data_desc(ObTablet &tablet,
|
||||
const int64_t snapshot_version,
|
||||
const int64_t data_format_version,
|
||||
const ObSSTable *first_ddl_sstable,
|
||||
@ -518,24 +481,14 @@ int ObTabletDDLUtil::prepare_index_data_desc(const share::ObLSID &ls_id,
|
||||
int ret = OB_SUCCESS;
|
||||
data_desc.reset();
|
||||
ObLSService *ls_service = MTL(ObLSService *);
|
||||
ObLSHandle ls_handle;
|
||||
ObTabletHandle tablet_handle;
|
||||
ObArenaAllocator tmp_arena("DDLIdxDescTmp");
|
||||
const ObStorageSchema *storage_schema = nullptr;
|
||||
const ObTabletID &tablet_id = tablet.get_tablet_meta().tablet_id_;
|
||||
const ObLSID &ls_id = tablet.get_tablet_meta().ls_id_;
|
||||
if (OB_UNLIKELY(!ls_id.is_valid() || !tablet_id.is_valid() || snapshot_version <= 0 || data_format_version < 0)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(ls_id), K(tablet_id), K(snapshot_version), K(data_format_version));
|
||||
} else if (OB_ISNULL(ls_service)) {
|
||||
ret = OB_ERR_SYS;
|
||||
LOG_WARN("ls service is null", K(ret), K(ls_id));
|
||||
} else if (OB_FAIL(ls_service->get_ls(ls_id, ls_handle, ObLSGetMod::DDL_MOD))) {
|
||||
LOG_WARN("get ls failed", K(ret), K(ls_id));
|
||||
} else if (OB_FAIL(ObDDLUtil::ddl_get_tablet(ls_handle,
|
||||
tablet_id,
|
||||
tablet_handle,
|
||||
ObMDSGetTabletMode::READ_ALL_COMMITED))) {
|
||||
LOG_WARN("get tablet failed", K(ret));
|
||||
} else if (OB_FAIL(tablet_handle.get_obj()->load_storage_schema(tmp_arena, storage_schema))) {
|
||||
} else if (OB_FAIL(tablet.load_storage_schema(tmp_arena, storage_schema))) {
|
||||
LOG_WARN("fail to get storage schema", K(ret));
|
||||
} else if (OB_FAIL(data_desc.init_as_index(*storage_schema,
|
||||
ls_id,
|
||||
@ -586,7 +539,8 @@ int ObTabletDDLUtil::prepare_index_data_desc(const share::ObLSID &ls_id,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTabletDDLUtil::create_ddl_sstable(const ObTabletDDLParam &ddl_param,
|
||||
int ObTabletDDLUtil::create_ddl_sstable(ObTablet &tablet,
|
||||
const ObTabletDDLParam &ddl_param,
|
||||
const ObIArray<const ObDataMacroBlockMeta *> &meta_array,
|
||||
const ObSSTable *first_ddl_sstable,
|
||||
common::ObArenaAllocator &allocator,
|
||||
@ -601,8 +555,7 @@ int ObTabletDDLUtil::create_ddl_sstable(const ObTabletDDLParam &ddl_param,
|
||||
if (OB_UNLIKELY(!ddl_param.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(ddl_param));
|
||||
} else if (OB_FAIL(ObTabletDDLUtil::prepare_index_data_desc(ddl_param.ls_id_,
|
||||
ddl_param.table_key_.tablet_id_,
|
||||
} else if (OB_FAIL(ObTabletDDLUtil::prepare_index_data_desc(tablet,
|
||||
ddl_param.table_key_.version_range_.snapshot_version_,
|
||||
ddl_param.data_format_version_,
|
||||
first_ddl_sstable,
|
||||
@ -634,7 +587,7 @@ int ObTabletDDLUtil::create_ddl_sstable(const ObTabletDDLParam &ddl_param,
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(index_block_rebuilder->close())) {
|
||||
LOG_WARN("close index block rebuilder failed", K(ret));
|
||||
} else if (OB_FAIL(ObTabletDDLUtil::create_ddl_sstable(sstable_index_builder, ddl_param, first_ddl_sstable, allocator, sstable))) {
|
||||
} else if (OB_FAIL(ObTabletDDLUtil::create_ddl_sstable(tablet, sstable_index_builder, ddl_param, first_ddl_sstable, allocator, sstable))) {
|
||||
LOG_WARN("create ddl sstable failed", K(ret), K(ddl_param));
|
||||
}
|
||||
}
|
||||
@ -651,34 +604,22 @@ int ObTabletDDLUtil::create_ddl_sstable(const ObTabletDDLParam &ddl_param,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTabletDDLUtil::create_ddl_sstable(ObSSTableIndexBuilder *sstable_index_builder,
|
||||
int ObTabletDDLUtil::create_ddl_sstable(ObTablet &tablet,
|
||||
ObSSTableIndexBuilder *sstable_index_builder,
|
||||
const ObTabletDDLParam &ddl_param,
|
||||
const ObSSTable *first_ddl_sstable,
|
||||
common::ObArenaAllocator &allocator,
|
||||
blocksstable::ObSSTable &sstable)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObLSService *ls_service = MTL(ObLSService *);
|
||||
ObLSHandle ls_handle;
|
||||
ObTabletHandle tablet_handle;
|
||||
ObArenaAllocator tmp_arena("CreateDDLSstTmp");
|
||||
const ObStorageSchema *storage_schema = nullptr;
|
||||
SMART_VAR(ObSSTableMergeRes, res) {
|
||||
if (OB_UNLIKELY(nullptr == sstable_index_builder || !ddl_param.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), KP(sstable_index_builder), K(ddl_param));
|
||||
} else if (OB_ISNULL(ls_service)) {
|
||||
ret = OB_ERR_SYS;
|
||||
LOG_WARN("ls service is null", K(ret), K(ddl_param));
|
||||
} else if (OB_FAIL(ls_service->get_ls(ddl_param.ls_id_, ls_handle, ObLSGetMod::DDL_MOD))) {
|
||||
LOG_WARN("get ls failed", K(ret), K(ddl_param));
|
||||
} else if (OB_FAIL(ObDDLUtil::ddl_get_tablet(ls_handle,
|
||||
ddl_param.table_key_.tablet_id_,
|
||||
tablet_handle,
|
||||
ObMDSGetTabletMode::READ_ALL_COMMITED))) {
|
||||
LOG_WARN("get tablet failed", K(ret), K(ddl_param));
|
||||
} else if (OB_FAIL(tablet_handle.get_obj()->load_storage_schema(tmp_arena, storage_schema))) {
|
||||
LOG_WARN("failed to load storage schema", K(ret), K(tablet_handle));
|
||||
} else if (OB_FAIL(tablet.load_storage_schema(tmp_arena, storage_schema))) {
|
||||
LOG_WARN("failed to load storage schema", K(ret), K(tablet));
|
||||
} else {
|
||||
int64_t column_count = 0;
|
||||
share::schema::ObTableMode table_mode = storage_schema->get_table_mode_struct();
|
||||
@ -709,7 +650,7 @@ int ObTabletDDLUtil::create_ddl_sstable(ObSSTableIndexBuilder *sstable_index_bui
|
||||
LOG_WARN("max_merged_trans_version_ in res is different from ddl snapshot version", K(ret),
|
||||
K(res), K(ddl_param));
|
||||
} else {
|
||||
const int64_t create_schema_version_on_tablet = tablet_handle.get_obj()->get_tablet_meta().create_schema_version_;
|
||||
const int64_t create_schema_version_on_tablet = tablet.get_tablet_meta().create_schema_version_;
|
||||
ObTabletCreateSSTableParam param;
|
||||
param.table_key_ = ddl_param.table_key_;
|
||||
param.table_mode_ = table_mode;
|
||||
@ -765,7 +706,8 @@ int ObTabletDDLUtil::create_ddl_sstable(ObSSTableIndexBuilder *sstable_index_bui
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTabletDDLUtil::update_ddl_table_store(const ObTabletDDLParam &ddl_param,
|
||||
int ObTabletDDLUtil::update_ddl_table_store(ObTablet &tablet,
|
||||
const ObTabletDDLParam &ddl_param,
|
||||
common::ObArenaAllocator &allocator,
|
||||
blocksstable::ObSSTable &sstable)
|
||||
{
|
||||
@ -781,19 +723,14 @@ int ObTabletDDLUtil::update_ddl_table_store(const ObTabletDDLParam &ddl_param,
|
||||
const ObStorageSchema *tablet_storage_schema = nullptr;
|
||||
if (OB_FAIL(ls_service->get_ls(ddl_param.ls_id_, ls_handle, ObLSGetMod::DDL_MOD))) {
|
||||
LOG_WARN("get ls failed", K(ret), K(ddl_param));
|
||||
} else if (OB_FAIL(ObDDLUtil::ddl_get_tablet(ls_handle,
|
||||
ddl_param.table_key_.tablet_id_,
|
||||
tablet_handle,
|
||||
ObMDSGetTabletMode::READ_ALL_COMMITED))) {
|
||||
LOG_WARN("get tablet failed", K(ret), K(ddl_param));
|
||||
} else if (OB_FAIL(tablet_handle.get_obj()->load_storage_schema(allocator, tablet_storage_schema))) {
|
||||
} else if (OB_FAIL(tablet.load_storage_schema(allocator, tablet_storage_schema))) {
|
||||
LOG_WARN("fail to load storage schema failed", K(ret));
|
||||
} else {
|
||||
const bool is_major_sstable = ddl_param.table_key_.is_major_sstable();
|
||||
const int64_t rebuild_seq = ls_handle.get_ls()->get_rebuild_seq();
|
||||
const int64_t snapshot_version = is_major_sstable ? max(ddl_param.snapshot_version_, tablet_handle.get_obj()->get_snapshot_version())
|
||||
: tablet_handle.get_obj()->get_snapshot_version();
|
||||
const int64_t multi_version_start = is_major_sstable ? max(ddl_param.snapshot_version_, tablet_handle.get_obj()->get_multi_version_start())
|
||||
const int64_t snapshot_version = is_major_sstable ? max(ddl_param.snapshot_version_, tablet.get_snapshot_version())
|
||||
: tablet.get_snapshot_version();
|
||||
const int64_t multi_version_start = is_major_sstable ? max(ddl_param.snapshot_version_, tablet.get_multi_version_start())
|
||||
: 0;
|
||||
ObTabletHandle new_tablet_handle;
|
||||
ObUpdateTableStoreParam table_store_param(&sstable,
|
||||
@ -821,7 +758,8 @@ int ObTabletDDLUtil::update_ddl_table_store(const ObTabletDDLParam &ddl_param,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTabletDDLUtil::compact_ddl_sstable(ObTableStoreIterator &ddl_sstable_iter,
|
||||
int ObTabletDDLUtil::compact_ddl_sstable(ObTablet &tablet,
|
||||
ObTableStoreIterator &ddl_sstable_iter,
|
||||
const ObITableReadInfo &read_info,
|
||||
const bool is_commit,
|
||||
const share::SCN &rec_scn,
|
||||
@ -867,7 +805,7 @@ int ObTabletDDLUtil::compact_ddl_sstable(ObTableStoreIterator &ddl_sstable_iter,
|
||||
if (TC_REACH_TIME_INTERVAL(10L * 1000L * 1000L)) {
|
||||
LOG_WARN("current ddl sstables not contain all data", K(ddl_sstable_iter), K(ddl_param));
|
||||
}
|
||||
} else if (OB_FAIL(meta_tree.init(ddl_param.ls_id_, ddl_param.table_key_, ddl_param.start_scn_, ddl_param.data_format_version_))) {
|
||||
} else if (OB_FAIL(meta_tree.init(tablet, ddl_param.table_key_, ddl_param.start_scn_, ddl_param.data_format_version_))) {
|
||||
LOG_WARN("init meta tree failed", K(ret), K(ddl_param));
|
||||
} else if (FALSE_IT(ddl_sstable_iter.resume())) {
|
||||
} else {
|
||||
@ -957,13 +895,14 @@ int ObTabletDDLUtil::compact_ddl_sstable(ObTableStoreIterator &ddl_sstable_iter,
|
||||
LOG_WARN("build sorted rowkey failed", K(ret));
|
||||
} else if (OB_FAIL(meta_tree.get_sorted_meta_array(sorted_metas))) {
|
||||
LOG_WARN("get sorted metas failed", K(ret));
|
||||
} else if (OB_FAIL(create_ddl_sstable(ddl_param,
|
||||
} else if (OB_FAIL(create_ddl_sstable(tablet,
|
||||
ddl_param,
|
||||
sorted_metas,
|
||||
static_cast<ObSSTable *>(first_ddl_sstable),
|
||||
allocator,
|
||||
sstable))) {
|
||||
LOG_WARN("create ddl sstable failed", K(ret));
|
||||
} else if (OB_FAIL(update_ddl_table_store(ddl_param, allocator, sstable))) {
|
||||
} else if (OB_FAIL(update_ddl_table_store(tablet, ddl_param, allocator, sstable))) {
|
||||
LOG_WARN("update ddl table store failed", K(ret));
|
||||
} else {
|
||||
LOG_INFO("compact ddl sstable success", K(ddl_param));
|
||||
|
||||
Reference in New Issue
Block a user