direct_load with column_group support aggregated_cg

This commit is contained in:
renju96
2024-02-29 13:45:16 +00:00
committed by ob-robot
parent a19b3513a5
commit 7d6aac9898
11 changed files with 400 additions and 57 deletions

View File

@ -78,6 +78,107 @@ TEST_F(TestDirectLoad, init_ddl_table_store)
}
TEST_F(TestDirectLoad, test_cg_aggregate)
{
ObTabletFullDirectLoadMgr tablet_dl_mgr;
ObTabletDirectLoadInsertParam build_param;
build_param.common_param_.ls_id_ = ls_id_;
build_param.common_param_.tablet_id_ = tablet_id_;
build_param.common_param_.direct_load_type_ = ObDirectLoadType::DIRECT_LOAD_DDL;
build_param.common_param_.read_snapshot_ = SNAPSHOT_VERSION;
build_param.runtime_only_param_.task_cnt_ = 1;
build_param.runtime_only_param_.task_id_ = 1;
build_param.runtime_only_param_.table_id_ = TEST_TABLE_ID;
build_param.runtime_only_param_.schema_version_ = 1;
SCN ddl_start_scn;
ASSERT_EQ(OB_SUCCESS, ddl_start_scn.convert_from_ts(ObTimeUtility::current_time()));
ASSERT_EQ(OB_SUCCESS, tablet_dl_mgr.update(nullptr, build_param));
tablet_dl_mgr.start_scn_ = ddl_start_scn;
tablet_dl_mgr.data_format_version_ = DATA_VERSION_4_0_0_0;
ASSERT_EQ(OB_SUCCESS, tablet_dl_mgr.init_ddl_table_store(ddl_start_scn, SNAPSHOT_VERSION, ddl_start_scn));
common::ObArenaAllocator allocator;
ObArray<ObDirectLoadSliceWriter *> sorted_slices;
for (int64_t i = 0; i < 3; ++i) {
ObDirectLoadSliceWriter *slice_writer = nullptr;
slice_writer = OB_NEWx(ObDirectLoadSliceWriter, (&allocator));
ASSERT_NE(nullptr, slice_writer);
ASSERT_EQ(OB_SUCCESS, sorted_slices.push_back(slice_writer));
}
// case 1:one thread can handle the number of slices divided according to EACH_MACRO_MIN_ROW_CNT
for (int64_t i = 0; i < sorted_slices.count(); ++i) {
ASSERT_EQ(OB_SUCCESS, sorted_slices.at(i)->mock_chunk_store(ObTabletDirectLoadMgr::EACH_MACRO_MIN_ROW_CNT / 2 - 1));
}
ASSERT_EQ(OB_SUCCESS, tablet_dl_mgr.calc_cg_range(sorted_slices, 2));
ASSERT_EQ(1, tablet_dl_mgr.get_sqc_build_ctx().sorted_slices_idx_.count());
for (int64_t i = 0; i < tablet_dl_mgr.get_sqc_build_ctx().sorted_slices_idx_.count(); ++i) {
const int64_t start_idx = tablet_dl_mgr.get_sqc_build_ctx().sorted_slices_idx_.at(i).start_idx_;
const int64_t last_idx = tablet_dl_mgr.get_sqc_build_ctx().sorted_slices_idx_.at(i).last_idx_;
STORAGE_LOG(INFO, "case1", K(start_idx), K(last_idx));
ASSERT_EQ(start_idx, 0);
ASSERT_EQ(last_idx, sorted_slices.count());
}
// case 2:all threads can handle the number of slices divided according to EACH_MACRO_MIN_ROW_CNT
for (int64_t i = 0; i < sorted_slices.count(); ++i) {
ASSERT_EQ(OB_SUCCESS, sorted_slices.at(i)->mock_chunk_store(ObTabletDirectLoadMgr::EACH_MACRO_MIN_ROW_CNT / 2 + 1));
}
ASSERT_EQ(OB_SUCCESS, tablet_dl_mgr.calc_cg_range(sorted_slices, 2));
ASSERT_EQ(2, tablet_dl_mgr.get_sqc_build_ctx().sorted_slices_idx_.count());
for (int64_t i = 0; i < tablet_dl_mgr.get_sqc_build_ctx().sorted_slices_idx_.count(); ++i) {
const int64_t start_idx = tablet_dl_mgr.get_sqc_build_ctx().sorted_slices_idx_.at(i).start_idx_;
const int64_t last_idx = tablet_dl_mgr.get_sqc_build_ctx().sorted_slices_idx_.at(i).last_idx_;
STORAGE_LOG(INFO, "case2", K(start_idx), K(last_idx));
}
// case 3:all threads cannot handle the number of slices divided according to EACH_MACRO_MIN_ROW_CNT
for (int64_t i = 0; i < sorted_slices.count(); ++i) {
ASSERT_EQ(OB_SUCCESS, sorted_slices.at(i)->mock_chunk_store(ObTabletDirectLoadMgr::EACH_MACRO_MIN_ROW_CNT + 1));
}
ASSERT_EQ(OB_SUCCESS, tablet_dl_mgr.calc_cg_range(sorted_slices, 2));
ASSERT_EQ(2, tablet_dl_mgr.get_sqc_build_ctx().sorted_slices_idx_.count());
for (int64_t i = 0; i < tablet_dl_mgr.get_sqc_build_ctx().sorted_slices_idx_.count(); ++i) {
const int64_t start_idx = tablet_dl_mgr.get_sqc_build_ctx().sorted_slices_idx_.at(i).start_idx_;
const int64_t last_idx = tablet_dl_mgr.get_sqc_build_ctx().sorted_slices_idx_.at(i).last_idx_;
STORAGE_LOG(INFO, "case3", K(start_idx), K(last_idx));
}
for (int64_t i = 0; i < 2; ++i) {
ObDirectLoadSliceWriter *slice_writer = nullptr;
ASSERT_EQ(OB_SUCCESS, sorted_slices.pop_back(slice_writer));
}
// case 4
for (int64_t i = 0; i < sorted_slices.count(); ++i) {
ASSERT_EQ(OB_SUCCESS, sorted_slices.at(i)->mock_chunk_store(ObTabletDirectLoadMgr::EACH_MACRO_MIN_ROW_CNT + 1));
}
ASSERT_EQ(OB_SUCCESS, tablet_dl_mgr.calc_cg_range(sorted_slices, 2));
ASSERT_EQ(1, tablet_dl_mgr.get_sqc_build_ctx().sorted_slices_idx_.count());
for (int64_t i = 0; i < tablet_dl_mgr.get_sqc_build_ctx().sorted_slices_idx_.count(); ++i) {
const int64_t start_idx = tablet_dl_mgr.get_sqc_build_ctx().sorted_slices_idx_.at(i).start_idx_;
const int64_t last_idx = tablet_dl_mgr.get_sqc_build_ctx().sorted_slices_idx_.at(i).last_idx_;
STORAGE_LOG(INFO, "case4", K(start_idx), K(last_idx));
ASSERT_EQ(start_idx, 0);
ASSERT_EQ(last_idx, sorted_slices.count());
}
// case 5
for (int64_t i = 0; i < sorted_slices.count(); ++i) {
ASSERT_EQ(OB_SUCCESS, sorted_slices.at(i)->mock_chunk_store(ObTabletDirectLoadMgr::EACH_MACRO_MIN_ROW_CNT - 1));
}
ASSERT_EQ(OB_SUCCESS, tablet_dl_mgr.calc_cg_range(sorted_slices, 2));
ASSERT_EQ(1, tablet_dl_mgr.get_sqc_build_ctx().sorted_slices_idx_.count());
for (int64_t i = 0; i < tablet_dl_mgr.get_sqc_build_ctx().sorted_slices_idx_.count(); ++i) {
const int64_t start_idx = tablet_dl_mgr.get_sqc_build_ctx().sorted_slices_idx_.at(i).start_idx_;
const int64_t last_idx = tablet_dl_mgr.get_sqc_build_ctx().sorted_slices_idx_.at(i).last_idx_;
STORAGE_LOG(INFO, "case5", K(start_idx), K(last_idx));
ASSERT_EQ(start_idx, 0);
ASSERT_EQ(last_idx, sorted_slices.count());
}
}
} // namespace oceanbase

