From a1fa978f84d8bcd22ee246fa46655e8eb53ae593 Mon Sep 17 00:00:00 2001 From: yongshige <598633031@qq.com> Date: Tue, 8 Aug 2023 05:12:15 +0000 Subject: [PATCH] support strict replace --- .../ob_table_direct_load_rpc_executor.cpp | 26 +- .../ob_table_direct_load_rpc_executor.h | 1 + .../table_load/ob_table_load_client_task.cpp | 1 + .../table_load/ob_table_load_client_task.h | 2 + .../table_load/ob_table_load_coordinator.cpp | 19 +- .../ob_table_load_mem_compactor.cpp | 2 + ...ble_load_multiple_heap_table_compactor.cpp | 1 + .../table_load/ob_table_load_trans_store.cpp | 8 +- .../table_load/ob_table_load_trans_store.h | 1 + src/share/table/ob_table_load_define.cpp | 2 + src/share/table/ob_table_load_define.h | 61 +++++ src/share/table/ob_table_load_row.h | 42 ++- .../engine/cmd/ob_load_data_direct_impl.cpp | 49 +++- src/sql/engine/cmd/ob_load_data_direct_impl.h | 7 +- .../direct_load/ob_direct_load_compare.cpp | 257 +++++++++++++++--- .../direct_load/ob_direct_load_compare.h | 43 ++- ...rect_load_external_multi_partition_row.cpp | 4 + ...direct_load_external_multi_partition_row.h | 8 +- ...ct_load_external_multi_partition_table.cpp | 4 +- ...rect_load_external_multi_partition_table.h | 1 + .../ob_direct_load_external_row.cpp | 13 +- .../direct_load/ob_direct_load_external_row.h | 9 +- .../ob_direct_load_external_table_builder.cpp | 3 +- .../ob_direct_load_external_table_builder.h | 1 + ...ob_direct_load_fast_heap_table_builder.cpp | 2 + .../ob_direct_load_fast_heap_table_builder.h | 1 + .../direct_load/ob_direct_load_i_table.h | 2 + .../direct_load/ob_direct_load_mem_context.h | 1 + .../direct_load/ob_direct_load_mem_dump.cpp | 4 +- .../direct_load/ob_direct_load_mem_loader.cpp | 2 +- .../direct_load/ob_direct_load_mem_sample.cpp | 2 +- .../ob_direct_load_multiple_datum_row.cpp | 13 +- .../ob_direct_load_multiple_datum_row.h | 13 +- .../ob_direct_load_multiple_external_row.cpp | 13 +- .../ob_direct_load_multiple_external_row.h | 11 +- ...irect_load_multiple_heap_table_builder.cpp | 3 +- ..._direct_load_multiple_heap_table_builder.h | 1 + ...direct_load_multiple_heap_table_sorter.cpp | 2 +- ...b_direct_load_multiple_sstable_builder.cpp | 3 +- .../ob_direct_load_multiple_sstable_builder.h | 1 + .../ob_direct_load_sstable_builder.cpp | 4 +- .../ob_direct_load_sstable_builder.h | 1 + .../ob_direct_load_table_store.cpp | 7 +- .../direct_load/ob_direct_load_table_store.h | 4 +- .../test_direct_load_data_block_writer.cpp | 33 ++- 45 files changed, 559 insertions(+), 129 deletions(-) diff --git a/src/observer/table_load/client/ob_table_direct_load_rpc_executor.cpp b/src/observer/table_load/client/ob_table_direct_load_rpc_executor.cpp index b1d813319..33892adb3 100644 --- a/src/observer/table_load/client/ob_table_direct_load_rpc_executor.cpp +++ b/src/observer/table_load/client/ob_table_direct_load_rpc_executor.cpp @@ -447,7 +447,10 @@ int ObTableDirectLoadInsertExecutor::process() } else { ObTableLoadCoordinator coordinator(table_ctx); ObTableLoadTransId trans_id; - if (OB_FAIL(client_task->get_next_trans_id(trans_id))) { + int64_t batch_id = client_task->get_next_batch_id(); + if (OB_FAIL(set_batch_seq_no(batch_id, obj_rows))) { + LOG_WARN("fail to set batch seq no", KR(ret)); + } else if (OB_FAIL(client_task->get_next_trans_id(trans_id))) { LOG_WARN("fail to get next trans id", KR(ret)); } else if (OB_FAIL(coordinator.init())) { LOG_WARN("fail to init coordinator", KR(ret)); @@ -497,5 +500,26 @@ int ObTableDirectLoadInsertExecutor::decode_payload(const ObString &payload, return ret; } +int ObTableDirectLoadInsertExecutor::set_batch_seq_no(int64_t batch_id, + ObTableLoadObjRowArray &obj_row_array) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(obj_row_array.empty())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid args", KR(ret), K(obj_row_array)); + } else if (OB_UNLIKELY(batch_id > ObTableLoadSequenceNo::MAX_BATCH_ID || + obj_row_array.count() > ObTableLoadSequenceNo::MAX_BATCH_SEQ_NO)) { + ret = OB_SIZE_OVERFLOW; + LOG_WARN("size is overflow", KR(ret), K(batch_id), K(obj_row_array.count())); + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < obj_row_array.count(); ++i) { + ObTableLoadObjRow &row = obj_row_array.at(i); + row.seq_no_.batch_id_ = batch_id; + row.seq_no_.batch_seq_no_ = i; + } + } + return ret; +} + } // namespace observer } // namespace oceanbase diff --git a/src/observer/table_load/client/ob_table_direct_load_rpc_executor.h b/src/observer/table_load/client/ob_table_direct_load_rpc_executor.h index 402730b1c..a98cc83cc 100644 --- a/src/observer/table_load/client/ob_table_direct_load_rpc_executor.h +++ b/src/observer/table_load/client/ob_table_direct_load_rpc_executor.h @@ -158,6 +158,7 @@ protected: private: static int decode_payload(const common::ObString &payload, table::ObTableLoadObjRowArray &obj_row_array); + int set_batch_seq_no(int64_t batch_id, table::ObTableLoadObjRowArray &obj_row_array); }; } // namespace observer diff --git a/src/observer/table_load/ob_table_load_client_task.cpp b/src/observer/table_load/ob_table_load_client_task.cpp index 12c0e8ace..7c5a42049 100644 --- a/src/observer/table_load/ob_table_load_client_task.cpp +++ b/src/observer/table_load/ob_table_load_client_task.cpp @@ -37,6 +37,7 @@ ObTableLoadClientTask::ObTableLoadClientTask() exec_ctx_(nullptr), task_scheduler_(nullptr), next_trans_idx_(0), + next_batch_id_(0), table_ctx_(nullptr), client_status_(ObTableLoadClientStatus::MAX_STATUS), error_code_(OB_SUCCESS), diff --git a/src/observer/table_load/ob_table_load_client_task.h b/src/observer/table_load/ob_table_load_client_task.h index 3f3f22467..10c8ee047 100644 --- a/src/observer/table_load/ob_table_load_client_task.h +++ b/src/observer/table_load/ob_table_load_client_task.h @@ -43,6 +43,7 @@ public: OB_INLINE ObTableLoadClientExecCtx *get_exec_ctx() { return exec_ctx_; } int add_trans_id(const table::ObTableLoadTransId &trans_id); int get_next_trans_id(table::ObTableLoadTransId &trans_id); + int64_t get_next_batch_id() { return ATOMIC_FAA(&next_batch_id_, 1); } OB_INLINE const common::ObIArray &get_trans_ids() const { return trans_ids_; @@ -82,6 +83,7 @@ private: ObITableLoadTaskScheduler *task_scheduler_; common::ObArray trans_ids_; int64_t next_trans_idx_; + int64_t next_batch_id_ CACHE_ALIGNED; mutable obsys::ObRWLock rw_lock_; ObTableLoadTableCtx *table_ctx_; table::ObTableLoadClientStatus client_status_; diff --git a/src/observer/table_load/ob_table_load_coordinator.cpp b/src/observer/table_load/ob_table_load_coordinator.cpp index 07ec9104e..c5c922cfe 100644 --- a/src/observer/table_load/ob_table_load_coordinator.cpp +++ b/src/observer/table_load/ob_table_load_coordinator.cpp @@ -1260,26 +1260,15 @@ public: int set_objs(const ObTableLoadObjRowArray &obj_rows, const ObIArray &idx_array) { int ret = OB_SUCCESS; - for (int64_t i = 0; OB_SUCC(ret) && (i < obj_rows.count()); ++i) { const ObTableLoadObjRow &src_obj_row = obj_rows.at(i); ObTableLoadObjRow out_obj_row; - - if (OB_FAIL(out_obj_row.init(src_obj_row.count_, src_obj_row.allocator_handle_))) { - LOG_WARN("failed to init out_obj_row", KR(ret), K(src_obj_row.count_)); - } else { - for (int64_t j = 0; OB_SUCC(ret) && (j < src_obj_row.count_); ++j) { - out_obj_row.cells_[j] = src_obj_row.cells_[idx_array.at(j)]; - } - } - - if (OB_SUCC(ret)) { - if (OB_FAIL(obj_rows_.push_back(out_obj_row))) { - LOG_WARN("failed to add row to obj_rows_", KR(ret), K(out_obj_row)); - } + if (src_obj_row.project(idx_array, out_obj_row)) { + LOG_WARN("failed to projecte out_obj_row", KR(ret), K(src_obj_row.count_)); + } else if (OB_FAIL(obj_rows_.push_back(out_obj_row))) { + LOG_WARN("failed to add row to obj_rows_", KR(ret), K(out_obj_row)); } } - return ret; } int process() override diff --git a/src/observer/table_load/ob_table_load_mem_compactor.cpp b/src/observer/table_load/ob_table_load_mem_compactor.cpp index c56be6034..98df727ef 100644 --- a/src/observer/table_load/ob_table_load_mem_compactor.cpp +++ b/src/observer/table_load/ob_table_load_mem_compactor.cpp @@ -26,6 +26,7 @@ using namespace common::hash; using namespace storage; using namespace table; using namespace blocksstable; +using namespace sql; class ObTableLoadMemCompactor::SampleTaskProcessor : public ObITableLoadTaskProcessor { @@ -276,6 +277,7 @@ int ObTableLoadMemCompactor::inner_init() mem_ctx_.column_count_ = param_->column_count_; mem_ctx_.dml_row_handler_ = store_ctx_->error_row_handler_; mem_ctx_.file_mgr_ = store_ctx_->tmp_file_mgr_; + mem_ctx_.dup_action_ = param_->dup_action_; } if (OB_SUCC(ret)) { if (OB_FAIL(mem_ctx_.init())) { diff --git a/src/observer/table_load/ob_table_load_multiple_heap_table_compactor.cpp b/src/observer/table_load/ob_table_load_multiple_heap_table_compactor.cpp index 739653787..a1f49dbf9 100644 --- a/src/observer/table_load/ob_table_load_multiple_heap_table_compactor.cpp +++ b/src/observer/table_load/ob_table_load_multiple_heap_table_compactor.cpp @@ -321,6 +321,7 @@ int ObTableLoadMultipleHeapTableCompactor::inner_init() mem_ctx_.column_count_ = param_->column_count_; mem_ctx_.dml_row_handler_ = store_ctx_->error_row_handler_; mem_ctx_.file_mgr_ = store_ctx_->tmp_file_mgr_; + mem_ctx_.dup_action_ = param_->dup_action_; if (OB_SUCC(ret)) { if (OB_FAIL(mem_ctx_.init())) { diff --git a/src/observer/table_load/ob_table_load_trans_store.cpp b/src/observer/table_load/ob_table_load_trans_store.cpp index 501da0dd1..e4c8360b0 100644 --- a/src/observer/table_load/ob_table_load_trans_store.cpp +++ b/src/observer/table_load/ob_table_load_trans_store.cpp @@ -294,7 +294,7 @@ int ObTableLoadTransStoreWriter::write(int32_t session_id, } else { ret = OB_SUCCESS; } - } else if (OB_FAIL(write_row_to_table_store(session_ctx.table_store_, row.tablet_id_, session_ctx.datum_row_))) { + } else if (OB_FAIL(write_row_to_table_store(session_ctx.table_store_, row.tablet_id_, row.obj_row_.seq_no_, session_ctx.datum_row_))) { LOG_WARN("fail to write row", KR(ret), K(session_id), K(row.tablet_id_), K(i)); } } @@ -319,6 +319,7 @@ int ObTableLoadTransStoreWriter::write(int32_t session_id, ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), K(session_id), K(row_array.empty())); } else { + ObTableLoadSequenceNo seq_no(0); // pdml导入的行目前不存在主键冲突,先都用一个默认的seq_no SessionContext &session_ctx = session_ctx_array_[session_id - 1]; for (int64_t i = 0; OB_SUCC(ret) && i < row_array.count(); ++i) { const ObNewRow &row = row_array.at(i); @@ -329,7 +330,7 @@ int ObTableLoadTransStoreWriter::write(int32_t session_id, } } if (OB_SUCC(ret)) { - if (OB_FAIL(write_row_to_table_store(session_ctx.table_store_, tablet_id, session_ctx.datum_row_))) { + if (OB_FAIL(write_row_to_table_store(session_ctx.table_store_, tablet_id, seq_no, session_ctx.datum_row_))) { LOG_WARN("fail to write row", KR(ret), K(session_id), K(tablet_id)); } } @@ -463,10 +464,11 @@ int ObTableLoadTransStoreWriter::handle_identity_column(const ObColumnSchemaV2 * int ObTableLoadTransStoreWriter::write_row_to_table_store(ObDirectLoadTableStore &table_store, const ObTabletID &tablet_id, + const ObTableLoadSequenceNo &seq_no, const ObDatumRow &datum_row) { int ret = OB_SUCCESS; - if (OB_FAIL(table_store.append_row(tablet_id, datum_row))) { + if (OB_FAIL(table_store.append_row(tablet_id, seq_no, datum_row))) { LOG_WARN("fail to append row", KR(ret), K(datum_row)); } if (OB_FAIL(ret)) { diff --git a/src/observer/table_load/ob_table_load_trans_store.h b/src/observer/table_load/ob_table_load_trans_store.h index c52d5729d..1f3bf9f04 100644 --- a/src/observer/table_load/ob_table_load_trans_store.h +++ b/src/observer/table_load/ob_table_load_trans_store.h @@ -83,6 +83,7 @@ private: common::ObArenaAllocator &cast_allocator); int write_row_to_table_store(storage::ObDirectLoadTableStore &table_store, const common::ObTabletID &tablet_id, + const table::ObTableLoadSequenceNo &seq_no, const blocksstable::ObDatumRow &datum_row); private: ObTableLoadTransStore *const trans_store_; diff --git a/src/share/table/ob_table_load_define.cpp b/src/share/table/ob_table_load_define.cpp index 31fd3ea60..e309ea461 100644 --- a/src/share/table/ob_table_load_define.cpp +++ b/src/share/table/ob_table_load_define.cpp @@ -40,5 +40,7 @@ OB_SERIALIZE_MEMBER_SIMPLE(ObTableLoadResultInfo, skipped_, warnings_); +OB_SERIALIZE_MEMBER_SIMPLE(ObTableLoadSequenceNo, sequence_no_); + } // namespace table } // namespace oceanbase diff --git a/src/share/table/ob_table_load_define.h b/src/share/table/ob_table_load_define.h index 3bc783b64..d63cae30e 100644 --- a/src/share/table/ob_table_load_define.h +++ b/src/share/table/ob_table_load_define.h @@ -428,5 +428,66 @@ public: uint64_t warnings_ CACHE_ALIGNED; }; +struct ObTableLoadSequenceNo +{ + OB_UNIS_VERSION(1); +public: + static const uint64_t MAX_DATA_ID = (1LL << 16) - 1; + static const uint64_t MAX_CHUNK_ID = (1LL << 32) - 1; + static const uint64_t MAX_BATCH_ID = (1LL << 48) - 1; + static const uint64_t MAX_DATA_SEQ_NO = (1LL << 48) - 1; + static const uint64_t MAX_CHUNK_SEQ_NO = (1LL << 32) - 1; + static const uint64_t MAX_BATCH_SEQ_NO = (1LL << 16) - 1; + union { + // multi file + struct { + uint64_t data_id_ : 16; + uint64_t data_seq_no_ : 48; + }; + // single file + struct { + uint64_t chunk_id_ : 32; + uint64_t chunk_seq_no_ : 32; + }; + // client + struct { + uint64_t batch_id_ : 48; + uint64_t batch_seq_no_ : 16; + }; + uint64_t sequence_no_; + }; + ObTableLoadSequenceNo() : sequence_no_(OB_INVALID_ID) {} + ObTableLoadSequenceNo(uint64_t sequence_no) { sequence_no_ = sequence_no; } + void reset() { sequence_no_ = OB_INVALID_ID; } + bool is_valid () const {return sequence_no_ != OB_INVALID_ID; } + bool operator == (const ObTableLoadSequenceNo &other) const { return sequence_no_ == other.sequence_no_; } + bool operator < (const ObTableLoadSequenceNo &other) const { return sequence_no_ < other.sequence_no_; } + bool operator > (const ObTableLoadSequenceNo &other) const { return sequence_no_ > other.sequence_no_; } + ObTableLoadSequenceNo &operator=(const ObTableLoadSequenceNo &other) { sequence_no_ = other.sequence_no_; return *this; } + ObTableLoadSequenceNo &operator++() + { + sequence_no_++; + return *this; + } + ObTableLoadSequenceNo operator++(int) + { + ObTableLoadSequenceNo tmp = *this; + sequence_no_++; + return tmp; + } + ObTableLoadSequenceNo &operator--() + { + sequence_no_--; + return *this; + } + ObTableLoadSequenceNo operator--(int) + { + ObTableLoadSequenceNo tmp = *this; + sequence_no_--; + return tmp; + } + TO_STRING_KV(K_(sequence_no), K_(data_id), K_(data_seq_no), K_(chunk_id), K_(chunk_seq_no), K_(batch_id), K_(batch_seq_no)); +}; + } // namespace table } // namespace oceanbase diff --git a/src/share/table/ob_table_load_row.h b/src/share/table/ob_table_load_row.h index 3420b9868..5e8d708a1 100644 --- a/src/share/table/ob_table_load_row.h +++ b/src/share/table/ob_table_load_row.h @@ -11,6 +11,7 @@ #include "lib/utility/ob_print_utils.h" #include "common/object/ob_object.h" #include "common/ob_tablet_id.h" +#include "ob_table_load_define.h" namespace oceanbase { @@ -25,8 +26,8 @@ public: virtual ~ObTableLoadRow() {} void reset(); int init(int64_t count, const ObTableLoadSharedAllocatorHandle &allocator_handle); - int deep_copy_and_assign(const T *row, int64_t count, - const ObTableLoadSharedAllocatorHandle &allocator_handle); + int project(const ObIArray &idx_projector, ObTableLoadRow &projected_row) const; + int deep_copy(const ObTableLoadRow &other, const ObTableLoadSharedAllocatorHandle &allocator_handle); // for deserialize() void set_allocator(const ObTableLoadSharedAllocatorHandle &allocator_handle) { @@ -36,6 +37,10 @@ public: { return allocator_handle_; } + ObTableLoadSequenceNo& get_sequence_no() + { + return seq_no_; + } TO_STRING_KV(K_(count)); private: @@ -44,6 +49,7 @@ private: public: ObTableLoadSharedAllocatorHandle allocator_handle_; + ObTableLoadSequenceNo seq_no_; T *cells_; int64_t count_; }; @@ -52,6 +58,7 @@ template void ObTableLoadRow::reset() { allocator_handle_.reset(); + seq_no_.reset(); cells_ = nullptr; count_ = 0; } @@ -101,7 +108,7 @@ int ObTableLoadRow::allocate_cells(T *&cells, int64_t count, } template -int ObTableLoadRow::deep_copy_and_assign(const T *row, int64_t count, +int ObTableLoadRow::deep_copy(const ObTableLoadRow &other, const ObTableLoadSharedAllocatorHandle &allocator_handle) { @@ -109,21 +116,36 @@ int ObTableLoadRow::deep_copy_and_assign(const T *row, int64_t count, T *cells = nullptr; reset(); - if (OB_FAIL(allocate_cells(cells, count, allocator_handle))) { - OB_LOG(WARN, "failed to allocate cells", KR(ret), K(count)); + if (OB_FAIL(allocate_cells(cells, other.count_, allocator_handle))) { + OB_LOG(WARN, "failed to allocate cells", KR(ret), K(other.count_)); } - for (int64_t i = 0; OB_SUCC(ret) && i < count; i ++) { - if (OB_FAIL(observer::ObTableLoadUtils::deep_copy(row[i], + for (int64_t i = 0; OB_SUCC(ret) && i < other.count_; i ++) { + if (OB_FAIL(observer::ObTableLoadUtils::deep_copy(other.cells_[i], cells[i], *allocator_handle))) { OB_LOG(WARN, "fail to deep copy object", KR(ret)); } } if (OB_SUCC(ret)) { allocator_handle_ = allocator_handle; + seq_no_ = other.seq_no_; cells_ = cells; - count_ = count; + count_ = other.count_; } + return ret; +} +template +int ObTableLoadRow::project(const ObIArray &idx_projector, ObTableLoadRow &projected_row) const +{ + int ret = OB_SUCCESS; + if (OB_FAIL(projected_row.init(count_, allocator_handle_))) { + OB_LOG(WARN, "failed to alloate cells", KR(ret), K(projected_row.count_)); + } else { + for (int64_t j = 0; j < count_; ++j) { + projected_row.cells_[j] = cells_[idx_projector.at(j)]; + } + projected_row.seq_no_ = seq_no_; + } return ret; } @@ -131,6 +153,7 @@ template int ObTableLoadRow::serialize(SERIAL_PARAMS) const { int ret = OB_SUCCESS; + OB_UNIS_ENCODE(seq_no_); OB_UNIS_ENCODE_ARRAY(cells_, count_); return ret; } @@ -140,6 +163,7 @@ int ObTableLoadRow::deserialize(DESERIAL_PARAMS) { int ret = OB_SUCCESS; int64_t count = 0; + OB_UNIS_DECODE(seq_no_); OB_UNIS_DECODE(count); if (OB_SUCC(ret) && (count > 0)) { T *cells = nullptr; @@ -162,6 +186,7 @@ template int64_t ObTableLoadRow::get_serialize_size() const { int64_t len = 0; + OB_UNIS_ADD_LEN(seq_no_); OB_UNIS_ADD_LEN_ARRAY(cells_, count_); return len; } @@ -178,7 +203,6 @@ public: obj_row_.set_allocator(allocator_handle); } TO_STRING_KV(K_(tablet_id), K_(obj_row)); - public: common::ObTabletID tablet_id_; ObTableLoadObjRow obj_row_; diff --git a/src/sql/engine/cmd/ob_load_data_direct_impl.cpp b/src/sql/engine/cmd/ob_load_data_direct_impl.cpp index 04822ab19..5f4316664 100644 --- a/src/sql/engine/cmd/ob_load_data_direct_impl.cpp +++ b/src/sql/engine/cmd/ob_load_data_direct_impl.cpp @@ -485,12 +485,13 @@ int ObLoadDataDirectImpl::DataDescIterator::add_data_desc(const DataDesc &data_d return ret; } -int ObLoadDataDirectImpl::DataDescIterator::get_next_data_desc(DataDesc &data_desc) +int ObLoadDataDirectImpl::DataDescIterator::get_next_data_desc(DataDesc &data_desc, int64_t &pos) { int ret = OB_SUCCESS; if (pos_ >= data_descs_.count()) { ret = OB_ITER_END; } else { + pos = pos_; data_desc = data_descs_.at(pos_++); } return ret; @@ -1340,9 +1341,13 @@ int ObLoadDataDirectImpl::FileLoadExecutor::process_task_handle(TaskHandle *hand //此时row中的每个obj的内容指向的是data parser中的内存 //因此得把它们深拷贝一遍 ObTableLoadObjRow tmp_obj_row; - if (OB_FAIL(tmp_obj_row.deep_copy_and_assign(row.cells_, row.count_, allocator_handle))) { + tmp_obj_row.seq_no_= handle->get_next_seq_no(); + tmp_obj_row.cells_ = row.cells_; + tmp_obj_row.count_ = row.count_; + ObTableLoadObjRow row; + if (OB_FAIL(row.deep_copy(tmp_obj_row, allocator_handle))) { LOG_WARN("failed to deep copy add assign to tmp_obj_row", KR(ret)); - } else if (OB_FAIL(obj_rows.push_back(tmp_obj_row))) { + } else if (OB_FAIL(obj_rows.push_back(row))) { LOG_WARN("failed to add tmp_obj_row to obj_rows", KR(ret)); } else { ++processed_line_count; @@ -1422,6 +1427,9 @@ int ObLoadDataDirectImpl::LargeFileLoadTaskProcessor::process() int64_t line_count = 0; if (OB_FAIL(file_load_executor_->process_task_handle(handle_, line_count))) { LOG_WARN("fail to process task handle", KR(ret)); + } else if (OB_UNLIKELY(line_count > ObTableLoadSequenceNo::MAX_CHUNK_SEQ_NO)){ + ret = OB_SIZE_OVERFLOW; + LOG_WARN("size is overflow", KR(ret), K(line_count)); } return ret; } @@ -1431,7 +1439,8 @@ int ObLoadDataDirectImpl::LargeFileLoadTaskProcessor::process() */ ObLoadDataDirectImpl::LargeFileLoadExecutor::LargeFileLoadExecutor() - : next_worker_idx_(0) + : next_worker_idx_(0), + next_chunk_id_(0) { } @@ -1452,6 +1461,7 @@ int ObLoadDataDirectImpl::LargeFileLoadExecutor::init(const LoadExecuteParam &ex ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), K(execute_param), K(execute_ctx), K(data_desc_iter)); } else { + int64_t data_id = 0; DataDescIterator copy_data_desc_iter; DataDesc data_desc; if (OB_FAIL(inner_init(execute_param, execute_ctx, execute_param.thread_count_, @@ -1461,7 +1471,7 @@ int ObLoadDataDirectImpl::LargeFileLoadExecutor::init(const LoadExecuteParam &ex // data_desc_ else if (OB_FAIL(copy_data_desc_iter.copy(data_desc_iter))) { LOG_WARN("fail to copy data desc iter", KR(ret)); - } else if (OB_FAIL(copy_data_desc_iter.get_next_data_desc(data_desc))) { + } else if (OB_FAIL(copy_data_desc_iter.get_next_data_desc(data_desc, data_id))) { LOG_WARN("fail to get next data desc", KR(ret)); } // expr_buffer_ @@ -1493,8 +1503,12 @@ int ObLoadDataDirectImpl::LargeFileLoadExecutor::get_next_task_handle(TaskHandle { int ret = OB_SUCCESS; int64_t current_line_count = 0; + const int64_t chunk_id = next_chunk_id_ ++; expr_buffer_.reuse(); - if (OB_FAIL(data_reader_.get_next_buffer(*expr_buffer_.file_buffer_, current_line_count))) { + if (OB_UNLIKELY(chunk_id > ObTableLoadSequenceNo::MAX_CHUNK_ID)) { + ret = OB_SIZE_OVERFLOW; + LOG_WARN("size is overflow", KR(ret), K(chunk_id)); + } else if (OB_FAIL(data_reader_.get_next_buffer(*expr_buffer_.file_buffer_, current_line_count))) { if (OB_UNLIKELY(OB_ITER_END != ret)) { LOG_WARN("fail to get next buffer", KR(ret)); } @@ -1507,6 +1521,8 @@ int ObLoadDataDirectImpl::LargeFileLoadExecutor::get_next_task_handle(TaskHandle handle->data_desc_ = data_desc_; handle->start_line_no_ = total_line_count_ + 1; handle->result_.created_ts_ = ObTimeUtil::current_time(); + handle->sequence_no_.chunk_id_ = chunk_id; + handle->sequence_no_.chunk_seq_no_ = 0; handle->data_buffer_.swap(expr_buffer_); handle->data_buffer_.is_end_file_ = data_reader_.is_end_file(); } @@ -1601,6 +1617,7 @@ int ObLoadDataDirectImpl::MultiFilesLoadTaskProcessor::process() int ret = OB_SUCCESS; handle_->result_.start_process_ts_ = ObTimeUtil::current_time(); int64_t current_line_count = 0; + int64_t total_line_count = 0; if (OB_FAIL(data_reader_.init(execute_param_->data_access_param_, *execute_ctx_, handle_->data_desc_, true))) { LOG_WARN("fail to init data reader", KR(ret)); @@ -1617,6 +1634,12 @@ int ObLoadDataDirectImpl::MultiFilesLoadTaskProcessor::process() current_line_count = 0; if (OB_FAIL(file_load_executor_->process_task_handle(handle_, current_line_count))) { LOG_WARN("fail to process task handle", KR(ret)); + } else { + total_line_count += current_line_count; + if (OB_UNLIKELY(total_line_count > ObTableLoadSequenceNo::MAX_DATA_SEQ_NO)){ + ret = OB_SIZE_OVERFLOW; + LOG_WARN("size is overflow", KR(ret), K(total_line_count)); + } } } } @@ -1640,6 +1663,12 @@ int ObLoadDataDirectImpl::MultiFilesLoadTaskProcessor::process() ret = OB_NOT_SUPPORTED; LOG_WARN("direct-load does not support big row", KR(ret), "size", handle_->data_buffer_.get_data_length()); + } else { + total_line_count += current_line_count; + if (OB_UNLIKELY(total_line_count > ObTableLoadSequenceNo::MAX_DATA_SEQ_NO)){ + ret = OB_SIZE_OVERFLOW; + LOG_WARN("size is overflow", KR(ret), K(total_line_count)); + } } } } @@ -1741,10 +1770,14 @@ int ObLoadDataDirectImpl::MultiFilesLoadExecutor::get_next_task_handle(TaskHandl { int ret = OB_SUCCESS; DataDesc data_desc; - if (OB_FAIL(data_desc_iter_.get_next_data_desc(data_desc))) { + int64_t data_id = 0; + if (OB_FAIL(data_desc_iter_.get_next_data_desc(data_desc, data_id))) { if (OB_UNLIKELY(OB_ITER_END != ret)) { LOG_WARN("fail to get next data desc", KR(ret)); } + } else if (OB_UNLIKELY(data_id > ObTableLoadSequenceNo::MAX_DATA_ID)) { + ret = OB_SIZE_OVERFLOW; + LOG_WARN("size is overflow", KR(ret), K(data_id)); } else if (OB_FAIL(fetch_task_handle(handle))) { LOG_WARN("fail to fetch task handle", KR(ret)); } else { @@ -1752,6 +1785,8 @@ int ObLoadDataDirectImpl::MultiFilesLoadExecutor::get_next_task_handle(TaskHandl handle->data_desc_ = data_desc; handle->start_line_no_ = 0; handle->result_.created_ts_ = ObTimeUtil::current_time(); + handle->sequence_no_.data_id_ = data_id; + handle->sequence_no_.data_seq_no_ = 0; } return ret; } diff --git a/src/sql/engine/cmd/ob_load_data_direct_impl.h b/src/sql/engine/cmd/ob_load_data_direct_impl.h index 7258c0962..a4bd54b98 100644 --- a/src/sql/engine/cmd/ob_load_data_direct_impl.h +++ b/src/sql/engine/cmd/ob_load_data_direct_impl.h @@ -155,7 +155,7 @@ private: int copy(const ObLoadFileIterator &file_iter); int copy(const DataDescIterator &desc_iter); int add_data_desc(const DataDesc &data_desc); - int get_next_data_desc(DataDesc &data_desc); + int get_next_data_desc(DataDesc &data_desc, int64_t &pos); TO_STRING_KV(K_(data_descs), K_(pos)); private: common::ObSEArray data_descs_; @@ -337,16 +337,20 @@ private: struct TaskHandle { + public: TaskHandle() : task_id_(common::OB_INVALID_ID), worker_idx_(-1), session_id_(0), start_line_no_(0) { } + table::ObTableLoadSequenceNo get_next_seq_no () { return sequence_no_ ++ ; } + public: int64_t task_id_; DataBuffer data_buffer_; int64_t worker_idx_; // parse thread idx int32_t session_id_; // table load session id DataDesc data_desc_; int64_t start_line_no_; // 从1开始 + table::ObTableLoadSequenceNo sequence_no_; TaskResult result_; TO_STRING_KV(K_(task_id), K_(data_buffer), K_(worker_idx), K_(session_id), K_(data_desc), K_(start_line_no), K_(result)); @@ -429,6 +433,7 @@ private: DataBuffer expr_buffer_; DataReader data_reader_; int64_t next_worker_idx_; + int64_t next_chunk_id_; DISALLOW_COPY_AND_ASSIGN(LargeFileLoadExecutor); }; diff --git a/src/storage/direct_load/ob_direct_load_compare.cpp b/src/storage/direct_load/ob_direct_load_compare.cpp index 94be8bffa..03639ccec 100644 --- a/src/storage/direct_load/ob_direct_load_compare.cpp +++ b/src/storage/direct_load/ob_direct_load_compare.cpp @@ -28,8 +28,23 @@ int ObDirectLoadDatumRowkeyCompare::init(const ObStorageDatumUtils &datum_utils) return ret; } -bool ObDirectLoadDatumRowkeyCompare::operator()(const ObDatumRowkey *lhs, - const ObDatumRowkey *rhs) +int ObDirectLoadDatumRowkeyCompare::compare(const ObDatumRowkey *lhs, const ObDatumRowkey *rhs, + int &cmp_ret) +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(datum_utils_) || OB_ISNULL(lhs) || OB_ISNULL(rhs) || + OB_UNLIKELY(!lhs->is_valid() || !rhs->is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid args", KR(ret), KP(datum_utils_), KP(lhs), KP(rhs)); + } else { + if (OB_FAIL(lhs->compare(*rhs, *datum_utils_, cmp_ret))) { + LOG_WARN("fail to compare rowkey", KR(ret), KP(lhs), K(rhs), K(datum_utils_)); + } + } + return ret; +} + +bool ObDirectLoadDatumRowkeyCompare::operator()(const ObDatumRowkey *lhs, const ObDatumRowkey *rhs) { int ret = OB_SUCCESS; int cmp_ret = 0; @@ -73,6 +88,33 @@ int ObDirectLoadDatumRowCompare::init(const ObStorageDatumUtils &datum_utils, in return ret; } +int ObDirectLoadDatumRowCompare::compare(const blocksstable::ObDatumRow *lhs, + const blocksstable::ObDatumRow *rhs, int &cmp_ret) +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("ObDirectLoadDatumRowCompare not init", KR(ret), KP(this)); + } else if (OB_ISNULL(lhs) || OB_ISNULL(rhs) || + OB_UNLIKELY(!lhs->is_valid() || !rhs->is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid args", KR(ret), KP(lhs), KP(rhs)); + } else if (OB_UNLIKELY(lhs->get_column_count() < rowkey_size_ || + rhs->get_column_count() < rowkey_size_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("Unexpected row column cnt", KR(ret), K(lhs), K(rhs), K_(rowkey_size)); + } else { + if (OB_FAIL(lhs_rowkey_.assign(lhs->storage_datums_, rowkey_size_))) { + LOG_WARN("Failed to assign datum rowkey", KR(ret), K(lhs), K_(rowkey_size)); + } else if (OB_FAIL(rhs_rowkey_.assign(rhs->storage_datums_, rowkey_size_))) { + LOG_WARN("Failed to assign datum rowkey", KR(ret), K(rhs), K_(rowkey_size)); + } else if (OB_FAIL(rowkey_compare_.compare(&lhs_rowkey_, &rhs_rowkey_, cmp_ret))) { + LOG_WARN("fail to compare rowkey", KR(ret), KP(lhs), K(rhs), K(cmp_ret)); + } + } + return ret; +} + bool ObDirectLoadDatumRowCompare::operator()(const ObDatumRow *lhs, const ObDatumRow *rhs) { int ret = OB_SUCCESS; @@ -125,6 +167,29 @@ int ObDirectLoadDatumArrayCompare::init(const ObStorageDatumUtils &datum_utils) return ret; } +int ObDirectLoadDatumArrayCompare::compare(const ObDirectLoadDatumArray *lhs, + const ObDirectLoadDatumArray *rhs, int &cmp_ret) +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("ObDirectLoadDatumArrayCompare not init", KR(ret), KP(this)); + } else if (OB_UNLIKELY(nullptr == lhs || nullptr == rhs || !lhs->is_valid() || !rhs->is_valid() || + lhs->count_ != rhs->count_)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid args", KR(ret), KP(lhs), KP(rhs)); + } else if (lhs->count_ > 0) { + if (OB_FAIL(lhs_rowkey_.assign(lhs->datums_, lhs->count_))) { + LOG_WARN("fail to assign rowkey", KR(ret)); + } else if (OB_FAIL(rhs_rowkey_.assign(rhs->datums_, rhs->count_))) { + LOG_WARN("fail to assign rowkey", KR(ret)); + } else if (OB_FAIL(rowkey_compare_.compare(&lhs_rowkey_, &rhs_rowkey_, cmp_ret))) { + LOG_WARN("fail to compare rowkey", KR(ret), KP(lhs), K(rhs), K(cmp_ret)); + } + } + return ret; +} + bool ObDirectLoadDatumArrayCompare::operator()(const ObDirectLoadDatumArray *lhs, const ObDirectLoadDatumArray *rhs) { @@ -154,6 +219,29 @@ bool ObDirectLoadDatumArrayCompare::operator()(const ObDirectLoadDatumArray *lhs return bret; } +int ObDirectLoadDatumArrayCompare::compare(const ObDirectLoadConstDatumArray *lhs, + const ObDirectLoadConstDatumArray *rhs, int &cmp_ret) +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("ObDirectLoadDatumArrayCompare not init", KR(ret), KP(this)); + } else if (OB_UNLIKELY(nullptr == lhs || nullptr == rhs || !lhs->is_valid() || !rhs->is_valid() || + lhs->count_ != rhs->count_)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid args", KR(ret), KP(lhs), KP(rhs)); + } else if (lhs->count_ > 0) { + if (OB_FAIL(lhs_rowkey_.assign(lhs->datums_, lhs->count_))) { + LOG_WARN("fail to assign rowkey", KR(ret)); + } else if (OB_FAIL(rhs_rowkey_.assign(rhs->datums_, rhs->count_))) { + LOG_WARN("fail to assign rowkey", KR(ret)); + } else if (OB_FAIL(rowkey_compare_.compare(&lhs_rowkey_, &rhs_rowkey_, cmp_ret))) { + LOG_WARN("fail to compare rowkey", KR(ret), KP(lhs), K(rhs), K(cmp_ret)); + } + } + return ret; +} + bool ObDirectLoadDatumArrayCompare::operator()(const ObDirectLoadConstDatumArray *lhs, const ObDirectLoadConstDatumArray *rhs) { @@ -187,7 +275,8 @@ bool ObDirectLoadDatumArrayCompare::operator()(const ObDirectLoadConstDatumArray * ObDirectLoadExternalRowCompare */ -int ObDirectLoadExternalRowCompare::init(const ObStorageDatumUtils &datum_utils) +int ObDirectLoadExternalRowCompare::init(const ObStorageDatumUtils &datum_utils, + sql::ObLoadDupActionType dup_action) { int ret = OB_SUCCESS; if (IS_INIT) { @@ -197,6 +286,7 @@ int ObDirectLoadExternalRowCompare::init(const ObStorageDatumUtils &datum_utils) if (OB_FAIL(datum_array_compare_.init(datum_utils))) { LOG_WARN("fail to init datum array compare", KR(ret)); } else { + dup_action_ = dup_action; is_inited_ = true; } } @@ -207,7 +297,7 @@ bool ObDirectLoadExternalRowCompare::operator()(const ObDirectLoadExternalRow *l const ObDirectLoadExternalRow *rhs) { int ret = OB_SUCCESS; - bool bret = false; + int cmp_ret = 0; if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("ObDirectLoadDatumRowCompare not init", KR(ret), KP(this)); @@ -215,22 +305,57 @@ bool ObDirectLoadExternalRowCompare::operator()(const ObDirectLoadExternalRow *l !rhs->is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), KP(lhs), KP(rhs)); - } else { - bret = datum_array_compare_.operator()(&lhs->rowkey_datum_array_, &rhs->rowkey_datum_array_); + } else if (OB_FAIL(compare(lhs, rhs, cmp_ret))) { + LOG_WARN("Fail to compare datum array", KR(ret), KP(lhs), KP(rhs)); } if (OB_FAIL(ret)) { result_code_ = ret; - } else if (OB_FAIL(datum_array_compare_.get_error_code())) { - result_code_ = datum_array_compare_.get_error_code(); } - return bret; + return cmp_ret < 0; +} + +int ObDirectLoadExternalRowCompare::compare(const ObDirectLoadExternalRow *lhs, + const ObDirectLoadExternalRow *rhs, int &cmp_ret) +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("ObDirectLoadDatumRowCompare not init", KR(ret), KP(this)); + } else if (OB_UNLIKELY(nullptr == lhs || nullptr == rhs || !lhs->is_valid() || + !rhs->is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid args", KR(ret), KP(lhs), KP(rhs)); + } else if (OB_FAIL(datum_array_compare_.compare(&lhs->rowkey_datum_array_, + &rhs->rowkey_datum_array_, cmp_ret))) { + LOG_WARN("fail to compare rowkey", KR(ret), KP(lhs), K(rhs), K(cmp_ret)); + } else { + if (cmp_ret == 0) { + if (lhs->seq_no_ == rhs->seq_no_) { + cmp_ret = 0; + } else if (lhs->seq_no_ > rhs->seq_no_) { + if (dup_action_ == sql::ObLoadDupActionType::LOAD_REPLACE) { + cmp_ret = -1; + } else { + cmp_ret = 1; + } + } else { + if (dup_action_ == sql::ObLoadDupActionType::LOAD_REPLACE) { + cmp_ret = 1; + } else { + cmp_ret = -1; + } + } + } + } + return ret; } /** * ObDirectLoadExternalMultiPartitionRowCompare */ -int ObDirectLoadExternalMultiPartitionRowCompare::init(const ObStorageDatumUtils &datum_utils) +int ObDirectLoadExternalMultiPartitionRowCompare::init(const ObStorageDatumUtils &datum_utils, + sql::ObLoadDupActionType dup_action) { int ret = OB_SUCCESS; if (IS_INIT) { @@ -240,6 +365,7 @@ int ObDirectLoadExternalMultiPartitionRowCompare::init(const ObStorageDatumUtils if (OB_FAIL(datum_array_compare_.init(datum_utils))) { LOG_WARN("fail to init datum array compare", KR(ret)); } else { + dup_action_ = dup_action; is_inited_ = true; } } @@ -251,7 +377,7 @@ bool ObDirectLoadExternalMultiPartitionRowCompare::operator()( const ObDirectLoadExternalMultiPartitionRow *rhs) { int ret = OB_SUCCESS; - bool bret = false; + int cmp_ret = 0; if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("ObDirectLoadExternalMultiPartitionRowCompare not init", KR(ret), KP(this)); @@ -259,25 +385,40 @@ bool ObDirectLoadExternalMultiPartitionRowCompare::operator()( !rhs->is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), KP(lhs), KP(rhs)); - } else { - if (lhs->tablet_id_ != rhs->tablet_id_) { - bret = lhs->tablet_id_ < rhs->tablet_id_; - } else { - bret = datum_array_compare_.operator()(&lhs->external_row_.rowkey_datum_array_, - &rhs->external_row_.rowkey_datum_array_); - } + } else if (OB_FAIL(compare(lhs, rhs, cmp_ret))) { + LOG_WARN("Fail to compare datum array", KR(ret), KP(lhs), KP(rhs)); } if (OB_FAIL(ret)) { result_code_ = ret; - } else if (OB_FAIL(datum_array_compare_.get_error_code())) { - result_code_ = datum_array_compare_.get_error_code(); } - return bret; + return cmp_ret < 0; } bool ObDirectLoadExternalMultiPartitionRowCompare::operator()( const ObDirectLoadConstExternalMultiPartitionRow *lhs, const ObDirectLoadConstExternalMultiPartitionRow *rhs) +{ + int ret = OB_SUCCESS; + int cmp_ret = 0; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("ObDirectLoadExternalMultiPartitionRowCompare not init", KR(ret), KP(this)); + } else if (OB_UNLIKELY(nullptr == lhs || nullptr == rhs || !lhs->is_valid() || + !rhs->is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid args", KR(ret), KP(lhs), KP(rhs)); + } else if (OB_FAIL(compare(lhs, rhs, cmp_ret))) { + LOG_WARN("Fail to compare datum array", KR(ret), KP(lhs), KP(rhs)); + } + if (OB_FAIL(ret)) { + result_code_ = ret; + } + return cmp_ret < 0; +} + +int ObDirectLoadExternalMultiPartitionRowCompare::compare( + const ObDirectLoadExternalMultiPartitionRow *lhs, + const ObDirectLoadExternalMultiPartitionRow *rhs, int &cmp_ret) { int ret = OB_SUCCESS; bool bret = false; @@ -290,18 +431,70 @@ bool ObDirectLoadExternalMultiPartitionRowCompare::operator()( LOG_WARN("invalid args", KR(ret), KP(lhs), KP(rhs)); } else { if (lhs->tablet_id_ != rhs->tablet_id_) { - bret = lhs->tablet_id_ < rhs->tablet_id_; - } else { - bret = datum_array_compare_.operator()(&lhs->rowkey_datum_array_, &rhs->rowkey_datum_array_); + cmp_ret = lhs->tablet_id_ < rhs->tablet_id_ ? -1 : 1; + } else if (OB_FAIL(datum_array_compare_.compare(&lhs->external_row_.rowkey_datum_array_, + &rhs->external_row_.rowkey_datum_array_, + cmp_ret))) { + LOG_WARN("fail to compare rowkey", KR(ret), KP(lhs), K(rhs), K(cmp_ret)); + } else if (cmp_ret == 0) { + if (lhs->external_row_.seq_no_ == rhs->external_row_.seq_no_) { + cmp_ret = 0; + } else if (lhs->external_row_.seq_no_ > rhs->external_row_.seq_no_) { + if (dup_action_ == sql::ObLoadDupActionType::LOAD_REPLACE) { + cmp_ret = -1; + } else { + cmp_ret = 1; + } + } else { + if (dup_action_ == sql::ObLoadDupActionType::LOAD_REPLACE) { + cmp_ret = 1; + } else { + cmp_ret = -1; + } + } } } - if (OB_FAIL(ret)) { - result_code_ = ret; - } else if (OB_FAIL(datum_array_compare_.get_error_code())) { - result_code_ = datum_array_compare_.get_error_code(); - } - return bret; + return ret; } -} // namespace storage -} // namespace oceanbase +int ObDirectLoadExternalMultiPartitionRowCompare::compare( + const ObDirectLoadConstExternalMultiPartitionRow *lhs, + const ObDirectLoadConstExternalMultiPartitionRow *rhs, int &cmp_ret) +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("ObDirectLoadExternalMultiPartitionRowCompare not init", KR(ret), KP(this)); + } else if (OB_UNLIKELY(nullptr == lhs || nullptr == rhs || !lhs->is_valid() || + !rhs->is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid args", KR(ret), KP(lhs), KP(rhs)); + } else { + if (lhs->tablet_id_ != rhs->tablet_id_) { + cmp_ret = lhs->tablet_id_ < rhs->tablet_id_ ? -1 : 1; + } else if (OB_FAIL(datum_array_compare_.compare(&lhs->rowkey_datum_array_, + &rhs->rowkey_datum_array_, cmp_ret))) { + LOG_WARN("fail to compare rowkey", KR(ret), KP(lhs), K(rhs), K(cmp_ret)); + } else if (cmp_ret == 0) { + if (lhs->seq_no_ == rhs->seq_no_) { + cmp_ret = 0; + } else if (lhs->seq_no_ > rhs->seq_no_) { + if (dup_action_ == sql::ObLoadDupActionType::LOAD_REPLACE) { + cmp_ret = -1; + } else { + cmp_ret = 1; + } + } else { + if (dup_action_ == sql::ObLoadDupActionType::LOAD_REPLACE) { + cmp_ret = 1; + } else { + cmp_ret = -1; + } + } + } + } + return ret; +} + +} // namespace storage +} // namespace oceanbase diff --git a/src/storage/direct_load/ob_direct_load_compare.h b/src/storage/direct_load/ob_direct_load_compare.h index f960fefa4..819c3c9c5 100644 --- a/src/storage/direct_load/ob_direct_load_compare.h +++ b/src/storage/direct_load/ob_direct_load_compare.h @@ -6,6 +6,7 @@ #include "lib/ob_define.h" #include "lib/ob_errno.h" +#include "sql/resolver/cmd/ob_load_data_stmt.h" #include "storage/blocksstable/ob_datum_rowkey.h" namespace oceanbase @@ -15,7 +16,7 @@ namespace blocksstable class ObStorageDatumUtils; class ObDatumRowkey; class ObDatumRow; -} // namespace blocksstable +} // namespace blocksstable namespace storage { class ObDirectLoadDatumArray; @@ -29,8 +30,11 @@ class ObDirectLoadDatumRowkeyCompare public: ObDirectLoadDatumRowkeyCompare() : datum_utils_(nullptr), result_code_(common::OB_SUCCESS) {} int init(const blocksstable::ObStorageDatumUtils &datum_utils); + int compare(const blocksstable::ObDatumRowkey *lhs, const blocksstable::ObDatumRowkey *rhs, + int &cmp_ret); bool operator()(const blocksstable::ObDatumRowkey *lhs, const blocksstable::ObDatumRowkey *rhs); int get_error_code() const { return result_code_; } + public: const blocksstable::ObStorageDatumUtils *datum_utils_; int result_code_; @@ -40,10 +44,15 @@ class ObDirectLoadDatumRowCompare { public: ObDirectLoadDatumRowCompare() - : rowkey_size_(0), result_code_(common::OB_SUCCESS), is_inited_(false) {} + : rowkey_size_(0), result_code_(common::OB_SUCCESS), is_inited_(false) + { + } int init(const blocksstable::ObStorageDatumUtils &datum_utils, int64_t rowkey_size); + int compare(const blocksstable::ObDatumRow *lhs, const blocksstable::ObDatumRow *rhs, + int &cmp_ret); bool operator()(const blocksstable::ObDatumRow *lhs, const blocksstable::ObDatumRow *rhs); int get_error_code() const { return result_code_; } + public: blocksstable::ObDatumRowkey lhs_rowkey_; blocksstable::ObDatumRowkey rhs_rowkey_; @@ -56,12 +65,15 @@ public: class ObDirectLoadDatumArrayCompare { public: - ObDirectLoadDatumArrayCompare() - : result_code_(common::OB_SUCCESS), is_inited_(false) {} + ObDirectLoadDatumArrayCompare() : result_code_(common::OB_SUCCESS), is_inited_(false) {} int init(const blocksstable::ObStorageDatumUtils &datum_utils); bool operator()(const ObDirectLoadDatumArray *lhs, const ObDirectLoadDatumArray *rhs); + int compare(const ObDirectLoadDatumArray *lhs, const ObDirectLoadDatumArray *rhs, int &cmp_ret); bool operator()(const ObDirectLoadConstDatumArray *lhs, const ObDirectLoadConstDatumArray *rhs); + int compare(const ObDirectLoadConstDatumArray *lhs, const ObDirectLoadConstDatumArray *rhs, + int &cmp_ret); int get_error_code() const { return result_code_; } + public: blocksstable::ObDatumRowkey lhs_rowkey_; blocksstable::ObDatumRowkey rhs_rowkey_; @@ -74,11 +86,15 @@ class ObDirectLoadExternalRowCompare { public: ObDirectLoadExternalRowCompare() : result_code_(common::OB_SUCCESS), is_inited_(false) {} - int init(const blocksstable::ObStorageDatumUtils &datum_utils); + int init(const blocksstable::ObStorageDatumUtils &datum_utils, + sql::ObLoadDupActionType dup_action); + int compare(const ObDirectLoadExternalRow *lhs, const ObDirectLoadExternalRow *rhs, int &cmp_ret); bool operator()(const ObDirectLoadExternalRow *lhs, const ObDirectLoadExternalRow *rhs); int get_error_code() const { return result_code_; } + public: ObDirectLoadDatumArrayCompare datum_array_compare_; + sql::ObLoadDupActionType dup_action_; int result_code_; bool is_inited_; }; @@ -87,18 +103,27 @@ class ObDirectLoadExternalMultiPartitionRowCompare { public: ObDirectLoadExternalMultiPartitionRowCompare() - : result_code_(common::OB_SUCCESS), is_inited_(false) {} - int init(const blocksstable::ObStorageDatumUtils &datum_utils); + : result_code_(common::OB_SUCCESS), is_inited_(false) + { + } + int init(const blocksstable::ObStorageDatumUtils &datum_utils, + sql::ObLoadDupActionType dup_action); bool operator()(const ObDirectLoadExternalMultiPartitionRow *lhs, const ObDirectLoadExternalMultiPartitionRow *rhs); bool operator()(const ObDirectLoadConstExternalMultiPartitionRow *lhs, const ObDirectLoadConstExternalMultiPartitionRow *rhs); + int compare(const ObDirectLoadExternalMultiPartitionRow *lhs, + const ObDirectLoadExternalMultiPartitionRow *rhs, int &cmp_ret); + int compare(const ObDirectLoadConstExternalMultiPartitionRow *lhs, + const ObDirectLoadConstExternalMultiPartitionRow *rhs, int &cmp_ret); int get_error_code() const { return result_code_; } + public: ObDirectLoadDatumArrayCompare datum_array_compare_; + sql::ObLoadDupActionType dup_action_; int result_code_; bool is_inited_; }; -} // namespace storage -} // namespace oceanbase +} // namespace storage +} // namespace oceanbase diff --git a/src/storage/direct_load/ob_direct_load_external_multi_partition_row.cpp b/src/storage/direct_load/ob_direct_load_external_multi_partition_row.cpp index da0a14991..10d8b1240 100644 --- a/src/storage/direct_load/ob_direct_load_external_multi_partition_row.cpp +++ b/src/storage/direct_load/ob_direct_load_external_multi_partition_row.cpp @@ -95,6 +95,7 @@ void ObDirectLoadConstExternalMultiPartitionRow::reset() { tablet_id_.reset(); rowkey_datum_array_.reset(); + seq_no_.reset(); buf_size_ = 0; buf_ = nullptr; } @@ -107,6 +108,7 @@ ObDirectLoadConstExternalMultiPartitionRow &ObDirectLoadConstExternalMultiPartit tablet_id_ = other.tablet_id_; rowkey_datum_array_ = other.rowkey_datum_array_; buf_size_ = other.buf_size_; + seq_no_ = other.seq_no_; buf_ = other.buf_; } return *this; @@ -118,6 +120,7 @@ ObDirectLoadConstExternalMultiPartitionRow &ObDirectLoadConstExternalMultiPartit tablet_id_ = other.tablet_id_; rowkey_datum_array_ = other.external_row_.rowkey_datum_array_; buf_size_ = other.external_row_.buf_size_; + seq_no_ = other.external_row_.seq_no_; buf_ = other.external_row_.buf_; return *this; } @@ -147,6 +150,7 @@ int ObDirectLoadConstExternalMultiPartitionRow::deep_copy( LOG_WARN("fail to deep copy datum array", KR(ret)); } else { buf_size_ = src.buf_size_; + seq_no_ = src.seq_no_; buf_ = buf + pos; MEMCPY(buf + pos, src.buf_, buf_size_); pos += buf_size_; diff --git a/src/storage/direct_load/ob_direct_load_external_multi_partition_row.h b/src/storage/direct_load/ob_direct_load_external_multi_partition_row.h index faca9a198..803906299 100644 --- a/src/storage/direct_load/ob_direct_load_external_multi_partition_row.h +++ b/src/storage/direct_load/ob_direct_load_external_multi_partition_row.h @@ -5,6 +5,7 @@ #pragma once #include "storage/direct_load/ob_direct_load_external_row.h" +#include "share/table/ob_table_load_define.h" namespace oceanbase { @@ -47,13 +48,14 @@ public: int to_datums(blocksstable::ObStorageDatum *datums, int64_t column_count) const; bool is_valid() const { - return tablet_id_.is_valid() && rowkey_datum_array_.is_valid() && buf_size_ > 0 && - nullptr != buf_; + return tablet_id_.is_valid() && rowkey_datum_array_.is_valid() && seq_no_.is_valid() && + buf_size_ > 0 && nullptr != buf_; } - TO_STRING_KV(K_(tablet_id), K_(rowkey_datum_array), K_(buf_size), KP_(buf)); + TO_STRING_KV(K_(tablet_id), K_(rowkey_datum_array), K_(seq_no), K_(buf_size), KP_(buf)); public: common::ObTabletID tablet_id_; ObDirectLoadConstDatumArray rowkey_datum_array_; + table::ObTableLoadSequenceNo seq_no_; int64_t buf_size_; const char *buf_; }; diff --git a/src/storage/direct_load/ob_direct_load_external_multi_partition_table.cpp b/src/storage/direct_load/ob_direct_load_external_multi_partition_table.cpp index e39db0497..676e936e9 100644 --- a/src/storage/direct_load/ob_direct_load_external_multi_partition_table.cpp +++ b/src/storage/direct_load/ob_direct_load_external_multi_partition_table.cpp @@ -7,6 +7,7 @@ #include "storage/direct_load/ob_direct_load_external_multi_partition_table.h" #include "observer/table_load/ob_table_load_stat.h" #include "storage/direct_load/ob_direct_load_external_table.h" +#include "share/table/ob_table_load_define.h" namespace oceanbase { @@ -84,6 +85,7 @@ int ObDirectLoadExternalMultiPartitionTableBuilder::init( } int ObDirectLoadExternalMultiPartitionTableBuilder::append_row(const ObTabletID &tablet_id, + const table::ObTableLoadSequenceNo &seq_no, const ObDatumRow &datum_row) { int ret = OB_SUCCESS; @@ -101,7 +103,7 @@ int ObDirectLoadExternalMultiPartitionTableBuilder::append_row(const ObTabletID OB_TABLE_LOAD_STATISTICS_TIME_COST(DEBUG, external_append_row_time_us); row_.tablet_id_ = tablet_id; if (OB_FAIL(row_.external_row_.from_datums(datum_row.storage_datums_, datum_row.count_, - param_.table_data_desc_.rowkey_column_num_))) { + param_.table_data_desc_.rowkey_column_num_, seq_no))) { LOG_WARN("fail to from datums", KR(ret)); } else if (OB_FAIL(external_writer_.write_item(row_))) { LOG_WARN("fail to write item", KR(ret)); diff --git a/src/storage/direct_load/ob_direct_load_external_multi_partition_table.h b/src/storage/direct_load/ob_direct_load_external_multi_partition_table.h index ddf5dd483..53d13f19b 100644 --- a/src/storage/direct_load/ob_direct_load_external_multi_partition_table.h +++ b/src/storage/direct_load/ob_direct_load_external_multi_partition_table.h @@ -41,6 +41,7 @@ public: virtual ~ObDirectLoadExternalMultiPartitionTableBuilder(); int init(const ObDirectLoadExternalMultiPartitionTableBuildParam ¶m); int append_row(const common::ObTabletID &tablet_id, + const table::ObTableLoadSequenceNo &seq_no, const blocksstable::ObDatumRow &datum_row) override; int close() override; int64_t get_row_count() const override { return total_row_count_; } diff --git a/src/storage/direct_load/ob_direct_load_external_row.cpp b/src/storage/direct_load/ob_direct_load_external_row.cpp index de3b954dc..4268f9c2e 100644 --- a/src/storage/direct_load/ob_direct_load_external_row.cpp +++ b/src/storage/direct_load/ob_direct_load_external_row.cpp @@ -14,6 +14,7 @@ namespace storage { using namespace common; using namespace blocksstable; +using namespace table; ObDirectLoadExternalRow::ObDirectLoadExternalRow() : allocator_("TLD_ext_row"), buf_size_(0), buf_(nullptr) @@ -24,6 +25,7 @@ ObDirectLoadExternalRow::ObDirectLoadExternalRow() void ObDirectLoadExternalRow::reset() { rowkey_datum_array_.reset(); + seq_no_.reset(); buf_size_ = 0; buf_ = nullptr; allocator_.reset(); @@ -32,6 +34,7 @@ void ObDirectLoadExternalRow::reset() void ObDirectLoadExternalRow::reuse() { rowkey_datum_array_.reuse(); + seq_no_.reset(); buf_size_ = 0; buf_ = nullptr; allocator_.reuse(); @@ -62,6 +65,7 @@ int ObDirectLoadExternalRow::deep_copy(const ObDirectLoadExternalRow &src, char LOG_WARN("fail to deep copy datum array", KR(ret)); } else { buf_size_ = src.buf_size_; + seq_no_ = src.seq_no_; buf_ = buf + pos; MEMCPY(buf + pos, src.buf_, buf_size_); pos += buf_size_; @@ -71,7 +75,7 @@ int ObDirectLoadExternalRow::deep_copy(const ObDirectLoadExternalRow &src, char } int ObDirectLoadExternalRow::from_datums(ObStorageDatum *datums, int64_t column_count, - int64_t rowkey_column_count) + int64_t rowkey_column_count, const ObTableLoadSequenceNo &seq_no) { OB_TABLE_LOAD_STATISTICS_TIME_COST(DEBUG, transfer_external_row_time_us); int ret = OB_SUCCESS; @@ -98,6 +102,7 @@ int ObDirectLoadExternalRow::from_datums(ObStorageDatum *datums, int64_t column_ } else { buf_ = buf; buf_size_ = buf_size; + seq_no_ = seq_no; } } } @@ -153,7 +158,7 @@ OB_DEF_SERIALIZE_SIMPLE(ObDirectLoadExternalRow) { OB_TABLE_LOAD_STATISTICS_TIME_COST(DEBUG, external_row_serialize_time_us); int ret = OB_SUCCESS; - LST_DO_CODE(OB_UNIS_ENCODE, rowkey_datum_array_, buf_size_); + LST_DO_CODE(OB_UNIS_ENCODE, rowkey_datum_array_, seq_no_, buf_size_); if (OB_SUCC(ret) && OB_NOT_NULL(buf_)) { MEMCPY(buf + pos, buf_, buf_size_); pos += buf_size_; @@ -166,7 +171,7 @@ OB_DEF_DESERIALIZE_SIMPLE(ObDirectLoadExternalRow) OB_TABLE_LOAD_STATISTICS_TIME_COST(DEBUG, external_row_deserialize_time_us); int ret = OB_SUCCESS; reuse(); - LST_DO_CODE(OB_UNIS_DECODE, rowkey_datum_array_, buf_size_); + LST_DO_CODE(OB_UNIS_DECODE, rowkey_datum_array_, seq_no_, buf_size_); if (OB_SUCC(ret)) { buf_ = buf + pos; pos += buf_size_; @@ -178,7 +183,7 @@ OB_DEF_SERIALIZE_SIZE_SIMPLE(ObDirectLoadExternalRow) { OB_TABLE_LOAD_STATISTICS_TIME_COST(DEBUG, external_row_serialize_time_us); int64_t len = 0; - LST_DO_CODE(OB_UNIS_ADD_LEN, rowkey_datum_array_, buf_size_); + LST_DO_CODE(OB_UNIS_ADD_LEN, rowkey_datum_array_, seq_no_, buf_size_); len += buf_size_; return len; } diff --git a/src/storage/direct_load/ob_direct_load_external_row.h b/src/storage/direct_load/ob_direct_load_external_row.h index 18502a5e5..04b84cdd3 100644 --- a/src/storage/direct_load/ob_direct_load_external_row.h +++ b/src/storage/direct_load/ob_direct_load_external_row.h @@ -5,6 +5,7 @@ #pragma once #include "lib/allocator/page_arena.h" +#include "share/table/ob_table_load_define.h" #include "storage/blocksstable/ob_datum_rowkey.h" #include "storage/direct_load/ob_direct_load_datum.h" @@ -24,18 +25,20 @@ public: int deep_copy(const ObDirectLoadExternalRow &src, char *buf, const int64_t len, int64_t &pos); // not deep copy int from_datums(blocksstable::ObStorageDatum *datums, int64_t column_count, - int64_t rowkey_column_count); + int64_t rowkey_column_count, const table::ObTableLoadSequenceNo &seq_no); int to_datums(blocksstable::ObStorageDatum *datums, int64_t column_count) const; int get_rowkey(blocksstable::ObDatumRowkey &rowkey) const; bool is_valid() const { - return rowkey_datum_array_.is_valid() && buf_size_ > 0 && nullptr != buf_; + return rowkey_datum_array_.is_valid() && seq_no_.is_valid() && buf_size_ > 0 && nullptr != buf_; } OB_INLINE int64_t get_raw_size() const { return buf_size_; } - TO_STRING_KV(K_(rowkey_datum_array), K_(buf_size), KP_(buf)); + TO_STRING_KV(K_(rowkey_datum_array), K_(seq_no), K_(buf_size), KP_(buf)); + public: common::ObArenaAllocator allocator_; ObDirectLoadDatumArray rowkey_datum_array_; + table::ObTableLoadSequenceNo seq_no_; int64_t buf_size_; const char *buf_; }; diff --git a/src/storage/direct_load/ob_direct_load_external_table_builder.cpp b/src/storage/direct_load/ob_direct_load_external_table_builder.cpp index 0b09b2745..4f7394eb9 100644 --- a/src/storage/direct_load/ob_direct_load_external_table_builder.cpp +++ b/src/storage/direct_load/ob_direct_load_external_table_builder.cpp @@ -79,6 +79,7 @@ int ObDirectLoadExternalTableBuilder::init(const ObDirectLoadExternalTableBuildP } int ObDirectLoadExternalTableBuilder::append_row(const ObTabletID &tablet_id, + const table::ObTableLoadSequenceNo &seq_no, const ObDatumRow &datum_row) { int ret = OB_SUCCESS; @@ -96,7 +97,7 @@ int ObDirectLoadExternalTableBuilder::append_row(const ObTabletID &tablet_id, } else { OB_TABLE_LOAD_STATISTICS_TIME_COST(DEBUG, external_append_row_time_us); if (OB_FAIL(external_row_.from_datums(datum_row.storage_datums_, datum_row.count_, - build_param_.table_data_desc_.rowkey_column_num_))) { + build_param_.table_data_desc_.rowkey_column_num_, seq_no))) { LOG_WARN("fail to from datums", KR(ret)); } else if (OB_FAIL(external_writer_.write_item(external_row_))) { LOG_WARN("fail to write item", KR(ret)); diff --git a/src/storage/direct_load/ob_direct_load_external_table_builder.h b/src/storage/direct_load/ob_direct_load_external_table_builder.h index 0bc6fe435..d2893e604 100644 --- a/src/storage/direct_load/ob_direct_load_external_table_builder.h +++ b/src/storage/direct_load/ob_direct_load_external_table_builder.h @@ -42,6 +42,7 @@ public: virtual ~ObDirectLoadExternalTableBuilder(); int init(const ObDirectLoadExternalTableBuildParam &build_param); int append_row(const common::ObTabletID &tablet_id, + const table::ObTableLoadSequenceNo &seq_no, const blocksstable::ObDatumRow &datum_row) override; int close() override; int64_t get_row_count() const override { return row_count_; } diff --git a/src/storage/direct_load/ob_direct_load_fast_heap_table_builder.cpp b/src/storage/direct_load/ob_direct_load_fast_heap_table_builder.cpp index 45fca0caf..735b94c35 100644 --- a/src/storage/direct_load/ob_direct_load_fast_heap_table_builder.cpp +++ b/src/storage/direct_load/ob_direct_load_fast_heap_table_builder.cpp @@ -191,9 +191,11 @@ int ObDirectLoadFastHeapTableBuilder::switch_sstable_slice() } int ObDirectLoadFastHeapTableBuilder::append_row(const ObTabletID &tablet_id, + const table::ObTableLoadSequenceNo &seq_no, const ObDatumRow &datum_row) { UNUSED(tablet_id); + UNUSED(seq_no); int ret = OB_SUCCESS; if (IS_NOT_INIT) { ret = OB_NOT_INIT; diff --git a/src/storage/direct_load/ob_direct_load_fast_heap_table_builder.h b/src/storage/direct_load/ob_direct_load_fast_heap_table_builder.h index fbac5948c..4ffda4f0b 100644 --- a/src/storage/direct_load/ob_direct_load_fast_heap_table_builder.h +++ b/src/storage/direct_load/ob_direct_load_fast_heap_table_builder.h @@ -56,6 +56,7 @@ public: virtual ~ObDirectLoadFastHeapTableBuilder(); int init(const ObDirectLoadFastHeapTableBuildParam ¶m); int append_row(const common::ObTabletID &tablet_id, + const table::ObTableLoadSequenceNo &seq_no, const blocksstable::ObDatumRow &datum_row) override; int close() override; int64_t get_row_count() const override { return row_count_; } diff --git a/src/storage/direct_load/ob_direct_load_i_table.h b/src/storage/direct_load/ob_direct_load_i_table.h index fba3017ba..47e13c4bf 100644 --- a/src/storage/direct_load/ob_direct_load_i_table.h +++ b/src/storage/direct_load/ob_direct_load_i_table.h @@ -5,6 +5,7 @@ #pragma once #include "storage/blocksstable/ob_datum_row.h" +#include "share/table/ob_table_load_define.h" namespace oceanbase { @@ -28,6 +29,7 @@ public: ObIDirectLoadPartitionTableBuilder() = default; virtual ~ObIDirectLoadPartitionTableBuilder() = default; virtual int append_row(const common::ObTabletID &tablet_id, + const table::ObTableLoadSequenceNo &seq_no, const blocksstable::ObDatumRow &datum_row) = 0; virtual int close() = 0; virtual int64_t get_row_count() const = 0; diff --git a/src/storage/direct_load/ob_direct_load_mem_context.h b/src/storage/direct_load/ob_direct_load_mem_context.h index b04c2fea9..147f7da6a 100644 --- a/src/storage/direct_load/ob_direct_load_mem_context.h +++ b/src/storage/direct_load/ob_direct_load_mem_context.h @@ -78,6 +78,7 @@ public: int32_t column_count_; ObDirectLoadDMLRowHandler *dml_row_handler_; ObDirectLoadTmpFileManager *file_mgr_; + sql::ObLoadDupActionType dup_action_; ObDirectLoadEasyQueue mem_chunk_queue_; int64_t fly_mem_chunk_count_; diff --git a/src/storage/direct_load/ob_direct_load_mem_dump.cpp b/src/storage/direct_load/ob_direct_load_mem_dump.cpp index 4943a11ba..ead5a0c40 100644 --- a/src/storage/direct_load/ob_direct_load_mem_dump.cpp +++ b/src/storage/direct_load/ob_direct_load_mem_dump.cpp @@ -215,7 +215,7 @@ int ObDirectLoadMemDump::dump_tables() if (OB_ISNULL(extra_buf_ = static_cast(allocator_.alloc(extra_buf_size_)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to allocate memory", KR(ret)); - } else if (OB_FAIL(compare.init(*(mem_ctx_->datum_utils_)))) { + } else if (OB_FAIL(compare.init(*(mem_ctx_->datum_utils_), mem_ctx_->dup_action_))) { LOG_WARN("fail to init compare", KR(ret)); } for (int64_t i = 0; OB_SUCC(ret) && i < context_ptr_->mem_chunk_array_.count(); i++) { @@ -263,7 +263,7 @@ int ObDirectLoadMemDump::dump_tables() if (OB_SUCC(ret)) { if (OB_FAIL(external_row->to_datums(datum_row.storage_datums_, datum_row.count_))) { LOG_WARN("fail to transfer dataum row", KR(ret)); - } else if (OB_FAIL(table_builder->append_row(external_row->tablet_id_, datum_row))) { + } else if (OB_FAIL(table_builder->append_row(external_row->tablet_id_, external_row->seq_no_, datum_row))) { if (OB_LIKELY(OB_ERR_PRIMARY_KEY_DUPLICATE == ret)) { if (OB_FAIL(mem_ctx_->dml_row_handler_->handle_update_row(datum_row))) { LOG_WARN("fail to handle update row", KR(ret), K(datum_row)); diff --git a/src/storage/direct_load/ob_direct_load_mem_loader.cpp b/src/storage/direct_load/ob_direct_load_mem_loader.cpp index 0895d8a3e..b84ac89b2 100644 --- a/src/storage/direct_load/ob_direct_load_mem_loader.cpp +++ b/src/storage/direct_load/ob_direct_load_mem_loader.cpp @@ -142,7 +142,7 @@ int ObDirectLoadMemLoader::close_chunk(ChunkType *&chunk) { int ret = OB_SUCCESS; CompareType compare; - if (OB_FAIL(compare.init(*(mem_ctx_->datum_utils_)))) { + if (OB_FAIL(compare.init(*(mem_ctx_->datum_utils_), mem_ctx_->dup_action_))) { LOG_WARN("fail to init compare", KR(ret)); } else if (OB_FAIL(chunk->sort(compare))) { LOG_WARN("fail to sort chunk", KR(ret)); diff --git a/src/storage/direct_load/ob_direct_load_mem_sample.cpp b/src/storage/direct_load/ob_direct_load_mem_sample.cpp index ce45f378b..6c874d698 100644 --- a/src/storage/direct_load/ob_direct_load_mem_sample.cpp +++ b/src/storage/direct_load/ob_direct_load_mem_sample.cpp @@ -38,7 +38,7 @@ int ObDirectLoadMemSample::gen_ranges(ObIArray &chunks, ObIArraydatum_utils_)))) { + if (OB_FAIL(compare.init(*(mem_ctx_->datum_utils_), mem_ctx_->dup_action_))) { LOG_WARN("fail to init compare", KR(ret)); } else { std::sort(sample_rows.begin(), sample_rows.end(), compare); diff --git a/src/storage/direct_load/ob_direct_load_multiple_datum_row.cpp b/src/storage/direct_load/ob_direct_load_multiple_datum_row.cpp index 1d023170c..c98cee73f 100644 --- a/src/storage/direct_load/ob_direct_load_multiple_datum_row.cpp +++ b/src/storage/direct_load/ob_direct_load_multiple_datum_row.cpp @@ -14,6 +14,7 @@ namespace storage { using namespace common; using namespace blocksstable; +using namespace table; ObDirectLoadMultipleDatumRow::ObDirectLoadMultipleDatumRow() : allocator_("TLD_MultiRow"), buf_size_(0), buf_(nullptr) @@ -28,6 +29,7 @@ ObDirectLoadMultipleDatumRow::~ObDirectLoadMultipleDatumRow() void ObDirectLoadMultipleDatumRow::reset() { rowkey_.reset(); + seq_no_.reset(); buf_size_ = 0; buf_ = nullptr; allocator_.reset(); @@ -36,6 +38,7 @@ void ObDirectLoadMultipleDatumRow::reset() void ObDirectLoadMultipleDatumRow::reuse() { rowkey_.reuse(); + seq_no_.reset(); buf_size_ = 0; buf_ = nullptr; allocator_.reuse(); @@ -66,6 +69,7 @@ int ObDirectLoadMultipleDatumRow::deep_copy(const ObDirectLoadMultipleDatumRow & LOG_WARN("fail to deep copy rowkey", KR(ret)); } else { buf_size_ = src.buf_size_; + seq_no_ = src.seq_no_; buf_ = buf + pos; MEMCPY(buf + pos, src.buf_, buf_size_); pos += buf_size_; @@ -75,7 +79,7 @@ int ObDirectLoadMultipleDatumRow::deep_copy(const ObDirectLoadMultipleDatumRow & } int ObDirectLoadMultipleDatumRow::from_datums(const ObTabletID &tablet_id, ObStorageDatum *datums, - int64_t column_count, int64_t rowkey_column_count) + int64_t column_count, int64_t rowkey_column_count, const ObTableLoadSequenceNo &seq_no) { OB_TABLE_LOAD_STATISTICS_TIME_COST(DEBUG, transfer_external_row_time_us); int ret = OB_SUCCESS; @@ -104,6 +108,7 @@ int ObDirectLoadMultipleDatumRow::from_datums(const ObTabletID &tablet_id, ObSto } else { buf_ = buf; buf_size_ = buf_size; + seq_no_ = seq_no; } } } @@ -147,7 +152,7 @@ OB_DEF_SERIALIZE_SIMPLE(ObDirectLoadMultipleDatumRow) { OB_TABLE_LOAD_STATISTICS_TIME_COST(DEBUG, external_row_serialize_time_us); int ret = OB_SUCCESS; - LST_DO_CODE(OB_UNIS_ENCODE, rowkey_, buf_size_); + LST_DO_CODE(OB_UNIS_ENCODE, rowkey_, seq_no_, buf_size_); if (OB_SUCC(ret) && OB_NOT_NULL(buf_)) { MEMCPY(buf + pos, buf_, buf_size_); pos += buf_size_; @@ -160,7 +165,7 @@ OB_DEF_DESERIALIZE_SIMPLE(ObDirectLoadMultipleDatumRow) OB_TABLE_LOAD_STATISTICS_TIME_COST(DEBUG, external_row_deserialize_time_us); int ret = OB_SUCCESS; reuse(); - LST_DO_CODE(OB_UNIS_DECODE, rowkey_, buf_size_); + LST_DO_CODE(OB_UNIS_DECODE, rowkey_, seq_no_, buf_size_); if (OB_SUCC(ret)) { buf_ = buf + pos; pos += buf_size_; @@ -172,7 +177,7 @@ OB_DEF_SERIALIZE_SIZE_SIMPLE(ObDirectLoadMultipleDatumRow) { OB_TABLE_LOAD_STATISTICS_TIME_COST(DEBUG, external_row_serialize_time_us); int64_t len = 0; - LST_DO_CODE(OB_UNIS_ADD_LEN, rowkey_, buf_size_); + LST_DO_CODE(OB_UNIS_ADD_LEN, rowkey_, seq_no_, buf_size_); len += buf_size_; return len; } diff --git a/src/storage/direct_load/ob_direct_load_multiple_datum_row.h b/src/storage/direct_load/ob_direct_load_multiple_datum_row.h index e41203c10..6809680fe 100644 --- a/src/storage/direct_load/ob_direct_load_multiple_datum_row.h +++ b/src/storage/direct_load/ob_direct_load_multiple_datum_row.h @@ -4,6 +4,7 @@ #pragma once +#include "share/table/ob_table_load_define.h" #include "storage/direct_load/ob_direct_load_multiple_datum_rowkey.h" namespace oceanbase @@ -24,14 +25,20 @@ public: int64_t &pos); // not deep copy int from_datums(const common::ObTabletID &tablet_id, blocksstable::ObStorageDatum *datums, - int64_t column_count, int64_t rowkey_column_count); + int64_t column_count, int64_t rowkey_column_count, + const table::ObTableLoadSequenceNo &seq_no); int to_datums(blocksstable::ObStorageDatum *datums, int64_t column_count) const; - OB_INLINE bool is_valid() const { return rowkey_.is_valid() && buf_size_ > 0 && nullptr != buf_; } + OB_INLINE bool is_valid() const + { + return rowkey_.is_valid() && seq_no_.is_valid() && buf_size_ > 0 && nullptr != buf_; + } OB_INLINE int64_t get_raw_size() const { return buf_size_; } - TO_STRING_KV(K_(rowkey), K_(buf_size), KP_(buf)); + TO_STRING_KV(K_(rowkey), K_(seq_no), K_(buf_size), KP_(buf)); + public: common::ObArenaAllocator allocator_; ObDirectLoadMultipleDatumRowkey rowkey_; + table::ObTableLoadSequenceNo seq_no_; int64_t buf_size_; const char *buf_; }; diff --git a/src/storage/direct_load/ob_direct_load_multiple_external_row.cpp b/src/storage/direct_load/ob_direct_load_multiple_external_row.cpp index 703190b43..f49c9c97c 100644 --- a/src/storage/direct_load/ob_direct_load_multiple_external_row.cpp +++ b/src/storage/direct_load/ob_direct_load_multiple_external_row.cpp @@ -14,6 +14,7 @@ namespace storage { using namespace common; using namespace blocksstable; +using namespace table; ObDirectLoadMultipleExternalRow::ObDirectLoadMultipleExternalRow() : allocator_("TLD_ME_Row"), buf_size_(0), buf_(nullptr) @@ -23,6 +24,7 @@ ObDirectLoadMultipleExternalRow::ObDirectLoadMultipleExternalRow() void ObDirectLoadMultipleExternalRow::reset() { + seq_no_.reset(); buf_size_ = 0; buf_ = nullptr; allocator_.reset(); @@ -30,6 +32,7 @@ void ObDirectLoadMultipleExternalRow::reset() void ObDirectLoadMultipleExternalRow::reuse() { + seq_no_.reset(); buf_size_ = 0; buf_ = nullptr; allocator_.reuse(); @@ -54,6 +57,7 @@ int ObDirectLoadMultipleExternalRow::deep_copy(const ObDirectLoadMultipleExterna } else { reuse(); buf_size_ = src.buf_size_; + seq_no_ = src.seq_no_; buf_ = buf + pos; MEMCPY(buf + pos, src.buf_, buf_size_); pos += buf_size_; @@ -61,7 +65,7 @@ int ObDirectLoadMultipleExternalRow::deep_copy(const ObDirectLoadMultipleExterna return ret; } -int ObDirectLoadMultipleExternalRow::from_datums(ObStorageDatum *datums, int64_t column_count) +int ObDirectLoadMultipleExternalRow::from_datums(ObStorageDatum *datums, int64_t column_count, const ObTableLoadSequenceNo &seq_no) { OB_TABLE_LOAD_STATISTICS_TIME_COST(DEBUG, transfer_external_row_time_us); int ret = OB_SUCCESS; @@ -84,6 +88,7 @@ int ObDirectLoadMultipleExternalRow::from_datums(ObStorageDatum *datums, int64_t LOG_WARN("fail to serialize datum array", KR(ret)); } else { buf_ = buf; + seq_no_ = seq_no; buf_size_ = buf_size; } } @@ -121,7 +126,7 @@ OB_DEF_SERIALIZE_SIMPLE(ObDirectLoadMultipleExternalRow) { OB_TABLE_LOAD_STATISTICS_TIME_COST(DEBUG, external_row_serialize_time_us); int ret = OB_SUCCESS; - LST_DO_CODE(OB_UNIS_ENCODE, tablet_id_.id(), buf_size_); + LST_DO_CODE(OB_UNIS_ENCODE, tablet_id_.id(), seq_no_, buf_size_); if (OB_SUCC(ret) && OB_NOT_NULL(buf_)) { MEMCPY(buf + pos, buf_, buf_size_); pos += buf_size_; @@ -135,7 +140,7 @@ OB_DEF_DESERIALIZE_SIMPLE(ObDirectLoadMultipleExternalRow) int ret = OB_SUCCESS; reset(); uint64_t id = 0; - LST_DO_CODE(OB_UNIS_DECODE, id, buf_size_); + LST_DO_CODE(OB_UNIS_DECODE, id, seq_no_, buf_size_); if (OB_SUCC(ret)) { tablet_id_ = id; buf_ = buf + pos; @@ -148,7 +153,7 @@ OB_DEF_SERIALIZE_SIZE_SIMPLE(ObDirectLoadMultipleExternalRow) { OB_TABLE_LOAD_STATISTICS_TIME_COST(DEBUG, external_row_serialize_time_us); int64_t len = 0; - LST_DO_CODE(OB_UNIS_ADD_LEN, tablet_id_.id(), buf_size_); + LST_DO_CODE(OB_UNIS_ADD_LEN, tablet_id_.id(), seq_no_, buf_size_); len += buf_size_; return len; } diff --git a/src/storage/direct_load/ob_direct_load_multiple_external_row.h b/src/storage/direct_load/ob_direct_load_multiple_external_row.h index e4d4a5119..2a6813fc2 100644 --- a/src/storage/direct_load/ob_direct_load_multiple_external_row.h +++ b/src/storage/direct_load/ob_direct_load_multiple_external_row.h @@ -5,6 +5,7 @@ #pragma once #include "lib/allocator/page_arena.h" +#include "share/table/ob_table_load_define.h" #include "storage/blocksstable/ob_datum_rowkey.h" #include "storage/direct_load/ob_direct_load_datum.h" @@ -12,7 +13,6 @@ namespace oceanbase { namespace storage { - class ObDirectLoadMultipleExternalRow { OB_UNIS_VERSION(1); @@ -23,16 +23,19 @@ public: int64_t get_deep_copy_size() const; int deep_copy(const ObDirectLoadMultipleExternalRow &src, char *buf, const int64_t len, int64_t &pos); - int from_datums(blocksstable::ObStorageDatum *datums, int64_t column_count); + int from_datums(blocksstable::ObStorageDatum *datums, int64_t column_count, + const table::ObTableLoadSequenceNo &seq_no); int to_datums(blocksstable::ObStorageDatum *datums, int64_t column_count) const; OB_INLINE bool is_valid() const { - return tablet_id_.is_valid() && buf_size_ > 0 && nullptr != buf_; + return tablet_id_.is_valid() && seq_no_.is_valid() && buf_size_ > 0 && nullptr != buf_; } - TO_STRING_KV(K_(tablet_id), K_(buf_size), KP_(buf)); + TO_STRING_KV(K_(tablet_id), K_(seq_no), K_(buf_size), KP_(buf)); + public: common::ObArenaAllocator allocator_; common::ObTabletID tablet_id_; + table::ObTableLoadSequenceNo seq_no_; int64_t buf_size_; const char *buf_; }; diff --git a/src/storage/direct_load/ob_direct_load_multiple_heap_table_builder.cpp b/src/storage/direct_load/ob_direct_load_multiple_heap_table_builder.cpp index 4963489de..36fc85ae6 100644 --- a/src/storage/direct_load/ob_direct_load_multiple_heap_table_builder.cpp +++ b/src/storage/direct_load/ob_direct_load_multiple_heap_table_builder.cpp @@ -81,6 +81,7 @@ int ObDirectLoadMultipleHeapTableBuilder::init(const ObDirectLoadMultipleHeapTab } int ObDirectLoadMultipleHeapTableBuilder::append_row(const ObTabletID &tablet_id, + const table::ObTableLoadSequenceNo &seq_no, const ObDatumRow &datum_row) { int ret = OB_SUCCESS; @@ -96,7 +97,7 @@ int ObDirectLoadMultipleHeapTableBuilder::append_row(const ObTabletID &tablet_id LOG_WARN("invalid args", KR(ret), K(param_), K(datum_row)); } else { row_.tablet_id_ = tablet_id; - if (OB_FAIL(row_.from_datums(datum_row.storage_datums_, datum_row.count_))) { + if (OB_FAIL(row_.from_datums(datum_row.storage_datums_, datum_row.count_, seq_no))) { LOG_WARN("fail to from datum row", KR(ret)); } else if (OB_FAIL(append_row(row_))) { LOG_WARN("fail to append row", KR(ret), K(row_)); diff --git a/src/storage/direct_load/ob_direct_load_multiple_heap_table_builder.h b/src/storage/direct_load/ob_direct_load_multiple_heap_table_builder.h index 25473a2ab..69cb15b9a 100644 --- a/src/storage/direct_load/ob_direct_load_multiple_heap_table_builder.h +++ b/src/storage/direct_load/ob_direct_load_multiple_heap_table_builder.h @@ -46,6 +46,7 @@ public: virtual ~ObDirectLoadMultipleHeapTableBuilder(); int init(const ObDirectLoadMultipleHeapTableBuildParam ¶m); int append_row(const common::ObTabletID &tablet_id, + const table::ObTableLoadSequenceNo &seq_no, const blocksstable::ObDatumRow &datum_row) override; int append_row(const RowType &row); int close() override; diff --git a/src/storage/direct_load/ob_direct_load_multiple_heap_table_sorter.cpp b/src/storage/direct_load/ob_direct_load_multiple_heap_table_sorter.cpp index a5c2778b5..631f025bc 100644 --- a/src/storage/direct_load/ob_direct_load_multiple_heap_table_sorter.cpp +++ b/src/storage/direct_load/ob_direct_load_multiple_heap_table_sorter.cpp @@ -102,7 +102,7 @@ int ObDirectLoadMultipleHeapTableSorter::close_chunk(ObDirectLoadMultipleHeapTab for (int64_t j = 0; OB_SUCC(ret) && j < bag.count(); j ++) { if (OB_FAIL(bag.at(j)->to_datums(datum_row.storage_datums_, datum_row.count_))) { LOG_WARN("fail to transfer dataum row", KR(ret)); - } else if (OB_FAIL(table_builder.append_row(keys.at(i), datum_row))) { + } else if (OB_FAIL(table_builder.append_row(keys.at(i), bag.at(j)->seq_no_, datum_row))) { LOG_WARN("fail to append row", KR(ret)); } } diff --git a/src/storage/direct_load/ob_direct_load_multiple_sstable_builder.cpp b/src/storage/direct_load/ob_direct_load_multiple_sstable_builder.cpp index fe7c99ae1..77a2941ab 100644 --- a/src/storage/direct_load/ob_direct_load_multiple_sstable_builder.cpp +++ b/src/storage/direct_load/ob_direct_load_multiple_sstable_builder.cpp @@ -142,6 +142,7 @@ int ObDirectLoadMultipleSSTableBuilder::init(const ObDirectLoadMultipleSSTableBu } int ObDirectLoadMultipleSSTableBuilder::append_row(const ObTabletID &tablet_id, + const table::ObTableLoadSequenceNo &seq_no, const ObDatumRow &datum_row) { int ret = OB_SUCCESS; @@ -157,7 +158,7 @@ int ObDirectLoadMultipleSSTableBuilder::append_row(const ObTabletID &tablet_id, LOG_WARN("invalid args", KR(ret), K(param_), K(datum_row)); } else { if (OB_FAIL(row_.from_datums(tablet_id, datum_row.storage_datums_, datum_row.count_, - param_.table_data_desc_.rowkey_column_num_))) { + param_.table_data_desc_.rowkey_column_num_, seq_no))) { LOG_WARN("fail to from datum row", KR(ret)); } else if (OB_FAIL(append_row(row_))) { LOG_WARN("fail to append row", KR(ret), K(row_)); diff --git a/src/storage/direct_load/ob_direct_load_multiple_sstable_builder.h b/src/storage/direct_load/ob_direct_load_multiple_sstable_builder.h index 3d675a4f8..296dacebd 100644 --- a/src/storage/direct_load/ob_direct_load_multiple_sstable_builder.h +++ b/src/storage/direct_load/ob_direct_load_multiple_sstable_builder.h @@ -41,6 +41,7 @@ public: virtual ~ObDirectLoadMultipleSSTableBuilder(); int init(const ObDirectLoadMultipleSSTableBuildParam ¶m); int append_row(const common::ObTabletID &tablet_id, + const table::ObTableLoadSequenceNo &seq_no, const blocksstable::ObDatumRow &datum_row) override; int append_row(const RowType &row); int close() override; diff --git a/src/storage/direct_load/ob_direct_load_sstable_builder.cpp b/src/storage/direct_load/ob_direct_load_sstable_builder.cpp index e2fa8f9a5..d997c2e2d 100644 --- a/src/storage/direct_load/ob_direct_load_sstable_builder.cpp +++ b/src/storage/direct_load/ob_direct_load_sstable_builder.cpp @@ -77,7 +77,7 @@ int ObDirectLoadSSTableBuilder::init(const ObDirectLoadSSTableBuildParam ¶m) return ret; } -int ObDirectLoadSSTableBuilder::append_row(const ObTabletID &tablet_id, const ObDatumRow &datum_row) +int ObDirectLoadSSTableBuilder::append_row(const ObTabletID &tablet_id, const table::ObTableLoadSequenceNo &seq_no, const ObDatumRow &datum_row) { int ret = OB_SUCCESS; if (IS_NOT_INIT) { @@ -97,7 +97,7 @@ int ObDirectLoadSSTableBuilder::append_row(const ObTabletID &tablet_id, const Ob if (OB_FAIL(check_rowkey_order(key))) { LOG_WARN("fail to check rowkey order", KR(ret), K(datum_row)); } else if (OB_FAIL(external_row.from_datums(datum_row.storage_datums_, datum_row.count_, - param_.table_data_desc_.rowkey_column_num_))) { + param_.table_data_desc_.rowkey_column_num_, seq_no))) { LOG_WARN("fail to from datum row", KR(ret)); } else if (OB_FAIL(data_block_writer_.append_row(external_row))) { LOG_WARN("fail to append row to data block writer", KR(ret), K(external_row)); diff --git a/src/storage/direct_load/ob_direct_load_sstable_builder.h b/src/storage/direct_load/ob_direct_load_sstable_builder.h index ea6b16fad..ead23a76a 100644 --- a/src/storage/direct_load/ob_direct_load_sstable_builder.h +++ b/src/storage/direct_load/ob_direct_load_sstable_builder.h @@ -161,6 +161,7 @@ public: virtual ~ObDirectLoadSSTableBuilder() = default; int init(const ObDirectLoadSSTableBuildParam ¶m); int append_row(const common::ObTabletID &tablet_id, + const table::ObTableLoadSequenceNo &seq_no, const blocksstable::ObDatumRow &datum_row) override; int append_row(const ObDirectLoadExternalRow &external_row); int close() override; diff --git a/src/storage/direct_load/ob_direct_load_table_store.cpp b/src/storage/direct_load/ob_direct_load_table_store.cpp index 230cab14c..393553bac 100644 --- a/src/storage/direct_load/ob_direct_load_table_store.cpp +++ b/src/storage/direct_load/ob_direct_load_table_store.cpp @@ -148,6 +148,7 @@ int ObDirectLoadTableStoreBucket::init(const ObDirectLoadTableStoreParam ¶m, } int ObDirectLoadTableStoreBucket::append_row(const ObTabletID &tablet_id, + const ObTableLoadSequenceNo &seq_no, const ObDatumRow &datum_row) { OB_TABLE_LOAD_STATISTICS_TIME_COST(DEBUG, table_store_bucket_append_row); @@ -160,7 +161,7 @@ int ObDirectLoadTableStoreBucket::append_row(const ObTabletID &tablet_id, ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), K(tablet_id), K(datum_row), KPC(param_)); } else { - if (OB_FAIL(table_builder_->append_row(tablet_id, datum_row))) { + if (OB_FAIL(table_builder_->append_row(tablet_id, seq_no, datum_row))) { LOG_WARN("fail to append row", KR(ret)); } } @@ -291,7 +292,7 @@ int ObDirectLoadTableStore::get_bucket(const ObTabletID &tablet_id, return ret; } -int ObDirectLoadTableStore::append_row(const ObTabletID &tablet_id, const ObDatumRow &datum_row) +int ObDirectLoadTableStore::append_row(const ObTabletID &tablet_id, const ObTableLoadSequenceNo &seq_no, const ObDatumRow &datum_row) { OB_TABLE_LOAD_STATISTICS_TIME_COST(DEBUG, table_store_append_row); OB_TABLE_LOAD_STATISTICS_COUNTER(table_store_row_count); @@ -306,7 +307,7 @@ int ObDirectLoadTableStore::append_row(const ObTabletID &tablet_id, const ObDatu ObDirectLoadTableStoreBucket *bucket = nullptr; if (OB_FAIL(get_bucket(tablet_id, bucket))) { LOG_WARN("fail to get bucket", KR(ret), K(tablet_id)); - } else if (OB_FAIL(bucket->append_row(tablet_id, datum_row))) { + } else if (OB_FAIL(bucket->append_row(tablet_id, seq_no, datum_row))) { LOG_WARN("fail to append row to bucket", KR(ret), K(tablet_id), K(datum_row)); } } diff --git a/src/storage/direct_load/ob_direct_load_table_store.h b/src/storage/direct_load/ob_direct_load_table_store.h index 478dc065e..1e228d5ad 100644 --- a/src/storage/direct_load/ob_direct_load_table_store.h +++ b/src/storage/direct_load/ob_direct_load_table_store.h @@ -54,7 +54,7 @@ public: ObDirectLoadTableStoreBucket(); ~ObDirectLoadTableStoreBucket(); int init(const ObDirectLoadTableStoreParam ¶m, const common::ObTabletID &tablet_id); - int append_row(const common::ObTabletID &tablet_id, const blocksstable::ObDatumRow &datum_row); + int append_row(const common::ObTabletID &tablet_id, const table::ObTableLoadSequenceNo &seq_no, const blocksstable::ObDatumRow &datum_row); int close(); int get_tables(common::ObIArray &table_array, common::ObIAllocator &allocator); @@ -74,7 +74,7 @@ public: ObDirectLoadTableStore() : allocator_("TLD_TSBucket"), is_inited_(false) {} ~ObDirectLoadTableStore(); int init(const ObDirectLoadTableStoreParam ¶m); - int append_row(const common::ObTabletID &tablet_id, const blocksstable::ObDatumRow &datum_row); + int append_row(const common::ObTabletID &tablet_id, const table::ObTableLoadSequenceNo &seq_no, const blocksstable::ObDatumRow &datum_row); int close(); void clean_up(); int get_tables(common::ObIArray &table_array, diff --git a/unittest/storage/direct_load/test_direct_load_data_block_writer.cpp b/unittest/storage/direct_load/test_direct_load_data_block_writer.cpp index 2c1bd4645..3203d0b80 100644 --- a/unittest/storage/direct_load/test_direct_load_data_block_writer.cpp +++ b/unittest/storage/direct_load/test_direct_load_data_block_writer.cpp @@ -12,6 +12,7 @@ #include "../unittest/storage/blocksstable/ob_row_generate.h" #include "observer/table_load/ob_table_load_partition_location.h" #include "share/ob_simple_mem_limit_getter.h" +#include "share/table/ob_table_load_define.h" #include "storage/blocksstable/ob_tmp_file.h" #include "storage/direct_load/ob_direct_load_sstable_scanner.h" #include "storage/direct_load/ob_direct_load_sstable_compactor.h" @@ -24,6 +25,7 @@ using namespace blocksstable; using namespace storage; using namespace share::schema; using namespace share; +using namespace table; static ObSimpleMemLimitGetter getter; @@ -264,6 +266,7 @@ TEST_F(TestDataBlockWriter, test_write_and_scan) ret = file_mgr->init(table_schema_.get_tenant_id()); ObArray col_descs; ObStorageDatumUtils datum_utils; + ObTableLoadSequenceNo seq_no(0); ASSERT_EQ(OB_SUCCESS, table_schema_.get_column_ids(col_descs)); ret = datum_utils.init(col_descs, rowkey_column_count, lib::is_oracle_mode(), allocator_); param.tablet_id_ = table_schema_.get_tablet_id(); @@ -278,7 +281,7 @@ TEST_F(TestDataBlockWriter, test_write_and_scan) ASSERT_EQ(OB_SUCCESS, row->init(allocator_, column_num)); ASSERT_EQ(OB_SUCCESS, row_generate_.get_next_row(*row)); array.push_back(row); - ret = sstable_builder.append_row(table_schema_.get_tablet_id(), *row); + ret = sstable_builder.append_row(table_schema_.get_tablet_id(), seq_no, *row); ASSERT_EQ(OB_SUCCESS, ret); } ret = sstable_builder.close(); @@ -404,6 +407,7 @@ TEST_F(TestDataBlockWriter, test_write_and_scan_range) ObDirectLoadSSTableBuildParam param; ObArray col_descs; ObStorageDatumUtils datum_utils; + ObTableLoadSequenceNo seq_no(0); ASSERT_EQ(OB_SUCCESS, table_schema_.get_column_ids(col_descs)); ret = datum_utils.init(col_descs, rowkey_column_count, lib::is_oracle_mode(), allocator_); param.tablet_id_ = table_schema_.get_tablet_id(); @@ -419,7 +423,7 @@ TEST_F(TestDataBlockWriter, test_write_and_scan_range) ASSERT_EQ(OB_SUCCESS, row_generate_.get_next_row(*row)); array.push_back(row); if (i < 5000) { - ret = sstable_builder.append_row(table_schema_.get_tablet_id(), *row); + ret = sstable_builder.append_row(table_schema_.get_tablet_id(), seq_no, *row); ASSERT_EQ(OB_SUCCESS, ret); } } @@ -516,6 +520,7 @@ TEST_F(TestDataBlockWriter, test_scan_less_range) ObDirectLoadSSTableBuildParam param; ObArray col_descs; ObStorageDatumUtils datum_utils; + ObTableLoadSequenceNo seq_no(0); ASSERT_EQ(OB_SUCCESS, table_schema_.get_column_ids(col_descs)); ret = datum_utils.init(col_descs, rowkey_column_count, lib::is_oracle_mode(), allocator_); param.tablet_id_ = table_schema_.get_tablet_id(); @@ -531,7 +536,7 @@ TEST_F(TestDataBlockWriter, test_scan_less_range) ASSERT_EQ(OB_SUCCESS, row_generate_.get_next_row(*row)); array.push_back(row); if (i >= 5000) { - ret = sstable_builder.append_row(table_schema_.get_tablet_id(), *row); + ret = sstable_builder.append_row(table_schema_.get_tablet_id(), seq_no, *row); ASSERT_EQ(OB_SUCCESS, ret); } } @@ -632,6 +637,7 @@ TEST_F(TestDataBlockWriter, test_scan_range) ObDirectLoadSSTableBuildParam param; ObArray col_descs; ObStorageDatumUtils datum_utils; + ObTableLoadSequenceNo seq_no(0); ASSERT_EQ(OB_SUCCESS, table_schema_.get_column_ids(col_descs)); ret = datum_utils.init(col_descs, rowkey_column_count, lib::is_oracle_mode(), allocator_); param.tablet_id_ = table_schema_.get_tablet_id(); @@ -647,7 +653,7 @@ TEST_F(TestDataBlockWriter, test_scan_range) ASSERT_EQ(OB_SUCCESS, row_generate_.get_next_row(*row)); array.push_back(row); if (i < 5000) { - ret = sstable_builder.append_row(table_schema_.get_tablet_id(), *row); + ret = sstable_builder.append_row(table_schema_.get_tablet_id(), seq_no, *row); ASSERT_EQ(OB_SUCCESS, ret); } } @@ -768,6 +774,7 @@ TEST_F(TestDataBlockWriter, test_write_and_scan_large_low) ObDirectLoadSSTableBuildParam param; ObArray col_descs; ObStorageDatumUtils datum_utils; + ObTableLoadSequenceNo seq_no(0); ASSERT_EQ(OB_SUCCESS, table_schema_.get_column_ids(col_descs)); ret = datum_utils.init(col_descs, rowkey_column_count, lib::is_oracle_mode(), allocator_); param.tablet_id_ = table_schema_.get_tablet_id(); @@ -789,7 +796,7 @@ TEST_F(TestDataBlockWriter, test_write_and_scan_large_low) row->storage_datums_[24].set_string(ObString(value1_size, ptr1)); } array.push_back(row); - ret = sstable_builder.append_row(table_schema_.get_tablet_id(), *row); + ret = sstable_builder.append_row(table_schema_.get_tablet_id(), seq_no, *row); ASSERT_EQ(OB_SUCCESS, ret); } ret = sstable_builder.close(); @@ -912,6 +919,7 @@ TEST_F(TestDataBlockWriter, test_write_and_scan_range_large_low) ObDirectLoadSSTableBuildParam param; ObArray col_descs; ObStorageDatumUtils datum_utils; + ObTableLoadSequenceNo seq_no(0); ASSERT_EQ(OB_SUCCESS, table_schema_.get_column_ids(col_descs)); ret = datum_utils.init(col_descs, rowkey_column_count, lib::is_oracle_mode(), allocator_); param.tablet_id_ = table_schema_.get_tablet_id(); @@ -934,7 +942,7 @@ TEST_F(TestDataBlockWriter, test_write_and_scan_range_large_low) } array.push_back(row); if (i < 5000) { - ret = sstable_builder.append_row(table_schema_.get_tablet_id(), *row); + ret = sstable_builder.append_row(table_schema_.get_tablet_id(), seq_no, *row); ASSERT_EQ(OB_SUCCESS, ret); } } @@ -1010,6 +1018,7 @@ TEST_F(TestDataBlockWriter, test_scan_range_large_low) ObDirectLoadSSTableBuildParam param; ObArray col_descs; ObStorageDatumUtils datum_utils; + ObTableLoadSequenceNo seq_no(0); ASSERT_EQ(OB_SUCCESS, table_schema_.get_column_ids(col_descs)); ret = datum_utils.init(col_descs, rowkey_column_count, lib::is_oracle_mode(), allocator_); param.tablet_id_ = table_schema_.get_tablet_id(); @@ -1032,7 +1041,7 @@ TEST_F(TestDataBlockWriter, test_scan_range_large_low) } array.push_back(row); if (i < 5000) { - ret = sstable_builder.append_row(table_schema_.get_tablet_id(), *row); + ret = sstable_builder.append_row(table_schema_.get_tablet_id(), seq_no, *row); ASSERT_EQ(OB_SUCCESS, ret); } } @@ -1112,6 +1121,7 @@ TEST_F(TestDataBlockWriter, test_write_and_compact) ObDirectLoadSSTableBuildParam param; ObArray col_descs; ObStorageDatumUtils datum_utils; + ObTableLoadSequenceNo seq_no(0); ASSERT_EQ(OB_SUCCESS, table_schema_.get_column_ids(col_descs)); ret = datum_utils.init(col_descs, rowkey_column_count, lib::is_oracle_mode(), allocator_); param.tablet_id_ = table_schema_.get_tablet_id(); @@ -1130,7 +1140,7 @@ TEST_F(TestDataBlockWriter, test_write_and_compact) ASSERT_EQ(OB_SUCCESS, row->init(allocator_, column_num)); ASSERT_EQ(OB_SUCCESS, row_generate_.get_next_row(*row)); array1.push_back(row); - ret = sstable_builder1.append_row(table_schema_.get_tablet_id(), *row); + ret = sstable_builder1.append_row(table_schema_.get_tablet_id(), seq_no, *row); ASSERT_EQ(OB_SUCCESS, ret); } for (int64_t i = 0; i < test_row_num; ++i) { @@ -1138,7 +1148,7 @@ TEST_F(TestDataBlockWriter, test_write_and_compact) ASSERT_EQ(OB_SUCCESS, row->init(allocator_, column_num)); ASSERT_EQ(OB_SUCCESS, row_generate_.get_next_row(*row)); array2.push_back(row); - ret = sstable_builder2.append_row(table_schema_.get_tablet_id(), *row); + ret = sstable_builder2.append_row(table_schema_.get_tablet_id(), seq_no, *row); ASSERT_EQ(OB_SUCCESS, ret); } @@ -1310,6 +1320,7 @@ TEST_F(TestDataBlockWriter, test_write_and_compact_large_row) ObDirectLoadSSTableBuildParam param; ObArray col_descs; ObStorageDatumUtils datum_utils; + ObTableLoadSequenceNo seq_no(0); ASSERT_EQ(OB_SUCCESS, table_schema_.get_column_ids(col_descs)); ret = datum_utils.init(col_descs, rowkey_column_count, lib::is_oracle_mode(), allocator_); param.tablet_id_ = table_schema_.get_tablet_id(); @@ -1335,7 +1346,7 @@ TEST_F(TestDataBlockWriter, test_write_and_compact_large_row) row->storage_datums_[24].set_string(ObString(value1_size, ptr1)); } array1.push_back(row); - ret = sstable_builder1.append_row(table_schema_.get_tablet_id(), *row); + ret = sstable_builder1.append_row(table_schema_.get_tablet_id(), seq_no, *row); ASSERT_EQ(OB_SUCCESS, ret); } for (int64_t i = 0; i < test_row_num; ++i) { @@ -1350,7 +1361,7 @@ TEST_F(TestDataBlockWriter, test_write_and_compact_large_row) row->storage_datums_[24].set_string(ObString(value1_size, ptr1)); } array2.push_back(row); - ret = sstable_builder2.append_row(table_schema_.get_tablet_id(), *row); + ret = sstable_builder2.append_row(table_schema_.get_tablet_id(), seq_no, *row); ASSERT_EQ(OB_SUCCESS, ret); }