diff --git a/src/storage/ddl/ob_direct_insert_sstable_ctx_new.cpp b/src/storage/ddl/ob_direct_insert_sstable_ctx_new.cpp index 05a8194e85..baf3d185a0 100644 --- a/src/storage/ddl/ob_direct_insert_sstable_ctx_new.cpp +++ b/src/storage/ddl/ob_direct_insert_sstable_ctx_new.cpp @@ -550,31 +550,8 @@ int ObTenantDirectLoadMgr::calc_range( } else { LOG_WARN("get table mgr failed", K(ret), K(tablet_id)); } - } else { - ObStorageSchema *storage_schema = nullptr; - ObLSHandle ls_handle; - ObTabletHandle tablet_handle; - ObArenaAllocator arena_allocator("DIRECT_RESCAN", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()); - bool is_column_store = false; - if (OB_FAIL(MTL(ObLSService *)->get_ls(handle.get_obj()->get_ls_id(), ls_handle, ObLSGetMod::DDL_MOD))) { - LOG_WARN("failed to get log stream", K(ret), K(handle), "ls_id", handle.get_obj()->get_ls_id()); - } else if (OB_FAIL(ObDDLUtil::ddl_get_tablet(ls_handle, - tablet_id, - tablet_handle, - ObMDSGetTabletMode::READ_ALL_COMMITED))) { - LOG_WARN("failed to get tablet", K(ret), "ls_id", handle.get_obj()->get_ls_id(), K(tablet_id)); - } else if (OB_FAIL(tablet_handle.get_obj()->load_storage_schema(arena_allocator, storage_schema))) { - LOG_WARN("load storage schema failed", K(ret), K(tablet_id)); - } else if (OB_FAIL(ObCODDLUtil::need_column_group_store(*storage_schema, is_column_store))) { - LOG_WARN("fail to check need column group", K(ret)); - } else if (OB_UNLIKELY(!is_column_store)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("table withou cg", K(ret)); - } else if (OB_FAIL(handle.get_obj()->calc_range(storage_schema, tablet_handle.get_obj()->get_rowkey_read_info().get_datum_utils(), thread_cnt))) { - LOG_WARN("calc range failed", K(ret), K(thread_cnt)); - } - ObTabletObjLoadHelper::free(arena_allocator, storage_schema); - arena_allocator.reset(); + } else if (OB_FAIL(handle.get_obj()->calc_range(thread_cnt))) { + LOG_WARN("calc range failed", K(ret), K(thread_cnt)); } return ret; } @@ -1171,7 +1148,8 @@ private: ObTabletDirectLoadBuildCtx::ObTabletDirectLoadBuildCtx() : allocator_(), slice_writer_allocator_(), build_param_(), slice_mgr_map_(), data_block_desc_(true/*is ddl*/), index_builder_(nullptr), - column_stat_array_(), sorted_slice_writers_(), sorted_slices_idx_(), is_task_end_(false), task_finish_count_(0), fill_column_group_finish_count_(0) + column_stat_array_(), sorted_slice_writers_(), sorted_slices_idx_(), is_task_end_(false), task_finish_count_(0), fill_column_group_finish_count_(0), + commit_scn_(), schema_allocator_("TDL_schema", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()), storage_schema_(nullptr) { column_stat_array_.set_attr(ObMemAttr(MTL_ID(), "TblDL_CSA")); sorted_slice_writers_.set_attr(ObMemAttr(MTL_ID(), "TblDL_SSR")); @@ -1186,6 +1164,10 @@ ObTabletDirectLoadBuildCtx::~ObTabletDirectLoadBuildCtx() allocator_.free(index_builder_); index_builder_ = nullptr; } + ObTabletObjLoadHelper::free(schema_allocator_, storage_schema_); + storage_schema_ = nullptr; + schema_allocator_.reset(); + commit_scn_.reset(); for (int64_t i = 0; i < column_stat_array_.count(); i++) { ObOptColumnStat *col_stat = column_stat_array_.at(i); col_stat->~ObOptColumnStat(); @@ -1220,7 +1202,7 @@ void ObTabletDirectLoadBuildCtx::reset_slice_ctx_on_demand() ObTabletDirectLoadMgr::ObTabletDirectLoadMgr() : is_inited_(false), is_schema_item_ready_(false), ls_id_(), tablet_id_(), table_key_(), data_format_version_(0), lock_(), ref_cnt_(0), direct_load_type_(ObDirectLoadType::DIRECT_LOAD_INVALID), sqc_build_ctx_(), - column_items_(), lob_column_idxs_(), lob_col_types_(), tablet_handle_(), schema_item_(), dir_id_(0) + column_items_(), lob_column_idxs_(), lob_col_types_(), schema_item_(), dir_id_(0) { column_items_.set_attr(ObMemAttr(MTL_ID(), "DL_schema")); lob_column_idxs_.set_attr(ObMemAttr(MTL_ID(), "DL_schema")); @@ -1241,7 +1223,6 @@ ObTabletDirectLoadMgr::~ObTabletDirectLoadMgr() column_items_.reset(); lob_column_idxs_.reset(); lob_col_types_.reset(); - tablet_handle_.reset(); schema_item_.reset(); is_schema_item_ready_ = false; } @@ -1261,6 +1242,7 @@ int ObTabletDirectLoadMgr::update( const int64_t memory_limit = 1024L * 1024L * 1024L * 10L; // 10GB ObLSService *ls_service = nullptr; ObLSHandle ls_handle; + ObTabletHandle tablet_handle; if (OB_UNLIKELY(!build_param.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arg", K(ret), K(build_param)); @@ -1271,9 +1253,14 @@ int ObTabletDirectLoadMgr::update( LOG_WARN("failed to get log stream", K(ret), K(build_param)); } else if (OB_FAIL(ObDDLUtil::ddl_get_tablet(ls_handle, build_param.common_param_.tablet_id_, - tablet_handle_, + tablet_handle, ObMDSGetTabletMode::READ_WITHOUT_CHECK))) { LOG_WARN("get tablet handle failed", K(ret), K(build_param)); + } else if (OB_FAIL(prepare_storage_schema(tablet_handle))) { + LOG_WARN("fail to prepare storage schema", K(ret), K(tablet_handle)); + } else if (OB_ISNULL(sqc_build_ctx_.storage_schema_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("null storage schema", K(ret)); } else if (nullptr != lob_tablet_mgr) { // has lob ObTabletDirectLoadInsertParam lob_param; @@ -1282,7 +1269,7 @@ int ObTabletDirectLoadMgr::update( const ObTableSchema *table_schema = nullptr; if (OB_FAIL(lob_param.assign(build_param))) { LOG_WARN("assign lob parameter failed", K(ret)); - } else if (OB_FAIL(tablet_handle_.get_obj()->ObITabletMdsInterface::get_ddl_data(share::SCN::max_scn(), ddl_data))) { + } else if (OB_FAIL(tablet_handle.get_obj()->ObITabletMdsInterface::get_ddl_data(share::SCN::max_scn(), ddl_data))) { LOG_WARN("get ddl data failed", K(ret)); } else if (OB_FALSE_IT(lob_param.common_param_.tablet_id_ = ddl_data.lob_meta_tablet_id_)) { } else if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_schema_guard( @@ -1495,18 +1482,12 @@ int ObTabletDirectLoadMgr::fill_sstable_slice( ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected err", K(ret), KPC(this)); } else if (is_full_direct_load(direct_load_type_)) { - if (OB_UNLIKELY(!tablet_handle_.is_valid())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("invalid tablet handle", K(ret), K(tablet_handle_)); - } else { - commit_scn = get_commit_scn(tablet_handle_.get_obj()->get_tablet_meta()); - if (commit_scn.is_valid_and_not_min()) { - ret = OB_TRANS_COMMITED; - FLOG_INFO("already committed", K(commit_scn), KPC(this)); - } else if (start_scn != get_start_scn()) { - ret = OB_TASK_EXPIRED; - LOG_WARN("task expired", K(ret), "start_scn of current execution", start_scn, "start_scn latest", get_start_scn()); - } + if (sqc_build_ctx_.commit_scn_.is_valid_and_not_min()) { + ret = OB_TRANS_COMMITED; + FLOG_INFO("already committed", K(commit_scn), KPC(this)); + } else if (start_scn != get_start_scn()) { + ret = OB_TASK_EXPIRED; + LOG_WARN("task expired", K(ret), "start_scn of current execution", start_scn, "start_scn latest", get_start_scn()); } } if (OB_SUCC(ret)) { @@ -1517,7 +1498,7 @@ int ObTabletDirectLoadMgr::fill_sstable_slice( ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected err", K(ret), K(slice_info), K(is_schema_item_ready_)); } else if (OB_FAIL(slice_writer->fill_sstable_slice(start_scn, sqc_build_ctx_.build_param_.runtime_only_param_.table_id_, tablet_id_, - tablet_handle_, iter, schema_item_, direct_load_type_, column_items_, dir_id_, + sqc_build_ctx_.storage_schema_, iter, schema_item_, direct_load_type_, column_items_, dir_id_, sqc_build_ctx_.build_param_.runtime_only_param_.parallel_, affected_rows, insert_monitor))) { LOG_WARN("fill sstable slice failed", K(ret), KPC(this)); } @@ -1555,8 +1536,7 @@ int ObTabletDirectLoadMgr::fill_lob_sstable_slice( ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(slice_info), "lob_direct_load_mgr is valid", lob_mgr_handle_.is_valid(), KPC(this), K(start_scn)); } else if (is_full_direct_load(direct_load_type_)) { - commit_scn = get_commit_scn(tablet_handle_.get_obj()->get_tablet_meta()); - if (commit_scn.is_valid_and_not_min()) { + if (sqc_build_ctx_.commit_scn_.is_valid_and_not_min()) { ret = OB_TRANS_COMMITED; FLOG_INFO("already committed", K(commit_scn), KPC(this)); } else if (start_scn != get_start_scn()) { @@ -1616,10 +1596,9 @@ int ObTabletDirectLoadMgr::fill_lob_sstable_slice( ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(slice_info), "lob_direct_load_mgr is valid", lob_mgr_handle_.is_valid(), KPC(this), K(start_scn)); } else if (is_full_direct_load(direct_load_type_)) { - commit_scn = get_commit_scn(tablet_handle_.get_obj()->get_tablet_meta()); - if (commit_scn.is_valid_and_not_min()) { + if (sqc_build_ctx_.commit_scn_.is_valid_and_not_min()) { ret = OB_TRANS_COMMITED; - FLOG_INFO("already committed", K(commit_scn), KPC(this)); + FLOG_INFO("already committed", K(sqc_build_ctx_.commit_scn_), KPC(this)); } else if (start_scn != get_start_scn()) { ret = OB_TASK_EXPIRED; LOG_WARN("task expired", K(ret), "start_scn of current execution", start_scn, "start_scn latest", get_start_scn()); @@ -1758,14 +1737,27 @@ public: int ret_code_; }; -int ObTabletDirectLoadMgr::calc_range(const ObStorageSchema *storage_schema, const ObStorageDatumUtils &datum_utils, const int64_t thread_cnt) +int ObTabletDirectLoadMgr::calc_range(const int64_t thread_cnt) { int ret = OB_SUCCESS; ObArray sorted_slices; sorted_slices.set_attr(ObMemAttr(MTL_ID(), "DL_SortS_tmp")); - if (OB_UNLIKELY(nullptr == storage_schema || !datum_utils.is_valid())) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", K(ret), KP(storage_schema), K(datum_utils)); + ObLSService *ls_service = nullptr; + ObLSHandle ls_handle; + ObTabletHandle tablet_handle; + if (OB_UNLIKELY(nullptr == sqc_build_ctx_.storage_schema_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("invalid argument", K(ret), KP(sqc_build_ctx_.storage_schema_)); + } else if (OB_ISNULL(ls_service = MTL(ObLSService *))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected err", K(ret), K(MTL_ID())); + } else if (OB_FAIL(ls_service->get_ls(ls_id_, ls_handle, ObLSGetMod::DDL_MOD))) { + LOG_WARN("failed to get log stream", K(ret), K(ls_id_)); + } else if (OB_FAIL(ObDDLUtil::ddl_get_tablet(ls_handle, + tablet_id_, + tablet_handle, + ObMDSGetTabletMode::READ_WITHOUT_CHECK))) { + LOG_WARN("get tablet handle failed", K(ret), K(tablet_id_)); } else if (OB_FAIL(sorted_slices.reserve(sqc_build_ctx_.slice_mgr_map_.size()))) { LOG_WARN("reserve slice array failed", K(ret), K(sqc_build_ctx_.slice_mgr_map_.size())); } else { @@ -1780,7 +1772,7 @@ int ObTabletDirectLoadMgr::calc_range(const ObStorageSchema *storage_schema, con } } if (OB_SUCC(ret)) { - SliceEndkeyCompareFunctor cmp(datum_utils); + SliceEndkeyCompareFunctor cmp(tablet_handle.get_obj()->get_rowkey_read_info().get_datum_utils()); std::sort(sorted_slices.begin(), sorted_slices.end(), cmp); ret = cmp.ret_code_; if (OB_FAIL(ret)) { @@ -1795,7 +1787,7 @@ int ObTabletDirectLoadMgr::calc_range(const ObStorageSchema *storage_schema, con } if (OB_SUCC(ret) && is_data_direct_load(direct_load_type_)) { bool is_column_store = false; - if (OB_FAIL(ObCODDLUtil::need_column_group_store(*storage_schema, is_column_store))) { + if (OB_FAIL(ObCODDLUtil::need_column_group_store(*sqc_build_ctx_.storage_schema_, is_column_store))) { LOG_WARN("fail to check need column group", K(ret)); } else if (is_column_store) { if (thread_cnt <= 0) { @@ -1929,19 +1921,11 @@ int ObTabletDirectLoadMgr::close_sstable_slice( } } LOG_INFO("inc task finish count", K(tablet_id_), K(execution_id), K(task_finish_count), K(sqc_build_ctx_.task_total_cnt_)); - ObTablet *tablet = nullptr; - ObStorageSchema *storage_schema = nullptr; - ObArenaAllocator arena_allocator("DDL_RESCAN", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()); bool is_column_group_store = false; - if (OB_UNLIKELY(!tablet_handle_.is_valid())) { + if (OB_ISNULL(sqc_build_ctx_.storage_schema_)) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("invalid tablet handle", K(ret), K(tablet_handle_)); - } else if (OB_ISNULL(tablet = tablet_handle_.get_obj())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("tablet is null", K(ret), K(ls_id_), K(tablet_id_)); - } else if (OB_FAIL(tablet->load_storage_schema(arena_allocator, storage_schema))) { - LOG_WARN("load storage schema failed", K(ret), K(tablet_id_)); - } else if (OB_FAIL(ObCODDLUtil::need_column_group_store(*storage_schema, is_column_group_store))) { + LOG_WARN("invalid tablet handle", K(ret), KP(sqc_build_ctx_.storage_schema_)); + } else if (OB_FAIL(ObCODDLUtil::need_column_group_store(*sqc_build_ctx_.storage_schema_, is_column_group_store))) { LOG_WARN("fail to check is column group store", K(ret)); } else if (!is_column_group_store) { if (task_finish_count >= sqc_build_ctx_.task_total_cnt_) { @@ -1954,15 +1938,15 @@ int ObTabletDirectLoadMgr::close_sstable_slice( if (task_finish_count < sqc_build_ctx_.task_total_cnt_) { if (OB_FAIL(wait_notify(slice_writer, start_scn))) { LOG_WARN("wait notify failed", K(ret)); - } else if (OB_FAIL(slice_writer->fill_column_group(storage_schema, start_scn, insert_monitor))) { + } else if (OB_FAIL(slice_writer->fill_column_group(sqc_build_ctx_.storage_schema_, start_scn, insert_monitor))) { LOG_WARN("slice writer fill column group failed", K(ret)); } } else { - if (OB_FAIL(calc_range(storage_schema, tablet->get_rowkey_read_info().get_datum_utils(), 0))) { + if (OB_FAIL(calc_range(0))) { LOG_WARN("calc range failed", K(ret)); } else if (OB_FAIL(notify_all())) { LOG_WARN("notify all failed", K(ret)); - } else if (OB_FAIL(slice_writer->fill_column_group(storage_schema, start_scn, insert_monitor))) { + } else if (OB_FAIL(slice_writer->fill_column_group(sqc_build_ctx_.storage_schema_, start_scn, insert_monitor))) { LOG_WARN("slice fill column group failed", K(ret)); } } @@ -1983,7 +1967,6 @@ int ObTabletDirectLoadMgr::close_sstable_slice( } } } - ObTabletObjLoadHelper::free(arena_allocator, storage_schema); } if (OB_NOT_NULL(slice_writer)) { if (OB_SUCC(ret) && is_data_direct_load(direct_load_type_) && slice_writer->need_column_store()) { @@ -2047,23 +2030,13 @@ int ObTabletDirectLoadMgr::fill_column_group(const int64_t thread_cnt, const int const int64_t last_idx = sqc_build_ctx_.sorted_slices_idx_.at(thread_id).last_idx_; ObArenaAllocator arena_allocator("DIRECT_RESCAN", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()); - ObTablet *tablet = nullptr; - ObStorageSchema *storage_schema = nullptr; int64_t fill_cg_finish_count = -1; int64_t row_cnt = 0; - if (OB_UNLIKELY(!tablet_handle_.is_valid())) { + if (OB_ISNULL(sqc_build_ctx_.storage_schema_)) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("invalid tablet handle", K(ret), K(tablet_handle_)); - } else if (OB_ISNULL(tablet = tablet_handle_.get_obj())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("tablet is null", K(ret), K(ls_id_), K(tablet_id_)); - } else if (OB_FAIL(tablet->load_storage_schema(arena_allocator, storage_schema))) { - LOG_WARN("load storage schema failed", K(ret), K(tablet_id_)); - } else if (OB_UNLIKELY(nullptr == storage_schema)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("invalid storage_schema", K(ret), KP(storage_schema)); + LOG_WARN("invalid tablet handle", K(ret), KP(sqc_build_ctx_.storage_schema_)); } else { - const ObIArray &cg_schemas = storage_schema->get_column_groups(); + const ObIArray &cg_schemas = sqc_build_ctx_.storage_schema_->get_column_groups(); FLOG_INFO("[DIRECT_LOAD_FILL_CG] start fill cg", "tablet_id", tablet_id_, "cg_cnt", cg_schemas.count(), @@ -2074,7 +2047,7 @@ int ObTabletDirectLoadMgr::fill_column_group(const int64_t thread_cnt, const int if (OB_ISNULL(cur_writer = OB_NEWx(ObCOSliceWriter, &arena_allocator))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("allocate memory for co writer failed", K(ret)); - } else if (OB_FAIL(fill_aggregated_column_group(start_idx, last_idx, storage_schema, cur_writer, fill_cg_finish_count, row_cnt))) { + } else if (OB_FAIL(fill_aggregated_column_group(start_idx, last_idx, sqc_build_ctx_.storage_schema_, cur_writer, fill_cg_finish_count, row_cnt))) { LOG_WARN("fail to fill aggregated cg", K(ret), KPC(cur_writer)); } // free writer anyhow @@ -2083,7 +2056,6 @@ int ObTabletDirectLoadMgr::fill_column_group(const int64_t thread_cnt, const int arena_allocator.free(cur_writer); cur_writer = nullptr; } - ObTabletObjLoadHelper::free(arena_allocator, storage_schema); //arena cannot free arena_allocator.reset(); // after finish all slice, free slice_writer @@ -2275,6 +2247,20 @@ void ObTabletDirectLoadMgr::unlock(const uint32_t tid) } } +int ObTabletDirectLoadMgr::prepare_storage_schema(ObTabletHandle &tablet_handle) +{ + int ret = OB_SUCCESS; + sqc_build_ctx_.storage_schema_ = nullptr; + if (OB_UNLIKELY(!tablet_handle.is_valid())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("invalid tablet handle", K(ret), K(tablet_handle)); + } else if (OB_FAIL(tablet_handle.get_obj()->load_storage_schema(sqc_build_ctx_.schema_allocator_, sqc_build_ctx_.storage_schema_))) { + LOG_WARN("load storage schema failed", K(ret)); + } else { + sqc_build_ctx_.commit_scn_ = get_commit_scn(tablet_handle.get_obj()->get_tablet_meta()); + } + return ret; +} ObTabletFullDirectLoadMgr::ObTabletFullDirectLoadMgr() : ObTabletDirectLoadMgr(), start_scn_(share::SCN::min_scn()), @@ -2291,26 +2277,25 @@ int ObTabletFullDirectLoadMgr::update( const ObTabletDirectLoadInsertParam &build_param) { int ret = OB_SUCCESS; - ObStorageSchema *storage_schema = nullptr; - ObArenaAllocator arena_allocator("dl_mgr_update", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()); ObLatchWGuard guard(lock_, ObLatchIds::TABLET_DIRECT_LOAD_MGR_LOCK); if (OB_UNLIKELY(!build_param.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arg", K(ret), K(build_param)); } else if (OB_FAIL(ObTabletDirectLoadMgr::update(lob_tablet_mgr, build_param))) { LOG_WARN("init failed", K(ret), K(build_param)); - } else if (OB_FAIL(tablet_handle_.get_obj()->load_storage_schema(arena_allocator, storage_schema))) { - LOG_WARN("load storage schema failed", K(ret)); } else { table_key_.reset(); table_key_.tablet_id_ = build_param.common_param_.tablet_id_; bool is_column_group_store = false; - if (OB_FAIL(ObCODDLUtil::need_column_group_store(*storage_schema, is_column_group_store))) { + if (OB_ISNULL(sqc_build_ctx_.storage_schema_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("null storage schema", K(ret)); + } else if (OB_FAIL(ObCODDLUtil::need_column_group_store(*sqc_build_ctx_.storage_schema_, is_column_group_store))) { LOG_WARN("fail to get schema is column group store", K(ret)); } else if (is_column_group_store) { table_key_.table_type_ = ObITable::COLUMN_ORIENTED_SSTABLE; int64_t base_cg_idx = -1; - if (OB_FAIL(ObCODDLUtil::get_base_cg_idx(storage_schema, base_cg_idx))) { + if (OB_FAIL(ObCODDLUtil::get_base_cg_idx(sqc_build_ctx_.storage_schema_, base_cg_idx))) { LOG_WARN("get base cg idx failed", K(ret)); } else { table_key_.column_group_idx_ = static_cast(base_cg_idx); @@ -2320,7 +2305,6 @@ int ObTabletFullDirectLoadMgr::update( } table_key_.version_range_.snapshot_version_ = build_param.common_param_.read_snapshot_; } - ObTabletObjLoadHelper::free(arena_allocator, storage_schema); LOG_INFO("init tablet direct load mgr finished", K(ret), K(build_param), KPC(this)); return ret; } diff --git a/src/storage/ddl/ob_direct_insert_sstable_ctx_new.h b/src/storage/ddl/ob_direct_insert_sstable_ctx_new.h index 0700cafe78..65cbc12462 100644 --- a/src/storage/ddl/ob_direct_insert_sstable_ctx_new.h +++ b/src/storage/ddl/ob_direct_insert_sstable_ctx_new.h @@ -274,7 +274,7 @@ public: return common::murmurhash(&slice_id, sizeof(slice_id), 0L); } void reset_slice_ctx_on_demand(); - TO_STRING_KV(K_(build_param), K_(is_task_end), K_(task_finish_count), K_(task_total_cnt), K_(sorted_slices_idx)); + TO_STRING_KV(K_(build_param), K_(is_task_end), K_(task_finish_count), K_(task_total_cnt), K_(sorted_slices_idx), K_(commit_scn), KPC(storage_schema_)); struct AggregatedCGInfo final { public: AggregatedCGInfo() @@ -303,6 +303,9 @@ public: int64_t task_finish_count_; // reach the parallel slice cnt, means the tablet data finished. int64_t task_total_cnt_; // parallelism of the PX. int64_t fill_column_group_finish_count_; + share::SCN commit_scn_; + ObArenaAllocator schema_allocator_; + ObStorageSchema *storage_schema_; }; class ObTabletDirectLoadMgr @@ -378,9 +381,10 @@ public: virtual int wait_notify(const ObDirectLoadSliceWriter *slice_writer, const share::SCN &start_scn); int fill_column_group(const int64_t thread_cnt, const int64_t thread_id); virtual int notify_all(); - virtual int calc_range(const ObStorageSchema *storage_schema, const blocksstable::ObStorageDatumUtils &datum_utils, const int64_t thread_cnt); + virtual int calc_range(const int64_t thread_cnt); int calc_cg_range(ObArray &sorted_slices, const int64_t thread_cnt); const ObIArray &get_column_info() const { return column_items_; }; + int prepare_storage_schema(ObTabletHandle &tablet_handle); VIRTUAL_TO_STRING_KV(K_(is_inited), K_(is_schema_item_ready), K_(ls_id), K_(tablet_id), K_(table_key), K_(data_format_version), K_(ref_cnt), K_(direct_load_type), K_(sqc_build_ctx), KPC(lob_mgr_handle_.get_obj()), K_(schema_item), K_(column_items), K_(lob_column_idxs)); @@ -424,7 +428,6 @@ protected: ObArray column_items_; ObArray lob_column_idxs_; ObArray lob_col_types_; - ObTabletHandle tablet_handle_; ObTableSchemaItem schema_item_; int64_t dir_id_; }; diff --git a/src/storage/ddl/ob_direct_load_struct.cpp b/src/storage/ddl/ob_direct_load_struct.cpp index aa9913bd71..e11fc81a12 100644 --- a/src/storage/ddl/ob_direct_load_struct.cpp +++ b/src/storage/ddl/ob_direct_load_struct.cpp @@ -420,7 +420,7 @@ ObTabletDDLParam::~ObTabletDDLParam() } -int ObChunkSliceStore::init(const int64_t rowkey_column_count, ObTabletHandle &tablet_handle, +int ObChunkSliceStore::init(const int64_t rowkey_column_count, const ObStorageSchema *storage_schema, ObArenaAllocator &allocator, const ObIArray &col_array, const int64_t dir_id, const int64_t parallelism) { @@ -428,10 +428,13 @@ int ObChunkSliceStore::init(const int64_t rowkey_column_count, ObTabletHandle &t if (OB_UNLIKELY(is_inited_)) { ret = OB_INIT_TWICE; LOG_WARN("init twice", K(ret)); + } else if (OB_ISNULL(storage_schema)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("null schema", K(ret), K(*this)); } else if (OB_UNLIKELY(rowkey_column_count <= 0)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalida argument", K(ret), K(rowkey_column_count)); - } else if (OB_FAIL(prepare_datum_stores(MTL_ID(), tablet_handle, allocator, col_array, dir_id, parallelism))) { + } else if (OB_FAIL(prepare_datum_stores(MTL_ID(), storage_schema, allocator, col_array, dir_id, parallelism))) { LOG_WARN("fail to prepare datum stores"); } else { arena_allocator_ = &allocator; @@ -469,86 +472,77 @@ int64_t ObChunkSliceStore::calc_chunk_limit(const ObStorageColumnGroupSchema &cg return ((cg_schema.column_cnt_ / basic_column_cnt) + 1) * basic_chunk_memory_limit; } -int ObChunkSliceStore::prepare_datum_stores(const uint64_t tenant_id, ObTabletHandle &tablet_handle, ObIAllocator &allocator, +int ObChunkSliceStore::prepare_datum_stores(const uint64_t tenant_id, const ObStorageSchema *storage_schema, ObIAllocator &allocator, const ObIArray &col_array, const int64_t dir_id, const int64_t parallelism) { int ret = OB_SUCCESS; const int64_t chunk_mem_limit = 64 * 1024L; // 64K ObCompactStore *datum_store = nullptr; void *buf = nullptr; - if (OB_UNLIKELY(tenant_id <= 0)) { + if (OB_UNLIKELY(tenant_id <= 0 || nullptr == storage_schema)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", K(ret), K(tenant_id)); + LOG_WARN("invalid argument", K(ret), K(tenant_id), KP(storage_schema)); } else { - - ObStorageSchema *storage_schema = nullptr; - if (OB_UNLIKELY(!tablet_handle.is_valid())) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid tablet handle", K(ret), K(tablet_handle)); - } else if (OB_FAIL(tablet_handle.get_obj()->load_storage_schema(allocator, storage_schema))) { - LOG_WARN("load storage schema failed", K(ret), K(tablet_handle)); - } else { - const ObIArray &cg_schemas = storage_schema->get_column_groups(); - for (int64_t i = 0; OB_SUCC(ret) && i < cg_schemas.count(); ++i) { - const ObStorageColumnGroupSchema &cur_cg_schema = cg_schemas.at(i); - ObCompressorType compressor_type = cur_cg_schema.compressor_type_; - compressor_type = NONE_COMPRESSOR == compressor_type ? (CS_ENCODING_ROW_STORE == cur_cg_schema.row_store_type_ ? ZSTD_1_3_8_COMPRESSOR : NONE_COMPRESSOR) : compressor_type; - if (OB_FAIL(ObDDLUtil::get_temp_store_compress_type(compressor_type, - parallelism, - compressor_type))) { - LOG_WARN("fail to get temp store compress type", K(ret)); - } - if (cur_cg_schema.is_rowkey_column_group() || cur_cg_schema.is_all_column_group()) { - target_store_idx_ = i; - } - if (OB_ISNULL(buf = allocator.alloc(sizeof(ObCompactStore)))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("allocate memory failed", K(ret)); - } else { - datum_store = new (buf) ObCompactStore(); - ObArray cur_column_items; - cur_column_items.set_attr(ObMemAttr(tenant_id, "tmp_cg_item")); - for (int64_t j = 0; OB_SUCC(ret) && j < cur_cg_schema.column_cnt_; ++j) { - int64_t column_idx = cur_cg_schema.column_idxs_ ? cur_cg_schema.column_idxs_[j] : j; // all_cg column_idxs_ = null - if (column_idx >= col_array.count()) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("invalid column idex", K(ret), K(column_idx), K(col_array.count()), K(i), K(cur_cg_schema)); - } else if (OB_FAIL(cur_column_items.push_back(col_array.at(column_idx)))) { - LOG_WARN("fail to push_back col_item", K(ret)); - } - } - - if (OB_FAIL(ret)) { - } else if (OB_FAIL(datum_store->init(chunk_mem_limit, cur_column_items, tenant_id, ObCtxIds::DEFAULT_CTX_ID, - "DL_SLICE_STORE", true/*enable_dump*/, 0, false/*disable truncate*/, - compressor_type))) { - LOG_WARN("failed to init chunk datum store", K(ret)); - } else { - datum_store->set_dir_id(dir_id); - datum_store->get_inner_allocator().set_tenant_id(tenant_id); - LOG_INFO("set dir id", K(dir_id)); - } - if (OB_SUCC(ret)) { - if (OB_FAIL(datum_stores_.push_back(datum_store))) { - LOG_WARN("fail to push back datum_store", K(ret)); - } - } - if (OB_FAIL(ret)) { - if (OB_NOT_NULL(datum_store)) { - datum_store->~ObCompactStore(); - allocator.free(datum_store); - datum_store = nullptr; - } - } - } + const ObIArray &cg_schemas = storage_schema->get_column_groups(); + for (int64_t i = 0; OB_SUCC(ret) && i < cg_schemas.count(); ++i) { + const ObStorageColumnGroupSchema &cur_cg_schema = cg_schemas.at(i); + ObCompressorType compressor_type = cur_cg_schema.compressor_type_; + compressor_type = NONE_COMPRESSOR == compressor_type ? (CS_ENCODING_ROW_STORE == cur_cg_schema.row_store_type_ ? ZSTD_1_3_8_COMPRESSOR : NONE_COMPRESSOR) : compressor_type; + if (OB_FAIL(ObDDLUtil::get_temp_store_compress_type(compressor_type, + parallelism, + compressor_type))) { + LOG_WARN("fail to get temp store compress type", K(ret)); } - if (OB_SUCC(ret)) { - if (OB_FAIL(cg_schemas_.assign(cg_schemas))) { - LOG_WARN("fail to copy cg schemas", K(ret)); + if (cur_cg_schema.is_rowkey_column_group() || cur_cg_schema.is_all_column_group()) { + target_store_idx_ = i; + } + if (OB_FAIL(ret)) { + } else if (OB_ISNULL(buf = allocator.alloc(sizeof(ObCompactStore)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("allocate memory failed", K(ret)); + } else { + datum_store = new (buf) ObCompactStore(); + ObArray cur_column_items; + cur_column_items.set_attr(ObMemAttr(tenant_id, "tmp_cg_item")); + for (int64_t j = 0; OB_SUCC(ret) && j < cur_cg_schema.column_cnt_; ++j) { + int64_t column_idx = cur_cg_schema.column_idxs_ ? cur_cg_schema.column_idxs_[j] : j; // all_cg column_idxs_ = null + if (column_idx >= col_array.count()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("invalid column idex", K(ret), K(column_idx), K(col_array.count()), K(i), K(cur_cg_schema)); + } else if (OB_FAIL(cur_column_items.push_back(col_array.at(column_idx)))) { + LOG_WARN("fail to push_back col_item", K(ret)); + } + } + + if (OB_FAIL(ret)) { + } else if (OB_FAIL(datum_store->init(chunk_mem_limit, cur_column_items, tenant_id, ObCtxIds::DEFAULT_CTX_ID, + "DL_SLICE_STORE", true/*enable_dump*/, 0, false/*disable truncate*/, + compressor_type))) { + LOG_WARN("failed to init chunk datum store", K(ret)); + } else { + datum_store->set_dir_id(dir_id); + datum_store->get_inner_allocator().set_tenant_id(tenant_id); + LOG_INFO("set dir id", K(dir_id)); + } + if (OB_SUCC(ret)) { + if (OB_FAIL(datum_stores_.push_back(datum_store))) { + LOG_WARN("fail to push back datum_store", K(ret)); + } + } + if (OB_FAIL(ret)) { + if (OB_NOT_NULL(datum_store)) { + datum_store->~ObCompactStore(); + allocator.free(datum_store); + datum_store = nullptr; + } } } } - ObTabletObjLoadHelper::free(allocator, storage_schema); + if (OB_SUCC(ret)) { + if (OB_FAIL(cg_schemas_.assign(cg_schemas))) { + LOG_WARN("fail to copy cg schemas", K(ret)); + } + } } LOG_INFO("init ObChunkSliceStore", K(*this)); return ret; @@ -766,7 +760,7 @@ int ObDirectLoadSliceWriter::prepare_slice_store_if_need( const bool is_column_store, const int64_t dir_id, const int64_t parallelism, - ObTabletHandle &tablet_handle, + const ObStorageSchema *storage_schema, const SCN &start_scn) { int ret = OB_SUCCESS; @@ -778,15 +772,15 @@ int ObDirectLoadSliceWriter::prepare_slice_store_if_need( } else if (is_full_direct_load(tablet_direct_load_mgr_->get_direct_load_type()) && is_column_store) { need_column_store_ = true; ObChunkSliceStore *chunk_slice_store = nullptr; - if (OB_UNLIKELY(!tablet_handle.is_valid())) { + if (OB_ISNULL(storage_schema)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid tablet handle", K(ret), K(tablet_handle)); + LOG_WARN("null schema", K(ret), K(*this)); } else if (OB_ISNULL(chunk_slice_store = OB_NEWx(ObChunkSliceStore, &allocator_))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("allocate memory for chunk slice store failed", K(ret)); } else if (OB_FAIL(chunk_slice_store->init(schema_rowkey_column_num + ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt(), - tablet_handle, allocator_, tablet_direct_load_mgr_->get_column_info(), dir_id, parallelism))) { - LOG_WARN("init chunk slice store failed", K(ret)); + storage_schema, allocator_, tablet_direct_load_mgr_->get_column_info(), dir_id, parallelism))) { + LOG_WARN("init chunk slice store failed", K(ret), KPC(storage_schema)); } else { slice_store_ = chunk_slice_store; } @@ -979,7 +973,6 @@ int ObDirectLoadSliceWriter::fill_lob_into_macro_block( info.trans_id_, info.seq_no_, timeout_ts, lob_inrow_threshold, info.src_tenant_id_, row_iter))) { LOG_WARN("fail to prepare iters", K(ret), KP(row_iter), K(datum)); } else { - ObTabletHandle unused_tablet_handle; //lob no need to get storageschema with handle while (OB_SUCC(ret)) { const blocksstable::ObDatumRow *cur_row = nullptr; if (OB_FAIL(THIS_WORKER.check_status())) { @@ -1000,7 +993,7 @@ int ObDirectLoadSliceWriter::fill_lob_into_macro_block( } else if (OB_FAIL(check_null(false/*is_index_table*/, ObLobMetaUtil::LOB_META_SCHEMA_ROWKEY_COL_CNT, *cur_row))) { LOG_WARN("fail to check null value in row", KR(ret), KPC(cur_row)); } else if (OB_FAIL(prepare_slice_store_if_need(ObLobMetaUtil::LOB_META_SCHEMA_ROWKEY_COL_CNT, - false/*is_column_store*/, 1L/*unsued*/, 1L/*unused*/, unused_tablet_handle, start_scn))) { + false/*is_column_store*/, 1L/*unsued*/, 1L/*unused*/, nullptr /*storage_schema*/, start_scn))) { LOG_WARN("prepare macro block writer failed", K(ret)); } else if (OB_FAIL(slice_store_->append_row(*cur_row))) { LOG_WARN("macro block writer append row failed", K(ret), KPC(cur_row)); @@ -1029,7 +1022,7 @@ int ObDirectLoadSliceWriter::fill_sstable_slice( const SCN &start_scn, const uint64_t table_id, const ObTabletID &tablet_id, - ObTabletHandle &tablet_handle, + const ObStorageSchema *storage_schema, ObIStoreRowIterator *row_iter, const ObTableSchemaItem &schema_item, const ObDirectLoadType &direct_load_type, @@ -1045,6 +1038,9 @@ int ObDirectLoadSliceWriter::fill_sstable_slice( if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("ObDirectLoadSliceWriter not init", KR(ret), KP(this)); + } else if (OB_ISNULL(storage_schema)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("null schema", K(ret), K(*this)); } else { ObArenaAllocator arena("SliceW_sst", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()); const ObDataStoreDesc &data_desc = tablet_direct_load_mgr_->get_sqc_build_ctx().data_block_desc_.get_desc(); @@ -1086,7 +1082,7 @@ int ObDirectLoadSliceWriter::fill_sstable_slice( if (OB_FAIL(ret)) { } else if (OB_FAIL(check_null(schema_item.is_index_table_, schema_item.rowkey_column_num_, *cur_row))) { LOG_WARN("fail to check null value in row", KR(ret), KPC(cur_row)); - } else if (OB_FAIL(prepare_slice_store_if_need(schema_item.rowkey_column_num_, schema_item.is_column_store_, dir_id, parallelism, tablet_handle, start_scn))) { + } else if (OB_FAIL(prepare_slice_store_if_need(schema_item.rowkey_column_num_, schema_item.is_column_store_, dir_id, parallelism, storage_schema, start_scn))) { LOG_WARN("prepare macro block writer failed", K(ret)); } else if (OB_FAIL(slice_store_->append_row(*cur_row))) { if (is_full_direct_load_task && OB_ERR_PRIMARY_KEY_DUPLICATE == ret && schema_item.is_unique_index_) { diff --git a/src/storage/ddl/ob_direct_load_struct.h b/src/storage/ddl/ob_direct_load_struct.h index 4a3669de49..b8cd786f2a 100644 --- a/src/storage/ddl/ob_direct_load_struct.h +++ b/src/storage/ddl/ob_direct_load_struct.h @@ -419,7 +419,7 @@ public: datum_stores_.set_attr(ObMemAttr(MTL_ID(), "ChunkSlicStoreD")); } virtual ~ObChunkSliceStore() { reset(); } - int init(const int64_t rowkey_column_count, ObTabletHandle &tablet_handle, ObArenaAllocator &allocator, + int init(const int64_t rowkey_column_count, const ObStorageSchema *storage_schema, ObArenaAllocator &allocator, const ObIArray &col_schema, const int64_t dir_id, const int64_t parallelism); virtual int append_row(const blocksstable::ObDatumRow &datum_row) override; virtual int close() override; @@ -427,7 +427,7 @@ public: virtual int64_t get_row_count() const { return row_cnt_; } TO_STRING_KV(K(is_inited_), K(target_store_idx_), K(row_cnt_), KP(arena_allocator_), K(datum_stores_), K(endkey_), K(rowkey_column_count_), K(cg_schemas_)); private: - int prepare_datum_stores(const uint64_t tenant_id, ObTabletHandle &tablet_handle, ObIAllocator &allocator, + int prepare_datum_stores(const uint64_t tenant_id, const ObStorageSchema *storage_schema, ObIAllocator &allocator, const ObIArray &col_array, const int64_t dir_id, const int64_t parallelism); int64_t calc_chunk_limit(const ObStorageColumnGroupSchema &cg_schema); public: @@ -492,7 +492,7 @@ public: const share::SCN &start_scn, const uint64_t table_id, const ObTabletID &curr_tablet_id, - ObTabletHandle &tablet_handle, + const ObStorageSchema *storage_schema, ObIStoreRowIterator *row_iter, const ObTableSchemaItem &schema_item, const ObDirectLoadType &direct_load_type, @@ -559,7 +559,7 @@ private: const bool is_slice_store, const int64_t dir_id, const int64_t parallelism, - ObTabletHandle &tablet_handle, + const ObStorageSchema *storage_schema, const share::SCN &start_scn); int report_unique_key_dumplicated( const int ret_code,