[opt]: vec_sort support truncate tmp_file.

This commit is contained in:
Monk-Liu
2024-02-09 16:10:16 +00:00
committed by ob-robot
parent 97de0ef587
commit ed33b6e951
8 changed files with 69 additions and 52 deletions

View File

@ -54,10 +54,6 @@ void ObCompactStore::rescan()
{
cur_blk_id_ = 0;
start_iter_ = false;
// shouldn't truncate for ObChunkSliceStore.
if (enable_truncate_) {
last_truncate_offset_ = 0;
}
if (OB_NOT_NULL(reader_)) {
reader_->reuse();
}
@ -82,14 +78,6 @@ int ObCompactStore::inner_get_next_row(const ObChunkDatumStore::StoredRow *&sr)
start_iter_ = true;
reader_->reuse();
reader_->set_block(tmp_blk);
int64_t file_offset = block_reader_.get_cur_file_offset();
if (enable_truncate_ && (file_offset > last_truncate_offset_ + TRUNCATE_SIZE)) {
if (OB_FAIL(truncate_file(file_offset))) {
LOG_WARN("fail to truncate file", K(ret));
} else {
last_truncate_offset_ = file_offset;
}
}
}
}
if (OB_FAIL(ret)) {
@ -105,14 +93,6 @@ int ObCompactStore::inner_get_next_row(const ObChunkDatumStore::StoredRow *&sr)
} else {
reader_->reuse();
reader_->set_block(tmp_blk);
int64_t file_offset = block_reader_.get_cur_file_offset();
if (enable_truncate_ && (file_offset > last_truncate_offset_ + TRUNCATE_SIZE)) {
if (OB_FAIL(truncate_file(file_offset))) {
LOG_WARN("fail to truncate file", K(ret));
} else {
last_truncate_offset_ = file_offset;
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(reader_->get_row(sr))) {
if (ret != OB_ITER_END) {
@ -380,13 +360,12 @@ int ObCompactStore::init(const int64_t mem_limit,
{
int ret = OB_SUCCESS;
compact_level_ = compact_level;
enable_truncate_ = enable_trunc;
inited_ = true;
if (OB_ISNULL(exprs) || (compact_level != share::SORT_COMPACT_LEVEL && compact_level != share::SORT_COMPRESSION_COMPACT_LEVEL)) {
} else {
OZ(row_meta_.init(*exprs, row_extra_size));
}
OZ(ObTempBlockStore::init(mem_limit, enable_dump, tenant_id, mem_ctx_id, label, compress_type));
OZ(ObTempBlockStore::init(mem_limit, enable_dump, tenant_id, mem_ctx_id, label, compress_type, enable_trunc));
OZ(block_reader_.init(this));
OZ(init_writer_reader());
LOG_INFO("success to init compact store", K(enable_dump), K(enable_trunc), K(compact_level), K(compress_type),
@ -407,15 +386,16 @@ int ObCompactStore::init(const int64_t mem_limit,
{
int ret = OB_SUCCESS;
compact_level_ = compact_level;
enable_truncate_ = enable_trunc;
inited_ = true;
if (compact_level != share::SORT_COMPACT_LEVEL && compact_level != share::SORT_COMPRESSION_COMPACT_LEVEL) {
} else {
OZ(row_meta_.init(col_array, row_extra_size));
}
OZ(ObTempBlockStore::init(mem_limit, enable_dump, tenant_id, mem_ctx_id, label, compress_type));
OZ(ObTempBlockStore::init(mem_limit, enable_dump, tenant_id, mem_ctx_id, label, compress_type, enable_trunc));
OZ(block_reader_.init(this));
OZ(init_writer_reader());
LOG_INFO("success to init compact store", K(enable_dump), K(enable_trunc), K(compact_level), K(compress_type),
K(col_array), K(ret));
return ret;
}
@ -436,8 +416,6 @@ void ObCompactStore::reset()
row_cnt_ = 0;
start_iter_ = false;
block_reader_.reset();
last_truncate_offset_ = 0;
enable_truncate_ = false;
cur_blk_id_ = 0;
}

View File

@ -31,13 +31,12 @@ namespace sql
class ObCompactStore final : public ObTempBlockStore
{
OB_UNIS_VERSION_V(1);
static const int64_t TRUNCATE_SIZE = 2L * 1024 * 1024;
public:
explicit ObCompactStore(common::ObIAllocator *alloc = NULL) : ObTempBlockStore(alloc),
compact_level_(share::SORT_DEFAULT_LEVEL),
writer_(nullptr), reader_(nullptr), batch_ctx_(nullptr),
row_meta_(*allocator_), row_cnt_(0), block_reader_(), start_iter_(false),
cur_blk_id_(0), last_truncate_offset_(0), enable_truncate_(true)
cur_blk_id_(0)
{
};
virtual ~ObCompactStore() {reset();};
@ -88,8 +87,7 @@ public:
ObTempBlockStore::BlockReader* get_block_reader() { return &block_reader_; }
ObBlockIWriter* get_writer() { return writer_; }
int64_t get_row_cnt() const { return row_cnt_; }
void set_enable_truncate(bool enable_trunc) { enable_truncate_ = enable_trunc; }
bool enable_truncate() { return enable_truncate_; }
int has_next(bool &has_next);
ChunkRowMeta *get_row_meta() { return &row_meta_; }
void set_meta(ChunkRowMeta *row_meta) { writer_->set_meta(row_meta); reader_->set_meta(row_meta); }
@ -124,8 +122,6 @@ private:
ObTempBlockStore::BlockReader block_reader_;
bool start_iter_;
int64_t cur_blk_id_;
int64_t last_truncate_offset_;
bool enable_truncate_;
}
;

View File

@ -49,10 +49,11 @@ int ObTempBlockStore::ShrinkBuffer::init(char *buf, const int64_t buf_size)
ObTempBlockStore::ObTempBlockStore(common::ObIAllocator *alloc /* = NULL */)
: inited_(false), allocator_(NULL == alloc ? &inner_allocator_ : alloc), blk_(NULL),
block_id_cnt_(0), saved_block_id_cnt_(0), dumped_block_id_cnt_(0), enable_dump_(true),
enable_trunc_(false), last_trunc_offset_(0),
tenant_id_(0), label_(), ctx_id_(0), mem_limit_(0), mem_hold_(0), mem_used_(0),
file_size_(0), block_cnt_(0), index_block_cnt_(0), block_cnt_on_disk_(0),
alloced_mem_size_(0), max_block_size_(0), max_hold_mem_(0), idx_blk_(NULL), mem_stat_(NULL),
io_observer_(NULL), last_block_on_disk_(false)
io_observer_(NULL), last_block_on_disk_(false), cur_file_offset_(0)
{
label_[0] = '\0';
io_.dir_id_ = -1;
@ -64,7 +65,8 @@ int ObTempBlockStore::init(int64_t mem_limit,
uint64_t tenant_id,
int64_t mem_ctx_id,
const char *label,
common::ObCompressorType compress_type)
common::ObCompressorType compress_type,
const bool enable_trunc)
{
int ret = OB_SUCCESS;
mem_limit_ = mem_limit;
@ -78,6 +80,7 @@ int ObTempBlockStore::init(int64_t mem_limit,
inner_reader_.init(this);
inited_ = true;
compressor_.init(compress_type);
enable_trunc_ = enable_trunc;
return ret;
}
@ -372,6 +375,18 @@ int ObTempBlockStore::get_block(BlockReader &reader, const int64_t block_id, con
LOG_WARN("Null block returned", K(ret));
}
}
if (OB_SUCC(ret)) {
// truncate file, if enable_trunc_
if (enable_trunc_ && (last_trunc_offset_ + TRUNCATE_THRESHOLD < cur_file_offset_)) {
if (OB_FAIL(truncate_file(cur_file_offset_))) {
LOG_WARN("fail to truncate file", K(ret));
} else {
last_trunc_offset_ = cur_file_offset_;
}
}
}
return ret;
}
@ -826,10 +841,10 @@ int ObTempBlockStore::load_block(BlockReader &reader, const int64_t block_id,
}
if (OB_SUCC(ret) && bi->on_disk_) {
if (reader.is_async()) {
reader.set_cur_file_offset(bi->offset_ > 0 ? bi->offset_ - 1 : 0);
cur_file_offset_ = bi->offset_ > 0 ? bi->offset_ - 1 : 0;
} else {
blk = reinterpret_cast<const Block *>(reader.buf_.data());
reader.set_cur_file_offset(bi->offset_ + bi->length_);
cur_file_offset_ = bi->offset_ + bi->length_;
}
}
}
@ -1026,6 +1041,11 @@ int ObTempBlockStore::BlockReader::aio_wait()
return ret;
}
int ObTempBlockStore::BlockReader::get_block(const int64_t block_id, const Block *&blk)
{
return store_->get_block(*this, block_id, blk);
}
int ObTempBlockStore::write_file(BlockIndex &bi, void *buf, int64_t size)
{
int ret = OB_SUCCESS;
@ -1397,7 +1417,6 @@ void ObTempBlockStore::BlockReader::reset_cursor(const int64_t file_size, const
idx_blk_ = NULL;
aio_blk_ = NULL;
ib_pos_ = 0;
cur_file_offset_ = 0;
if (need_release && nullptr != blk_holder_ptr_) {
blk_holder_ptr_->release();
blk_holder_ptr_ = nullptr;

View File

@ -57,6 +57,7 @@ class ObIOEventObserver;
class ObTempBlockStore
{
OB_UNIS_VERSION_V(1);
static const int64_t TRUNCATE_THRESHOLD = 2L << 20;
public:
/*
* ShrinkBuffer, a buffer wrapper class supporting bidirectional writing,
@ -281,15 +282,12 @@ public:
static const int AIO_BUF_CNT = 2;
public:
BlockReader() : store_(NULL), idx_blk_(NULL), ib_pos_(0), file_size_(0), age_(NULL),
try_free_list_(NULL), blk_holder_ptr_(NULL), read_io_handle_(), cur_file_offset_(0),
try_free_list_(NULL), blk_holder_ptr_(NULL), read_io_handle_(),
is_async_(true), aio_buf_idx_(0), aio_blk_(nullptr) {}
virtual ~BlockReader() { reset(); }
int init(ObTempBlockStore *store, const bool async = true);
inline int get_block(const int64_t block_id, const Block *&blk)
{ return store_->get_block(*this, block_id, blk); }
inline int64_t get_cur_file_offset() const { return cur_file_offset_; }
inline void set_cur_file_offset(int64_t file_offset) { cur_file_offset_ = file_offset; }
int get_block(const int64_t block_id, const Block *&blk);
inline int64_t get_block_cnt() const { return store_->get_block_cnt(); }
void set_iteration_age(IterationAge *age) { age_ = age; }
void set_blk_holder(BlockHolder *holder) { blk_holder_ptr_ = holder; }
@ -353,7 +351,8 @@ public:
uint64_t tenant_id,
int64_t mem_ctx_id,
const char *label,
common::ObCompressorType compressor_type = NONE_COMPRESSOR);
common::ObCompressorType compressor_type = NONE_COMPRESSOR,
const bool enable_trunc = false);
void reset();
void reuse();
void reset_block_cnt();
@ -408,12 +407,19 @@ public:
int alloc_dir_id();
int dump(const bool all_dump, const int64_t target_dump_size=INT64_MAX);
int finish_add_row(bool need_dump = true);
void set_enable_truncate(bool enable_trunc)
{
enable_trunc_ = enable_trunc;
}
bool is_truncate() { return enable_trunc_; }
inline int64_t get_cur_file_offset() const { return cur_file_offset_; }
inline void set_cur_file_offset(int64_t file_offset) { cur_file_offset_ = file_offset; }
// include index blocks and data blocks
TO_STRING_KV(K_(inited), K_(enable_dump), K_(tenant_id), K_(label), K_(ctx_id), K_(mem_limit),
K_(mem_hold), K_(mem_used), K_(io_.fd), K_(io_.dir_id), K_(file_size), K_(block_cnt),
K_(index_block_cnt), K_(block_cnt_on_disk), K_(block_id_cnt), K_(dumped_block_id_cnt),
K_(alloced_mem_size));
K_(alloced_mem_size), K_(enable_trunc), K_(last_trunc_offset), K_(cur_file_offset));
void *alloc(const int64_t size)
{
@ -526,6 +532,8 @@ protected:
int64_t saved_block_id_cnt_;
int64_t dumped_block_id_cnt_;
bool enable_dump_;
bool enable_trunc_; // if true, the read contents of tmp file we be removed from disk.
int64_t last_trunc_offset_;
private:
uint64_t tenant_id_;
@ -559,6 +567,7 @@ private:
blocksstable::ObTmpFileIOHandle write_io_handle_;
blocksstable::ObTmpFileIOInfo io_;
bool last_block_on_disk_;
int64_t cur_file_offset_;
DISALLOW_COPY_AND_ASSIGN(ObTempBlockStore);
};

View File

@ -453,14 +453,17 @@ int ObTempRowStore::init(const ObExprPtrIArray &exprs,
const int64_t mem_limit,
bool enable_dump,
uint32_t row_extra_size,
const bool reorder_fixed_expr /*true*/)
const bool reorder_fixed_expr /*true*/,
const bool enable_trunc /*false*/,
const common::ObCompressorType compressor_type /*NONE_COMPRESSOR*/)
{
int ret = OB_SUCCESS;
mem_attr_ = mem_attr;
col_cnt_ = exprs.count();
max_batch_size_ = max_batch_size;
ObTempBlockStore::set_inner_allocator_attr(mem_attr);
OZ(ObTempBlockStore::init(mem_limit, enable_dump, mem_attr.tenant_id_, mem_attr.ctx_id_, mem_attr_.label_));
OZ(ObTempBlockStore::init(mem_limit, enable_dump, mem_attr.tenant_id_, mem_attr.ctx_id_, mem_attr_.label_,
compressor_type, enable_trunc));
OZ(row_meta_.init(exprs, row_extra_size, reorder_fixed_expr));
inited_ = true;
return ret;
@ -470,7 +473,9 @@ int ObTempRowStore::init(const RowMeta &row_meta,
const int64_t max_batch_size,
const lib::ObMemAttr &mem_attr,
const int64_t mem_limit,
bool enable_dump)
bool enable_dump,
const bool enable_trunc /*false*/,
const common::ObCompressorType compressor_type /*NONE_COMPRESSOR*/)
{
int ret = OB_SUCCESS;
mem_attr_ = mem_attr;
@ -478,7 +483,8 @@ int ObTempRowStore::init(const RowMeta &row_meta,
max_batch_size_ = max_batch_size;
CK (!row_meta.fixed_expr_reordered())
row_meta_ = row_meta;
OZ(ObTempBlockStore::init(mem_limit, enable_dump, mem_attr.tenant_id_, mem_attr.ctx_id_, mem_attr_.label_));
OZ(ObTempBlockStore::init(mem_limit, enable_dump, mem_attr.tenant_id_, mem_attr.ctx_id_, mem_attr_.label_,
compressor_type, enable_trunc));
inited_ = true;
return ret;
}

View File

@ -149,13 +149,17 @@ public:
const int64_t mem_limit,
bool enable_dump,
uint32_t row_extra_size,
const bool reorder_fixed_expr = true);
const bool reorder_fixed_expr = true,
const bool enable_trunc = false,
const common::ObCompressorType compressor_type = NONE_COMPRESSOR);
int init(const RowMeta &row_meta,
const int64_t max_batch_size,
const lib::ObMemAttr &mem_attr,
const int64_t mem_limit,
bool enable_dump);
bool enable_dump,
const bool enable_trunc = false,
const common::ObCompressorType compressor_type = NONE_COMPRESSOR);
int init_batch_ctx();

View File

@ -193,10 +193,13 @@ int ObSortVecOp::init_temp_row_store(const common::ObIArray<ObExpr *> &exprs,
const bool is_sort_key, ObTempRowStore &row_store)
{
int ret = OB_SUCCESS;
const bool enable_trunc = true;
const bool reorder_fixed_expr = true;
if (row_store.is_inited()) {
// do nothing
} else if (OB_FAIL(row_store.init(exprs, batch_size, mem_attr, 2 * 1024 * 1024, true,
sort_op_provider_.get_extra_size(is_sort_key) /* row_extra_size */))) {
sort_op_provider_.get_extra_size(is_sort_key) /* row_extra_size */,
reorder_fixed_expr, enable_trunc))) {
LOG_WARN("init row store failed", K(ret));
} else if (OB_FAIL(row_store.alloc_dir_id())) {
LOG_WARN("failed to alloc dir id", K(ret));

View File

@ -166,9 +166,11 @@ int ObSortVecOpImpl<Compare, Store_Row, has_addon>::init_temp_row_store(
ObTempRowStore &row_store)
{
int ret = OB_SUCCESS;
const bool enable_trunc = true;
const bool reorder_fixed_expr = true;
ObMemAttr mem_attr(tenant_id_, ObModIds::OB_SQL_SORT_ROW, ObCtxIds::WORK_AREA);
if (OB_FAIL(row_store.init(exprs, batch_size, mem_attr, mem_limit, enable_dump,
extra_size /* row_extra_size */))) {
extra_size /* row_extra_size */, reorder_fixed_expr, enable_trunc))) {
SQL_ENG_LOG(WARN, "init row store failed", K(ret));
} else {
row_store.set_dir_id(sql_mem_processor_.get_dir_id());