diff --git a/mittest/mtlenv/storage/blocksstable/test_direct_load.cpp b/mittest/mtlenv/storage/blocksstable/test_direct_load.cpp index 69ddccead9..d1a81a478a 100644 --- a/mittest/mtlenv/storage/blocksstable/test_direct_load.cpp +++ b/mittest/mtlenv/storage/blocksstable/test_direct_load.cpp @@ -78,6 +78,107 @@ TEST_F(TestDirectLoad, init_ddl_table_store) } +TEST_F(TestDirectLoad, test_cg_aggregate) +{ + ObTabletFullDirectLoadMgr tablet_dl_mgr; + ObTabletDirectLoadInsertParam build_param; + build_param.common_param_.ls_id_ = ls_id_; + build_param.common_param_.tablet_id_ = tablet_id_; + build_param.common_param_.direct_load_type_ = ObDirectLoadType::DIRECT_LOAD_DDL; + build_param.common_param_.read_snapshot_ = SNAPSHOT_VERSION; + build_param.runtime_only_param_.task_cnt_ = 1; + build_param.runtime_only_param_.task_id_ = 1; + build_param.runtime_only_param_.table_id_ = TEST_TABLE_ID; + build_param.runtime_only_param_.schema_version_ = 1; + SCN ddl_start_scn; + ASSERT_EQ(OB_SUCCESS, ddl_start_scn.convert_from_ts(ObTimeUtility::current_time())); + ASSERT_EQ(OB_SUCCESS, tablet_dl_mgr.update(nullptr, build_param)); + tablet_dl_mgr.start_scn_ = ddl_start_scn; + tablet_dl_mgr.data_format_version_ = DATA_VERSION_4_0_0_0; + ASSERT_EQ(OB_SUCCESS, tablet_dl_mgr.init_ddl_table_store(ddl_start_scn, SNAPSHOT_VERSION, ddl_start_scn)); + + common::ObArenaAllocator allocator; + + ObArray sorted_slices; + for (int64_t i = 0; i < 3; ++i) { + ObDirectLoadSliceWriter *slice_writer = nullptr; + slice_writer = OB_NEWx(ObDirectLoadSliceWriter, (&allocator)); + ASSERT_NE(nullptr, slice_writer); + ASSERT_EQ(OB_SUCCESS, sorted_slices.push_back(slice_writer)); + } + + // case 1:one thread can handle the number of slices divided according to EACH_MACRO_MIN_ROW_CNT + for (int64_t i = 0; i < sorted_slices.count(); ++i) { + ASSERT_EQ(OB_SUCCESS, sorted_slices.at(i)->mock_chunk_store(ObTabletDirectLoadMgr::EACH_MACRO_MIN_ROW_CNT / 2 - 1)); + } + ASSERT_EQ(OB_SUCCESS, tablet_dl_mgr.calc_cg_range(sorted_slices, 2)); + ASSERT_EQ(1, tablet_dl_mgr.get_sqc_build_ctx().sorted_slices_idx_.count()); + for (int64_t i = 0; i < tablet_dl_mgr.get_sqc_build_ctx().sorted_slices_idx_.count(); ++i) { + const int64_t start_idx = tablet_dl_mgr.get_sqc_build_ctx().sorted_slices_idx_.at(i).start_idx_; + const int64_t last_idx = tablet_dl_mgr.get_sqc_build_ctx().sorted_slices_idx_.at(i).last_idx_; + STORAGE_LOG(INFO, "case1", K(start_idx), K(last_idx)); + ASSERT_EQ(start_idx, 0); + ASSERT_EQ(last_idx, sorted_slices.count()); + } + + // case 2:all threads can handle the number of slices divided according to EACH_MACRO_MIN_ROW_CNT + for (int64_t i = 0; i < sorted_slices.count(); ++i) { + ASSERT_EQ(OB_SUCCESS, sorted_slices.at(i)->mock_chunk_store(ObTabletDirectLoadMgr::EACH_MACRO_MIN_ROW_CNT / 2 + 1)); + } + ASSERT_EQ(OB_SUCCESS, tablet_dl_mgr.calc_cg_range(sorted_slices, 2)); + ASSERT_EQ(2, tablet_dl_mgr.get_sqc_build_ctx().sorted_slices_idx_.count()); + for (int64_t i = 0; i < tablet_dl_mgr.get_sqc_build_ctx().sorted_slices_idx_.count(); ++i) { + const int64_t start_idx = tablet_dl_mgr.get_sqc_build_ctx().sorted_slices_idx_.at(i).start_idx_; + const int64_t last_idx = tablet_dl_mgr.get_sqc_build_ctx().sorted_slices_idx_.at(i).last_idx_; + STORAGE_LOG(INFO, "case2", K(start_idx), K(last_idx)); + } + + // case 3:all threads cannot handle the number of slices divided according to EACH_MACRO_MIN_ROW_CNT + for (int64_t i = 0; i < sorted_slices.count(); ++i) { + ASSERT_EQ(OB_SUCCESS, sorted_slices.at(i)->mock_chunk_store(ObTabletDirectLoadMgr::EACH_MACRO_MIN_ROW_CNT + 1)); + } + ASSERT_EQ(OB_SUCCESS, tablet_dl_mgr.calc_cg_range(sorted_slices, 2)); + ASSERT_EQ(2, tablet_dl_mgr.get_sqc_build_ctx().sorted_slices_idx_.count()); + for (int64_t i = 0; i < tablet_dl_mgr.get_sqc_build_ctx().sorted_slices_idx_.count(); ++i) { + const int64_t start_idx = tablet_dl_mgr.get_sqc_build_ctx().sorted_slices_idx_.at(i).start_idx_; + const int64_t last_idx = tablet_dl_mgr.get_sqc_build_ctx().sorted_slices_idx_.at(i).last_idx_; + STORAGE_LOG(INFO, "case3", K(start_idx), K(last_idx)); + } + + for (int64_t i = 0; i < 2; ++i) { + ObDirectLoadSliceWriter *slice_writer = nullptr; + ASSERT_EQ(OB_SUCCESS, sorted_slices.pop_back(slice_writer)); + } + + // case 4 + for (int64_t i = 0; i < sorted_slices.count(); ++i) { + ASSERT_EQ(OB_SUCCESS, sorted_slices.at(i)->mock_chunk_store(ObTabletDirectLoadMgr::EACH_MACRO_MIN_ROW_CNT + 1)); + } + ASSERT_EQ(OB_SUCCESS, tablet_dl_mgr.calc_cg_range(sorted_slices, 2)); + ASSERT_EQ(1, tablet_dl_mgr.get_sqc_build_ctx().sorted_slices_idx_.count()); + for (int64_t i = 0; i < tablet_dl_mgr.get_sqc_build_ctx().sorted_slices_idx_.count(); ++i) { + const int64_t start_idx = tablet_dl_mgr.get_sqc_build_ctx().sorted_slices_idx_.at(i).start_idx_; + const int64_t last_idx = tablet_dl_mgr.get_sqc_build_ctx().sorted_slices_idx_.at(i).last_idx_; + STORAGE_LOG(INFO, "case4", K(start_idx), K(last_idx)); + ASSERT_EQ(start_idx, 0); + ASSERT_EQ(last_idx, sorted_slices.count()); + } + + // case 5 + for (int64_t i = 0; i < sorted_slices.count(); ++i) { + ASSERT_EQ(OB_SUCCESS, sorted_slices.at(i)->mock_chunk_store(ObTabletDirectLoadMgr::EACH_MACRO_MIN_ROW_CNT - 1)); + } + ASSERT_EQ(OB_SUCCESS, tablet_dl_mgr.calc_cg_range(sorted_slices, 2)); + ASSERT_EQ(1, tablet_dl_mgr.get_sqc_build_ctx().sorted_slices_idx_.count()); + for (int64_t i = 0; i < tablet_dl_mgr.get_sqc_build_ctx().sorted_slices_idx_.count(); ++i) { + const int64_t start_idx = tablet_dl_mgr.get_sqc_build_ctx().sorted_slices_idx_.at(i).start_idx_; + const int64_t last_idx = tablet_dl_mgr.get_sqc_build_ctx().sorted_slices_idx_.at(i).last_idx_; + STORAGE_LOG(INFO, "case5", K(start_idx), K(last_idx)); + ASSERT_EQ(start_idx, 0); + ASSERT_EQ(last_idx, sorted_slices.count()); + } + +} } // namespace oceanbase diff --git a/src/observer/table_load/ob_table_load_merger.cpp b/src/observer/table_load/ob_table_load_merger.cpp index 6e9062fcb2..384f016ae4 100644 --- a/src/observer/table_load/ob_table_load_merger.cpp +++ b/src/observer/table_load/ob_table_load_merger.cpp @@ -282,6 +282,7 @@ int ObTableLoadMerger::build_merge_ctx() merge_param.is_fast_heap_table_ = store_ctx_->is_fast_heap_table_; merge_param.online_opt_stat_gather_ = param_.online_opt_stat_gather_; merge_param.is_column_store_ = store_ctx_->ctx_->schema_.is_column_store_; + merge_param.fill_cg_thread_cnt_ = param_.session_count_; merge_param.px_mode_ = param_.px_mode_; merge_param.insert_table_ctx_ = store_ctx_->insert_table_ctx_; merge_param.dml_row_handler_ = store_ctx_->error_row_handler_; diff --git a/src/storage/ddl/ob_direct_insert_sstable_ctx_new.cpp b/src/storage/ddl/ob_direct_insert_sstable_ctx_new.cpp index 53113c4927..9a60e6c10a 100644 --- a/src/storage/ddl/ob_direct_insert_sstable_ctx_new.cpp +++ b/src/storage/ddl/ob_direct_insert_sstable_ctx_new.cpp @@ -493,6 +493,7 @@ int ObTenantDirectLoadMgr::fill_lob_sstable_slice( int ObTenantDirectLoadMgr::calc_range( const share::ObLSID &ls_id, const common::ObTabletID &tablet_id, + const int64_t thread_cnt, const bool is_full_direct_load) { int ret = OB_SUCCESS; @@ -531,8 +532,8 @@ int ObTenantDirectLoadMgr::calc_range( } else if (OB_UNLIKELY(!is_column_store)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("table withou cg", K(ret)); - } else if (OB_FAIL(handle.get_obj()->calc_range(storage_schema, tablet_handle.get_obj()->get_rowkey_read_info().get_datum_utils()))) { - LOG_WARN("calc range failed", K(ret)); + } else if (OB_FAIL(handle.get_obj()->calc_range(storage_schema, tablet_handle.get_obj()->get_rowkey_read_info().get_datum_utils(), thread_cnt))) { + LOG_WARN("calc range failed", K(ret), K(thread_cnt)); } ObTabletObjLoadHelper::free(arena_allocator, storage_schema); arena_allocator.reset(); @@ -1066,10 +1067,11 @@ private: ObTabletDirectLoadBuildCtx::ObTabletDirectLoadBuildCtx() : allocator_(), slice_writer_allocator_(), build_param_(), slice_mgr_map_(), data_block_desc_(true/*is ddl*/), index_builder_(nullptr), - column_stat_array_(), sorted_slice_writers_(), is_task_end_(false), task_finish_count_(0), fill_column_group_finish_count_(0) + column_stat_array_(), sorted_slice_writers_(), sorted_slices_idx_(), is_task_end_(false), task_finish_count_(0), fill_column_group_finish_count_(0) { column_stat_array_.set_attr(ObMemAttr(MTL_ID(), "TblDL_CSA")); sorted_slice_writers_.set_attr(ObMemAttr(MTL_ID(), "TblDL_SSR")); + sorted_slices_idx_.set_attr(ObMemAttr(MTL_ID(), "TblDL_IDX")); } ObTabletDirectLoadBuildCtx::~ObTabletDirectLoadBuildCtx() @@ -1088,6 +1090,7 @@ ObTabletDirectLoadBuildCtx::~ObTabletDirectLoadBuildCtx() } column_stat_array_.reset(); sorted_slice_writers_.reset(); + sorted_slices_idx_.reset(); if (!slice_mgr_map_.empty()) { DestroySliceWriterMapFn destroy_map_fn(&slice_writer_allocator_); @@ -1603,7 +1606,7 @@ public: int ret_code_; }; -int ObTabletDirectLoadMgr::calc_range(const ObStorageSchema *storage_schema, const ObStorageDatumUtils &datum_utils) +int ObTabletDirectLoadMgr::calc_range(const ObStorageSchema *storage_schema, const ObStorageDatumUtils &datum_utils, const int64_t thread_cnt) { int ret = OB_SUCCESS; ObArray sorted_slices; @@ -1643,14 +1646,70 @@ int ObTabletDirectLoadMgr::calc_range(const ObStorageSchema *storage_schema, con if (OB_FAIL(ObCODDLUtil::need_column_group_store(*storage_schema, is_column_store))) { LOG_WARN("fail to check need column group", K(ret)); } else if (is_column_store) { - if (OB_FAIL(sqc_build_ctx_.sorted_slice_writers_.assign(sorted_slices))) { - LOG_WARN("copy slice array failed", K(ret), K(sorted_slices.count())); + if (thread_cnt <= 0) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invali thread cnt", K(ret), K(thread_cnt)); + } else if (OB_FAIL(calc_cg_range(sorted_slices, thread_cnt))) { + LOG_WARN("fail to calc cg range", K(ret), K(sorted_slices), K(thread_cnt)); } } } return ret; } +int ObTabletDirectLoadMgr::calc_cg_range(ObArray &sorted_slices, const int64_t thread_cnt) +{ + int ret = OB_SUCCESS; + if (thread_cnt <= 0) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invali thread cnt", K(ret), K(thread_cnt)); + } else { + common::ObArray sorted_slices_idx; + int64_t slice_idx = 0; + while (OB_SUCC(ret) && slice_idx < sorted_slices.count()) { + int64_t tmp_row_cnt = 0; + ObTabletDirectLoadBuildCtx::AggregatedCGInfo cur_info; + cur_info.start_idx_ = slice_idx; + while (OB_SUCC(ret) && slice_idx < sorted_slices.count()) { + tmp_row_cnt += sorted_slices.at(slice_idx)->get_row_count(); + ++slice_idx; + cur_info.last_idx_ = slice_idx; + if (tmp_row_cnt >= EACH_MACRO_MIN_ROW_CNT) { + break; + } + } + if (OB_SUCC(ret)) { + if (OB_FAIL(sorted_slices_idx.push_back(cur_info))) { + LOG_WARN("fail to push slice info", K(ret)); + } + } + } + + sqc_build_ctx_.sorted_slices_idx_.reset(); + if (OB_FAIL(ret)) { + } else if (sorted_slices_idx.count() > thread_cnt) { + // thread_cnt cannot handle aggregated group, re_calc by thread_cnt + for (int64_t i = 0; OB_SUCC(ret) && i < thread_cnt; ++i) { + ObTabletDirectLoadBuildCtx::AggregatedCGInfo cur_info; + calc_cg_idx(thread_cnt, i, cur_info.start_idx_, cur_info.last_idx_); + if (OB_FAIL(sqc_build_ctx_.sorted_slices_idx_.push_back(cur_info))) { + LOG_WARN("fail to push info", K(ret), K(i)); + } + } + } else if (OB_FAIL(sqc_build_ctx_.sorted_slices_idx_.assign(sorted_slices_idx))) { + LOG_WARN("fail to assign array", K(ret)); + } + + sqc_build_ctx_.sorted_slice_writers_.reset(); + if (OB_FAIL(ret)) { + } else if (OB_FAIL(sqc_build_ctx_.sorted_slice_writers_.assign(sorted_slices))) { + LOG_WARN("copy slice array failed", K(ret), K(sorted_slices.count())); + } + } + FLOG_INFO("calc_cg_range", K(ret), K(sorted_slices.count()), K(thread_cnt), K(sqc_build_ctx_.sorted_slice_writers_.count()), K(sqc_build_ctx_.sorted_slices_idx_.count())); + return ret; +} + int ObTabletDirectLoadMgr::cancel() { int ret = OB_SUCCESS; @@ -1740,7 +1799,7 @@ int ObTabletDirectLoadMgr::close_sstable_slice( LOG_WARN("slice writer fill column group failed", K(ret)); } } else { - if (OB_FAIL(calc_range(storage_schema, tablet->get_rowkey_read_info().get_datum_utils()))) { + if (OB_FAIL(calc_range(storage_schema, tablet->get_rowkey_read_info().get_datum_utils(), 0))) { LOG_WARN("calc range failed", K(ret)); } else if (OB_FAIL(notify_all())) { LOG_WARN("notify all failed", K(ret)); @@ -1787,7 +1846,7 @@ int ObTabletDirectLoadMgr::close_sstable_slice( return ret; } -void ObTabletDirectLoadMgr::calc_cg_idx(const int64_t thread_cnt, const int64_t thread_id, int64_t &strat_idx, int64_t &end_idx) +void ObTabletDirectLoadMgr::calc_cg_idx(const int64_t thread_cnt, const int64_t thread_id, int64_t &start_idx, int64_t &end_idx) { int ret = OB_SUCCESS; const int64_t each_thread_task_cnt = sqc_build_ctx_.sorted_slice_writers_.count() / thread_cnt; @@ -1795,15 +1854,16 @@ void ObTabletDirectLoadMgr::calc_cg_idx(const int64_t thread_cnt, const int64_t const int64_t pre_handle_cnt = need_plus_thread_cnt * (each_thread_task_cnt + 1); if (need_plus_thread_cnt != 0) { if (thread_id < need_plus_thread_cnt) { - strat_idx = (each_thread_task_cnt + 1) * thread_id; - end_idx = strat_idx + (each_thread_task_cnt + 1); + start_idx = (each_thread_task_cnt + 1) * thread_id; + end_idx = start_idx + (each_thread_task_cnt + 1); } else { - strat_idx = pre_handle_cnt + (thread_id - need_plus_thread_cnt) * each_thread_task_cnt; - end_idx = strat_idx + each_thread_task_cnt; + start_idx = pre_handle_cnt + (thread_id - need_plus_thread_cnt) * each_thread_task_cnt; + end_idx = start_idx + each_thread_task_cnt; + // when slice_cnt < thread_cnt, idle thread start_idx = end_idx } } else { - strat_idx = each_thread_task_cnt * thread_id; - end_idx = strat_idx + each_thread_task_cnt; + start_idx = each_thread_task_cnt * thread_id; + end_idx = start_idx + each_thread_task_cnt; } } @@ -1816,46 +1876,57 @@ int ObTabletDirectLoadMgr::fill_column_group(const int64_t thread_cnt, const int } else if (OB_UNLIKELY(thread_cnt <= 0 || thread_id < 0 || thread_id > thread_cnt - 1)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arguement", K(ret), K(thread_cnt), K(thread_id)); - } else if (sqc_build_ctx_.sorted_slice_writers_.count() == 0) { + } else if (sqc_build_ctx_.sorted_slice_writers_.count() == 0 || thread_id > sqc_build_ctx_.sorted_slices_idx_.count() - 1) { //ignore + FLOG_INFO("[DIRECT_LOAD_FILL_CG] idle thread", K(sqc_build_ctx_.sorted_slice_writers_.count()), K(thread_id), K(sqc_build_ctx_.sorted_slices_idx_.count())); } else if (sqc_build_ctx_.sorted_slice_writers_.count() != sqc_build_ctx_.slice_mgr_map_.size()) { ret = OB_ERR_UNEXPECTED; LOG_WARN("wrong slice writer num", K(ret), K(sqc_build_ctx_.sorted_slice_writers_.count()), K(sqc_build_ctx_.slice_mgr_map_.size()), K(common::lbt())); } else { - int64_t strat_idx = 0; - int64_t last_idx = 0; - calc_cg_idx(thread_cnt, thread_id, strat_idx, last_idx); - LOG_INFO("direct load start fill column group", K(tablet_id_), K(sqc_build_ctx_.sorted_slice_writers_.count()), K(thread_cnt), K(thread_id), K(strat_idx), K(last_idx)); - if (strat_idx < 0 || strat_idx >= sqc_build_ctx_.sorted_slice_writers_.count() || last_idx > sqc_build_ctx_.sorted_slice_writers_.count()) { - //skip + const int64_t start_idx = sqc_build_ctx_.sorted_slices_idx_.at(thread_id).start_idx_; + const int64_t last_idx = sqc_build_ctx_.sorted_slices_idx_.at(thread_id).last_idx_; + + ObArenaAllocator arena_allocator("DIRECT_RESCAN", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()); + ObTablet *tablet = nullptr; + ObStorageSchema *storage_schema = nullptr; + int64_t fill_cg_finish_count = -1; + int64_t row_cnt = 0; + if (OB_UNLIKELY(!tablet_handle_.is_valid())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("invalid tablet handle", K(ret), K(tablet_handle_)); + } else if (OB_ISNULL(tablet = tablet_handle_.get_obj())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("tablet is null", K(ret), K(ls_id_), K(tablet_id_)); + } else if (OB_FAIL(tablet->load_storage_schema(arena_allocator, storage_schema))) { + LOG_WARN("load storage schema failed", K(ret), K(tablet_id_)); + } else if (OB_UNLIKELY(nullptr == storage_schema)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("invalid storage_schema", K(ret), KP(storage_schema)); } else { - ObArenaAllocator arena_allocator("DIRECT_RESCAN", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()); - ObTablet *tablet = nullptr; - ObStorageSchema *storage_schema = nullptr; - int64_t fill_cg_finish_count = -1; - if (OB_UNLIKELY(!tablet_handle_.is_valid())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("invalid tablet handle", K(ret), K(tablet_handle_)); - } else if (OB_ISNULL(tablet = tablet_handle_.get_obj())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("tablet is null", K(ret), K(ls_id_), K(tablet_id_)); - } else if (OB_FAIL(tablet->load_storage_schema(arena_allocator, storage_schema))) { - LOG_WARN("load storage schema failed", K(ret), K(tablet_id_)); - } else { - for (int64_t i = strat_idx; OB_SUCC(ret) && i < last_idx; ++i) { - ObDirectLoadSliceWriter *slice_writer = sqc_build_ctx_.sorted_slice_writers_.at(i); - if (OB_ISNULL(slice_writer) || !slice_writer->need_column_store()) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("wrong slice writer", KPC(slice_writer)); - } else if (OB_FAIL(slice_writer->fill_column_group(storage_schema, get_start_scn()))) { - LOG_WARN("slice writer rescan failed", K(ret), KP(storage_schema), K(get_start_scn())); - } else { - fill_cg_finish_count = ATOMIC_AAF(&sqc_build_ctx_.fill_column_group_finish_count_, 1); - } - } + const ObIArray &cg_schemas = storage_schema->get_column_groups(); + FLOG_INFO("[DIRECT_LOAD_FILL_CG] start fill cg", + "tablet_id", tablet_id_, + "cg_cnt", cg_schemas.count(), + "slice_cnt", sqc_build_ctx_.sorted_slice_writers_.count(), + K(thread_cnt), K(thread_id), K(start_idx), K(last_idx)); + + ObCOSliceWriter *cur_writer = nullptr; + if (OB_ISNULL(cur_writer = OB_NEWx(ObCOSliceWriter, &arena_allocator))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("allocate memory for co writer failed", K(ret)); + } else if (OB_FAIL(fill_aggregated_column_group(start_idx, last_idx, storage_schema, cur_writer, fill_cg_finish_count, row_cnt))) { + LOG_WARN("fail to fill aggregated cg", K(ret), KPC(cur_writer)); + } + // free writer anyhow + if (OB_NOT_NULL(cur_writer)) { + cur_writer->~ObCOSliceWriter(); + arena_allocator.free(cur_writer); + cur_writer = nullptr; } ObTabletObjLoadHelper::free(arena_allocator, storage_schema); //arena cannot free arena_allocator.reset(); + + // after finish all slice, free slice_writer if (OB_SUCC(ret)) { if (fill_cg_finish_count == sqc_build_ctx_.sorted_slice_writers_.count()) { sqc_build_ctx_.sorted_slice_writers_.reset(); @@ -1873,8 +1944,77 @@ int ObTabletDirectLoadMgr::fill_column_group(const int64_t thread_cnt, const int } } if (OB_SUCC(ret)) { - LOG_INFO("direct load finish fill column group", K(tablet_id_), K(sqc_build_ctx_.sorted_slice_writers_.count()), K(thread_cnt), K(thread_id), K(strat_idx), K(last_idx), - K(sqc_build_ctx_.slice_mgr_map_.size())); + FLOG_INFO("[DIRECT_LOAD_FILL_CG] finish fill cg", + "tablet_id", tablet_id_, + "row_cnt", row_cnt, + "slice_cnt", sqc_build_ctx_.sorted_slice_writers_.count(), + K(thread_cnt), K(thread_id), K(start_idx), K(last_idx), K(sqc_build_ctx_.slice_mgr_map_.size())); + } + } + return ret; +} + +int ObTabletDirectLoadMgr::fill_aggregated_column_group( + const int64_t start_idx, + const int64_t last_idx, + const ObStorageSchema *storage_schema, + ObCOSliceWriter *cur_writer, + int64_t &fill_cg_finish_count, + int64_t &fill_row_cnt) +{ + int ret = OB_SUCCESS; + fill_cg_finish_count = -1; + fill_row_cnt = 0; + if (OB_ISNULL(cur_writer) || OB_ISNULL(storage_schema) || OB_UNLIKELY(start_idx < 0 || last_idx < 0)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret), KP(cur_writer), KP(storage_schema), K(start_idx), K(last_idx)); + } else { + const ObIArray &cg_schemas = storage_schema->get_column_groups(); + for (int64_t cg_idx = 0; OB_SUCC(ret) && cg_idx < cg_schemas.count(); ++cg_idx) { + cur_writer->reset(); + common::ObArray datum_stores; + if (start_idx == last_idx || start_idx >= sqc_build_ctx_.sorted_slice_writers_.count() || last_idx > sqc_build_ctx_.sorted_slice_writers_.count()) { + // skip + } else { + ObDirectLoadSliceWriter *first_slice_writer = sqc_build_ctx_.sorted_slice_writers_.at(start_idx); + if (OB_ISNULL(first_slice_writer)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("null slice writer", K(ret), KP(first_slice_writer)); + } else if (OB_UNLIKELY(first_slice_writer->get_row_offset() < 0)) { + ret = OB_ERR_SYS; + LOG_WARN("invalid row offset", K(ret), K(first_slice_writer->get_row_offset())); + } else if (OB_FAIL(cur_writer->init(storage_schema, cg_idx, this, first_slice_writer->get_start_seq(), first_slice_writer->get_row_offset(), get_start_scn()))) { + LOG_WARN("init co ddl writer failed", K(ret), KPC(cur_writer), K(cg_idx), KPC(this)); + } else { + for (int64_t i = start_idx; OB_SUCC(ret) && i < last_idx; ++i) { + ObDirectLoadSliceWriter *slice_writer = sqc_build_ctx_.sorted_slice_writers_.at(i); + if (OB_ISNULL(slice_writer) || !slice_writer->need_column_store()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("wrong slice writer", K(ret), KPC(slice_writer)); + } else if (OB_FAIL(slice_writer->fill_aggregated_column_group(cg_idx, cur_writer, datum_stores))) { + LOG_WARN("slice writer rescan failed", K(ret), K(cg_idx), KPC(cur_writer)); + } else if (cg_idx == cg_schemas.count() - 1) { + // after fill last cg, inc finish cnt + fill_cg_finish_count = ATOMIC_AAF(&sqc_build_ctx_.fill_column_group_finish_count_, 1); + fill_row_cnt += slice_writer->get_row_count(); + } + } + } + } + + if (OB_SUCC(ret)) { + if (cur_writer->is_inited() && OB_FAIL(cur_writer->close())) { + LOG_WARN("close co ddl writer failed", K(ret)); + } else { + for (int64_t i = 0; i < datum_stores.count(); ++i) { + if (OB_NOT_NULL(datum_stores.at(i))) { + datum_stores.at(i)->~ObCompactStore(); + } + } + datum_stores.reset(); + } + } + // next cg } } return ret; diff --git a/src/storage/ddl/ob_direct_insert_sstable_ctx_new.h b/src/storage/ddl/ob_direct_insert_sstable_ctx_new.h index 957a0e7f54..30d60b690f 100644 --- a/src/storage/ddl/ob_direct_insert_sstable_ctx_new.h +++ b/src/storage/ddl/ob_direct_insert_sstable_ctx_new.h @@ -178,6 +178,7 @@ public: int calc_range( const share::ObLSID &ls_id, const common::ObTabletID &tablet_id, + const int64_t thread_cnt, const bool is_full_direct_load); int fill_column_group( const share::ObLSID &ls_id, @@ -263,7 +264,18 @@ public: return common::murmurhash(&slice_id, sizeof(slice_id), 0L); } void reset_slice_ctx_on_demand(); - TO_STRING_KV(K_(build_param), K_(is_task_end), K_(task_finish_count), K_(task_total_cnt)); + TO_STRING_KV(K_(build_param), K_(is_task_end), K_(task_finish_count), K_(task_total_cnt), K_(sorted_slices_idx)); + struct AggregatedCGInfo final { + public: + AggregatedCGInfo() + : start_idx_(0), + last_idx_(0) {} + ~AggregatedCGInfo() {} + TO_STRING_KV(K_(start_idx), K_(last_idx)); + public: + int64_t start_idx_; + int64_t last_idx_; + }; public: typedef common::hash::ObHashMap< int64_t, @@ -276,6 +288,7 @@ public: blocksstable::ObSSTableIndexBuilder *index_builder_; common::ObArray column_stat_array_; // online column stat result. common::ObArray sorted_slice_writers_; + common::ObArray sorted_slices_idx_; //for cg_aggregation bool is_task_end_; // to avoid write commit log/freeze in memory index sstable again. int64_t task_finish_count_; // reach the parallel slice cnt, means the tablet data finished. int64_t task_total_cnt_; // parallelism of the PX. @@ -353,7 +366,8 @@ public: virtual int wait_notify(const ObDirectLoadSliceWriter *slice_writer, const share::SCN &start_scn); int fill_column_group(const int64_t thread_cnt, const int64_t thread_id); virtual int notify_all(); - virtual int calc_range(const ObStorageSchema *storage_schema, const blocksstable::ObStorageDatumUtils &datum_utils); + virtual int calc_range(const ObStorageSchema *storage_schema, const blocksstable::ObStorageDatumUtils &datum_utils, const int64_t thread_cnt); + int calc_cg_range(ObArray &sorted_slices, const int64_t thread_cnt); const ObIArray &get_column_info() const { return column_items_; }; VIRTUAL_TO_STRING_KV(K_(is_inited), K_(is_schema_item_ready), K_(ls_id), K_(tablet_id), K_(table_key), K_(data_format_version), K_(ref_cnt), @@ -362,7 +376,13 @@ public: private: int prepare_schema_item_on_demand(const uint64_t table_id); void calc_cg_idx(const int64_t thread_cnt, const int64_t thread_id, int64_t &strat_idx, int64_t &end_idx); - + int fill_aggregated_column_group( + const int64_t start_idx, + const int64_t last_idx, + const ObStorageSchema *storage_schema, + ObCOSliceWriter *cur_writer, + int64_t &fill_cg_finish_count, + int64_t &fill_row_cnt); // private: /* +++++ online column stat collect +++++ */ // virtual int init_sql_statistics_if_needed(); @@ -370,6 +390,7 @@ private: /* +++++ -------------------------- +++++ */ public: static const int64_t TRY_LOCK_TIMEOUT = 1 * 1000000; // 1s + static const int64_t EACH_MACRO_MIN_ROW_CNT = 1000000; // 100w protected: bool is_inited_; bool is_schema_item_ready_; diff --git a/src/storage/ddl/ob_direct_load_struct.cpp b/src/storage/ddl/ob_direct_load_struct.cpp index cd65fa6215..f75a4a1518 100644 --- a/src/storage/ddl/ob_direct_load_struct.cpp +++ b/src/storage/ddl/ob_direct_load_struct.cpp @@ -648,6 +648,31 @@ ObDirectLoadSliceWriter::~ObDirectLoadSliceWriter() need_column_store_ = false; } +//for test +int ObDirectLoadSliceWriter::mock_chunk_store(const int64_t row_cnt) +{ + int ret = OB_SUCCESS; + if (row_cnt < 0) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid row cnt", K(ret), K(row_cnt)); + } else { + ObChunkSliceStore *chunk_slice_store = nullptr; + if (OB_ISNULL(chunk_slice_store = OB_NEWx(ObChunkSliceStore, &allocator_))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("allocate memory for chunk slice store failed", K(ret)); + } else { + chunk_slice_store->row_cnt_ = row_cnt; + slice_store_ = chunk_slice_store; + + } + if (OB_FAIL(ret) && nullptr != chunk_slice_store) { + chunk_slice_store->~ObChunkSliceStore(); + allocator_.free(chunk_slice_store); + } + } + return ret; +} + int ObDirectLoadSliceWriter::prepare_slice_store_if_need( const int64_t schema_rowkey_column_num, const bool is_column_store, @@ -1057,6 +1082,54 @@ int ObDirectLoadSliceWriter::check_null( return ret; } +int ObDirectLoadSliceWriter::fill_aggregated_column_group( + const int64_t cg_idx, + ObCOSliceWriter *cur_writer, + ObIArray &datum_stores) +{ + int ret = OB_SUCCESS; + datum_stores.reset(); + ObChunkSliceStore *chunk_slice_store = static_cast(slice_store_); + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + LOG_WARN("not init", K(ret)); + } else if (nullptr == chunk_slice_store || is_empty()) { + // do nothing + LOG_INFO("chunk slice store is null or empty", K(ret), + KPC(chunk_slice_store), KPC(tablet_direct_load_mgr_)); + } else if (ATOMIC_LOAD(&is_canceled_)) { + ret = OB_CANCELED; + LOG_WARN("fil cg task canceled", K(ret), K(is_canceled_)); + } else if (cg_idx < 0 || cg_idx > chunk_slice_store->datum_stores_.count()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid cg idx", K(ret), K(cg_idx), K(chunk_slice_store->datum_stores_)); + } else { + sql::ObCompactStore *cur_datum_store = chunk_slice_store->datum_stores_.at(cg_idx); + const ObChunkDatumStore::StoredRow *stored_row = nullptr; + bool has_next = false; + while (OB_SUCC(ret) && OB_SUCC(cur_datum_store->has_next(has_next)) && has_next) { + if (OB_FAIL(cur_datum_store->get_next_row(stored_row))) { + if (OB_ITER_END == ret) { + ret = OB_SUCCESS; + break; + } else { + LOG_WARN("get next row failed", K(ret)); + } + } else { + if (OB_FAIL(cur_writer->append_row(stored_row))) { + LOG_WARN("append row failed", K(ret), KPC(stored_row)); + } + } + if (OB_SUCC(ret)) { + if (OB_FAIL(datum_stores.push_back(cur_datum_store))) { + LOG_WARN("fail to push datum store", K(ret)); + } + } + } + } + return ret; +} + int ObDirectLoadSliceWriter::close() { int ret = OB_SUCCESS; @@ -1091,8 +1164,6 @@ int ObDirectLoadSliceWriter::fill_column_group(const ObStorageSchema *storage_sc LOG_WARN("fil cg task canceled", K(ret), K(is_canceled_)); } else { const ObIArray &cg_schemas = storage_schema->get_column_groups(); - ObArray co_ddl_writers; - co_ddl_writers.set_attr(ObMemAttr(MTL_ID(), "DL_co_writers")); FLOG_INFO("[DDL_FILL_CG] fill column group start", "tablet_id", tablet_direct_load_mgr_->get_tablet_id(), "row_count", chunk_slice_store->get_row_count(), @@ -1106,7 +1177,6 @@ int ObDirectLoadSliceWriter::fill_column_group(const ObStorageSchema *storage_sc } else { // 2. rescan and write for (int64_t cg_idx = 0; OB_SUCC(ret) && cg_idx < cg_schemas.count(); ++cg_idx) { - const ObStorageColumnGroupSchema &cg_schema = cg_schemas.at(cg_idx); cur_writer->reset(); if (OB_FAIL(cur_writer->init(storage_schema, cg_idx, tablet_direct_load_mgr_, start_seq_, row_offset_, start_scn))) { LOG_WARN("init co ddl writer failed", K(ret), KPC(cur_writer), K(cg_idx), KPC(this)); diff --git a/src/storage/ddl/ob_direct_load_struct.h b/src/storage/ddl/ob_direct_load_struct.h index b5a131a715..7247ca4c1f 100644 --- a/src/storage/ddl/ob_direct_load_struct.h +++ b/src/storage/ddl/ob_direct_load_struct.h @@ -458,6 +458,7 @@ public: int64_t &inserted_cg_row_cnt_; }; +class ObCOSliceWriter; class ObDirectLoadSliceWriter final { public: @@ -494,9 +495,14 @@ public: const ObStorageSchema *storage_schema, const share::SCN &start_scn, ObInsertMonitor *monitor_node = NULL); + int fill_aggregated_column_group( + const int64_t cg_idx, + ObCOSliceWriter *cur_writer, + ObIArray &datum_stores); void set_row_offset(const int64_t row_offset) { row_offset_ = row_offset; } int64_t get_row_count() const { return nullptr == slice_store_ ? 0 : slice_store_->get_row_count(); } int64_t get_row_offset() const { return row_offset_; } + blocksstable::ObMacroDataSeq &get_start_seq() { return start_seq_; } bool is_empty() const { return 0 == get_row_count(); } bool need_column_store() const { return need_column_store_; } ObTabletSliceStore *get_slice_store() const { return slice_store_; } @@ -552,6 +558,7 @@ private: const int64_t lob_inrow_threshold, const uint64_t src_tenant_id, ObLobMetaRowIterator *&row_iter); + int mock_chunk_store(const int64_t row_cnt); private: bool is_inited_; bool need_column_store_; @@ -585,6 +592,7 @@ public: const sql::ObChunkDatumStore::StoredRow *stored_row, blocksstable::ObDatumRow &cg_row); int close(); + bool is_inited() { return is_inited_; } TO_STRING_KV(K(is_inited_), K(cg_idx_), KPC(cg_schema_), K(macro_block_writer_), K(data_desc_), K(cg_row_)); private: bool is_inited_; diff --git a/src/storage/direct_load/ob_direct_load_insert_table_ctx.cpp b/src/storage/direct_load/ob_direct_load_insert_table_ctx.cpp index fcd67e480a..c12eb72eed 100644 --- a/src/storage/direct_load/ob_direct_load_insert_table_ctx.cpp +++ b/src/storage/direct_load/ob_direct_load_insert_table_ctx.cpp @@ -437,7 +437,7 @@ int ObDirectLoadInsertTabletContext::close_lob_sstable_slice(const int64_t slice return ret; } -int ObDirectLoadInsertTabletContext::calc_range() +int ObDirectLoadInsertTabletContext::calc_range(const int64_t thread_cnt) { int ret = OB_SUCCESS; if (IS_NOT_INIT) { @@ -445,7 +445,7 @@ int ObDirectLoadInsertTabletContext::calc_range() LOG_WARN("ObDirectLoadInsertTableContext not init", KR(ret), KP(this)); } else { ObTenantDirectLoadMgr *sstable_insert_mgr = MTL(ObTenantDirectLoadMgr *); - if (OB_FAIL(sstable_insert_mgr->calc_range(param_.ls_id_, param_.tablet_id_, true))) { + if (OB_FAIL(sstable_insert_mgr->calc_range(param_.ls_id_, param_.tablet_id_, thread_cnt, true))) { LOG_WARN("fail to calc range", KR(ret), K(param_.tablet_id_)); } else { LOG_INFO("success to calc range", K(param_.tablet_id_)); diff --git a/src/storage/direct_load/ob_direct_load_insert_table_ctx.h b/src/storage/direct_load/ob_direct_load_insert_table_ctx.h index 50c3fda1bc..63e7b36a0a 100644 --- a/src/storage/direct_load/ob_direct_load_insert_table_ctx.h +++ b/src/storage/direct_load/ob_direct_load_insert_table_ctx.h @@ -110,7 +110,7 @@ public: blocksstable::ObDatumRow &datum_row); int get_lob_write_ctx(ObDirectLoadInsertTabletWriteCtx &write_ctx); - int calc_range(); + int calc_range(const int64_t thread_cnt); int fill_column_group(const int64_t thread_cnt, const int64_t thread_id); int cancel(); TO_STRING_KV(K_(param), K_(is_open)); diff --git a/src/storage/direct_load/ob_direct_load_merge_ctx.cpp b/src/storage/direct_load/ob_direct_load_merge_ctx.cpp index 4436fe780b..e2ddff8491 100644 --- a/src/storage/direct_load/ob_direct_load_merge_ctx.cpp +++ b/src/storage/direct_load/ob_direct_load_merge_ctx.cpp @@ -50,6 +50,7 @@ ObDirectLoadMergeParam::ObDirectLoadMergeParam() store_column_count_(0), snapshot_version_(0), lob_column_cnt_(0), + fill_cg_thread_cnt_(0), datum_utils_(nullptr), col_descs_(nullptr), cmp_funcs_(nullptr), diff --git a/src/storage/direct_load/ob_direct_load_merge_ctx.h b/src/storage/direct_load/ob_direct_load_merge_ctx.h index db3db98024..6249c3d44a 100644 --- a/src/storage/direct_load/ob_direct_load_merge_ctx.h +++ b/src/storage/direct_load/ob_direct_load_merge_ctx.h @@ -62,6 +62,7 @@ public: int64_t store_column_count_; int64_t snapshot_version_; int64_t lob_column_cnt_; + int64_t fill_cg_thread_cnt_; storage::ObDirectLoadTableDataDesc table_data_desc_; const blocksstable::ObStorageDatumUtils *datum_utils_; const common::ObIArray *col_descs_; diff --git a/src/storage/direct_load/ob_direct_load_partition_merge_task.cpp b/src/storage/direct_load/ob_direct_load_partition_merge_task.cpp index 3835b31e8f..7d395b84c3 100644 --- a/src/storage/direct_load/ob_direct_load_partition_merge_task.cpp +++ b/src/storage/direct_load/ob_direct_load_partition_merge_task.cpp @@ -110,7 +110,7 @@ int ObDirectLoadPartitionMergeTask::process() LOG_WARN("fail to inc finish count", KR(ret)); } else if (is_ready) { if (merge_param_->is_column_store_) { - if (OB_FAIL(tablet_ctx->calc_range())) { + if (OB_FAIL(tablet_ctx->calc_range(merge_param_->fill_cg_thread_cnt_))) { LOG_WARN("fail to calc range", KR(ret)); } } else if (OB_FAIL(tablet_ctx->close())) {