diff --git a/src/storage/direct_load/ob_direct_load_external_multi_partition_table.cpp b/src/storage/direct_load/ob_direct_load_external_multi_partition_table.cpp index 9201fe7e2e..06ca8a0cf1 100644 --- a/src/storage/direct_load/ob_direct_load_external_multi_partition_table.cpp +++ b/src/storage/direct_load/ob_direct_load_external_multi_partition_table.cpp @@ -41,7 +41,13 @@ bool ObDirectLoadExternalMultiPartitionTableBuildParam::is_valid() const */ ObDirectLoadExternalMultiPartitionTableBuilder::ObDirectLoadExternalMultiPartitionTableBuilder() - : allocator_("TLD_EMPTBuilder"), row_count_(0), is_closed_(false), is_inited_(false) + : allocator_("TLD_EMPTBuilder"), + fragment_array_(), + total_row_count_(0), + fragment_row_count_(0), + max_data_block_size_(0), + is_closed_(false), + is_inited_(false) { } @@ -62,11 +68,8 @@ int ObDirectLoadExternalMultiPartitionTableBuilder::init( } else { param_ = param; allocator_.set_tenant_id(MTL_ID()); - int64_t dir_id = -1; - if (OB_FAIL(param.file_mgr_->alloc_dir(dir_id))) { - LOG_WARN("fail to alloc dir", KR(ret)); - } else if (OB_FAIL(param.file_mgr_->alloc_file(dir_id, file_handle_))) { - LOG_WARN("fail to alloc fragment", KR(ret)); + if (OB_FAIL(alloc_tmp_file())) { + LOG_WARN("fail to alloc tmp file", KR(ret)); } else if (OB_FAIL(external_writer_.init(param_.table_data_desc_.external_data_block_size_, param_.table_data_desc_.compressor_type_, param_.extra_buf_, param_.extra_buf_size_))) { @@ -103,7 +106,62 @@ int ObDirectLoadExternalMultiPartitionTableBuilder::append_row(const ObTabletID } else if (OB_FAIL(external_writer_.write_item(row_))) { LOG_WARN("fail to write item", KR(ret)); } else { - ++row_count_; + ++fragment_row_count_; + ++total_row_count_; + } + if (OB_SUCC(ret) && (external_writer_.get_file_size() >= MAX_TMP_FILE_SIZE)) { + if (OB_FAIL(switch_fragment())) { + LOG_WARN("fail to switch fragment", KR(ret)); + } + } + } + return ret; +} + +int ObDirectLoadExternalMultiPartitionTableBuilder::alloc_tmp_file() +{ + int ret = OB_SUCCESS; + int64_t dir_id = -1; + if (OB_FAIL(param_.file_mgr_->alloc_dir(dir_id))) { + LOG_WARN("fail to alloc dir", KR(ret)); + } else if (OB_FAIL(param_.file_mgr_->alloc_file(dir_id, file_handle_))) { + LOG_WARN("fail to alloc file", KR(ret)); + } + return ret; +} + +int ObDirectLoadExternalMultiPartitionTableBuilder::generate_fragment() +{ + int ret = OB_SUCCESS; + ObDirectLoadExternalFragment fragment; + fragment.file_size_ = external_writer_.get_file_size(); + fragment.row_count_ = fragment_row_count_; + fragment.max_data_block_size_ = external_writer_.get_max_block_size(); + if (OB_FAIL(fragment.file_handle_.assign(file_handle_))) { + LOG_WARN("fail to assign file handle", KR(ret)); + } else if (OB_FAIL(fragment_array_.push_back(fragment))) { + LOG_WARN("fail to push back fragment", KR(ret)); + } else if (max_data_block_size_ < fragment.max_data_block_size_) { + max_data_block_size_ = fragment.max_data_block_size_; + } + return ret; +} + +int ObDirectLoadExternalMultiPartitionTableBuilder::switch_fragment() +{ + int ret = OB_SUCCESS; + if (OB_FAIL(external_writer_.close())) { + LOG_WARN("fail to close external writer", KR(ret)); + } else if (OB_FAIL(generate_fragment())) { + LOG_WARN("fail to generate fragment", KR(ret)); + } else if (OB_FAIL(alloc_tmp_file())) { + LOG_WARN("fail to alloc tmp file", KR(ret)); + } else { + external_writer_.reuse(); + if (OB_FAIL(external_writer_.open(file_handle_))) { + LOG_WARN("fail to open file", KR(ret)); + } else { + fragment_row_count_ = 0; } } return ret; @@ -118,13 +176,18 @@ int ObDirectLoadExternalMultiPartitionTableBuilder::close() } else if (OB_UNLIKELY(is_closed_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("direct load external multi partition table is closed", KR(ret)); - } else { - if (OB_FAIL(external_writer_.close())) { - LOG_WARN("fail to close external writer", KR(ret)); + } else if (OB_FAIL(external_writer_.close())) { + LOG_WARN("fail to close external writer", KR(ret)); + } else if ((fragment_row_count_ > 0) || fragment_array_.empty()) { + if (OB_FAIL(generate_fragment())) { + LOG_WARN("failed to generate fragment", KR(ret)); } else { - is_closed_ = true; + fragment_row_count_ = 0; } } + if (OB_SUCC(ret)) { + is_closed_ = true; + } return ret; } @@ -142,18 +205,11 @@ int ObDirectLoadExternalMultiPartitionTableBuilder::get_tables( ObDirectLoadExternalTableCreateParam create_param; create_param.tablet_id_ = 0; //因为包含了所有的tablet_id,设置为一个无效值 create_param.data_block_size_ = param_.table_data_desc_.external_data_block_size_; - create_param.row_count_ = row_count_; - create_param.max_data_block_size_ = external_writer_.get_max_block_size(); - ObDirectLoadExternalFragment fragment; - fragment.file_size_ = external_writer_.get_file_size(); - fragment.row_count_ = row_count_; - fragment.max_data_block_size_ = external_writer_.get_max_block_size(); - if (OB_FAIL(fragment.file_handle_.assign(file_handle_))) { - LOG_WARN("fail to assign file handle", KR(ret)); - } else if (OB_FAIL(create_param.fragments_.push_back(fragment))) { - LOG_WARN("fail to push back", KR(ret)); - } - if (OB_SUCC(ret)) { + create_param.row_count_ = total_row_count_; + create_param.max_data_block_size_ = max_data_block_size_; + if (OB_FAIL(create_param.fragments_.assign(fragment_array_))) { + LOG_WARN("fail to assign fragment array", KR(ret), K(fragment_array_)); + } else { ObDirectLoadExternalTable *external_table = nullptr; if (OB_ISNULL(external_table = OB_NEWx(ObDirectLoadExternalTable, (&allocator)))) { ret = OB_ALLOCATE_MEMORY_FAILED; diff --git a/src/storage/direct_load/ob_direct_load_external_multi_partition_table.h b/src/storage/direct_load/ob_direct_load_external_multi_partition_table.h index 0bed5e2d27..ddf5dd4834 100644 --- a/src/storage/direct_load/ob_direct_load_external_multi_partition_table.h +++ b/src/storage/direct_load/ob_direct_load_external_multi_partition_table.h @@ -9,12 +9,12 @@ #include "storage/direct_load/ob_direct_load_external_multi_partition_row.h" #include "storage/direct_load/ob_direct_load_i_table.h" #include "storage/direct_load/ob_direct_load_table_data_desc.h" +#include "storage/direct_load/ob_direct_load_external_fragment.h" namespace oceanbase { namespace storage { - struct ObDirectLoadExternalMultiPartitionTableBuildParam { public: @@ -35,6 +35,7 @@ class ObDirectLoadExternalMultiPartitionTableBuilder : public ObIDirectLoadParti { typedef ObDirectLoadExternalMultiPartitionRow RowType; typedef ObDirectLoadExternalBlockWriter ExternalWriter; + static const int64_t MAX_TMP_FILE_SIZE = 1LL * 1024 * 1024 * 1024; // 1GiB public: ObDirectLoadExternalMultiPartitionTableBuilder(); virtual ~ObDirectLoadExternalMultiPartitionTableBuilder(); @@ -42,16 +43,23 @@ public: int append_row(const common::ObTabletID &tablet_id, const blocksstable::ObDatumRow &datum_row) override; int close() override; - int64_t get_row_count() const override { return row_count_; } + int64_t get_row_count() const override { return total_row_count_; } int get_tables(common::ObIArray &table_array, common::ObIAllocator &allocator) override; +private: + int alloc_tmp_file(); + int generate_fragment(); + int switch_fragment(); private: ObDirectLoadExternalMultiPartitionTableBuildParam param_; common::ObArenaAllocator allocator_; ObDirectLoadTmpFileHandle file_handle_; ExternalWriter external_writer_; RowType row_; - int64_t row_count_; + ObDirectLoadExternalFragmentArray fragment_array_; + int64_t total_row_count_; + int64_t fragment_row_count_; + int64_t max_data_block_size_; bool is_closed_; bool is_inited_; DISALLOW_COPY_AND_ASSIGN(ObDirectLoadExternalMultiPartitionTableBuilder); diff --git a/src/storage/direct_load/ob_direct_load_mem_loader.cpp b/src/storage/direct_load/ob_direct_load_mem_loader.cpp index 86a3278ea7..0895d8a3ea 100644 --- a/src/storage/direct_load/ob_direct_load_mem_loader.cpp +++ b/src/storage/direct_load/ob_direct_load_mem_loader.cpp @@ -40,9 +40,10 @@ int ObDirectLoadMemLoader::add_table(ObIDirectLoadPartitionTable *table) if (OB_ISNULL(external_table = dynamic_cast(table))) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected table", KR(ret), KPC(table)); - } else if (OB_UNLIKELY(external_table->get_fragments().count() != 1)) { + } else if (OB_UNLIKELY(external_table->get_fragments().count() <= 0)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("files handle should only have one handle", KR(ret)); + LOG_WARN("files handle should have at least one handle", + KR(ret), K(external_table->get_fragments().count())); } else if (OB_FAIL(fragments_.push_back(external_table->get_fragments()))) { LOG_WARN("fail to push back", KR(ret)); } @@ -58,7 +59,7 @@ int ObDirectLoadMemLoader::work() ChunkType *chunk = nullptr; RowType row; for (int64_t i = 0; OB_SUCC(ret) && i < fragments_.count(); i++) { - const ObDirectLoadExternalFragment &fragment = fragments_.at(i); + ObDirectLoadExternalFragment &fragment = fragments_.at(i); ExternalReader external_reader; if (OB_FAIL(external_reader.init(mem_ctx_->table_data_desc_.external_data_block_size_, fragment.max_data_block_size_, @@ -117,6 +118,9 @@ int ObDirectLoadMemLoader::work() } } } + if (OB_SUCC(ret)) { + fragment.reset(); + } } if (OB_SUCC(ret)) {