Use small temp files in direct load

This commit is contained in:
leftgeek
2023-07-11 06:42:35 +00:00
committed by ob-robot
parent 4aee898f21
commit 56b8609165
3 changed files with 97 additions and 29 deletions

View File

@ -41,7 +41,13 @@ bool ObDirectLoadExternalMultiPartitionTableBuildParam::is_valid() const
*/ */
ObDirectLoadExternalMultiPartitionTableBuilder::ObDirectLoadExternalMultiPartitionTableBuilder() ObDirectLoadExternalMultiPartitionTableBuilder::ObDirectLoadExternalMultiPartitionTableBuilder()
: allocator_("TLD_EMPTBuilder"), row_count_(0), is_closed_(false), is_inited_(false) : allocator_("TLD_EMPTBuilder"),
fragment_array_(),
total_row_count_(0),
fragment_row_count_(0),
max_data_block_size_(0),
is_closed_(false),
is_inited_(false)
{ {
} }
@ -62,11 +68,8 @@ int ObDirectLoadExternalMultiPartitionTableBuilder::init(
} else { } else {
param_ = param; param_ = param;
allocator_.set_tenant_id(MTL_ID()); allocator_.set_tenant_id(MTL_ID());
int64_t dir_id = -1; if (OB_FAIL(alloc_tmp_file())) {
if (OB_FAIL(param.file_mgr_->alloc_dir(dir_id))) { LOG_WARN("fail to alloc tmp file", KR(ret));
LOG_WARN("fail to alloc dir", KR(ret));
} else if (OB_FAIL(param.file_mgr_->alloc_file(dir_id, file_handle_))) {
LOG_WARN("fail to alloc fragment", KR(ret));
} else if (OB_FAIL(external_writer_.init(param_.table_data_desc_.external_data_block_size_, } else if (OB_FAIL(external_writer_.init(param_.table_data_desc_.external_data_block_size_,
param_.table_data_desc_.compressor_type_, param_.table_data_desc_.compressor_type_,
param_.extra_buf_, param_.extra_buf_size_))) { param_.extra_buf_, param_.extra_buf_size_))) {
@ -103,7 +106,62 @@ int ObDirectLoadExternalMultiPartitionTableBuilder::append_row(const ObTabletID
} else if (OB_FAIL(external_writer_.write_item(row_))) { } else if (OB_FAIL(external_writer_.write_item(row_))) {
LOG_WARN("fail to write item", KR(ret)); LOG_WARN("fail to write item", KR(ret));
} else { } else {
++row_count_; ++fragment_row_count_;
++total_row_count_;
}
if (OB_SUCC(ret) && (external_writer_.get_file_size() >= MAX_TMP_FILE_SIZE)) {
if (OB_FAIL(switch_fragment())) {
LOG_WARN("fail to switch fragment", KR(ret));
}
}
}
return ret;
}
int ObDirectLoadExternalMultiPartitionTableBuilder::alloc_tmp_file()
{
int ret = OB_SUCCESS;
int64_t dir_id = -1;
if (OB_FAIL(param_.file_mgr_->alloc_dir(dir_id))) {
LOG_WARN("fail to alloc dir", KR(ret));
} else if (OB_FAIL(param_.file_mgr_->alloc_file(dir_id, file_handle_))) {
LOG_WARN("fail to alloc file", KR(ret));
}
return ret;
}
int ObDirectLoadExternalMultiPartitionTableBuilder::generate_fragment()
{
int ret = OB_SUCCESS;
ObDirectLoadExternalFragment fragment;
fragment.file_size_ = external_writer_.get_file_size();
fragment.row_count_ = fragment_row_count_;
fragment.max_data_block_size_ = external_writer_.get_max_block_size();
if (OB_FAIL(fragment.file_handle_.assign(file_handle_))) {
LOG_WARN("fail to assign file handle", KR(ret));
} else if (OB_FAIL(fragment_array_.push_back(fragment))) {
LOG_WARN("fail to push back fragment", KR(ret));
} else if (max_data_block_size_ < fragment.max_data_block_size_) {
max_data_block_size_ = fragment.max_data_block_size_;
}
return ret;
}
int ObDirectLoadExternalMultiPartitionTableBuilder::switch_fragment()
{
int ret = OB_SUCCESS;
if (OB_FAIL(external_writer_.close())) {
LOG_WARN("fail to close external writer", KR(ret));
} else if (OB_FAIL(generate_fragment())) {
LOG_WARN("fail to generate fragment", KR(ret));
} else if (OB_FAIL(alloc_tmp_file())) {
LOG_WARN("fail to alloc tmp file", KR(ret));
} else {
external_writer_.reuse();
if (OB_FAIL(external_writer_.open(file_handle_))) {
LOG_WARN("fail to open file", KR(ret));
} else {
fragment_row_count_ = 0;
} }
} }
return ret; return ret;
@ -118,13 +176,18 @@ int ObDirectLoadExternalMultiPartitionTableBuilder::close()
} else if (OB_UNLIKELY(is_closed_)) { } else if (OB_UNLIKELY(is_closed_)) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
LOG_WARN("direct load external multi partition table is closed", KR(ret)); LOG_WARN("direct load external multi partition table is closed", KR(ret));
} else { } else if (OB_FAIL(external_writer_.close())) {
if (OB_FAIL(external_writer_.close())) {
LOG_WARN("fail to close external writer", KR(ret)); LOG_WARN("fail to close external writer", KR(ret));
} else if ((fragment_row_count_ > 0) || fragment_array_.empty()) {
if (OB_FAIL(generate_fragment())) {
LOG_WARN("failed to generate fragment", KR(ret));
} else { } else {
is_closed_ = true; fragment_row_count_ = 0;
} }
} }
if (OB_SUCC(ret)) {
is_closed_ = true;
}
return ret; return ret;
} }
@ -142,18 +205,11 @@ int ObDirectLoadExternalMultiPartitionTableBuilder::get_tables(
ObDirectLoadExternalTableCreateParam create_param; ObDirectLoadExternalTableCreateParam create_param;
create_param.tablet_id_ = 0; //因为包含了所有的tablet_id,设置为一个无效值 create_param.tablet_id_ = 0; //因为包含了所有的tablet_id,设置为一个无效值
create_param.data_block_size_ = param_.table_data_desc_.external_data_block_size_; create_param.data_block_size_ = param_.table_data_desc_.external_data_block_size_;
create_param.row_count_ = row_count_; create_param.row_count_ = total_row_count_;
create_param.max_data_block_size_ = external_writer_.get_max_block_size(); create_param.max_data_block_size_ = max_data_block_size_;
ObDirectLoadExternalFragment fragment; if (OB_FAIL(create_param.fragments_.assign(fragment_array_))) {
fragment.file_size_ = external_writer_.get_file_size(); LOG_WARN("fail to assign fragment array", KR(ret), K(fragment_array_));
fragment.row_count_ = row_count_; } else {
fragment.max_data_block_size_ = external_writer_.get_max_block_size();
if (OB_FAIL(fragment.file_handle_.assign(file_handle_))) {
LOG_WARN("fail to assign file handle", KR(ret));
} else if (OB_FAIL(create_param.fragments_.push_back(fragment))) {
LOG_WARN("fail to push back", KR(ret));
}
if (OB_SUCC(ret)) {
ObDirectLoadExternalTable *external_table = nullptr; ObDirectLoadExternalTable *external_table = nullptr;
if (OB_ISNULL(external_table = OB_NEWx(ObDirectLoadExternalTable, (&allocator)))) { if (OB_ISNULL(external_table = OB_NEWx(ObDirectLoadExternalTable, (&allocator)))) {
ret = OB_ALLOCATE_MEMORY_FAILED; ret = OB_ALLOCATE_MEMORY_FAILED;

View File

@ -9,12 +9,12 @@
#include "storage/direct_load/ob_direct_load_external_multi_partition_row.h" #include "storage/direct_load/ob_direct_load_external_multi_partition_row.h"
#include "storage/direct_load/ob_direct_load_i_table.h" #include "storage/direct_load/ob_direct_load_i_table.h"
#include "storage/direct_load/ob_direct_load_table_data_desc.h" #include "storage/direct_load/ob_direct_load_table_data_desc.h"
#include "storage/direct_load/ob_direct_load_external_fragment.h"
namespace oceanbase namespace oceanbase
{ {
namespace storage namespace storage
{ {
struct ObDirectLoadExternalMultiPartitionTableBuildParam struct ObDirectLoadExternalMultiPartitionTableBuildParam
{ {
public: public:
@ -35,6 +35,7 @@ class ObDirectLoadExternalMultiPartitionTableBuilder : public ObIDirectLoadParti
{ {
typedef ObDirectLoadExternalMultiPartitionRow RowType; typedef ObDirectLoadExternalMultiPartitionRow RowType;
typedef ObDirectLoadExternalBlockWriter<RowType> ExternalWriter; typedef ObDirectLoadExternalBlockWriter<RowType> ExternalWriter;
static const int64_t MAX_TMP_FILE_SIZE = 1LL * 1024 * 1024 * 1024; // 1GiB
public: public:
ObDirectLoadExternalMultiPartitionTableBuilder(); ObDirectLoadExternalMultiPartitionTableBuilder();
virtual ~ObDirectLoadExternalMultiPartitionTableBuilder(); virtual ~ObDirectLoadExternalMultiPartitionTableBuilder();
@ -42,16 +43,23 @@ public:
int append_row(const common::ObTabletID &tablet_id, int append_row(const common::ObTabletID &tablet_id,
const blocksstable::ObDatumRow &datum_row) override; const blocksstable::ObDatumRow &datum_row) override;
int close() override; int close() override;
int64_t get_row_count() const override { return row_count_; } int64_t get_row_count() const override { return total_row_count_; }
int get_tables(common::ObIArray<ObIDirectLoadPartitionTable *> &table_array, int get_tables(common::ObIArray<ObIDirectLoadPartitionTable *> &table_array,
common::ObIAllocator &allocator) override; common::ObIAllocator &allocator) override;
private:
int alloc_tmp_file();
int generate_fragment();
int switch_fragment();
private: private:
ObDirectLoadExternalMultiPartitionTableBuildParam param_; ObDirectLoadExternalMultiPartitionTableBuildParam param_;
common::ObArenaAllocator allocator_; common::ObArenaAllocator allocator_;
ObDirectLoadTmpFileHandle file_handle_; ObDirectLoadTmpFileHandle file_handle_;
ExternalWriter external_writer_; ExternalWriter external_writer_;
RowType row_; RowType row_;
int64_t row_count_; ObDirectLoadExternalFragmentArray fragment_array_;
int64_t total_row_count_;
int64_t fragment_row_count_;
int64_t max_data_block_size_;
bool is_closed_; bool is_closed_;
bool is_inited_; bool is_inited_;
DISALLOW_COPY_AND_ASSIGN(ObDirectLoadExternalMultiPartitionTableBuilder); DISALLOW_COPY_AND_ASSIGN(ObDirectLoadExternalMultiPartitionTableBuilder);

View File

@ -40,9 +40,10 @@ int ObDirectLoadMemLoader::add_table(ObIDirectLoadPartitionTable *table)
if (OB_ISNULL(external_table = dynamic_cast<ObDirectLoadExternalTable *>(table))) { if (OB_ISNULL(external_table = dynamic_cast<ObDirectLoadExternalTable *>(table))) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected table", KR(ret), KPC(table)); LOG_WARN("unexpected table", KR(ret), KPC(table));
} else if (OB_UNLIKELY(external_table->get_fragments().count() != 1)) { } else if (OB_UNLIKELY(external_table->get_fragments().count() <= 0)) {
ret = OB_INVALID_ARGUMENT; ret = OB_INVALID_ARGUMENT;
LOG_WARN("files handle should only have one handle", KR(ret)); LOG_WARN("files handle should have at least one handle",
KR(ret), K(external_table->get_fragments().count()));
} else if (OB_FAIL(fragments_.push_back(external_table->get_fragments()))) { } else if (OB_FAIL(fragments_.push_back(external_table->get_fragments()))) {
LOG_WARN("fail to push back", KR(ret)); LOG_WARN("fail to push back", KR(ret));
} }
@ -58,7 +59,7 @@ int ObDirectLoadMemLoader::work()
ChunkType *chunk = nullptr; ChunkType *chunk = nullptr;
RowType row; RowType row;
for (int64_t i = 0; OB_SUCC(ret) && i < fragments_.count(); i++) { for (int64_t i = 0; OB_SUCC(ret) && i < fragments_.count(); i++) {
const ObDirectLoadExternalFragment &fragment = fragments_.at(i); ObDirectLoadExternalFragment &fragment = fragments_.at(i);
ExternalReader external_reader; ExternalReader external_reader;
if (OB_FAIL(external_reader.init(mem_ctx_->table_data_desc_.external_data_block_size_, if (OB_FAIL(external_reader.init(mem_ctx_->table_data_desc_.external_data_block_size_,
fragment.max_data_block_size_, fragment.max_data_block_size_,
@ -117,6 +118,9 @@ int ObDirectLoadMemLoader::work()
} }
} }
} }
if (OB_SUCC(ret)) {
fragment.reset();
}
} }
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {