remove tablet_handle_ in tablet_direct_load_mgr
This commit is contained in:
@ -420,7 +420,7 @@ ObTabletDDLParam::~ObTabletDDLParam()
|
||||
|
||||
}
|
||||
|
||||
int ObChunkSliceStore::init(const int64_t rowkey_column_count, ObTabletHandle &tablet_handle,
|
||||
int ObChunkSliceStore::init(const int64_t rowkey_column_count, const ObStorageSchema *storage_schema,
|
||||
ObArenaAllocator &allocator, const ObIArray<ObColumnSchemaItem> &col_array, const int64_t dir_id,
|
||||
const int64_t parallelism)
|
||||
{
|
||||
@ -428,10 +428,13 @@ int ObChunkSliceStore::init(const int64_t rowkey_column_count, ObTabletHandle &t
|
||||
if (OB_UNLIKELY(is_inited_)) {
|
||||
ret = OB_INIT_TWICE;
|
||||
LOG_WARN("init twice", K(ret));
|
||||
} else if (OB_ISNULL(storage_schema)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("null schema", K(ret), K(*this));
|
||||
} else if (OB_UNLIKELY(rowkey_column_count <= 0)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalida argument", K(ret), K(rowkey_column_count));
|
||||
} else if (OB_FAIL(prepare_datum_stores(MTL_ID(), tablet_handle, allocator, col_array, dir_id, parallelism))) {
|
||||
} else if (OB_FAIL(prepare_datum_stores(MTL_ID(), storage_schema, allocator, col_array, dir_id, parallelism))) {
|
||||
LOG_WARN("fail to prepare datum stores");
|
||||
} else {
|
||||
arena_allocator_ = &allocator;
|
||||
@ -469,86 +472,77 @@ int64_t ObChunkSliceStore::calc_chunk_limit(const ObStorageColumnGroupSchema &cg
|
||||
return ((cg_schema.column_cnt_ / basic_column_cnt) + 1) * basic_chunk_memory_limit;
|
||||
}
|
||||
|
||||
int ObChunkSliceStore::prepare_datum_stores(const uint64_t tenant_id, ObTabletHandle &tablet_handle, ObIAllocator &allocator,
|
||||
int ObChunkSliceStore::prepare_datum_stores(const uint64_t tenant_id, const ObStorageSchema *storage_schema, ObIAllocator &allocator,
|
||||
const ObIArray<ObColumnSchemaItem> &col_array, const int64_t dir_id, const int64_t parallelism)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const int64_t chunk_mem_limit = 64 * 1024L; // 64K
|
||||
ObCompactStore *datum_store = nullptr;
|
||||
void *buf = nullptr;
|
||||
if (OB_UNLIKELY(tenant_id <= 0)) {
|
||||
if (OB_UNLIKELY(tenant_id <= 0 || nullptr == storage_schema)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(tenant_id));
|
||||
LOG_WARN("invalid argument", K(ret), K(tenant_id), KP(storage_schema));
|
||||
} else {
|
||||
|
||||
ObStorageSchema *storage_schema = nullptr;
|
||||
if (OB_UNLIKELY(!tablet_handle.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid tablet handle", K(ret), K(tablet_handle));
|
||||
} else if (OB_FAIL(tablet_handle.get_obj()->load_storage_schema(allocator, storage_schema))) {
|
||||
LOG_WARN("load storage schema failed", K(ret), K(tablet_handle));
|
||||
} else {
|
||||
const ObIArray<ObStorageColumnGroupSchema> &cg_schemas = storage_schema->get_column_groups();
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < cg_schemas.count(); ++i) {
|
||||
const ObStorageColumnGroupSchema &cur_cg_schema = cg_schemas.at(i);
|
||||
ObCompressorType compressor_type = cur_cg_schema.compressor_type_;
|
||||
compressor_type = NONE_COMPRESSOR == compressor_type ? (CS_ENCODING_ROW_STORE == cur_cg_schema.row_store_type_ ? ZSTD_1_3_8_COMPRESSOR : NONE_COMPRESSOR) : compressor_type;
|
||||
if (OB_FAIL(ObDDLUtil::get_temp_store_compress_type(compressor_type,
|
||||
parallelism,
|
||||
compressor_type))) {
|
||||
LOG_WARN("fail to get temp store compress type", K(ret));
|
||||
}
|
||||
if (cur_cg_schema.is_rowkey_column_group() || cur_cg_schema.is_all_column_group()) {
|
||||
target_store_idx_ = i;
|
||||
}
|
||||
if (OB_ISNULL(buf = allocator.alloc(sizeof(ObCompactStore)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("allocate memory failed", K(ret));
|
||||
} else {
|
||||
datum_store = new (buf) ObCompactStore();
|
||||
ObArray<ObColumnSchemaItem> cur_column_items;
|
||||
cur_column_items.set_attr(ObMemAttr(tenant_id, "tmp_cg_item"));
|
||||
for (int64_t j = 0; OB_SUCC(ret) && j < cur_cg_schema.column_cnt_; ++j) {
|
||||
int64_t column_idx = cur_cg_schema.column_idxs_ ? cur_cg_schema.column_idxs_[j] : j; // all_cg column_idxs_ = null
|
||||
if (column_idx >= col_array.count()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("invalid column idex", K(ret), K(column_idx), K(col_array.count()), K(i), K(cur_cg_schema));
|
||||
} else if (OB_FAIL(cur_column_items.push_back(col_array.at(column_idx)))) {
|
||||
LOG_WARN("fail to push_back col_item", K(ret));
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(datum_store->init(chunk_mem_limit, cur_column_items, tenant_id, ObCtxIds::DEFAULT_CTX_ID,
|
||||
"DL_SLICE_STORE", true/*enable_dump*/, 0, false/*disable truncate*/,
|
||||
compressor_type))) {
|
||||
LOG_WARN("failed to init chunk datum store", K(ret));
|
||||
} else {
|
||||
datum_store->set_dir_id(dir_id);
|
||||
datum_store->get_inner_allocator().set_tenant_id(tenant_id);
|
||||
LOG_INFO("set dir id", K(dir_id));
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(datum_stores_.push_back(datum_store))) {
|
||||
LOG_WARN("fail to push back datum_store", K(ret));
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
if (OB_NOT_NULL(datum_store)) {
|
||||
datum_store->~ObCompactStore();
|
||||
allocator.free(datum_store);
|
||||
datum_store = nullptr;
|
||||
}
|
||||
}
|
||||
}
|
||||
const ObIArray<ObStorageColumnGroupSchema> &cg_schemas = storage_schema->get_column_groups();
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < cg_schemas.count(); ++i) {
|
||||
const ObStorageColumnGroupSchema &cur_cg_schema = cg_schemas.at(i);
|
||||
ObCompressorType compressor_type = cur_cg_schema.compressor_type_;
|
||||
compressor_type = NONE_COMPRESSOR == compressor_type ? (CS_ENCODING_ROW_STORE == cur_cg_schema.row_store_type_ ? ZSTD_1_3_8_COMPRESSOR : NONE_COMPRESSOR) : compressor_type;
|
||||
if (OB_FAIL(ObDDLUtil::get_temp_store_compress_type(compressor_type,
|
||||
parallelism,
|
||||
compressor_type))) {
|
||||
LOG_WARN("fail to get temp store compress type", K(ret));
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(cg_schemas_.assign(cg_schemas))) {
|
||||
LOG_WARN("fail to copy cg schemas", K(ret));
|
||||
if (cur_cg_schema.is_rowkey_column_group() || cur_cg_schema.is_all_column_group()) {
|
||||
target_store_idx_ = i;
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_ISNULL(buf = allocator.alloc(sizeof(ObCompactStore)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("allocate memory failed", K(ret));
|
||||
} else {
|
||||
datum_store = new (buf) ObCompactStore();
|
||||
ObArray<ObColumnSchemaItem> cur_column_items;
|
||||
cur_column_items.set_attr(ObMemAttr(tenant_id, "tmp_cg_item"));
|
||||
for (int64_t j = 0; OB_SUCC(ret) && j < cur_cg_schema.column_cnt_; ++j) {
|
||||
int64_t column_idx = cur_cg_schema.column_idxs_ ? cur_cg_schema.column_idxs_[j] : j; // all_cg column_idxs_ = null
|
||||
if (column_idx >= col_array.count()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("invalid column idex", K(ret), K(column_idx), K(col_array.count()), K(i), K(cur_cg_schema));
|
||||
} else if (OB_FAIL(cur_column_items.push_back(col_array.at(column_idx)))) {
|
||||
LOG_WARN("fail to push_back col_item", K(ret));
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(datum_store->init(chunk_mem_limit, cur_column_items, tenant_id, ObCtxIds::DEFAULT_CTX_ID,
|
||||
"DL_SLICE_STORE", true/*enable_dump*/, 0, false/*disable truncate*/,
|
||||
compressor_type))) {
|
||||
LOG_WARN("failed to init chunk datum store", K(ret));
|
||||
} else {
|
||||
datum_store->set_dir_id(dir_id);
|
||||
datum_store->get_inner_allocator().set_tenant_id(tenant_id);
|
||||
LOG_INFO("set dir id", K(dir_id));
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(datum_stores_.push_back(datum_store))) {
|
||||
LOG_WARN("fail to push back datum_store", K(ret));
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
if (OB_NOT_NULL(datum_store)) {
|
||||
datum_store->~ObCompactStore();
|
||||
allocator.free(datum_store);
|
||||
datum_store = nullptr;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
ObTabletObjLoadHelper::free(allocator, storage_schema);
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(cg_schemas_.assign(cg_schemas))) {
|
||||
LOG_WARN("fail to copy cg schemas", K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
LOG_INFO("init ObChunkSliceStore", K(*this));
|
||||
return ret;
|
||||
@ -766,7 +760,7 @@ int ObDirectLoadSliceWriter::prepare_slice_store_if_need(
|
||||
const bool is_column_store,
|
||||
const int64_t dir_id,
|
||||
const int64_t parallelism,
|
||||
ObTabletHandle &tablet_handle,
|
||||
const ObStorageSchema *storage_schema,
|
||||
const SCN &start_scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -778,15 +772,15 @@ int ObDirectLoadSliceWriter::prepare_slice_store_if_need(
|
||||
} else if (is_full_direct_load(tablet_direct_load_mgr_->get_direct_load_type()) && is_column_store) {
|
||||
need_column_store_ = true;
|
||||
ObChunkSliceStore *chunk_slice_store = nullptr;
|
||||
if (OB_UNLIKELY(!tablet_handle.is_valid())) {
|
||||
if (OB_ISNULL(storage_schema)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid tablet handle", K(ret), K(tablet_handle));
|
||||
LOG_WARN("null schema", K(ret), K(*this));
|
||||
} else if (OB_ISNULL(chunk_slice_store = OB_NEWx(ObChunkSliceStore, &allocator_))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("allocate memory for chunk slice store failed", K(ret));
|
||||
} else if (OB_FAIL(chunk_slice_store->init(schema_rowkey_column_num + ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt(),
|
||||
tablet_handle, allocator_, tablet_direct_load_mgr_->get_column_info(), dir_id, parallelism))) {
|
||||
LOG_WARN("init chunk slice store failed", K(ret));
|
||||
storage_schema, allocator_, tablet_direct_load_mgr_->get_column_info(), dir_id, parallelism))) {
|
||||
LOG_WARN("init chunk slice store failed", K(ret), KPC(storage_schema));
|
||||
} else {
|
||||
slice_store_ = chunk_slice_store;
|
||||
}
|
||||
@ -979,7 +973,6 @@ int ObDirectLoadSliceWriter::fill_lob_into_macro_block(
|
||||
info.trans_id_, info.seq_no_, timeout_ts, lob_inrow_threshold, info.src_tenant_id_, row_iter))) {
|
||||
LOG_WARN("fail to prepare iters", K(ret), KP(row_iter), K(datum));
|
||||
} else {
|
||||
ObTabletHandle unused_tablet_handle; //lob no need to get storageschema with handle
|
||||
while (OB_SUCC(ret)) {
|
||||
const blocksstable::ObDatumRow *cur_row = nullptr;
|
||||
if (OB_FAIL(THIS_WORKER.check_status())) {
|
||||
@ -1000,7 +993,7 @@ int ObDirectLoadSliceWriter::fill_lob_into_macro_block(
|
||||
} else if (OB_FAIL(check_null(false/*is_index_table*/, ObLobMetaUtil::LOB_META_SCHEMA_ROWKEY_COL_CNT, *cur_row))) {
|
||||
LOG_WARN("fail to check null value in row", KR(ret), KPC(cur_row));
|
||||
} else if (OB_FAIL(prepare_slice_store_if_need(ObLobMetaUtil::LOB_META_SCHEMA_ROWKEY_COL_CNT,
|
||||
false/*is_column_store*/, 1L/*unsued*/, 1L/*unused*/, unused_tablet_handle, start_scn))) {
|
||||
false/*is_column_store*/, 1L/*unsued*/, 1L/*unused*/, nullptr /*storage_schema*/, start_scn))) {
|
||||
LOG_WARN("prepare macro block writer failed", K(ret));
|
||||
} else if (OB_FAIL(slice_store_->append_row(*cur_row))) {
|
||||
LOG_WARN("macro block writer append row failed", K(ret), KPC(cur_row));
|
||||
@ -1029,7 +1022,7 @@ int ObDirectLoadSliceWriter::fill_sstable_slice(
|
||||
const SCN &start_scn,
|
||||
const uint64_t table_id,
|
||||
const ObTabletID &tablet_id,
|
||||
ObTabletHandle &tablet_handle,
|
||||
const ObStorageSchema *storage_schema,
|
||||
ObIStoreRowIterator *row_iter,
|
||||
const ObTableSchemaItem &schema_item,
|
||||
const ObDirectLoadType &direct_load_type,
|
||||
@ -1045,6 +1038,9 @@ int ObDirectLoadSliceWriter::fill_sstable_slice(
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObDirectLoadSliceWriter not init", KR(ret), KP(this));
|
||||
} else if (OB_ISNULL(storage_schema)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("null schema", K(ret), K(*this));
|
||||
} else {
|
||||
ObArenaAllocator arena("SliceW_sst", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID());
|
||||
const ObDataStoreDesc &data_desc = tablet_direct_load_mgr_->get_sqc_build_ctx().data_block_desc_.get_desc();
|
||||
@ -1086,7 +1082,7 @@ int ObDirectLoadSliceWriter::fill_sstable_slice(
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(check_null(schema_item.is_index_table_, schema_item.rowkey_column_num_, *cur_row))) {
|
||||
LOG_WARN("fail to check null value in row", KR(ret), KPC(cur_row));
|
||||
} else if (OB_FAIL(prepare_slice_store_if_need(schema_item.rowkey_column_num_, schema_item.is_column_store_, dir_id, parallelism, tablet_handle, start_scn))) {
|
||||
} else if (OB_FAIL(prepare_slice_store_if_need(schema_item.rowkey_column_num_, schema_item.is_column_store_, dir_id, parallelism, storage_schema, start_scn))) {
|
||||
LOG_WARN("prepare macro block writer failed", K(ret));
|
||||
} else if (OB_FAIL(slice_store_->append_row(*cur_row))) {
|
||||
if (is_full_direct_load_task && OB_ERR_PRIMARY_KEY_DUPLICATE == ret && schema_item.is_unique_index_) {
|
||||
|
||||
Reference in New Issue
Block a user