View File

@ -282,6 +282,7 @@ int ObTableLoadMerger::build_merge_ctx()
merge_param.is_fast_heap_table_ = store_ctx_->is_fast_heap_table_;
merge_param.online_opt_stat_gather_ = param_.online_opt_stat_gather_;
merge_param.is_column_store_ = store_ctx_->ctx_->schema_.is_column_store_;
merge_param.fill_cg_thread_cnt_ = param_.session_count_;
merge_param.px_mode_ = param_.px_mode_;
merge_param.insert_table_ctx_ = store_ctx_->insert_table_ctx_;
merge_param.dml_row_handler_ = store_ctx_->error_row_handler_;

View File

@ -493,6 +493,7 @@ int ObTenantDirectLoadMgr::fill_lob_sstable_slice(
int ObTenantDirectLoadMgr::calc_range(
const share::ObLSID &ls_id,
const common::ObTabletID &tablet_id,
const int64_t thread_cnt,
const bool is_full_direct_load)
{
int ret = OB_SUCCESS;
@ -531,8 +532,8 @@ int ObTenantDirectLoadMgr::calc_range(
} else if (OB_UNLIKELY(!is_column_store)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("table withou cg", K(ret));
} else if (OB_FAIL(handle.get_obj()->calc_range(storage_schema, tablet_handle.get_obj()->get_rowkey_read_info().get_datum_utils()))) {
LOG_WARN("calc range failed", K(ret));
} else if (OB_FAIL(handle.get_obj()->calc_range(storage_schema, tablet_handle.get_obj()->get_rowkey_read_info().get_datum_utils(), thread_cnt))) {
LOG_WARN("calc range failed", K(ret), K(thread_cnt));
}
ObTabletObjLoadHelper::free(arena_allocator, storage_schema);
arena_allocator.reset();
@ -1066,10 +1067,11 @@ private:
ObTabletDirectLoadBuildCtx::ObTabletDirectLoadBuildCtx()
: allocator_(), slice_writer_allocator_(), build_param_(), slice_mgr_map_(), data_block_desc_(true/*is ddl*/), index_builder_(nullptr),
column_stat_array_(), sorted_slice_writers_(), is_task_end_(false), task_finish_count_(0), fill_column_group_finish_count_(0)
column_stat_array_(), sorted_slice_writers_(), sorted_slices_idx_(), is_task_end_(false), task_finish_count_(0), fill_column_group_finish_count_(0)
{
column_stat_array_.set_attr(ObMemAttr(MTL_ID(), "TblDL_CSA"));
sorted_slice_writers_.set_attr(ObMemAttr(MTL_ID(), "TblDL_SSR"));
sorted_slices_idx_.set_attr(ObMemAttr(MTL_ID(), "TblDL_IDX"));
}
ObTabletDirectLoadBuildCtx::~ObTabletDirectLoadBuildCtx()
@ -1088,6 +1090,7 @@ ObTabletDirectLoadBuildCtx::~ObTabletDirectLoadBuildCtx()
}
column_stat_array_.reset();
sorted_slice_writers_.reset();
sorted_slices_idx_.reset();
if (!slice_mgr_map_.empty()) {
DestroySliceWriterMapFn destroy_map_fn(&slice_writer_allocator_);
@ -1603,7 +1606,7 @@ public:
int ret_code_;
};
int ObTabletDirectLoadMgr::calc_range(const ObStorageSchema *storage_schema, const ObStorageDatumUtils &datum_utils)
int ObTabletDirectLoadMgr::calc_range(const ObStorageSchema *storage_schema, const ObStorageDatumUtils &datum_utils, const int64_t thread_cnt)
{
int ret = OB_SUCCESS;
ObArray<ObDirectLoadSliceWriter *> sorted_slices;
@ -1643,11 +1646,67 @@ int ObTabletDirectLoadMgr::calc_range(const ObStorageSchema *storage_schema, con
if (OB_FAIL(ObCODDLUtil::need_column_group_store(*storage_schema, is_column_store))) {
LOG_WARN("fail to check need column group", K(ret));
} else if (is_column_store) {
if (OB_FAIL(sqc_build_ctx_.sorted_slice_writers_.assign(sorted_slices))) {
if (thread_cnt <= 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invali thread cnt", K(ret), K(thread_cnt));
} else if (OB_FAIL(calc_cg_range(sorted_slices, thread_cnt))) {
LOG_WARN("fail to calc cg range", K(ret), K(sorted_slices), K(thread_cnt));
}
}
}
return ret;
}
int ObTabletDirectLoadMgr::calc_cg_range(ObArray<ObDirectLoadSliceWriter *> &sorted_slices, const int64_t thread_cnt)
{
int ret = OB_SUCCESS;
if (thread_cnt <= 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invali thread cnt", K(ret), K(thread_cnt));
} else {
common::ObArray<ObTabletDirectLoadBuildCtx::AggregatedCGInfo> sorted_slices_idx;
int64_t slice_idx = 0;
while (OB_SUCC(ret) && slice_idx < sorted_slices.count()) {
int64_t tmp_row_cnt = 0;
ObTabletDirectLoadBuildCtx::AggregatedCGInfo cur_info;
cur_info.start_idx_ = slice_idx;
while (OB_SUCC(ret) && slice_idx < sorted_slices.count()) {
tmp_row_cnt += sorted_slices.at(slice_idx)->get_row_count();
++slice_idx;
cur_info.last_idx_ = slice_idx;
if (tmp_row_cnt >= EACH_MACRO_MIN_ROW_CNT) {
break;
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(sorted_slices_idx.push_back(cur_info))) {
LOG_WARN("fail to push slice info", K(ret));
}
}
}
sqc_build_ctx_.sorted_slices_idx_.reset();
if (OB_FAIL(ret)) {
} else if (sorted_slices_idx.count() > thread_cnt) {
// thread_cnt cannot handle aggregated group, re_calc by thread_cnt
for (int64_t i = 0; OB_SUCC(ret) && i < thread_cnt; ++i) {
ObTabletDirectLoadBuildCtx::AggregatedCGInfo cur_info;
calc_cg_idx(thread_cnt, i, cur_info.start_idx_, cur_info.last_idx_);
if (OB_FAIL(sqc_build_ctx_.sorted_slices_idx_.push_back(cur_info))) {
LOG_WARN("fail to push info", K(ret), K(i));
}
}
} else if (OB_FAIL(sqc_build_ctx_.sorted_slices_idx_.assign(sorted_slices_idx))) {
LOG_WARN("fail to assign array", K(ret));
}
sqc_build_ctx_.sorted_slice_writers_.reset();
if (OB_FAIL(ret)) {
} else if (OB_FAIL(sqc_build_ctx_.sorted_slice_writers_.assign(sorted_slices))) {
LOG_WARN("copy slice array failed", K(ret), K(sorted_slices.count()));
}
}
}
FLOG_INFO("calc_cg_range", K(ret), K(sorted_slices.count()), K(thread_cnt), K(sqc_build_ctx_.sorted_slice_writers_.count()), K(sqc_build_ctx_.sorted_slices_idx_.count()));
return ret;
}
@ -1740,7 +1799,7 @@ int ObTabletDirectLoadMgr::close_sstable_slice(
LOG_WARN("slice writer fill column group failed", K(ret));
}
} else {
if (OB_FAIL(calc_range(storage_schema, tablet->get_rowkey_read_info().get_datum_utils()))) {
if (OB_FAIL(calc_range(storage_schema, tablet->get_rowkey_read_info().get_datum_utils(), 0))) {
LOG_WARN("calc range failed", K(ret));
} else if (OB_FAIL(notify_all())) {
LOG_WARN("notify all failed", K(ret));
@ -1787,7 +1846,7 @@ int ObTabletDirectLoadMgr::close_sstable_slice(
return ret;
}
void ObTabletDirectLoadMgr::calc_cg_idx(const int64_t thread_cnt, const int64_t thread_id, int64_t &strat_idx, int64_t &end_idx)
void ObTabletDirectLoadMgr::calc_cg_idx(const int64_t thread_cnt, const int64_t thread_id, int64_t &start_idx, int64_t &end_idx)
{
int ret = OB_SUCCESS;
const int64_t each_thread_task_cnt = sqc_build_ctx_.sorted_slice_writers_.count() / thread_cnt;
@ -1795,15 +1854,16 @@ void ObTabletDirectLoadMgr::calc_cg_idx(const int64_t thread_cnt, const int64_t
const int64_t pre_handle_cnt = need_plus_thread_cnt * (each_thread_task_cnt + 1);
if (need_plus_thread_cnt != 0) {
if (thread_id < need_plus_thread_cnt) {
strat_idx = (each_thread_task_cnt + 1) * thread_id;
end_idx = strat_idx + (each_thread_task_cnt + 1);
start_idx = (each_thread_task_cnt + 1) * thread_id;
end_idx = start_idx + (each_thread_task_cnt + 1);
} else {
strat_idx = pre_handle_cnt + (thread_id - need_plus_thread_cnt) * each_thread_task_cnt;
end_idx = strat_idx + each_thread_task_cnt;
start_idx = pre_handle_cnt + (thread_id - need_plus_thread_cnt) * each_thread_task_cnt;
end_idx = start_idx + each_thread_task_cnt;
// when slice_cnt < thread_cnt, idle thread start_idx = end_idx
}
} else {
strat_idx = each_thread_task_cnt * thread_id;
end_idx = strat_idx + each_thread_task_cnt;
start_idx = each_thread_task_cnt * thread_id;
end_idx = start_idx + each_thread_task_cnt;
}
}
@ -1816,23 +1876,21 @@ int ObTabletDirectLoadMgr::fill_column_group(const int64_t thread_cnt, const int
} else if (OB_UNLIKELY(thread_cnt <= 0 || thread_id < 0 || thread_id > thread_cnt - 1)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguement", K(ret), K(thread_cnt), K(thread_id));
} else if (sqc_build_ctx_.sorted_slice_writers_.count() == 0) {
} else if (sqc_build_ctx_.sorted_slice_writers_.count() == 0 || thread_id > sqc_build_ctx_.sorted_slices_idx_.count() - 1) {
//ignore
FLOG_INFO("[DIRECT_LOAD_FILL_CG] idle thread", K(sqc_build_ctx_.sorted_slice_writers_.count()), K(thread_id), K(sqc_build_ctx_.sorted_slices_idx_.count()));
} else if (sqc_build_ctx_.sorted_slice_writers_.count() != sqc_build_ctx_.slice_mgr_map_.size()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("wrong slice writer num", K(ret), K(sqc_build_ctx_.sorted_slice_writers_.count()), K(sqc_build_ctx_.slice_mgr_map_.size()), K(common::lbt()));
} else {
int64_t strat_idx = 0;
int64_t last_idx = 0;
calc_cg_idx(thread_cnt, thread_id, strat_idx, last_idx);
LOG_INFO("direct load start fill column group", K(tablet_id_), K(sqc_build_ctx_.sorted_slice_writers_.count()), K(thread_cnt), K(thread_id), K(strat_idx), K(last_idx));
if (strat_idx < 0 || strat_idx >= sqc_build_ctx_.sorted_slice_writers_.count() || last_idx > sqc_build_ctx_.sorted_slice_writers_.count()) {
//skip
} else {
const int64_t start_idx = sqc_build_ctx_.sorted_slices_idx_.at(thread_id).start_idx_;
const int64_t last_idx = sqc_build_ctx_.sorted_slices_idx_.at(thread_id).last_idx_;
ObArenaAllocator arena_allocator("DIRECT_RESCAN", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID());
ObTablet *tablet = nullptr;
ObStorageSchema *storage_schema = nullptr;
int64_t fill_cg_finish_count = -1;
int64_t row_cnt = 0;
if (OB_UNLIKELY(!tablet_handle_.is_valid())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid tablet handle", K(ret), K(tablet_handle_));
@ -1841,21 +1899,34 @@ int ObTabletDirectLoadMgr::fill_column_group(const int64_t thread_cnt, const int
LOG_WARN("tablet is null", K(ret), K(ls_id_), K(tablet_id_));
} else if (OB_FAIL(tablet->load_storage_schema(arena_allocator, storage_schema))) {
LOG_WARN("load storage schema failed", K(ret), K(tablet_id_));
} else {
for (int64_t i = strat_idx; OB_SUCC(ret) && i < last_idx; ++i) {
ObDirectLoadSliceWriter *slice_writer = sqc_build_ctx_.sorted_slice_writers_.at(i);
if (OB_ISNULL(slice_writer) || !slice_writer->need_column_store()) {
} else if (OB_UNLIKELY(nullptr == storage_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("wrong slice writer", KPC(slice_writer));
} else if (OB_FAIL(slice_writer->fill_column_group(storage_schema, get_start_scn()))) {
LOG_WARN("slice writer rescan failed", K(ret), KP(storage_schema), K(get_start_scn()));
LOG_WARN("invalid storage_schema", K(ret), KP(storage_schema));
} else {
fill_cg_finish_count = ATOMIC_AAF(&sqc_build_ctx_.fill_column_group_finish_count_, 1);
}
const ObIArray<ObStorageColumnGroupSchema> &cg_schemas = storage_schema->get_column_groups();
FLOG_INFO("[DIRECT_LOAD_FILL_CG] start fill cg",
"tablet_id", tablet_id_,
"cg_cnt", cg_schemas.count(),
"slice_cnt", sqc_build_ctx_.sorted_slice_writers_.count(),
K(thread_cnt), K(thread_id), K(start_idx), K(last_idx));
ObCOSliceWriter *cur_writer = nullptr;
if (OB_ISNULL(cur_writer = OB_NEWx(ObCOSliceWriter, &arena_allocator))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate memory for co writer failed", K(ret));
} else if (OB_FAIL(fill_aggregated_column_group(start_idx, last_idx, storage_schema, cur_writer, fill_cg_finish_count, row_cnt))) {
LOG_WARN("fail to fill aggregated cg", K(ret), KPC(cur_writer));
}
// free writer anyhow
if (OB_NOT_NULL(cur_writer)) {
cur_writer->~ObCOSliceWriter();
arena_allocator.free(cur_writer);
cur_writer = nullptr;
}
ObTabletObjLoadHelper::free(arena_allocator, storage_schema); //arena cannot free
arena_allocator.reset();
// after finish all slice, free slice_writer
if (OB_SUCC(ret)) {
if (fill_cg_finish_count == sqc_build_ctx_.sorted_slice_writers_.count()) {
sqc_build_ctx_.sorted_slice_writers_.reset();
@ -1873,8 +1944,77 @@ int ObTabletDirectLoadMgr::fill_column_group(const int64_t thread_cnt, const int
}
}
if (OB_SUCC(ret)) {
LOG_INFO("direct load finish fill column group", K(tablet_id_), K(sqc_build_ctx_.sorted_slice_writers_.count()), K(thread_cnt), K(thread_id), K(strat_idx), K(last_idx),
K(sqc_build_ctx_.slice_mgr_map_.size()));
FLOG_INFO("[DIRECT_LOAD_FILL_CG] finish fill cg",
"tablet_id", tablet_id_,
"row_cnt", row_cnt,
"slice_cnt", sqc_build_ctx_.sorted_slice_writers_.count(),
K(thread_cnt), K(thread_id), K(start_idx), K(last_idx), K(sqc_build_ctx_.slice_mgr_map_.size()));
}
}
return ret;
}
int ObTabletDirectLoadMgr::fill_aggregated_column_group(
const int64_t start_idx,
const int64_t last_idx,
const ObStorageSchema *storage_schema,
ObCOSliceWriter *cur_writer,
int64_t &fill_cg_finish_count,
int64_t &fill_row_cnt)
{
int ret = OB_SUCCESS;
fill_cg_finish_count = -1;
fill_row_cnt = 0;
if (OB_ISNULL(cur_writer) || OB_ISNULL(storage_schema) || OB_UNLIKELY(start_idx < 0 || last_idx < 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), KP(cur_writer), KP(storage_schema), K(start_idx), K(last_idx));
} else {
const ObIArray<ObStorageColumnGroupSchema> &cg_schemas = storage_schema->get_column_groups();
for (int64_t cg_idx = 0; OB_SUCC(ret) && cg_idx < cg_schemas.count(); ++cg_idx) {
cur_writer->reset();
common::ObArray<sql::ObCompactStore *> datum_stores;
if (start_idx == last_idx || start_idx >= sqc_build_ctx_.sorted_slice_writers_.count() || last_idx > sqc_build_ctx_.sorted_slice_writers_.count()) {
// skip
} else {
ObDirectLoadSliceWriter *first_slice_writer = sqc_build_ctx_.sorted_slice_writers_.at(start_idx);
if (OB_ISNULL(first_slice_writer)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("null slice writer", K(ret), KP(first_slice_writer));
} else if (OB_UNLIKELY(first_slice_writer->get_row_offset() < 0)) {
ret = OB_ERR_SYS;
LOG_WARN("invalid row offset", K(ret), K(first_slice_writer->get_row_offset()));
} else if (OB_FAIL(cur_writer->init(storage_schema, cg_idx, this, first_slice_writer->get_start_seq(), first_slice_writer->get_row_offset(), get_start_scn()))) {
LOG_WARN("init co ddl writer failed", K(ret), KPC(cur_writer), K(cg_idx), KPC(this));
} else {
for (int64_t i = start_idx; OB_SUCC(ret) && i < last_idx; ++i) {
ObDirectLoadSliceWriter *slice_writer = sqc_build_ctx_.sorted_slice_writers_.at(i);
if (OB_ISNULL(slice_writer) || !slice_writer->need_column_store()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("wrong slice writer", K(ret), KPC(slice_writer));
} else if (OB_FAIL(slice_writer->fill_aggregated_column_group(cg_idx, cur_writer, datum_stores))) {
LOG_WARN("slice writer rescan failed", K(ret), K(cg_idx), KPC(cur_writer));
} else if (cg_idx == cg_schemas.count() - 1) {
// after fill last cg, inc finish cnt
fill_cg_finish_count = ATOMIC_AAF(&sqc_build_ctx_.fill_column_group_finish_count_, 1);
fill_row_cnt += slice_writer->get_row_count();
}
}
}
}
if (OB_SUCC(ret)) {
if (cur_writer->is_inited() && OB_FAIL(cur_writer->close())) {
LOG_WARN("close co ddl writer failed", K(ret));
} else {
for (int64_t i = 0; i < datum_stores.count(); ++i) {
if (OB_NOT_NULL(datum_stores.at(i))) {
datum_stores.at(i)->~ObCompactStore();
}
}
datum_stores.reset();
}
}
// next cg
}
}
return ret;

View File

@ -178,6 +178,7 @@ public:
int calc_range(
const share::ObLSID &ls_id,
const common::ObTabletID &tablet_id,
const int64_t thread_cnt,
const bool is_full_direct_load);
int fill_column_group(
const share::ObLSID &ls_id,
@ -263,7 +264,18 @@ public:
return common::murmurhash(&slice_id, sizeof(slice_id), 0L);
}
void reset_slice_ctx_on_demand();
TO_STRING_KV(K_(build_param), K_(is_task_end), K_(task_finish_count), K_(task_total_cnt));
TO_STRING_KV(K_(build_param), K_(is_task_end), K_(task_finish_count), K_(task_total_cnt), K_(sorted_slices_idx));
struct AggregatedCGInfo final {
public:
AggregatedCGInfo()
: start_idx_(0),
last_idx_(0) {}
~AggregatedCGInfo() {}
TO_STRING_KV(K_(start_idx), K_(last_idx));
public:
int64_t start_idx_;
int64_t last_idx_;
};
public:
typedef common::hash::ObHashMap<
int64_t,
@ -276,6 +288,7 @@ public:
blocksstable::ObSSTableIndexBuilder *index_builder_;
common::ObArray<ObOptColumnStat*> column_stat_array_; // online column stat result.
common::ObArray<ObDirectLoadSliceWriter *> sorted_slice_writers_;
common::ObArray<AggregatedCGInfo> sorted_slices_idx_; //for cg_aggregation
bool is_task_end_; // to avoid write commit log/freeze in memory index sstable again.
int64_t task_finish_count_; // reach the parallel slice cnt, means the tablet data finished.
int64_t task_total_cnt_; // parallelism of the PX.
@ -353,7 +366,8 @@ public:
virtual int wait_notify(const ObDirectLoadSliceWriter *slice_writer, const share::SCN &start_scn);
int fill_column_group(const int64_t thread_cnt, const int64_t thread_id);
virtual int notify_all();
virtual int calc_range(const ObStorageSchema *storage_schema, const blocksstable::ObStorageDatumUtils &datum_utils);
virtual int calc_range(const ObStorageSchema *storage_schema, const blocksstable::ObStorageDatumUtils &datum_utils, const int64_t thread_cnt);
int calc_cg_range(ObArray<ObDirectLoadSliceWriter *> &sorted_slices, const int64_t thread_cnt);
const ObIArray<ObColumnSchemaItem> &get_column_info() const { return column_items_; };
VIRTUAL_TO_STRING_KV(K_(is_inited), K_(is_schema_item_ready), K_(ls_id), K_(tablet_id), K_(table_key), K_(data_format_version), K_(ref_cnt),
@ -362,7 +376,13 @@ public:
private:
int prepare_schema_item_on_demand(const uint64_t table_id);
void calc_cg_idx(const int64_t thread_cnt, const int64_t thread_id, int64_t &strat_idx, int64_t &end_idx);
int fill_aggregated_column_group(
const int64_t start_idx,
const int64_t last_idx,
const ObStorageSchema *storage_schema,
ObCOSliceWriter *cur_writer,
int64_t &fill_cg_finish_count,
int64_t &fill_row_cnt);
// private:
/* +++++ online column stat collect +++++ */
// virtual int init_sql_statistics_if_needed();
@ -370,6 +390,7 @@ private:
/* +++++ -------------------------- +++++ */
public:
static const int64_t TRY_LOCK_TIMEOUT = 1 * 1000000; // 1s
static const int64_t EACH_MACRO_MIN_ROW_CNT = 1000000; // 100w
protected:
bool is_inited_;
bool is_schema_item_ready_;

View File

@ -648,6 +648,31 @@ ObDirectLoadSliceWriter::~ObDirectLoadSliceWriter()
need_column_store_ = false;
}
//for test
int ObDirectLoadSliceWriter::mock_chunk_store(const int64_t row_cnt)
{
int ret = OB_SUCCESS;
if (row_cnt < 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid row cnt", K(ret), K(row_cnt));
} else {
ObChunkSliceStore *chunk_slice_store = nullptr;
if (OB_ISNULL(chunk_slice_store = OB_NEWx(ObChunkSliceStore, &allocator_))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate memory for chunk slice store failed", K(ret));
} else {
chunk_slice_store->row_cnt_ = row_cnt;
slice_store_ = chunk_slice_store;
}
if (OB_FAIL(ret) && nullptr != chunk_slice_store) {
chunk_slice_store->~ObChunkSliceStore();
allocator_.free(chunk_slice_store);
}
}
return ret;
}
int ObDirectLoadSliceWriter::prepare_slice_store_if_need(
const int64_t schema_rowkey_column_num,
const bool is_column_store,
@ -1057,6 +1082,54 @@ int ObDirectLoadSliceWriter::check_null(
return ret;
}
int ObDirectLoadSliceWriter::fill_aggregated_column_group(
const int64_t cg_idx,
ObCOSliceWriter *cur_writer,
ObIArray<sql::ObCompactStore *> &datum_stores)
{
int ret = OB_SUCCESS;
datum_stores.reset();
ObChunkSliceStore *chunk_slice_store = static_cast<ObChunkSliceStore *>(slice_store_);
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
} else if (nullptr == chunk_slice_store || is_empty()) {
// do nothing
LOG_INFO("chunk slice store is null or empty", K(ret),
KPC(chunk_slice_store), KPC(tablet_direct_load_mgr_));
} else if (ATOMIC_LOAD(&is_canceled_)) {
ret = OB_CANCELED;
LOG_WARN("fil cg task canceled", K(ret), K(is_canceled_));
} else if (cg_idx < 0 || cg_idx > chunk_slice_store->datum_stores_.count()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid cg idx", K(ret), K(cg_idx), K(chunk_slice_store->datum_stores_));
} else {
sql::ObCompactStore *cur_datum_store = chunk_slice_store->datum_stores_.at(cg_idx);
const ObChunkDatumStore::StoredRow *stored_row = nullptr;
bool has_next = false;
while (OB_SUCC(ret) && OB_SUCC(cur_datum_store->has_next(has_next)) && has_next) {
if (OB_FAIL(cur_datum_store->get_next_row(stored_row))) {
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
break;
} else {
LOG_WARN("get next row failed", K(ret));
}
} else {
if (OB_FAIL(cur_writer->append_row(stored_row))) {
LOG_WARN("append row failed", K(ret), KPC(stored_row));
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(datum_stores.push_back(cur_datum_store))) {
LOG_WARN("fail to push datum store", K(ret));
}
}
}
}
return ret;
}
int ObDirectLoadSliceWriter::close()
{
int ret = OB_SUCCESS;
@ -1091,8 +1164,6 @@ int ObDirectLoadSliceWriter::fill_column_group(const ObStorageSchema *storage_sc
LOG_WARN("fil cg task canceled", K(ret), K(is_canceled_));
} else {
const ObIArray<ObStorageColumnGroupSchema> &cg_schemas = storage_schema->get_column_groups();
ObArray<ObCOSliceWriter *> co_ddl_writers;
co_ddl_writers.set_attr(ObMemAttr(MTL_ID(), "DL_co_writers"));
FLOG_INFO("[DDL_FILL_CG] fill column group start",
"tablet_id", tablet_direct_load_mgr_->get_tablet_id(),
"row_count", chunk_slice_store->get_row_count(),
@ -1106,7 +1177,6 @@ int ObDirectLoadSliceWriter::fill_column_group(const ObStorageSchema *storage_sc
} else {
// 2. rescan and write
for (int64_t cg_idx = 0; OB_SUCC(ret) && cg_idx < cg_schemas.count(); ++cg_idx) {
const ObStorageColumnGroupSchema &cg_schema = cg_schemas.at(cg_idx);
cur_writer->reset();
if (OB_FAIL(cur_writer->init(storage_schema, cg_idx, tablet_direct_load_mgr_, start_seq_, row_offset_, start_scn))) {
LOG_WARN("init co ddl writer failed", K(ret), KPC(cur_writer), K(cg_idx), KPC(this));

View File

@ -458,6 +458,7 @@ public:
int64_t &inserted_cg_row_cnt_;
};
class ObCOSliceWriter;
class ObDirectLoadSliceWriter final
{
public:
@ -494,9 +495,14 @@ public:
const ObStorageSchema *storage_schema,
const share::SCN &start_scn,
ObInsertMonitor *monitor_node = NULL);
int fill_aggregated_column_group(
const int64_t cg_idx,
ObCOSliceWriter *cur_writer,
ObIArray<sql::ObCompactStore *> &datum_stores);
void set_row_offset(const int64_t row_offset) { row_offset_ = row_offset; }
int64_t get_row_count() const { return nullptr == slice_store_ ? 0 : slice_store_->get_row_count(); }
int64_t get_row_offset() const { return row_offset_; }
blocksstable::ObMacroDataSeq &get_start_seq() { return start_seq_; }
bool is_empty() const { return 0 == get_row_count(); }
bool need_column_store() const { return need_column_store_; }
ObTabletSliceStore *get_slice_store() const { return slice_store_; }
@ -552,6 +558,7 @@ private:
const int64_t lob_inrow_threshold,
const uint64_t src_tenant_id,
ObLobMetaRowIterator *&row_iter);
int mock_chunk_store(const int64_t row_cnt);
private:
bool is_inited_;
bool need_column_store_;
@ -585,6 +592,7 @@ public:
const sql::ObChunkDatumStore::StoredRow *stored_row,
blocksstable::ObDatumRow &cg_row);
int close();
bool is_inited() { return is_inited_; }
TO_STRING_KV(K(is_inited_), K(cg_idx_), KPC(cg_schema_), K(macro_block_writer_), K(data_desc_), K(cg_row_));
private:
bool is_inited_;

View File

@ -437,7 +437,7 @@ int ObDirectLoadInsertTabletContext::close_lob_sstable_slice(const int64_t slice
return ret;
}
int ObDirectLoadInsertTabletContext::calc_range()
int ObDirectLoadInsertTabletContext::calc_range(const int64_t thread_cnt)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
@ -445,7 +445,7 @@ int ObDirectLoadInsertTabletContext::calc_range()
LOG_WARN("ObDirectLoadInsertTableContext not init", KR(ret), KP(this));
} else {
ObTenantDirectLoadMgr *sstable_insert_mgr = MTL(ObTenantDirectLoadMgr *);
if (OB_FAIL(sstable_insert_mgr->calc_range(param_.ls_id_, param_.tablet_id_, true))) {
if (OB_FAIL(sstable_insert_mgr->calc_range(param_.ls_id_, param_.tablet_id_, thread_cnt, true))) {
LOG_WARN("fail to calc range", KR(ret), K(param_.tablet_id_));
} else {
LOG_INFO("success to calc range", K(param_.tablet_id_));

View File

@ -110,7 +110,7 @@ public:
blocksstable::ObDatumRow &datum_row);
int get_lob_write_ctx(ObDirectLoadInsertTabletWriteCtx &write_ctx);
int calc_range();
int calc_range(const int64_t thread_cnt);
int fill_column_group(const int64_t thread_cnt, const int64_t thread_id);
int cancel();
TO_STRING_KV(K_(param), K_(is_open));

View File

@ -50,6 +50,7 @@ ObDirectLoadMergeParam::ObDirectLoadMergeParam()
store_column_count_(0),
snapshot_version_(0),
lob_column_cnt_(0),
fill_cg_thread_cnt_(0),
datum_utils_(nullptr),
col_descs_(nullptr),
cmp_funcs_(nullptr),

View File

@ -62,6 +62,7 @@ public:
int64_t store_column_count_;
int64_t snapshot_version_;
int64_t lob_column_cnt_;
int64_t fill_cg_thread_cnt_;
storage::ObDirectLoadTableDataDesc table_data_desc_;
const blocksstable::ObStorageDatumUtils *datum_utils_;
const common::ObIArray<share::schema::ObColDesc> *col_descs_;

View File

@ -110,7 +110,7 @@ int ObDirectLoadPartitionMergeTask::process()
LOG_WARN("fail to inc finish count", KR(ret));
} else if (is_ready) {
if (merge_param_->is_column_store_) {
if (OB_FAIL(tablet_ctx->calc_range())) {
if (OB_FAIL(tablet_ctx->calc_range(merge_param_->fill_cg_thread_cnt_))) {
LOG_WARN("fail to calc range", KR(ret));
}
} else if (OB_FAIL(tablet_ctx->close())) {