From d04aec2793a4fcc9d619aa1a0d92decd635d1e7b Mon Sep 17 00:00:00 2001 From: coolfishchen Date: Tue, 18 Jun 2024 02:06:44 +0000 Subject: [PATCH] Direct load open compress --- .../ob_table_load_control_rpc_executor.cpp | 1 + .../ob_table_load_control_rpc_struct.cpp | 12 ++- .../ob_table_load_control_rpc_struct.h | 4 +- .../table_load/ob_table_load_client_task.cpp | 22 +++++ .../table_load/ob_table_load_client_task.h | 4 + .../table_load/ob_table_load_coordinator.cpp | 1 + .../ob_table_load_parallel_merge_ctx.cpp | 1 - .../table_load/ob_table_load_schema.cpp | 14 +++ .../table_load/ob_table_load_schema.h | 2 + .../table_load/ob_table_load_store_ctx.cpp | 4 +- .../table_load/ob_table_load_struct.h | 11 ++- .../ob_table_load_table_compactor.cpp | 4 +- .../engine/cmd/ob_load_data_direct_impl.cpp | 12 ++- .../engine/cmd/ob_table_direct_insert_ctx.cpp | 27 ++++++ .../engine/cmd/ob_table_direct_insert_ctx.h | 3 + .../ob_direct_load_data_block_decoder.h | 25 +++--- .../ob_direct_load_data_block_encoder.h | 88 ++++++++++--------- .../ob_direct_load_data_block_reader.h | 67 +++++++------- .../ob_direct_load_data_block_writer.h | 67 +++++++------- .../direct_load/ob_direct_load_merge_ctx.cpp | 13 +-- ...d_multiple_heap_table_index_block_reader.h | 5 +- ...d_multiple_heap_table_index_block_writer.h | 3 +- .../ob_direct_load_multiple_sstable.cpp | 3 + .../ob_direct_load_multiple_sstable.h | 5 +- ...b_direct_load_multiple_sstable_builder.cpp | 1 + .../ob_direct_load_multiple_sstable_builder.h | 3 +- ...direct_load_multiple_sstable_compactor.cpp | 2 + ...b_direct_load_multiple_sstable_compactor.h | 3 +- ...b_direct_load_sstable_index_block_reader.h | 6 +- ...b_direct_load_sstable_index_block_writer.h | 3 +- .../ob_direct_load_table_store.cpp | 12 +-- 31 files changed, 276 insertions(+), 152 deletions(-) diff --git a/src/observer/table_load/control/ob_table_load_control_rpc_executor.cpp b/src/observer/table_load/control/ob_table_load_control_rpc_executor.cpp index b1d9a64186..46030c2d1b 100644 --- a/src/observer/table_load/control/ob_table_load_control_rpc_executor.cpp +++ b/src/observer/table_load/control/ob_table_load_control_rpc_executor.cpp @@ -67,6 +67,7 @@ int ObDirectLoadControlPreBeginExecutor::process() param.exe_mode_ = arg_.exe_mode_; param.method_ = arg_.method_; param.insert_mode_ = arg_.insert_mode_; + param.compressor_type_ = arg_.compressor_type_; if (OB_FAIL(create_table_ctx(param, arg_.ddl_param_, table_ctx))) { LOG_WARN("fail to create table ctx", KR(ret)); } diff --git a/src/observer/table_load/control/ob_table_load_control_rpc_struct.cpp b/src/observer/table_load/control/ob_table_load_control_rpc_struct.cpp index f91a7871f9..a5ee3a75ea 100644 --- a/src/observer/table_load/control/ob_table_load_control_rpc_struct.cpp +++ b/src/observer/table_load/control/ob_table_load_control_rpc_struct.cpp @@ -66,7 +66,8 @@ ObDirectLoadControlPreBeginArg::ObDirectLoadControlPreBeginArg() write_session_count_(0), exe_mode_(ObTableLoadExeMode::MAX_TYPE), method_(ObDirectLoadMethod::INVALID_METHOD), - insert_mode_(ObDirectLoadInsertMode::INVALID_INSERT_MODE) + insert_mode_(ObDirectLoadInsertMode::INVALID_INSERT_MODE), + compressor_type_(ObCompressorType::INVALID_COMPRESSOR) { free_session_ctx_.sessid_ = ObSQLSessionInfo::INVALID_SESSID; } @@ -107,7 +108,8 @@ OB_DEF_SERIALIZE(ObDirectLoadControlPreBeginArg) write_session_count_, exe_mode_, method_, - insert_mode_); + insert_mode_, + compressor_type_); return ret; } @@ -136,7 +138,8 @@ OB_DEF_DESERIALIZE(ObDirectLoadControlPreBeginArg) write_session_count_, exe_mode_, method_, - insert_mode_); + insert_mode_, + compressor_type_); return ret; } @@ -167,7 +170,8 @@ OB_DEF_SERIALIZE_SIZE(ObDirectLoadControlPreBeginArg) write_session_count_, exe_mode_, method_, - insert_mode_); + insert_mode_, + compressor_type_); return len; } diff --git a/src/observer/table_load/control/ob_table_load_control_rpc_struct.h b/src/observer/table_load/control/ob_table_load_control_rpc_struct.h index e41ad50640..3b481c248e 100644 --- a/src/observer/table_load/control/ob_table_load_control_rpc_struct.h +++ b/src/observer/table_load/control/ob_table_load_control_rpc_struct.h @@ -169,7 +169,8 @@ public: K_(write_session_count), K_(exe_mode), "method", storage::ObDirectLoadMethod::get_type_string(method_), - "insert_mode", storage::ObDirectLoadInsertMode::get_type_string(insert_mode_)); + "insert_mode", storage::ObDirectLoadInsertMode::get_type_string(insert_mode_), + K_(compressor_type)); public: uint64_t table_id_; @@ -189,6 +190,7 @@ public: ObTableLoadExeMode exe_mode_; storage::ObDirectLoadMethod::Type method_; storage::ObDirectLoadInsertMode::Type insert_mode_; + ObCompressorType compressor_type_; }; class ObDirectLoadControlConfirmBeginArg final diff --git a/src/observer/table_load/ob_table_load_client_task.cpp b/src/observer/table_load/ob_table_load_client_task.cpp index 899865df07..4fbb5f2ce8 100644 --- a/src/observer/table_load/ob_table_load_client_task.cpp +++ b/src/observer/table_load/ob_table_load_client_task.cpp @@ -578,6 +578,24 @@ void ObTableLoadClientTask::get_status(ObTableLoadClientStatus &client_status, error_code = error_code_; } +int ObTableLoadClientTask::get_compressor_type(const uint64_t tenant_id, + const uint64_t table_id, + const int64_t parallel, + ObCompressorType &compressor_type) +{ + int ret = OB_SUCCESS; + ObCompressorType table_compressor_type = ObCompressorType::NONE_COMPRESSOR; + if (OB_FAIL( + ObTableLoadSchema::get_table_compressor_type(tenant_id, table_id, table_compressor_type))) { + LOG_WARN("fail to get table compressor type", KR(ret)); + } else if (OB_FAIL(ObDDLUtil::get_temp_store_compress_type(table_compressor_type, parallel, + compressor_type))) { + LOG_WARN("fail to get tmp store compressor type", KR(ret)); + } + return ret; +} + + int ObTableLoadClientTask::init_instance() { int ret = OB_SUCCESS; @@ -592,6 +610,7 @@ int ObTableLoadClientTask::init_instance() omt::ObTenant *tenant = nullptr; ObSchemaGetterGuard schema_guard; ObArray column_ids; + ObCompressorType compressor_type = INVALID_COMPRESSOR; if (OB_FAIL(GCTX.omt_->get_tenant(param_.get_tenant_id(), tenant))) { LOG_WARN("fail to get tenant handle", KR(ret), K(param_.get_tenant_id())); } else if (OB_FAIL(ObTableLoadSchema::get_schema_guard(tenant_id, schema_guard))) { @@ -606,6 +625,8 @@ int ObTableLoadClientTask::init_instance() table_id, column_ids))) { LOG_WARN("fail to get user column ids", KR(ret)); + } else if (OB_FAIL(get_compressor_type(param_.get_tenant_id(), param_.get_table_id(), session_count_, compressor_type))) { + LOG_WARN("fail to get compressor type", KR(ret)); } else { ObTableLoadParam load_param; load_param.tenant_id_ = tenant_id; @@ -621,6 +642,7 @@ int ObTableLoadClientTask::init_instance() load_param.dup_action_ = param_.get_dup_action(); load_param.method_ = method; load_param.insert_mode_ = insert_mode; + load_param.compressor_type_ = compressor_type; const ObTableLoadTableCtx *tmp_ctx = nullptr; if (OB_FAIL(instance_.init(load_param, column_ids, exec_ctx_))) { LOG_WARN("fail to init instance", KR(ret)); diff --git a/src/observer/table_load/ob_table_load_client_task.h b/src/observer/table_load/ob_table_load_client_task.h index 8a0110b55d..8bc6f919a8 100644 --- a/src/observer/table_load/ob_table_load_client_task.h +++ b/src/observer/table_load/ob_table_load_client_task.h @@ -124,6 +124,10 @@ private: int init_instance(); int commit_instance(); void destroy_instance(); + int get_compressor_type(const uint64_t tenant_id, + const uint64_t table_id, + const int64_t parallel, + ObCompressorType &compressor_type); private: class ClientTaskExectueProcessor; diff --git a/src/observer/table_load/ob_table_load_coordinator.cpp b/src/observer/table_load/ob_table_load_coordinator.cpp index 054ec67e33..e88b4f21d6 100644 --- a/src/observer/table_load/ob_table_load_coordinator.cpp +++ b/src/observer/table_load/ob_table_load_coordinator.cpp @@ -469,6 +469,7 @@ int ObTableLoadCoordinator::pre_begin_peers(ObDirectLoadResourceApplyArg &apply_ arg.exe_mode_ = ctx_->param_.exe_mode_; arg.method_ = param_.method_; arg.insert_mode_ = param_.insert_mode_; + arg.compressor_type_ = param_.compressor_type_; for (int64_t i = 0; OB_SUCC(ret) && i < all_leader_info_array.count(); ++i) { const ObTableLoadPartitionLocation::LeaderInfo &leader_info = all_leader_info_array.at(i); const ObTableLoadPartitionLocation::LeaderInfo &target_leader_info = diff --git a/src/observer/table_load/ob_table_load_parallel_merge_ctx.cpp b/src/observer/table_load/ob_table_load_parallel_merge_ctx.cpp index 43383c4f16..90e3b68efc 100644 --- a/src/observer/table_load/ob_table_load_parallel_merge_ctx.cpp +++ b/src/observer/table_load/ob_table_load_parallel_merge_ctx.cpp @@ -697,7 +697,6 @@ int ObTableLoadParallelMergeCtx::start(ObTableLoadParallelMergeCb *cb) LOG_WARN("invalid args", KR(ret), KP(cb)); } else { cb_ = cb; - abort_unless(1 >= tablet_ctx_map_.size()); // mutiple tablet sstable only for (TabletCtxIterator iter = tablet_ctx_map_.begin(); OB_SUCC(ret) && iter != tablet_ctx_map_.end(); ++iter) { ObTableLoadParallelMergeTabletCtx *tablet_ctx = iter->second; diff --git a/src/observer/table_load/ob_table_load_schema.cpp b/src/observer/table_load/ob_table_load_schema.cpp index 6a276b141f..5c4b7f50f6 100644 --- a/src/observer/table_load/ob_table_load_schema.cpp +++ b/src/observer/table_load/ob_table_load_schema.cpp @@ -362,6 +362,20 @@ int ObTableLoadSchema::check_has_unused_column(const ObTableSchema *table_schema return ret; } +int ObTableLoadSchema::get_table_compressor_type(uint64_t tenant_id, uint64_t table_id, + ObCompressorType &compressor_type) +{ + int ret = OB_SUCCESS; + ObSchemaGetterGuard schema_guard; + const share::schema::ObTableSchema *table_schema = nullptr; + if (OB_FAIL(get_table_schema(tenant_id, table_id, schema_guard, table_schema))) { + LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id)); + } else { + compressor_type = table_schema->get_compressor_type(); + } + return ret; +} + ObTableLoadSchema::ObTableLoadSchema() : allocator_("TLD_Schema"), is_partitioned_table_(false), diff --git a/src/observer/table_load/ob_table_load_schema.h b/src/observer/table_load/ob_table_load_schema.h index 6bdd1b56cb..1c75a1e1d9 100644 --- a/src/observer/table_load/ob_table_load_schema.h +++ b/src/observer/table_load/ob_table_load_schema.h @@ -64,6 +64,8 @@ public: uint64_t &lob_meta_table_id); static int check_has_invisible_column(const share::schema::ObTableSchema *table_schema, bool &bret); static int check_has_unused_column(const share::schema::ObTableSchema *table_schema, bool &bret); + static int get_table_compressor_type(uint64_t tenant_id, uint64_t table_id, + ObCompressorType &compressor_type); public: ObTableLoadSchema(); ~ObTableLoadSchema(); diff --git a/src/observer/table_load/ob_table_load_store_ctx.cpp b/src/observer/table_load/ob_table_load_store_ctx.cpp index ded6967547..043c6666f3 100644 --- a/src/observer/table_load/ob_table_load_store_ctx.cpp +++ b/src/observer/table_load/ob_table_load_store_ctx.cpp @@ -109,7 +109,7 @@ int ObTableLoadStoreCtx::init( table_data_desc_.sstable_data_block_size_ = ObDirectLoadSSTableDataBlock::DEFAULT_DATA_BLOCK_SIZE; table_data_desc_.extra_buf_size_ = ObDirectLoadTableDataDesc::DEFAULT_EXTRA_BUF_SIZE; - table_data_desc_.compressor_type_ = ObCompressorType::NONE_COMPRESSOR; + table_data_desc_.compressor_type_ = ctx_->param_.compressor_type_; table_data_desc_.is_heap_table_ = ctx_->schema_.is_heap_table_; table_data_desc_.session_count_ = ctx_->param_.session_count_; table_data_desc_.exe_mode_ = ctx_->param_.exe_mode_; @@ -587,7 +587,7 @@ int ObTableLoadStoreCtx::init_session_ctx_array() for (int64_t i = 0; OB_SUCC(ret) && i < ctx_->param_.write_session_count_; ++i) { SessionContext *session_ctx = session_ctx_array_ + i; session_ctx->autoinc_param_ = autoinc_param; - if (is_multiple_mode_) { + if (!is_fast_heap_table_) { session_ctx->extra_buf_size_ = table_data_desc_.extra_buf_size_; if (OB_ISNULL(session_ctx->extra_buf_ = static_cast(allocator_.alloc(session_ctx->extra_buf_size_)))) { diff --git a/src/observer/table_load/ob_table_load_struct.h b/src/observer/table_load/ob_table_load_struct.h index 629ee44762..57ec63edfa 100644 --- a/src/observer/table_load/ob_table_load_struct.h +++ b/src/observer/table_load/ob_table_load_struct.h @@ -149,7 +149,8 @@ public: write_session_count_(0), exe_mode_(ObTableLoadExeMode::MAX_TYPE), method_(storage::ObDirectLoadMethod::INVALID_METHOD), - insert_mode_(storage::ObDirectLoadInsertMode::INVALID_INSERT_MODE) + insert_mode_(storage::ObDirectLoadInsertMode::INVALID_INSERT_MODE), + compressor_type_(ObCompressorType::INVALID_COMPRESSOR) { } @@ -183,7 +184,8 @@ public: : true) && (storage::ObDirectLoadInsertMode::INC_REPLACE == insert_mode_ ? sql::ObLoadDupActionType::LOAD_REPLACE == dup_action_ - : true); + : true) && + ObCompressorType::INVALID_COMPRESSOR != compressor_type_; } TO_STRING_KV(K_(tenant_id), @@ -202,7 +204,9 @@ public: K_(write_session_count), K_(exe_mode), "method", storage::ObDirectLoadMethod::get_type_string(method_), - "insert_mode", storage::ObDirectLoadInsertMode::get_type_string(insert_mode_)); + "insert_mode", storage::ObDirectLoadInsertMode::get_type_string(insert_mode_), + K_(compressor_type)); + public: uint64_t tenant_id_; uint64_t table_id_; @@ -221,6 +225,7 @@ public: ObTableLoadExeMode exe_mode_; storage::ObDirectLoadMethod::Type method_; storage::ObDirectLoadInsertMode::Type insert_mode_; + ObCompressorType compressor_type_; }; struct ObTableLoadDDLParam diff --git a/src/observer/table_load/ob_table_load_table_compactor.cpp b/src/observer/table_load/ob_table_load_table_compactor.cpp index ce765fade0..62d208c9fa 100644 --- a/src/observer/table_load/ob_table_load_table_compactor.cpp +++ b/src/observer/table_load/ob_table_load_table_compactor.cpp @@ -21,6 +21,7 @@ #include "observer/table_load/ob_table_load_mem_compactor.h" #include "storage/direct_load/ob_direct_load_external_table.h" #include "observer/table_load/ob_table_load_multiple_heap_table_compactor.h" +#include "observer/table_load/ob_table_load_parallel_merge_table_compactor.h" namespace oceanbase { @@ -158,7 +159,8 @@ int ObTableLoadTableCompactCtx::new_compactor() compactor_ = OB_NEW(ObTableLoadMemCompactor, attr); } } else { - compactor_ = OB_NEW(ObTableLoadGeneralTableCompactor, attr); + // 有主键表不排序 + compactor_ = OB_NEW(ObTableLoadParallelMergeTableCompactor, attr); } if (OB_ISNULL(compactor_)) { ret = OB_ALLOCATE_MEMORY_FAILED; diff --git a/src/sql/engine/cmd/ob_load_data_direct_impl.cpp b/src/sql/engine/cmd/ob_load_data_direct_impl.cpp index c41938bffb..22e9880335 100644 --- a/src/sql/engine/cmd/ob_load_data_direct_impl.cpp +++ b/src/sql/engine/cmd/ob_load_data_direct_impl.cpp @@ -2000,6 +2000,7 @@ int ObLoadDataDirectImpl::init_execute_context() int ret = OB_SUCCESS; execute_ctx_.exec_ctx_.exec_ctx_ = ctx_; execute_ctx_.allocator_ = &ctx_->get_allocator(); + ObCompressorType table_compressor_type = ObCompressorType::NONE_COMPRESSOR; ObTableLoadParam load_param; load_param.tenant_id_ = execute_param_.tenant_id_; load_param.table_id_ = execute_param_.table_id_; @@ -2014,9 +2015,14 @@ int ObLoadDataDirectImpl::init_execute_context() load_param.online_opt_stat_gather_ = execute_param_.online_opt_stat_gather_; load_param.method_ = execute_param_.method_; load_param.insert_mode_ = execute_param_.insert_mode_; - if (OB_FAIL(direct_loader_.init(load_param, - execute_param_.column_ids_, - &execute_ctx_.exec_ctx_))) { + if (OB_FAIL(ObTableLoadSchema::get_table_compressor_type( + execute_param_.tenant_id_, execute_param_.table_id_, table_compressor_type))) { + LOG_WARN("fail to get table compressor type", KR(ret)); + } else if (OB_FAIL(ObDDLUtil::get_temp_store_compress_type( + table_compressor_type, execute_param_.parallel_, load_param.compressor_type_))) { + LOG_WARN("fail to get tmp store compressor type", KR(ret)); + } else if (OB_FAIL(direct_loader_.init(load_param, execute_param_.column_ids_, + &execute_ctx_.exec_ctx_))) { LOG_WARN("fail to init direct loader", KR(ret)); } else if (OB_FAIL(init_logger())) { LOG_WARN("fail to init logger", KR(ret)); diff --git a/src/sql/engine/cmd/ob_table_direct_insert_ctx.cpp b/src/sql/engine/cmd/ob_table_direct_insert_ctx.cpp index 65c3e11627..8c36979c8a 100644 --- a/src/sql/engine/cmd/ob_table_direct_insert_ctx.cpp +++ b/src/sql/engine/cmd/ob_table_direct_insert_ctx.cpp @@ -77,6 +77,7 @@ int ObTableDirectInsertCtx::init( load_exec_ctx_->exec_ctx_ = exec_ctx; ObArray column_ids; omt::ObTenant *tenant = nullptr; + ObCompressorType compressor_type = ObCompressorType::NONE_COMPRESSOR; ObDirectLoadMethod::Type method = (is_incremental ? ObDirectLoadMethod::INCREMENTAL : ObDirectLoadMethod::FULL); ObDirectLoadInsertMode::Type insert_mode = ObDirectLoadInsertMode::INVALID_INSERT_MODE; if (session_info->get_ddl_info().is_mview_complete_refresh()) { @@ -93,6 +94,8 @@ int ObTableDirectInsertCtx::init( method, insert_mode))) { LOG_WARN("fail to check support direct load", KR(ret)); + } else if (OB_FAIL(get_compressor_type(MTL_ID(), table_id, parallel, compressor_type))) { + LOG_WARN("fail to get compressor type", KR(ret)); } else if (OB_FAIL(ObTableLoadSchema::get_column_ids(*schema_guard, tenant_id, table_id, @@ -116,6 +119,7 @@ int ObTableDirectInsertCtx::init( param.method_ = method; param.insert_mode_ = insert_mode; + param.compressor_type_ = compressor_type; if (OB_FAIL(table_load_instance_->init(param, column_ids, load_exec_ctx_))) { LOG_WARN("failed to init direct loader", KR(ret)); } else { @@ -171,5 +175,28 @@ void ObTableDirectInsertCtx::destroy() is_online_gather_statistics_ = false; } +int ObTableDirectInsertCtx::get_compressor_type(const uint64_t tenant_id, + const uint64_t table_id, + const int64_t parallel, + ObCompressorType &compressor_type) +{ + int ret = OB_SUCCESS; + ObSchemaGetterGuard schema_guard; + const ObTableSchema *table_schema = nullptr; + if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_schema_guard(tenant_id, + schema_guard))) { + LOG_WARN("fail to get tenant schema guard", KR(ret), K(tenant_id)); + } else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, table_id, table_schema))) { + LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id)); + } else if (OB_ISNULL(table_schema)) { + ret = OB_TABLE_NOT_EXIST; + LOG_WARN("table not exist", KR(ret), K(tenant_id), K(table_id)); + } else if (OB_FAIL(ObDDLUtil::get_temp_store_compress_type(table_schema->get_compressor_type(), + parallel, compressor_type))) { + LOG_WARN("fail to get tmp store compressor type", KR(ret)); + } + return ret; +} + } // namespace sql } // namespace oceanbase diff --git a/src/sql/engine/cmd/ob_table_direct_insert_ctx.h b/src/sql/engine/cmd/ob_table_direct_insert_ctx.h index a3d2055c49..2d1a81c901 100644 --- a/src/sql/engine/cmd/ob_table_direct_insert_ctx.h +++ b/src/sql/engine/cmd/ob_table_direct_insert_ctx.h @@ -66,6 +66,9 @@ public: return ddl_task_id_; } +private: + int get_compressor_type(const uint64_t tenant_id, const uint64_t table_id, const int64_t parallel, + ObCompressorType &compressor_type); private: observer::ObTableLoadSqlExecCtx *load_exec_ctx_; observer::ObTableLoadInstance *table_load_instance_; diff --git a/src/storage/direct_load/ob_direct_load_data_block_decoder.h b/src/storage/direct_load/ob_direct_load_data_block_decoder.h index 432e014cac..fc97a346a8 100644 --- a/src/storage/direct_load/ob_direct_load_data_block_decoder.h +++ b/src/storage/direct_load/ob_direct_load_data_block_decoder.h @@ -40,13 +40,16 @@ public: template int read_item(int64_t pos, T &item); OB_INLINE const Header &get_header() const { return header_; } + OB_INLINE int64_t get_header_size() const { return header_size_; } OB_INLINE int64_t get_end_pos() const { return buf_size_; } - TO_STRING_KV(K_(header), K_(compressor_type), KP_(compressor), KP_(buf), K_(buf_size), K_(pos), - KP_(decompress_buf), KP_(decompress_buf_size)); + TO_STRING_KV(K_(header), K_(header_size), K_(compressor_type), KP_(compressor), + K_(data_block_size), KP_(buf), K_(buf_size), K_(pos), KP_(decompress_buf), + KP_(decompress_buf_size)); protected: int realloc_decompress_buf(const int64_t size); protected: Header header_; + int64_t header_size_; common::ObCompressorType compressor_type_; common::ObCompressor *compressor_; int64_t data_block_size_; @@ -61,7 +64,8 @@ protected: template ObDirectLoadDataBlockDecoder
::ObDirectLoadDataBlockDecoder() - : compressor_type_(ObCompressorType::INVALID_COMPRESSOR), + : header_size_(0), + compressor_type_(ObCompressorType::INVALID_COMPRESSOR), compressor_(nullptr), data_block_size_(0), buf_(nullptr), @@ -92,6 +96,7 @@ template void ObDirectLoadDataBlockDecoder
::reset() { header_.reset(); + header_size_ = 0; compressor_type_ = common::ObCompressorType::INVALID_COMPRESSOR; compressor_ = nullptr; data_block_size_ = 0; @@ -121,11 +126,12 @@ int ObDirectLoadDataBlockDecoder
::init(int64_t data_block_size, } else { if (common::ObCompressorType::NONE_COMPRESSOR != compressor_type) { if (OB_FAIL(common::ObCompressorPool::get_instance().get_compressor(compressor_type, - compressor_))) { + compressor_))) { STORAGE_LOG(WARN, "fail to get compressor, ", KR(ret), K(compressor_type)); } } if (OB_SUCC(ret)) { + header_size_ = header_.get_serialize_size(); compressor_type_ = compressor_type; data_block_size_ = data_block_size; is_inited_ = true; @@ -165,9 +171,9 @@ int ObDirectLoadDataBlockDecoder
::prepare_data_block(char *buf, int64_t } else if (OB_UNLIKELY(nullptr == buf || buf_size <= 0)) { ret = common::OB_INVALID_ARGUMENT; STORAGE_LOG(WARN, "invalid args", KR(ret), KP(buf), K(buf_size)); - } else if (buf_size <= header_.get_serialize_size()) { + } else if (buf_size <= header_size_) { ret = common::OB_ERR_UNEXPECTED; - STORAGE_LOG(WARN, "unexpected buf size", KR(ret), K(buf_size)); + STORAGE_LOG(WARN, "unexpected buf size", KR(ret), K(buf_size), K(header_size_)); } else { pos_ = 0; // deserialize header @@ -203,7 +209,7 @@ int ObDirectLoadDataBlockDecoder
::prepare_data_block(char *buf, int64_t } } if (OB_FAIL(ret)) { - //pass + // pass } else if (OB_UNLIKELY(common::ObCompressorType::NONE_COMPRESSOR == compressor_type_)) { ret = common::OB_ERR_UNEXPECTED; STORAGE_LOG(WARN, "unexpected compressor type", KR(ret)); @@ -227,10 +233,9 @@ template int ObDirectLoadDataBlockDecoder
::set_pos(int64_t pos) { int ret = common::OB_SUCCESS; - const int64_t header_size = header_.get_serialize_size(); - if (OB_UNLIKELY(pos < header_size || pos > buf_size_)) { + if (OB_UNLIKELY(pos < header_size_ || pos > buf_size_)) { ret = common::OB_INVALID_ARGUMENT; - STORAGE_LOG(WARN, "invalid args", KR(ret), K(pos), K(header_size), K(buf_size_)); + STORAGE_LOG(WARN, "invalid args", KR(ret), K(pos), K(header_size_), K(buf_size_)); } else { pos_ = pos; } diff --git a/src/storage/direct_load/ob_direct_load_data_block_encoder.h b/src/storage/direct_load/ob_direct_load_data_block_encoder.h index 752c8dea9b..ec00f6126f 100644 --- a/src/storage/direct_load/ob_direct_load_data_block_encoder.h +++ b/src/storage/direct_load/ob_direct_load_data_block_encoder.h @@ -24,7 +24,7 @@ namespace oceanbase namespace storage { -template +template class ObDirectLoadDataBlockEncoder { static const int64_t APPLY_COMPRESSION_THRESHOLD = 90; // compression ratio to apply compression @@ -40,8 +40,9 @@ public: int64_t get_pos() const { return pos_; } Header &get_header() { return header_; } int build_data_block(char *&buf, int64_t &buf_size); - TO_STRING_KV(K_(header), K_(header_size), K_(compressor_type), KP_(compressor), KP_(buf), - K_(buf_size), K_(pos), KP_(compress_buf), K_(compress_buf_size)); + TO_STRING_KV(K_(header), K_(header_size), K_(compressor_type), KP_(compressor), + K_(data_block_size), KP_(buf), K_(buf_size), K_(pos), KP_(compress_buf), + K_(compress_buf_size)); protected: int realloc_bufs(const int64_t size); protected: @@ -59,8 +60,8 @@ protected: DISALLOW_COPY_AND_ASSIGN(ObDirectLoadDataBlockEncoder); }; -template -ObDirectLoadDataBlockEncoder
::ObDirectLoadDataBlockEncoder() +template +ObDirectLoadDataBlockEncoder::ObDirectLoadDataBlockEncoder() : header_size_(0), compressor_type_(common::ObCompressorType::INVALID_COMPRESSOR), compressor_(nullptr), @@ -74,21 +75,21 @@ ObDirectLoadDataBlockEncoder
::ObDirectLoadDataBlockEncoder() { } -template -ObDirectLoadDataBlockEncoder
::~ObDirectLoadDataBlockEncoder() +template +ObDirectLoadDataBlockEncoder::~ObDirectLoadDataBlockEncoder() { reset(); } -template -void ObDirectLoadDataBlockEncoder
::reuse() +template +void ObDirectLoadDataBlockEncoder::reuse() { header_.reset(); pos_ = header_size_; } -template -void ObDirectLoadDataBlockEncoder
::reset() +template +void ObDirectLoadDataBlockEncoder::reset() { header_.reset(); header_size_ = 0; @@ -109,22 +110,21 @@ void ObDirectLoadDataBlockEncoder
::reset() is_inited_ = false; } - -template -int ObDirectLoadDataBlockEncoder
::realloc_bufs(const int64_t size) +template +int ObDirectLoadDataBlockEncoder::realloc_bufs(const int64_t size) { int ret = OB_SUCCESS; - int64_t align_size = ALIGN_UP(size, DIO_ALIGN_SIZE); - if (buf_size_ != align_size) { - char *tmp_buf = (char *)ob_malloc(align_size, ObMemAttr(MTL_ID(), "TLD_DBEncoder")); + const int64_t buf_size = align ? ALIGN_UP(size, DIO_ALIGN_SIZE) : size; + if (buf_size_ != buf_size) { + char *tmp_buf = (char *)ob_malloc(buf_size, ObMemAttr(MTL_ID(), "TLD_DBEncoder")); if (tmp_buf == nullptr) { ret = OB_ALLOCATE_MEMORY_FAILED; - STORAGE_LOG(WARN, "fail to alloc buf", K(align_size), KR(ret)); + STORAGE_LOG(WARN, "fail to alloc buf", K(buf_size), KR(ret)); } if (OB_SUCC(ret) && buf_ != nullptr && pos_ > 0) { - if (pos_ > align_size) { + if (pos_ > buf_size) { ret = OB_ERR_UNEXPECTED; - STORAGE_LOG(WARN, "pos is bigger than buf align_size", K(pos_), K(align_size), KR(ret)); + STORAGE_LOG(WARN, "pos is bigger than buf buf size", K(pos_), K(buf_size), KR(ret)); } else { MEMCPY(tmp_buf, buf_, pos_); } @@ -132,24 +132,23 @@ int ObDirectLoadDataBlockEncoder
::realloc_bufs(const int64_t size) if (OB_SUCC(ret)) { if (buf_ != nullptr) { ob_free(buf_); + buf_ = nullptr; } buf_ = tmp_buf; - buf_size_ = align_size; + buf_size_ = buf_size; + // pos_不变 } } - if (compressor_ != nullptr) { int64_t max_overflow_size = 0; int64_t compress_buf_size = 0; - if (OB_SUCC(ret)) { - if (OB_FAIL(compressor_->get_max_overflow_size(size, max_overflow_size))) { - STORAGE_LOG(WARN, "fail to get max_overflow_size", KR(ret), K(size), K(max_overflow_size)); - } else { - compress_buf_size = ALIGN_UP(size + max_overflow_size, DIO_ALIGN_SIZE); - } + if (OB_FAIL(compressor_->get_max_overflow_size(size, max_overflow_size))) { + STORAGE_LOG(WARN, "fail to get max_overflow_size", KR(ret), K(size), K(max_overflow_size)); + } else { + const int64_t compress_size = size + max_overflow_size; + compress_buf_size = align ? ALIGN_UP(compress_size, DIO_ALIGN_SIZE) : compress_size; } - if (OB_SUCC(ret) && compress_buf_size_ != compress_buf_size) { if (compress_buf_ != nullptr) { ob_free(compress_buf_); @@ -167,9 +166,9 @@ int ObDirectLoadDataBlockEncoder
::realloc_bufs(const int64_t size) return ret; } -template -int ObDirectLoadDataBlockEncoder
::init(int64_t data_block_size, - common::ObCompressorType compressor_type) +template +int ObDirectLoadDataBlockEncoder::init(int64_t data_block_size, + common::ObCompressorType compressor_type) { int ret = common::OB_SUCCESS; if (IS_INIT) { @@ -181,33 +180,37 @@ int ObDirectLoadDataBlockEncoder
::init(int64_t data_block_size, STORAGE_LOG(WARN, "invalid args", KR(ret), K(data_block_size), K(compressor_type)); } else { if (common::ObCompressorType::NONE_COMPRESSOR != compressor_type && - OB_FAIL(common::ObCompressorPool::get_instance().get_compressor(compressor_type, - compressor_))) { - STORAGE_LOG(WARN, "fail to get compressor, ", KR(ret), K(compressor_type)); + OB_FAIL( + common::ObCompressorPool::get_instance().get_compressor(compressor_type, compressor_))) { + STORAGE_LOG(WARN, "fail to get compressor", KR(ret), K(compressor_type)); + } else if (OB_FAIL(realloc_bufs(data_block_size))) { + STORAGE_LOG(WARN, "fail to alloc bufs", KR(ret), K(data_block_size)); } else { header_size_ = header_.get_serialize_size(); compressor_type_ = compressor_type; data_block_size_ = data_block_size; + pos_ = header_size_; is_inited_ = true; } } return ret; } -template +template template -int ObDirectLoadDataBlockEncoder
::write_item(const T &item) +int ObDirectLoadDataBlockEncoder::write_item(const T &item) { int ret = common::OB_SUCCESS; const int64_t item_size = item.get_serialize_size(); - // 没有分配内存,和内存太大,都需要重新分配内存 - if (item_size + pos_ < data_block_size_ || buf_size_ < data_block_size_) { + // 内存太大恢复到默认数据块大小 + if (item_size + pos_ < data_block_size_) { if (OB_FAIL(realloc_bufs(data_block_size_))) { STORAGE_LOG(WARN, "fail to realloc bufs", KR(ret)); } } + // 单行数据超过默认数据块大小, 且buf未扩容, 重新分配buf if (OB_SUCC(ret)) { if (item_size > data_block_size_ - header_size_ && item_size > buf_size_ - header_size_) { if (OB_FAIL(realloc_bufs(item_size + header_size_))) { @@ -226,8 +229,8 @@ int ObDirectLoadDataBlockEncoder
::write_item(const T &item) return ret; } -template -int ObDirectLoadDataBlockEncoder
::build_data_block(char *&buf, int64_t &buf_size) +template +int ObDirectLoadDataBlockEncoder::build_data_block(char *&buf, int64_t &buf_size) { int ret = common::OB_SUCCESS; if (IS_NOT_INIT) { @@ -258,7 +261,8 @@ int ObDirectLoadDataBlockEncoder
::build_data_block(char *&buf, int64_t & int64_t pos = 0; header_.data_size_ = pos_; header_.occupy_size_ = buf_size; - header_.checksum_ = ob_crc64_sse42(0, buf + header_size_, header_.occupy_size_ - header_size_); + header_.checksum_ = + ob_crc64_sse42(0, buf + header_size_, header_.occupy_size_ - header_size_); if (OB_FAIL(header_.serialize(buf, header_size_, pos))) { STORAGE_LOG(WARN, "fail to serialize header", KR(ret)); } else if (header_size_ != pos) { diff --git a/src/storage/direct_load/ob_direct_load_data_block_reader.h b/src/storage/direct_load/ob_direct_load_data_block_reader.h index 086fb0db0f..6b739db71a 100644 --- a/src/storage/direct_load/ob_direct_load_data_block_reader.h +++ b/src/storage/direct_load/ob_direct_load_data_block_reader.h @@ -22,7 +22,7 @@ namespace oceanbase namespace storage { -template +template class ObDirectLoadDataBlockReader : public ObDirectLoadExternalIterator { public: @@ -58,8 +58,8 @@ protected: DISALLOW_COPY_AND_ASSIGN(ObDirectLoadDataBlockReader); }; -template -ObDirectLoadDataBlockReader::ObDirectLoadDataBlockReader() +template +ObDirectLoadDataBlockReader::ObDirectLoadDataBlockReader() : data_block_size_(0), buf_(nullptr), buf_capacity_(0), @@ -74,14 +74,14 @@ ObDirectLoadDataBlockReader::ObDirectLoadDataBlockReader() { } -template -ObDirectLoadDataBlockReader::~ObDirectLoadDataBlockReader() +template +ObDirectLoadDataBlockReader::~ObDirectLoadDataBlockReader() { reset(); } -template -void ObDirectLoadDataBlockReader::reuse() +template +void ObDirectLoadDataBlockReader::reuse() { buf_size_ = 0; buf_pos_ = 0; @@ -94,8 +94,8 @@ void ObDirectLoadDataBlockReader::reuse() is_opened_ = false; } -template -void ObDirectLoadDataBlockReader::reset() +template +void ObDirectLoadDataBlockReader::reset() { data_block_size_ = 0; if (buf_ != nullptr) { @@ -116,9 +116,9 @@ void ObDirectLoadDataBlockReader::reset() is_inited_ = false; } -template -int ObDirectLoadDataBlockReader::init(int64_t data_block_size, - common::ObCompressorType compressor_type) +template +int ObDirectLoadDataBlockReader::init(int64_t data_block_size, + common::ObCompressorType compressor_type) { int ret = common::OB_SUCCESS; if (IS_INIT) { @@ -140,9 +140,9 @@ int ObDirectLoadDataBlockReader::init(int64_t data_block_size, return ret; } -template -int ObDirectLoadDataBlockReader::open(const ObDirectLoadTmpFileHandle &file_handle, - int64_t offset, int64_t size) +template +int ObDirectLoadDataBlockReader::open( + const ObDirectLoadTmpFileHandle &file_handle, int64_t offset, int64_t size) { int ret = common::OB_SUCCESS; if (IS_NOT_INIT) { @@ -169,8 +169,8 @@ int ObDirectLoadDataBlockReader::open(const ObDirectLoadTmpFileHandle return ret; } -template -int ObDirectLoadDataBlockReader::read_next_buffer() +template +int ObDirectLoadDataBlockReader::read_next_buffer() { int ret = common::OB_SUCCESS; if (0 == read_size_) { @@ -196,16 +196,16 @@ int ObDirectLoadDataBlockReader::read_next_buffer() return ret; } -template -int ObDirectLoadDataBlockReader::realloc_buf(int64_t size) +template +int ObDirectLoadDataBlockReader::realloc_buf(int64_t size) { int ret = OB_SUCCESS; - int64_t align_size = ALIGN_UP(size, DIO_ALIGN_SIZE); - if (buf_capacity_ != align_size && buf_size_ - buf_pos_ <= align_size) { - char *tmp_buf = (char *)ob_malloc(align_size, ObMemAttr(MTL_ID(), "TLD_DBReader")); + const int64_t buf_size = align ? ALIGN_UP(size, DIO_ALIGN_SIZE) : size; + if (buf_capacity_ != buf_size && buf_size_ - buf_pos_ <= buf_size) { + char *tmp_buf = (char *)ob_malloc(buf_size, ObMemAttr(MTL_ID(), "TLD_DBReader")); if (tmp_buf == nullptr) { ret = OB_ALLOCATE_MEMORY_FAILED; - STORAGE_LOG(WARN, "fail to alloc mem", K(align_size), KR(ret)); + STORAGE_LOG(WARN, "fail to alloc mem", K(buf_size), KR(ret)); } else { if (buf_ != nullptr) { MEMCPY(tmp_buf, buf_ + buf_pos_, buf_size_ - buf_pos_); @@ -217,26 +217,26 @@ int ObDirectLoadDataBlockReader::realloc_buf(int64_t size) buf_ = tmp_buf; buf_size_ = buf_size_ - buf_pos_; buf_pos_ = 0; - buf_capacity_ = align_size; + buf_capacity_ = buf_size; } } return ret; } -template -int ObDirectLoadDataBlockReader::switch_next_block() +template +int ObDirectLoadDataBlockReader::switch_next_block() { int ret = common::OB_SUCCESS; int64_t data_size = 0; if (OB_FAIL(realloc_buf(data_block_size_))) { STORAGE_LOG(WARN, "fail to realloc buf", K(data_block_size_), KR(ret)); - } else if (buf_size_ - buf_pos_ < DIO_ALIGN_SIZE && OB_FAIL(read_next_buffer())) { + } else if (buf_size_ - buf_pos_ <= data_block_reader_.get_header_size() && + OB_FAIL(read_next_buffer())) { if (OB_UNLIKELY(common::OB_ITER_END != ret)) { STORAGE_LOG(WARN, "fail to read next buffer", KR(ret)); } } else if (OB_FAIL(data_block_reader_.prepare_data_block(buf_ + buf_pos_, buf_size_ - buf_pos_, data_size))) { - if (OB_UNLIKELY(common::OB_BUF_NOT_ENOUGH != ret)) { STORAGE_LOG(WARN, "fail to prepare data block", KR(ret), K(buf_pos_), K(buf_size_)); } else { @@ -248,17 +248,18 @@ int ObDirectLoadDataBlockReader::switch_next_block() } if (OB_FAIL(ret)) { - //pass + // pass } else if (OB_FAIL(read_next_buffer())) { STORAGE_LOG(WARN, "fail to read next buffer", KR(ret)); } else if (OB_FAIL(data_block_reader_.prepare_data_block(buf_ + buf_pos_, buf_size_ - buf_pos_, data_size))) { - STORAGE_LOG(WARN, "fail to prepare data block", KR(ret), K(buf_pos_), K(buf_size_), K(data_size)); + STORAGE_LOG(WARN, "fail to prepare data block", KR(ret), K(buf_pos_), K(buf_size_), + K(data_size)); } } } if (OB_SUCC(ret)) { - const int64_t data_block_size = ALIGN_UP(data_size, DIO_ALIGN_SIZE); + const int64_t data_block_size = align ? ALIGN_UP(data_size, DIO_ALIGN_SIZE) : data_size; buf_pos_ += data_block_size; ++block_count_; if (OB_FAIL(prepare_read_block())) { @@ -268,8 +269,8 @@ int ObDirectLoadDataBlockReader::switch_next_block() return ret; } -template -int ObDirectLoadDataBlockReader::get_next_item(const T *&item) +template +int ObDirectLoadDataBlockReader::get_next_item(const T *&item) { int ret = common::OB_SUCCESS; item = nullptr; diff --git a/src/storage/direct_load/ob_direct_load_data_block_writer.h b/src/storage/direct_load/ob_direct_load_data_block_writer.h index 61a128a6fc..d3da5ad01b 100644 --- a/src/storage/direct_load/ob_direct_load_data_block_writer.h +++ b/src/storage/direct_load/ob_direct_load_data_block_writer.h @@ -28,7 +28,8 @@ public: virtual int write(char *buf, int64_t buf_size, int64_t offset) = 0; }; -template +// align: 是否对齐写. 当前的索引文件必须对齐写, 数据文件可以不对齐写. +template class ObDirectLoadDataBlockWriter : public ObDirectLoadExternalWriter { public: @@ -52,7 +53,7 @@ protected: int64_t data_block_size_; char *extra_buf_; int64_t extra_buf_size_; - ObDirectLoadDataBlockEncoder
data_block_writer_; + ObDirectLoadDataBlockEncoder data_block_writer_; int64_t io_timeout_ms_; ObDirectLoadTmpFileIOHandle file_io_handle_; int64_t offset_; @@ -64,8 +65,8 @@ protected: DISALLOW_COPY_AND_ASSIGN(ObDirectLoadDataBlockWriter); }; -template -ObDirectLoadDataBlockWriter::ObDirectLoadDataBlockWriter() +template +ObDirectLoadDataBlockWriter::ObDirectLoadDataBlockWriter() : data_block_size_(0), extra_buf_(nullptr), extra_buf_size_(0), @@ -78,14 +79,14 @@ ObDirectLoadDataBlockWriter::ObDirectLoadDataBlockWriter() { } -template -ObDirectLoadDataBlockWriter::~ObDirectLoadDataBlockWriter() +template +ObDirectLoadDataBlockWriter::~ObDirectLoadDataBlockWriter() { reset(); } -template -void ObDirectLoadDataBlockWriter::reuse() +template +void ObDirectLoadDataBlockWriter::reuse() { data_block_writer_.reuse(); file_io_handle_.reset(); @@ -95,8 +96,8 @@ void ObDirectLoadDataBlockWriter::reuse() is_opened_ = false; } -template -void ObDirectLoadDataBlockWriter::reset() +template +void ObDirectLoadDataBlockWriter::reset() { data_block_size_ = 0; extra_buf_ = nullptr; @@ -112,11 +113,13 @@ void ObDirectLoadDataBlockWriter::reset() is_inited_ = false; } -template -int ObDirectLoadDataBlockWriter::init(int64_t data_block_size, - common::ObCompressorType compressor_type, - char *extra_buf, int64_t extra_buf_size, - ObIDirectLoadDataBlockFlushCallback *callback) +template +int ObDirectLoadDataBlockWriter::init( + int64_t data_block_size, + common::ObCompressorType compressor_type, + char *extra_buf, + int64_t extra_buf_size, + ObIDirectLoadDataBlockFlushCallback *callback) { int ret = common::OB_SUCCESS; if (IS_INIT) { @@ -141,8 +144,9 @@ int ObDirectLoadDataBlockWriter::init(int64_t data_block_size, return ret; } -template -int ObDirectLoadDataBlockWriter::open(const ObDirectLoadTmpFileHandle &file_handle) +template +int ObDirectLoadDataBlockWriter::open( + const ObDirectLoadTmpFileHandle &file_handle) { int ret = common::OB_SUCCESS; if (IS_NOT_INIT) { @@ -165,8 +169,8 @@ int ObDirectLoadDataBlockWriter::open(const ObDirectLoadTmpFileHandle return ret; } -template -int ObDirectLoadDataBlockWriter::write_item(const T &item) +template +int ObDirectLoadDataBlockWriter::write_item(const T &item) { int ret = common::OB_SUCCESS; if (IS_NOT_INIT) { @@ -195,8 +199,8 @@ int ObDirectLoadDataBlockWriter::write_item(const T &item) return ret; } -template -int ObDirectLoadDataBlockWriter::flush_buffer() +template +int ObDirectLoadDataBlockWriter::flush_buffer() { OB_TABLE_LOAD_STATISTICS_TIME_COST(INFO, external_flush_buffer_time_us); int ret = common::OB_SUCCESS; @@ -205,27 +209,27 @@ int ObDirectLoadDataBlockWriter::flush_buffer() } else { char *buf = nullptr; int64_t buf_size = 0; - int64_t align_buf_size = 0; + int64_t occupy_size = 0; if (OB_FAIL(data_block_writer_.build_data_block(buf, buf_size))) { STORAGE_LOG(WARN, "fail to build data block", KR(ret)); - } else if (FALSE_IT(align_buf_size = ALIGN_UP(buf_size, DIO_ALIGN_SIZE))) { - } else if (OB_FAIL(file_io_handle_.aio_write(buf, align_buf_size))) { - STORAGE_LOG(WARN, "fail to do aio write tmp file", KR(ret)); - } else if (nullptr != callback_ && OB_FAIL(callback_->write(buf, align_buf_size, offset_))) { + } else if (FALSE_IT(occupy_size = align ? ALIGN_UP(buf_size, DIO_ALIGN_SIZE) : buf_size)) { + } else if (OB_FAIL(file_io_handle_.write(buf, occupy_size))) { + STORAGE_LOG(WARN, "fail to do write tmp file", KR(ret)); + } else if (nullptr != callback_ && OB_FAIL(callback_->write(buf, occupy_size, offset_))) { STORAGE_LOG(WARN, "fail to callback write", KR(ret)); } else { - OB_TABLE_LOAD_STATISTICS_INC(external_write_bytes, align_buf_size); + OB_TABLE_LOAD_STATISTICS_INC(external_write_bytes, occupy_size); data_block_writer_.reuse(); - offset_ += align_buf_size; + offset_ += occupy_size; ++block_count_; - max_block_size_ = MAX(max_block_size_, align_buf_size); + max_block_size_ = MAX(max_block_size_, occupy_size); } } return ret; } -template -int ObDirectLoadDataBlockWriter::close() +template +int ObDirectLoadDataBlockWriter::close() { int ret = common::OB_SUCCESS; if (IS_NOT_INIT) { @@ -240,6 +244,7 @@ int ObDirectLoadDataBlockWriter::close() } else if (OB_FAIL(file_io_handle_.wait())) { STORAGE_LOG(WARN, "fail to wait io finish", KR(ret)); } else { + max_block_size_ = ALIGN_UP(max_block_size_, DIO_ALIGN_SIZE); // 这个值目前没什么用了, 这里是为了过参数检查 is_opened_ = false; } } diff --git a/src/storage/direct_load/ob_direct_load_merge_ctx.cpp b/src/storage/direct_load/ob_direct_load_merge_ctx.cpp index 40143bfeee..2aefcedc7a 100644 --- a/src/storage/direct_load/ob_direct_load_merge_ctx.cpp +++ b/src/storage/direct_load/ob_direct_load_merge_ctx.cpp @@ -286,16 +286,17 @@ int ObDirectLoadTabletMergeCtx::build_merge_task( LOG_WARN("fail to build empty data merge task", KR(ret)); } } else if (!param_.is_heap_table_) { - if (!is_multiple_mode) { - if (OB_FAIL(build_pk_table_merge_task(table_array, col_descs, max_parallel_degree))) { - LOG_WARN("fail to build pk table merge task", KR(ret)); - } - } else { + // 有主键表排序和不排序都写成multiple sstable + // if (!is_multiple_mode) { + // if (OB_FAIL(build_pk_table_merge_task(table_array, col_descs, max_parallel_degree))) { + // LOG_WARN("fail to build pk table merge task", KR(ret)); + // } + // } else { if (OB_FAIL( build_pk_table_multiple_merge_task(table_array, col_descs, max_parallel_degree))) { LOG_WARN("fail to build pk table multiple merge task", KR(ret)); } - } + // } } else { if (!is_multiple_mode) { if (OB_FAIL(build_heap_table_merge_task(table_array, col_descs, max_parallel_degree))) { diff --git a/src/storage/direct_load/ob_direct_load_multiple_heap_table_index_block_reader.h b/src/storage/direct_load/ob_direct_load_multiple_heap_table_index_block_reader.h index ab6bc0afcf..a4fa8240c8 100644 --- a/src/storage/direct_load/ob_direct_load_multiple_heap_table_index_block_reader.h +++ b/src/storage/direct_load/ob_direct_load_multiple_heap_table_index_block_reader.h @@ -21,11 +21,12 @@ namespace storage class ObDirectLoadMultipleHeapTableIndexBlockReader : public ObDirectLoadDataBlockReader + ObDirectLoadMultipleHeapTableIndexBlock::Entry, + true/*align*/> { typedef ObDirectLoadMultipleHeapTableIndexBlock::Header Header; typedef ObDirectLoadMultipleHeapTableIndexBlock::Entry Entry; - typedef ObDirectLoadDataBlockReader ParentType; + typedef ObDirectLoadDataBlockReader ParentType; public: ObDirectLoadMultipleHeapTableIndexBlockReader(); virtual ~ObDirectLoadMultipleHeapTableIndexBlockReader(); diff --git a/src/storage/direct_load/ob_direct_load_multiple_heap_table_index_block_writer.h b/src/storage/direct_load/ob_direct_load_multiple_heap_table_index_block_writer.h index 21fc26a3c5..8143668acc 100644 --- a/src/storage/direct_load/ob_direct_load_multiple_heap_table_index_block_writer.h +++ b/src/storage/direct_load/ob_direct_load_multiple_heap_table_index_block_writer.h @@ -21,7 +21,8 @@ namespace storage class ObDirectLoadMultipleHeapTableIndexBlockWriter : public ObDirectLoadDataBlockWriter + ObDirectLoadMultipleHeapTableIndexBlock::Entry, + true/*align*/> { public: ObDirectLoadMultipleHeapTableIndexBlockWriter(); diff --git a/src/storage/direct_load/ob_direct_load_multiple_sstable.cpp b/src/storage/direct_load/ob_direct_load_multiple_sstable.cpp index d144b2f225..1a8d4258a0 100644 --- a/src/storage/direct_load/ob_direct_load_multiple_sstable.cpp +++ b/src/storage/direct_load/ob_direct_load_multiple_sstable.cpp @@ -137,6 +137,7 @@ ObDirectLoadMultipleSSTable::~ObDirectLoadMultipleSSTable() void ObDirectLoadMultipleSSTable::reset() { + tablet_id_.reset(); meta_.reset(); start_key_.reset(); end_key_.reset(); @@ -154,6 +155,7 @@ int ObDirectLoadMultipleSSTable::init(const ObDirectLoadMultipleSSTableCreatePar ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), K(param)); } else { + tablet_id_ = param.tablet_id_; meta_.rowkey_column_num_ = param.rowkey_column_num_; meta_.column_count_ = param.column_count_; meta_.index_block_size_ = param.index_block_size_; @@ -188,6 +190,7 @@ int ObDirectLoadMultipleSSTable::copy(const ObDirectLoadMultipleSSTable &other) LOG_WARN("invalid args", KR(ret), K(other)); } else { reset(); + tablet_id_ = other.tablet_id_; meta_ = other.meta_; if (OB_FAIL(start_key_.deep_copy(other.start_key_, allocator_))) { LOG_WARN("fail to deep copy rowkey", KR(ret)); diff --git a/src/storage/direct_load/ob_direct_load_multiple_sstable.h b/src/storage/direct_load/ob_direct_load_multiple_sstable.h index 34c1912a2c..9ec64babe5 100644 --- a/src/storage/direct_load/ob_direct_load_multiple_sstable.h +++ b/src/storage/direct_load/ob_direct_load_multiple_sstable.h @@ -51,10 +51,11 @@ public: ObDirectLoadMultipleSSTableCreateParam(); ~ObDirectLoadMultipleSSTableCreateParam(); bool is_valid() const; - TO_STRING_KV(K_(rowkey_column_num), K_(column_count), K_(index_block_size), K_(data_block_size), + TO_STRING_KV(K_(tablet_id), K_(rowkey_column_num), K_(column_count), K_(index_block_size), K_(data_block_size), K_(index_block_count), K_(data_block_count), K_(row_count), K_(max_data_block_size), K_(start_key), K_(end_key), K_(fragments)); public: + common::ObTabletID tablet_id_; int64_t rowkey_column_num_; int64_t column_count_; int64_t index_block_size_; @@ -130,7 +131,7 @@ public: TO_STRING_KV(K_(meta), K_(start_key), K_(end_key), K_(fragments)); private: common::ObArenaAllocator allocator_; - common::ObTabletID tablet_id_; // invalid + common::ObTabletID tablet_id_; // invalid in multiple mode ObDirectLoadMultipleSSTableMeta meta_; ObDirectLoadMultipleDatumRowkey start_key_; ObDirectLoadMultipleDatumRowkey end_key_; diff --git a/src/storage/direct_load/ob_direct_load_multiple_sstable_builder.cpp b/src/storage/direct_load/ob_direct_load_multiple_sstable_builder.cpp index a75f1a4cfa..a390023a57 100644 --- a/src/storage/direct_load/ob_direct_load_multiple_sstable_builder.cpp +++ b/src/storage/direct_load/ob_direct_load_multiple_sstable_builder.cpp @@ -270,6 +270,7 @@ int ObDirectLoadMultipleSSTableBuilder::get_tables( fragment.data_file_size_ = data_block_writer_.get_file_size(); fragment.row_count_ = row_count_; fragment.max_data_block_size_ = data_block_writer_.get_max_block_size(); + create_param.tablet_id_ = param_.tablet_id_; create_param.rowkey_column_num_ = param_.table_data_desc_.rowkey_column_num_; create_param.column_count_ = param_.table_data_desc_.column_count_; create_param.index_block_size_ = param_.table_data_desc_.sstable_index_block_size_; diff --git a/src/storage/direct_load/ob_direct_load_multiple_sstable_builder.h b/src/storage/direct_load/ob_direct_load_multiple_sstable_builder.h index 8412aedeac..5f626aea1e 100644 --- a/src/storage/direct_load/ob_direct_load_multiple_sstable_builder.h +++ b/src/storage/direct_load/ob_direct_load_multiple_sstable_builder.h @@ -30,8 +30,9 @@ public: ObDirectLoadMultipleSSTableBuildParam(); ~ObDirectLoadMultipleSSTableBuildParam(); bool is_valid() const; - TO_STRING_KV(K_(table_data_desc), KP_(datum_utils), KP_(file_mgr)); + TO_STRING_KV(K_(tablet_id), K_(table_data_desc), KP_(datum_utils), KP_(file_mgr)); public: + common::ObTabletID tablet_id_; ObDirectLoadTableDataDesc table_data_desc_; const blocksstable::ObStorageDatumUtils *datum_utils_; ObDirectLoadTmpFileManager *file_mgr_; diff --git a/src/storage/direct_load/ob_direct_load_multiple_sstable_compactor.cpp b/src/storage/direct_load/ob_direct_load_multiple_sstable_compactor.cpp index 995d4b77ac..723dec948f 100644 --- a/src/storage/direct_load/ob_direct_load_multiple_sstable_compactor.cpp +++ b/src/storage/direct_load/ob_direct_load_multiple_sstable_compactor.cpp @@ -127,6 +127,7 @@ int ObDirectLoadMultipleSSTableCompactor::check_table_compactable( } else { const ObDirectLoadMultipleSSTableMeta &table_meta = sstable->get_meta(); if (OB_UNLIKELY( + sstable->get_tablet_id() != param_.tablet_id_ || table_meta.rowkey_column_num_ != param_.table_data_desc_.rowkey_column_num_ || table_meta.column_count_ != param_.table_data_desc_.column_count_ || table_meta.index_block_size_ != param_.table_data_desc_.sstable_index_block_size_ || @@ -168,6 +169,7 @@ int ObDirectLoadMultipleSSTableCompactor::get_table(ObIDirectLoadPartitionTable } else { ObDirectLoadMultipleSSTable *sstable = nullptr; ObDirectLoadMultipleSSTableCreateParam create_param; + create_param.tablet_id_ = param_.tablet_id_; create_param.rowkey_column_num_ = param_.table_data_desc_.rowkey_column_num_; create_param.column_count_ = param_.table_data_desc_.column_count_; create_param.index_block_size_ = param_.table_data_desc_.sstable_index_block_size_; diff --git a/src/storage/direct_load/ob_direct_load_multiple_sstable_compactor.h b/src/storage/direct_load/ob_direct_load_multiple_sstable_compactor.h index fd73ef7207..940064f4b0 100644 --- a/src/storage/direct_load/ob_direct_load_multiple_sstable_compactor.h +++ b/src/storage/direct_load/ob_direct_load_multiple_sstable_compactor.h @@ -26,8 +26,9 @@ public: ObDirectLoadMultipleSSTableCompactParam(); ~ObDirectLoadMultipleSSTableCompactParam(); bool is_valid() const; - TO_STRING_KV(K_(table_data_desc), KP_(datum_utils)); + TO_STRING_KV(K_(tablet_id), K_(table_data_desc), KP_(datum_utils)); public: + common::ObTabletID tablet_id_; ObDirectLoadTableDataDesc table_data_desc_; const blocksstable::ObStorageDatumUtils *datum_utils_; }; diff --git a/src/storage/direct_load/ob_direct_load_sstable_index_block_reader.h b/src/storage/direct_load/ob_direct_load_sstable_index_block_reader.h index 4fd9ab81a6..1568ac2e14 100644 --- a/src/storage/direct_load/ob_direct_load_sstable_index_block_reader.h +++ b/src/storage/direct_load/ob_direct_load_sstable_index_block_reader.h @@ -21,10 +21,12 @@ namespace storage class ObDirectLoadSSTableIndexBlockReader : public ObDirectLoadDataBlockReader + ObDirectLoadSSTableIndexBlock::Entry, + true/*align*/> { typedef ObDirectLoadDataBlockReader + ObDirectLoadSSTableIndexBlock::Entry, + true> ParentType; public: ObDirectLoadSSTableIndexBlockReader(); diff --git a/src/storage/direct_load/ob_direct_load_sstable_index_block_writer.h b/src/storage/direct_load/ob_direct_load_sstable_index_block_writer.h index bdfd8296ca..516ed21af6 100644 --- a/src/storage/direct_load/ob_direct_load_sstable_index_block_writer.h +++ b/src/storage/direct_load/ob_direct_load_sstable_index_block_writer.h @@ -21,7 +21,8 @@ namespace storage class ObDirectLoadSSTableIndexBlockWriter : public ObDirectLoadDataBlockWriter + ObDirectLoadSSTableIndexBlock::Entry, + true/*align*/> { public: ObDirectLoadSSTableIndexBlockWriter(); diff --git a/src/storage/direct_load/ob_direct_load_table_store.cpp b/src/storage/direct_load/ob_direct_load_table_store.cpp index dc0ab0b9d6..2bcde21ee3 100644 --- a/src/storage/direct_load/ob_direct_load_table_store.cpp +++ b/src/storage/direct_load/ob_direct_load_table_store.cpp @@ -15,7 +15,7 @@ #include "observer/table_load/ob_table_load_stat.h" #include "storage/direct_load/ob_direct_load_external_multi_partition_table.h" #include "storage/direct_load/ob_direct_load_fast_heap_table_builder.h" -#include "storage/direct_load/ob_direct_load_sstable_builder.h" +#include "storage/direct_load/ob_direct_load_multiple_sstable_builder.h" #include "storage/direct_load/ob_direct_load_table_builder_allocator.h" namespace oceanbase @@ -118,16 +118,18 @@ int ObDirectLoadTableStoreBucket::init(const ObDirectLoadTableStoreParam ¶m, } else { abort_unless(!param.table_data_desc_.is_heap_table_); // new sstable - ObDirectLoadSSTableBuildParam sstable_build_param; + ObDirectLoadMultipleSSTableBuildParam sstable_build_param; sstable_build_param.tablet_id_ = tablet_id; sstable_build_param.table_data_desc_ = param.table_data_desc_; sstable_build_param.datum_utils_ = param.datum_utils_; sstable_build_param.file_mgr_ = param.file_mgr_; - ObDirectLoadSSTableBuilder *sstable_builder = nullptr; + sstable_build_param.extra_buf_ = param.extra_buf_; + sstable_build_param.extra_buf_size_ = param.extra_buf_size_; + ObDirectLoadMultipleSSTableBuilder *sstable_builder = nullptr; if (OB_ISNULL(sstable_builder = - table_builder_allocator_->alloc())) { + table_builder_allocator_->alloc())) { ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("fail to alloc ObDirectLoadSSTableBuilder", KR(ret)); + LOG_WARN("fail to alloc ObDirectLoadMultipleSSTableBuilder", KR(ret)); } else if (OB_FAIL(sstable_builder->init(sstable_build_param))) { LOG_WARN("fail to init sstable builder", KR(ret)); }