From 9a25831540072ea9d2d94f9fb49310cbc1ff37c0 Mon Sep 17 00:00:00 2001 From: Monk-Liu <1152761042@qq.com> Date: Thu, 18 Jan 2024 06:50:09 +0000 Subject: [PATCH] [opt]: vec_sort support truncate tmp_file. --- .../basic/chunk_store/ob_compact_store.cpp | 30 +++---------------- .../basic/chunk_store/ob_compact_store.h | 8 ++--- src/sql/engine/basic/ob_temp_block_store.cpp | 29 ++++++++++++++---- src/sql/engine/basic/ob_temp_block_store.h | 23 +++++++++----- src/sql/engine/basic/ob_temp_row_store.cpp | 14 ++++++--- src/sql/engine/basic/ob_temp_row_store.h | 8 +++-- src/sql/engine/sort/ob_sort_vec_op.cpp | 5 +++- src/sql/engine/sort/ob_sort_vec_op_impl.ipp | 4 ++- 8 files changed, 69 insertions(+), 52 deletions(-) diff --git a/src/sql/engine/basic/chunk_store/ob_compact_store.cpp b/src/sql/engine/basic/chunk_store/ob_compact_store.cpp index 10be339d33..22a2c9d69c 100644 --- a/src/sql/engine/basic/chunk_store/ob_compact_store.cpp +++ b/src/sql/engine/basic/chunk_store/ob_compact_store.cpp @@ -54,10 +54,6 @@ void ObCompactStore::rescan() { cur_blk_id_ = 0; start_iter_ = false; - // shouldn't truncate for ObChunkSliceStore. - if (enable_truncate_) { - last_truncate_offset_ = 0; - } if (OB_NOT_NULL(reader_)) { reader_->reuse(); } @@ -82,14 +78,6 @@ int ObCompactStore::inner_get_next_row(const ObChunkDatumStore::StoredRow *&sr) start_iter_ = true; reader_->reuse(); reader_->set_block(tmp_blk); - int64_t file_offset = block_reader_.get_cur_file_offset(); - if (enable_truncate_ && (file_offset > last_truncate_offset_ + TRUNCATE_SIZE)) { - if (OB_FAIL(truncate_file(file_offset))) { - LOG_WARN("fail to truncate file", K(ret)); - } else { - last_truncate_offset_ = file_offset; - } - } } } if (OB_FAIL(ret)) { @@ -105,14 +93,6 @@ int ObCompactStore::inner_get_next_row(const ObChunkDatumStore::StoredRow *&sr) } else { reader_->reuse(); reader_->set_block(tmp_blk); - int64_t file_offset = block_reader_.get_cur_file_offset(); - if (enable_truncate_ && (file_offset > last_truncate_offset_ + TRUNCATE_SIZE)) { - if (OB_FAIL(truncate_file(file_offset))) { - LOG_WARN("fail to truncate file", K(ret)); - } else { - last_truncate_offset_ = file_offset; - } - } if (OB_SUCC(ret)) { if (OB_FAIL(reader_->get_row(sr))) { if (ret != OB_ITER_END) { @@ -380,13 +360,12 @@ int ObCompactStore::init(const int64_t mem_limit, { int ret = OB_SUCCESS; compact_level_ = compact_level; - enable_truncate_ = enable_trunc; inited_ = true; if (OB_ISNULL(exprs) || (compact_level != share::SORT_COMPACT_LEVEL && compact_level != share::SORT_COMPRESSION_COMPACT_LEVEL)) { } else { OZ(row_meta_.init(*exprs, row_extra_size)); } - OZ(ObTempBlockStore::init(mem_limit, enable_dump, tenant_id, mem_ctx_id, label, compress_type)); + OZ(ObTempBlockStore::init(mem_limit, enable_dump, tenant_id, mem_ctx_id, label, compress_type, enable_trunc)); OZ(block_reader_.init(this)); OZ(init_writer_reader()); LOG_INFO("success to init compact store", K(enable_dump), K(enable_trunc), K(compact_level), K(compress_type), @@ -407,15 +386,16 @@ int ObCompactStore::init(const int64_t mem_limit, { int ret = OB_SUCCESS; compact_level_ = compact_level; - enable_truncate_ = enable_trunc; inited_ = true; if (compact_level != share::SORT_COMPACT_LEVEL && compact_level != share::SORT_COMPRESSION_COMPACT_LEVEL) { } else { OZ(row_meta_.init(col_array, row_extra_size)); } - OZ(ObTempBlockStore::init(mem_limit, enable_dump, tenant_id, mem_ctx_id, label, compress_type)); + OZ(ObTempBlockStore::init(mem_limit, enable_dump, tenant_id, mem_ctx_id, label, compress_type, enable_trunc)); OZ(block_reader_.init(this)); OZ(init_writer_reader()); + LOG_INFO("success to init compact store", K(enable_dump), K(enable_trunc), K(compact_level), K(compress_type), + K(col_array), K(ret)); return ret; } @@ -436,8 +416,6 @@ void ObCompactStore::reset() row_cnt_ = 0; start_iter_ = false; block_reader_.reset(); - last_truncate_offset_ = 0; - enable_truncate_ = false; cur_blk_id_ = 0; } diff --git a/src/sql/engine/basic/chunk_store/ob_compact_store.h b/src/sql/engine/basic/chunk_store/ob_compact_store.h index e2a869cb1d..53cc8b91f5 100644 --- a/src/sql/engine/basic/chunk_store/ob_compact_store.h +++ b/src/sql/engine/basic/chunk_store/ob_compact_store.h @@ -31,13 +31,12 @@ namespace sql class ObCompactStore final : public ObTempBlockStore { OB_UNIS_VERSION_V(1); - static const int64_t TRUNCATE_SIZE = 2L * 1024 * 1024; public: explicit ObCompactStore(common::ObIAllocator *alloc = NULL) : ObTempBlockStore(alloc), compact_level_(share::SORT_DEFAULT_LEVEL), writer_(nullptr), reader_(nullptr), batch_ctx_(nullptr), row_meta_(*allocator_), row_cnt_(0), block_reader_(), start_iter_(false), - cur_blk_id_(0), last_truncate_offset_(0), enable_truncate_(true) + cur_blk_id_(0) { }; virtual ~ObCompactStore() {reset();}; @@ -88,8 +87,7 @@ public: ObTempBlockStore::BlockReader* get_block_reader() { return &block_reader_; } ObBlockIWriter* get_writer() { return writer_; } int64_t get_row_cnt() const { return row_cnt_; } - void set_enable_truncate(bool enable_trunc) { enable_truncate_ = enable_trunc; } - bool enable_truncate() { return enable_truncate_; } + int has_next(bool &has_next); ChunkRowMeta *get_row_meta() { return &row_meta_; } void set_meta(ChunkRowMeta *row_meta) { writer_->set_meta(row_meta); reader_->set_meta(row_meta); } @@ -124,8 +122,6 @@ private: ObTempBlockStore::BlockReader block_reader_; bool start_iter_; int64_t cur_blk_id_; - int64_t last_truncate_offset_; - bool enable_truncate_; } ; diff --git a/src/sql/engine/basic/ob_temp_block_store.cpp b/src/sql/engine/basic/ob_temp_block_store.cpp index 0ad2408380..d21a55977e 100644 --- a/src/sql/engine/basic/ob_temp_block_store.cpp +++ b/src/sql/engine/basic/ob_temp_block_store.cpp @@ -49,10 +49,11 @@ int ObTempBlockStore::ShrinkBuffer::init(char *buf, const int64_t buf_size) ObTempBlockStore::ObTempBlockStore(common::ObIAllocator *alloc /* = NULL */) : inited_(false), allocator_(NULL == alloc ? &inner_allocator_ : alloc), blk_(NULL), block_id_cnt_(0), saved_block_id_cnt_(0), dumped_block_id_cnt_(0), enable_dump_(true), + enable_trunc_(false), last_trunc_offset_(0), tenant_id_(0), label_(), ctx_id_(0), mem_limit_(0), mem_hold_(0), mem_used_(0), file_size_(0), block_cnt_(0), index_block_cnt_(0), block_cnt_on_disk_(0), alloced_mem_size_(0), max_block_size_(0), max_hold_mem_(0), idx_blk_(NULL), mem_stat_(NULL), - io_observer_(NULL), last_block_on_disk_(false) + io_observer_(NULL), last_block_on_disk_(false), cur_file_offset_(0) { label_[0] = '\0'; io_.dir_id_ = -1; @@ -64,7 +65,8 @@ int ObTempBlockStore::init(int64_t mem_limit, uint64_t tenant_id, int64_t mem_ctx_id, const char *label, - common::ObCompressorType compress_type) + common::ObCompressorType compress_type, + const bool enable_trunc) { int ret = OB_SUCCESS; mem_limit_ = mem_limit; @@ -78,6 +80,7 @@ int ObTempBlockStore::init(int64_t mem_limit, inner_reader_.init(this); inited_ = true; compressor_.init(compress_type); + enable_trunc_ = enable_trunc; return ret; } @@ -372,6 +375,18 @@ int ObTempBlockStore::get_block(BlockReader &reader, const int64_t block_id, con LOG_WARN("Null block returned", K(ret)); } } + + if (OB_SUCC(ret)) { + // truncate file, if enable_trunc_ + if (enable_trunc_ && (last_trunc_offset_ + TRUNCATE_THRESHOLD < cur_file_offset_)) { + if (OB_FAIL(truncate_file(cur_file_offset_))) { + LOG_WARN("fail to truncate file", K(ret)); + } else { + last_trunc_offset_ = cur_file_offset_; + } + } + } + return ret; } @@ -826,10 +841,10 @@ int ObTempBlockStore::load_block(BlockReader &reader, const int64_t block_id, } if (OB_SUCC(ret) && bi->on_disk_) { if (reader.is_async()) { - reader.set_cur_file_offset(bi->offset_ > 0 ? bi->offset_ - 1 : 0); + cur_file_offset_ = bi->offset_ > 0 ? bi->offset_ - 1 : 0; } else { blk = reinterpret_cast(reader.buf_.data()); - reader.set_cur_file_offset(bi->offset_ + bi->length_); + cur_file_offset_ = bi->offset_ + bi->length_; } } } @@ -1026,6 +1041,11 @@ int ObTempBlockStore::BlockReader::aio_wait() return ret; } +int ObTempBlockStore::BlockReader::get_block(const int64_t block_id, const Block *&blk) +{ + return store_->get_block(*this, block_id, blk); +} + int ObTempBlockStore::write_file(BlockIndex &bi, void *buf, int64_t size) { int ret = OB_SUCCESS; @@ -1397,7 +1417,6 @@ void ObTempBlockStore::BlockReader::reset_cursor(const int64_t file_size, const idx_blk_ = NULL; aio_blk_ = NULL; ib_pos_ = 0; - cur_file_offset_ = 0; if (need_release && nullptr != blk_holder_ptr_) { blk_holder_ptr_->release(); blk_holder_ptr_ = nullptr; diff --git a/src/sql/engine/basic/ob_temp_block_store.h b/src/sql/engine/basic/ob_temp_block_store.h index 919f693714..114ad2a64d 100644 --- a/src/sql/engine/basic/ob_temp_block_store.h +++ b/src/sql/engine/basic/ob_temp_block_store.h @@ -57,6 +57,7 @@ class ObIOEventObserver; class ObTempBlockStore { OB_UNIS_VERSION_V(1); + static const int64_t TRUNCATE_THRESHOLD = 2L << 20; public: /* * ShrinkBuffer, a buffer wrapper class supporting bidirectional writing, @@ -281,15 +282,12 @@ public: static const int AIO_BUF_CNT = 2; public: BlockReader() : store_(NULL), idx_blk_(NULL), ib_pos_(0), file_size_(0), age_(NULL), - try_free_list_(NULL), blk_holder_ptr_(NULL), read_io_handle_(), cur_file_offset_(0), + try_free_list_(NULL), blk_holder_ptr_(NULL), read_io_handle_(), is_async_(true), aio_buf_idx_(0), aio_blk_(nullptr) {} virtual ~BlockReader() { reset(); } int init(ObTempBlockStore *store, const bool async = true); - inline int get_block(const int64_t block_id, const Block *&blk) - { return store_->get_block(*this, block_id, blk); } - inline int64_t get_cur_file_offset() const { return cur_file_offset_; } - inline void set_cur_file_offset(int64_t file_offset) { cur_file_offset_ = file_offset; } + int get_block(const int64_t block_id, const Block *&blk); inline int64_t get_block_cnt() const { return store_->get_block_cnt(); } void set_iteration_age(IterationAge *age) { age_ = age; } void set_blk_holder(BlockHolder *holder) { blk_holder_ptr_ = holder; } @@ -353,7 +351,8 @@ public: uint64_t tenant_id, int64_t mem_ctx_id, const char *label, - common::ObCompressorType compressor_type = NONE_COMPRESSOR); + common::ObCompressorType compressor_type = NONE_COMPRESSOR, + const bool enable_trunc = false); void reset(); void reuse(); void reset_block_cnt(); @@ -408,12 +407,19 @@ public: int alloc_dir_id(); int dump(const bool all_dump, const int64_t target_dump_size=INT64_MAX); int finish_add_row(bool need_dump = true); + void set_enable_truncate(bool enable_trunc) + { + enable_trunc_ = enable_trunc; + } + bool is_truncate() { return enable_trunc_; } + inline int64_t get_cur_file_offset() const { return cur_file_offset_; } + inline void set_cur_file_offset(int64_t file_offset) { cur_file_offset_ = file_offset; } // include index blocks and data blocks TO_STRING_KV(K_(inited), K_(enable_dump), K_(tenant_id), K_(label), K_(ctx_id), K_(mem_limit), K_(mem_hold), K_(mem_used), K_(io_.fd), K_(io_.dir_id), K_(file_size), K_(block_cnt), K_(index_block_cnt), K_(block_cnt_on_disk), K_(block_id_cnt), K_(dumped_block_id_cnt), - K_(alloced_mem_size)); + K_(alloced_mem_size), K_(enable_trunc), K_(last_trunc_offset), K_(cur_file_offset)); void *alloc(const int64_t size) { @@ -526,6 +532,8 @@ protected: int64_t saved_block_id_cnt_; int64_t dumped_block_id_cnt_; bool enable_dump_; + bool enable_trunc_; // if true, the read contents of tmp file we be removed from disk. + int64_t last_trunc_offset_; private: uint64_t tenant_id_; @@ -559,6 +567,7 @@ private: blocksstable::ObTmpFileIOHandle write_io_handle_; blocksstable::ObTmpFileIOInfo io_; bool last_block_on_disk_; + int64_t cur_file_offset_; DISALLOW_COPY_AND_ASSIGN(ObTempBlockStore); }; diff --git a/src/sql/engine/basic/ob_temp_row_store.cpp b/src/sql/engine/basic/ob_temp_row_store.cpp index 3434106d52..918055a18f 100644 --- a/src/sql/engine/basic/ob_temp_row_store.cpp +++ b/src/sql/engine/basic/ob_temp_row_store.cpp @@ -453,14 +453,17 @@ int ObTempRowStore::init(const ObExprPtrIArray &exprs, const int64_t mem_limit, bool enable_dump, uint32_t row_extra_size, - const bool reorder_fixed_expr /*true*/) + const bool reorder_fixed_expr /*true*/, + const bool enable_trunc /*false*/, + const common::ObCompressorType compressor_type /*NONE_COMPRESSOR*/) { int ret = OB_SUCCESS; mem_attr_ = mem_attr; col_cnt_ = exprs.count(); max_batch_size_ = max_batch_size; ObTempBlockStore::set_inner_allocator_attr(mem_attr); - OZ(ObTempBlockStore::init(mem_limit, enable_dump, mem_attr.tenant_id_, mem_attr.ctx_id_, mem_attr_.label_)); + OZ(ObTempBlockStore::init(mem_limit, enable_dump, mem_attr.tenant_id_, mem_attr.ctx_id_, mem_attr_.label_, + compressor_type, enable_trunc)); OZ(row_meta_.init(exprs, row_extra_size, reorder_fixed_expr)); inited_ = true; return ret; @@ -470,7 +473,9 @@ int ObTempRowStore::init(const RowMeta &row_meta, const int64_t max_batch_size, const lib::ObMemAttr &mem_attr, const int64_t mem_limit, - bool enable_dump) + bool enable_dump, + const bool enable_trunc /*false*/, + const common::ObCompressorType compressor_type /*NONE_COMPRESSOR*/) { int ret = OB_SUCCESS; mem_attr_ = mem_attr; @@ -478,7 +483,8 @@ int ObTempRowStore::init(const RowMeta &row_meta, max_batch_size_ = max_batch_size; CK (!row_meta.fixed_expr_reordered()) row_meta_ = row_meta; - OZ(ObTempBlockStore::init(mem_limit, enable_dump, mem_attr.tenant_id_, mem_attr.ctx_id_, mem_attr_.label_)); + OZ(ObTempBlockStore::init(mem_limit, enable_dump, mem_attr.tenant_id_, mem_attr.ctx_id_, mem_attr_.label_, + compressor_type, enable_trunc)); inited_ = true; return ret; } diff --git a/src/sql/engine/basic/ob_temp_row_store.h b/src/sql/engine/basic/ob_temp_row_store.h index 93f816dbd4..0634a2f1c6 100644 --- a/src/sql/engine/basic/ob_temp_row_store.h +++ b/src/sql/engine/basic/ob_temp_row_store.h @@ -149,13 +149,17 @@ public: const int64_t mem_limit, bool enable_dump, uint32_t row_extra_size, - const bool reorder_fixed_expr = true); + const bool reorder_fixed_expr = true, + const bool enable_trunc = false, + const common::ObCompressorType compressor_type = NONE_COMPRESSOR); int init(const RowMeta &row_meta, const int64_t max_batch_size, const lib::ObMemAttr &mem_attr, const int64_t mem_limit, - bool enable_dump); + bool enable_dump, + const bool enable_trunc = false, + const common::ObCompressorType compressor_type = NONE_COMPRESSOR); int init_batch_ctx(); diff --git a/src/sql/engine/sort/ob_sort_vec_op.cpp b/src/sql/engine/sort/ob_sort_vec_op.cpp index c779b890d1..408d1bd8f4 100644 --- a/src/sql/engine/sort/ob_sort_vec_op.cpp +++ b/src/sql/engine/sort/ob_sort_vec_op.cpp @@ -193,10 +193,13 @@ int ObSortVecOp::init_temp_row_store(const common::ObIArray &exprs, const bool is_sort_key, ObTempRowStore &row_store) { int ret = OB_SUCCESS; + const bool enable_trunc = true; + const bool reorder_fixed_expr = true; if (row_store.is_inited()) { // do nothing } else if (OB_FAIL(row_store.init(exprs, batch_size, mem_attr, 2 * 1024 * 1024, true, - sort_op_provider_.get_extra_size(is_sort_key) /* row_extra_size */))) { + sort_op_provider_.get_extra_size(is_sort_key) /* row_extra_size */, + reorder_fixed_expr, enable_trunc))) { LOG_WARN("init row store failed", K(ret)); } else if (OB_FAIL(row_store.alloc_dir_id())) { LOG_WARN("failed to alloc dir id", K(ret)); diff --git a/src/sql/engine/sort/ob_sort_vec_op_impl.ipp b/src/sql/engine/sort/ob_sort_vec_op_impl.ipp index 6ce46652ed..c05b659e94 100644 --- a/src/sql/engine/sort/ob_sort_vec_op_impl.ipp +++ b/src/sql/engine/sort/ob_sort_vec_op_impl.ipp @@ -166,9 +166,11 @@ int ObSortVecOpImpl::init_temp_row_store( ObTempRowStore &row_store) { int ret = OB_SUCCESS; + const bool enable_trunc = true; + const bool reorder_fixed_expr = true; ObMemAttr mem_attr(tenant_id_, ObModIds::OB_SQL_SORT_ROW, ObCtxIds::WORK_AREA); if (OB_FAIL(row_store.init(exprs, batch_size, mem_attr, mem_limit, enable_dump, - extra_size /* row_extra_size */))) { + extra_size /* row_extra_size */, reorder_fixed_expr, enable_trunc))) { SQL_ENG_LOG(WARN, "init row store failed", K(ret)); } else { row_store.set_dir_id(sql_mem_processor_.get_dir_id());