[opt]: vec_sort support truncate tmp_file.
This commit is contained in:
@ -54,10 +54,6 @@ void ObCompactStore::rescan()
|
|||||||
{
|
{
|
||||||
cur_blk_id_ = 0;
|
cur_blk_id_ = 0;
|
||||||
start_iter_ = false;
|
start_iter_ = false;
|
||||||
// shouldn't truncate for ObChunkSliceStore.
|
|
||||||
if (enable_truncate_) {
|
|
||||||
last_truncate_offset_ = 0;
|
|
||||||
}
|
|
||||||
if (OB_NOT_NULL(reader_)) {
|
if (OB_NOT_NULL(reader_)) {
|
||||||
reader_->reuse();
|
reader_->reuse();
|
||||||
}
|
}
|
||||||
@ -82,14 +78,6 @@ int ObCompactStore::inner_get_next_row(const ObChunkDatumStore::StoredRow *&sr)
|
|||||||
start_iter_ = true;
|
start_iter_ = true;
|
||||||
reader_->reuse();
|
reader_->reuse();
|
||||||
reader_->set_block(tmp_blk);
|
reader_->set_block(tmp_blk);
|
||||||
int64_t file_offset = block_reader_.get_cur_file_offset();
|
|
||||||
if (enable_truncate_ && (file_offset > last_truncate_offset_ + TRUNCATE_SIZE)) {
|
|
||||||
if (OB_FAIL(truncate_file(file_offset))) {
|
|
||||||
LOG_WARN("fail to truncate file", K(ret));
|
|
||||||
} else {
|
|
||||||
last_truncate_offset_ = file_offset;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (OB_FAIL(ret)) {
|
if (OB_FAIL(ret)) {
|
||||||
@ -105,14 +93,6 @@ int ObCompactStore::inner_get_next_row(const ObChunkDatumStore::StoredRow *&sr)
|
|||||||
} else {
|
} else {
|
||||||
reader_->reuse();
|
reader_->reuse();
|
||||||
reader_->set_block(tmp_blk);
|
reader_->set_block(tmp_blk);
|
||||||
int64_t file_offset = block_reader_.get_cur_file_offset();
|
|
||||||
if (enable_truncate_ && (file_offset > last_truncate_offset_ + TRUNCATE_SIZE)) {
|
|
||||||
if (OB_FAIL(truncate_file(file_offset))) {
|
|
||||||
LOG_WARN("fail to truncate file", K(ret));
|
|
||||||
} else {
|
|
||||||
last_truncate_offset_ = file_offset;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
if (OB_FAIL(reader_->get_row(sr))) {
|
if (OB_FAIL(reader_->get_row(sr))) {
|
||||||
if (ret != OB_ITER_END) {
|
if (ret != OB_ITER_END) {
|
||||||
@ -380,13 +360,12 @@ int ObCompactStore::init(const int64_t mem_limit,
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
compact_level_ = compact_level;
|
compact_level_ = compact_level;
|
||||||
enable_truncate_ = enable_trunc;
|
|
||||||
inited_ = true;
|
inited_ = true;
|
||||||
if (OB_ISNULL(exprs) || (compact_level != share::SORT_COMPACT_LEVEL && compact_level != share::SORT_COMPRESSION_COMPACT_LEVEL)) {
|
if (OB_ISNULL(exprs) || (compact_level != share::SORT_COMPACT_LEVEL && compact_level != share::SORT_COMPRESSION_COMPACT_LEVEL)) {
|
||||||
} else {
|
} else {
|
||||||
OZ(row_meta_.init(*exprs, row_extra_size));
|
OZ(row_meta_.init(*exprs, row_extra_size));
|
||||||
}
|
}
|
||||||
OZ(ObTempBlockStore::init(mem_limit, enable_dump, tenant_id, mem_ctx_id, label, compress_type));
|
OZ(ObTempBlockStore::init(mem_limit, enable_dump, tenant_id, mem_ctx_id, label, compress_type, enable_trunc));
|
||||||
OZ(block_reader_.init(this));
|
OZ(block_reader_.init(this));
|
||||||
OZ(init_writer_reader());
|
OZ(init_writer_reader());
|
||||||
LOG_INFO("success to init compact store", K(enable_dump), K(enable_trunc), K(compact_level), K(compress_type),
|
LOG_INFO("success to init compact store", K(enable_dump), K(enable_trunc), K(compact_level), K(compress_type),
|
||||||
@ -407,15 +386,16 @@ int ObCompactStore::init(const int64_t mem_limit,
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
compact_level_ = compact_level;
|
compact_level_ = compact_level;
|
||||||
enable_truncate_ = enable_trunc;
|
|
||||||
inited_ = true;
|
inited_ = true;
|
||||||
if (compact_level != share::SORT_COMPACT_LEVEL && compact_level != share::SORT_COMPRESSION_COMPACT_LEVEL) {
|
if (compact_level != share::SORT_COMPACT_LEVEL && compact_level != share::SORT_COMPRESSION_COMPACT_LEVEL) {
|
||||||
} else {
|
} else {
|
||||||
OZ(row_meta_.init(col_array, row_extra_size));
|
OZ(row_meta_.init(col_array, row_extra_size));
|
||||||
}
|
}
|
||||||
OZ(ObTempBlockStore::init(mem_limit, enable_dump, tenant_id, mem_ctx_id, label, compress_type));
|
OZ(ObTempBlockStore::init(mem_limit, enable_dump, tenant_id, mem_ctx_id, label, compress_type, enable_trunc));
|
||||||
OZ(block_reader_.init(this));
|
OZ(block_reader_.init(this));
|
||||||
OZ(init_writer_reader());
|
OZ(init_writer_reader());
|
||||||
|
LOG_INFO("success to init compact store", K(enable_dump), K(enable_trunc), K(compact_level), K(compress_type),
|
||||||
|
K(col_array), K(ret));
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -436,8 +416,6 @@ void ObCompactStore::reset()
|
|||||||
row_cnt_ = 0;
|
row_cnt_ = 0;
|
||||||
start_iter_ = false;
|
start_iter_ = false;
|
||||||
block_reader_.reset();
|
block_reader_.reset();
|
||||||
last_truncate_offset_ = 0;
|
|
||||||
enable_truncate_ = false;
|
|
||||||
cur_blk_id_ = 0;
|
cur_blk_id_ = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -31,13 +31,12 @@ namespace sql
|
|||||||
class ObCompactStore final : public ObTempBlockStore
|
class ObCompactStore final : public ObTempBlockStore
|
||||||
{
|
{
|
||||||
OB_UNIS_VERSION_V(1);
|
OB_UNIS_VERSION_V(1);
|
||||||
static const int64_t TRUNCATE_SIZE = 2L * 1024 * 1024;
|
|
||||||
public:
|
public:
|
||||||
explicit ObCompactStore(common::ObIAllocator *alloc = NULL) : ObTempBlockStore(alloc),
|
explicit ObCompactStore(common::ObIAllocator *alloc = NULL) : ObTempBlockStore(alloc),
|
||||||
compact_level_(share::SORT_DEFAULT_LEVEL),
|
compact_level_(share::SORT_DEFAULT_LEVEL),
|
||||||
writer_(nullptr), reader_(nullptr), batch_ctx_(nullptr),
|
writer_(nullptr), reader_(nullptr), batch_ctx_(nullptr),
|
||||||
row_meta_(*allocator_), row_cnt_(0), block_reader_(), start_iter_(false),
|
row_meta_(*allocator_), row_cnt_(0), block_reader_(), start_iter_(false),
|
||||||
cur_blk_id_(0), last_truncate_offset_(0), enable_truncate_(true)
|
cur_blk_id_(0)
|
||||||
{
|
{
|
||||||
};
|
};
|
||||||
virtual ~ObCompactStore() {reset();};
|
virtual ~ObCompactStore() {reset();};
|
||||||
@ -88,8 +87,7 @@ public:
|
|||||||
ObTempBlockStore::BlockReader* get_block_reader() { return &block_reader_; }
|
ObTempBlockStore::BlockReader* get_block_reader() { return &block_reader_; }
|
||||||
ObBlockIWriter* get_writer() { return writer_; }
|
ObBlockIWriter* get_writer() { return writer_; }
|
||||||
int64_t get_row_cnt() const { return row_cnt_; }
|
int64_t get_row_cnt() const { return row_cnt_; }
|
||||||
void set_enable_truncate(bool enable_trunc) { enable_truncate_ = enable_trunc; }
|
|
||||||
bool enable_truncate() { return enable_truncate_; }
|
|
||||||
int has_next(bool &has_next);
|
int has_next(bool &has_next);
|
||||||
ChunkRowMeta *get_row_meta() { return &row_meta_; }
|
ChunkRowMeta *get_row_meta() { return &row_meta_; }
|
||||||
void set_meta(ChunkRowMeta *row_meta) { writer_->set_meta(row_meta); reader_->set_meta(row_meta); }
|
void set_meta(ChunkRowMeta *row_meta) { writer_->set_meta(row_meta); reader_->set_meta(row_meta); }
|
||||||
@ -124,8 +122,6 @@ private:
|
|||||||
ObTempBlockStore::BlockReader block_reader_;
|
ObTempBlockStore::BlockReader block_reader_;
|
||||||
bool start_iter_;
|
bool start_iter_;
|
||||||
int64_t cur_blk_id_;
|
int64_t cur_blk_id_;
|
||||||
int64_t last_truncate_offset_;
|
|
||||||
bool enable_truncate_;
|
|
||||||
}
|
}
|
||||||
;
|
;
|
||||||
|
|
||||||
|
|||||||
@ -49,10 +49,11 @@ int ObTempBlockStore::ShrinkBuffer::init(char *buf, const int64_t buf_size)
|
|||||||
ObTempBlockStore::ObTempBlockStore(common::ObIAllocator *alloc /* = NULL */)
|
ObTempBlockStore::ObTempBlockStore(common::ObIAllocator *alloc /* = NULL */)
|
||||||
: inited_(false), allocator_(NULL == alloc ? &inner_allocator_ : alloc), blk_(NULL),
|
: inited_(false), allocator_(NULL == alloc ? &inner_allocator_ : alloc), blk_(NULL),
|
||||||
block_id_cnt_(0), saved_block_id_cnt_(0), dumped_block_id_cnt_(0), enable_dump_(true),
|
block_id_cnt_(0), saved_block_id_cnt_(0), dumped_block_id_cnt_(0), enable_dump_(true),
|
||||||
|
enable_trunc_(false), last_trunc_offset_(0),
|
||||||
tenant_id_(0), label_(), ctx_id_(0), mem_limit_(0), mem_hold_(0), mem_used_(0),
|
tenant_id_(0), label_(), ctx_id_(0), mem_limit_(0), mem_hold_(0), mem_used_(0),
|
||||||
file_size_(0), block_cnt_(0), index_block_cnt_(0), block_cnt_on_disk_(0),
|
file_size_(0), block_cnt_(0), index_block_cnt_(0), block_cnt_on_disk_(0),
|
||||||
alloced_mem_size_(0), max_block_size_(0), max_hold_mem_(0), idx_blk_(NULL), mem_stat_(NULL),
|
alloced_mem_size_(0), max_block_size_(0), max_hold_mem_(0), idx_blk_(NULL), mem_stat_(NULL),
|
||||||
io_observer_(NULL), last_block_on_disk_(false)
|
io_observer_(NULL), last_block_on_disk_(false), cur_file_offset_(0)
|
||||||
{
|
{
|
||||||
label_[0] = '\0';
|
label_[0] = '\0';
|
||||||
io_.dir_id_ = -1;
|
io_.dir_id_ = -1;
|
||||||
@ -64,7 +65,8 @@ int ObTempBlockStore::init(int64_t mem_limit,
|
|||||||
uint64_t tenant_id,
|
uint64_t tenant_id,
|
||||||
int64_t mem_ctx_id,
|
int64_t mem_ctx_id,
|
||||||
const char *label,
|
const char *label,
|
||||||
common::ObCompressorType compress_type)
|
common::ObCompressorType compress_type,
|
||||||
|
const bool enable_trunc)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
mem_limit_ = mem_limit;
|
mem_limit_ = mem_limit;
|
||||||
@ -78,6 +80,7 @@ int ObTempBlockStore::init(int64_t mem_limit,
|
|||||||
inner_reader_.init(this);
|
inner_reader_.init(this);
|
||||||
inited_ = true;
|
inited_ = true;
|
||||||
compressor_.init(compress_type);
|
compressor_.init(compress_type);
|
||||||
|
enable_trunc_ = enable_trunc;
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -372,6 +375,18 @@ int ObTempBlockStore::get_block(BlockReader &reader, const int64_t block_id, con
|
|||||||
LOG_WARN("Null block returned", K(ret));
|
LOG_WARN("Null block returned", K(ret));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (OB_SUCC(ret)) {
|
||||||
|
// truncate file, if enable_trunc_
|
||||||
|
if (enable_trunc_ && (last_trunc_offset_ + TRUNCATE_THRESHOLD < cur_file_offset_)) {
|
||||||
|
if (OB_FAIL(truncate_file(cur_file_offset_))) {
|
||||||
|
LOG_WARN("fail to truncate file", K(ret));
|
||||||
|
} else {
|
||||||
|
last_trunc_offset_ = cur_file_offset_;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -826,10 +841,10 @@ int ObTempBlockStore::load_block(BlockReader &reader, const int64_t block_id,
|
|||||||
}
|
}
|
||||||
if (OB_SUCC(ret) && bi->on_disk_) {
|
if (OB_SUCC(ret) && bi->on_disk_) {
|
||||||
if (reader.is_async()) {
|
if (reader.is_async()) {
|
||||||
reader.set_cur_file_offset(bi->offset_ > 0 ? bi->offset_ - 1 : 0);
|
cur_file_offset_ = bi->offset_ > 0 ? bi->offset_ - 1 : 0;
|
||||||
} else {
|
} else {
|
||||||
blk = reinterpret_cast<const Block *>(reader.buf_.data());
|
blk = reinterpret_cast<const Block *>(reader.buf_.data());
|
||||||
reader.set_cur_file_offset(bi->offset_ + bi->length_);
|
cur_file_offset_ = bi->offset_ + bi->length_;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1026,6 +1041,11 @@ int ObTempBlockStore::BlockReader::aio_wait()
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObTempBlockStore::BlockReader::get_block(const int64_t block_id, const Block *&blk)
|
||||||
|
{
|
||||||
|
return store_->get_block(*this, block_id, blk);
|
||||||
|
}
|
||||||
|
|
||||||
int ObTempBlockStore::write_file(BlockIndex &bi, void *buf, int64_t size)
|
int ObTempBlockStore::write_file(BlockIndex &bi, void *buf, int64_t size)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
@ -1397,7 +1417,6 @@ void ObTempBlockStore::BlockReader::reset_cursor(const int64_t file_size, const
|
|||||||
idx_blk_ = NULL;
|
idx_blk_ = NULL;
|
||||||
aio_blk_ = NULL;
|
aio_blk_ = NULL;
|
||||||
ib_pos_ = 0;
|
ib_pos_ = 0;
|
||||||
cur_file_offset_ = 0;
|
|
||||||
if (need_release && nullptr != blk_holder_ptr_) {
|
if (need_release && nullptr != blk_holder_ptr_) {
|
||||||
blk_holder_ptr_->release();
|
blk_holder_ptr_->release();
|
||||||
blk_holder_ptr_ = nullptr;
|
blk_holder_ptr_ = nullptr;
|
||||||
|
|||||||
@ -57,6 +57,7 @@ class ObIOEventObserver;
|
|||||||
class ObTempBlockStore
|
class ObTempBlockStore
|
||||||
{
|
{
|
||||||
OB_UNIS_VERSION_V(1);
|
OB_UNIS_VERSION_V(1);
|
||||||
|
static const int64_t TRUNCATE_THRESHOLD = 2L << 20;
|
||||||
public:
|
public:
|
||||||
/*
|
/*
|
||||||
* ShrinkBuffer, a buffer wrapper class supporting bidirectional writing,
|
* ShrinkBuffer, a buffer wrapper class supporting bidirectional writing,
|
||||||
@ -281,15 +282,12 @@ public:
|
|||||||
static const int AIO_BUF_CNT = 2;
|
static const int AIO_BUF_CNT = 2;
|
||||||
public:
|
public:
|
||||||
BlockReader() : store_(NULL), idx_blk_(NULL), ib_pos_(0), file_size_(0), age_(NULL),
|
BlockReader() : store_(NULL), idx_blk_(NULL), ib_pos_(0), file_size_(0), age_(NULL),
|
||||||
try_free_list_(NULL), blk_holder_ptr_(NULL), read_io_handle_(), cur_file_offset_(0),
|
try_free_list_(NULL), blk_holder_ptr_(NULL), read_io_handle_(),
|
||||||
is_async_(true), aio_buf_idx_(0), aio_blk_(nullptr) {}
|
is_async_(true), aio_buf_idx_(0), aio_blk_(nullptr) {}
|
||||||
virtual ~BlockReader() { reset(); }
|
virtual ~BlockReader() { reset(); }
|
||||||
|
|
||||||
int init(ObTempBlockStore *store, const bool async = true);
|
int init(ObTempBlockStore *store, const bool async = true);
|
||||||
inline int get_block(const int64_t block_id, const Block *&blk)
|
int get_block(const int64_t block_id, const Block *&blk);
|
||||||
{ return store_->get_block(*this, block_id, blk); }
|
|
||||||
inline int64_t get_cur_file_offset() const { return cur_file_offset_; }
|
|
||||||
inline void set_cur_file_offset(int64_t file_offset) { cur_file_offset_ = file_offset; }
|
|
||||||
inline int64_t get_block_cnt() const { return store_->get_block_cnt(); }
|
inline int64_t get_block_cnt() const { return store_->get_block_cnt(); }
|
||||||
void set_iteration_age(IterationAge *age) { age_ = age; }
|
void set_iteration_age(IterationAge *age) { age_ = age; }
|
||||||
void set_blk_holder(BlockHolder *holder) { blk_holder_ptr_ = holder; }
|
void set_blk_holder(BlockHolder *holder) { blk_holder_ptr_ = holder; }
|
||||||
@ -353,7 +351,8 @@ public:
|
|||||||
uint64_t tenant_id,
|
uint64_t tenant_id,
|
||||||
int64_t mem_ctx_id,
|
int64_t mem_ctx_id,
|
||||||
const char *label,
|
const char *label,
|
||||||
common::ObCompressorType compressor_type = NONE_COMPRESSOR);
|
common::ObCompressorType compressor_type = NONE_COMPRESSOR,
|
||||||
|
const bool enable_trunc = false);
|
||||||
void reset();
|
void reset();
|
||||||
void reuse();
|
void reuse();
|
||||||
void reset_block_cnt();
|
void reset_block_cnt();
|
||||||
@ -408,12 +407,19 @@ public:
|
|||||||
int alloc_dir_id();
|
int alloc_dir_id();
|
||||||
int dump(const bool all_dump, const int64_t target_dump_size=INT64_MAX);
|
int dump(const bool all_dump, const int64_t target_dump_size=INT64_MAX);
|
||||||
int finish_add_row(bool need_dump = true);
|
int finish_add_row(bool need_dump = true);
|
||||||
|
void set_enable_truncate(bool enable_trunc)
|
||||||
|
{
|
||||||
|
enable_trunc_ = enable_trunc;
|
||||||
|
}
|
||||||
|
bool is_truncate() { return enable_trunc_; }
|
||||||
|
inline int64_t get_cur_file_offset() const { return cur_file_offset_; }
|
||||||
|
inline void set_cur_file_offset(int64_t file_offset) { cur_file_offset_ = file_offset; }
|
||||||
// include index blocks and data blocks
|
// include index blocks and data blocks
|
||||||
|
|
||||||
TO_STRING_KV(K_(inited), K_(enable_dump), K_(tenant_id), K_(label), K_(ctx_id), K_(mem_limit),
|
TO_STRING_KV(K_(inited), K_(enable_dump), K_(tenant_id), K_(label), K_(ctx_id), K_(mem_limit),
|
||||||
K_(mem_hold), K_(mem_used), K_(io_.fd), K_(io_.dir_id), K_(file_size), K_(block_cnt),
|
K_(mem_hold), K_(mem_used), K_(io_.fd), K_(io_.dir_id), K_(file_size), K_(block_cnt),
|
||||||
K_(index_block_cnt), K_(block_cnt_on_disk), K_(block_id_cnt), K_(dumped_block_id_cnt),
|
K_(index_block_cnt), K_(block_cnt_on_disk), K_(block_id_cnt), K_(dumped_block_id_cnt),
|
||||||
K_(alloced_mem_size));
|
K_(alloced_mem_size), K_(enable_trunc), K_(last_trunc_offset), K_(cur_file_offset));
|
||||||
|
|
||||||
void *alloc(const int64_t size)
|
void *alloc(const int64_t size)
|
||||||
{
|
{
|
||||||
@ -526,6 +532,8 @@ protected:
|
|||||||
int64_t saved_block_id_cnt_;
|
int64_t saved_block_id_cnt_;
|
||||||
int64_t dumped_block_id_cnt_;
|
int64_t dumped_block_id_cnt_;
|
||||||
bool enable_dump_;
|
bool enable_dump_;
|
||||||
|
bool enable_trunc_; // if true, the read contents of tmp file we be removed from disk.
|
||||||
|
int64_t last_trunc_offset_;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
uint64_t tenant_id_;
|
uint64_t tenant_id_;
|
||||||
@ -559,6 +567,7 @@ private:
|
|||||||
blocksstable::ObTmpFileIOHandle write_io_handle_;
|
blocksstable::ObTmpFileIOHandle write_io_handle_;
|
||||||
blocksstable::ObTmpFileIOInfo io_;
|
blocksstable::ObTmpFileIOInfo io_;
|
||||||
bool last_block_on_disk_;
|
bool last_block_on_disk_;
|
||||||
|
int64_t cur_file_offset_;
|
||||||
|
|
||||||
DISALLOW_COPY_AND_ASSIGN(ObTempBlockStore);
|
DISALLOW_COPY_AND_ASSIGN(ObTempBlockStore);
|
||||||
};
|
};
|
||||||
|
|||||||
@ -453,14 +453,17 @@ int ObTempRowStore::init(const ObExprPtrIArray &exprs,
|
|||||||
const int64_t mem_limit,
|
const int64_t mem_limit,
|
||||||
bool enable_dump,
|
bool enable_dump,
|
||||||
uint32_t row_extra_size,
|
uint32_t row_extra_size,
|
||||||
const bool reorder_fixed_expr /*true*/)
|
const bool reorder_fixed_expr /*true*/,
|
||||||
|
const bool enable_trunc /*false*/,
|
||||||
|
const common::ObCompressorType compressor_type /*NONE_COMPRESSOR*/)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
mem_attr_ = mem_attr;
|
mem_attr_ = mem_attr;
|
||||||
col_cnt_ = exprs.count();
|
col_cnt_ = exprs.count();
|
||||||
max_batch_size_ = max_batch_size;
|
max_batch_size_ = max_batch_size;
|
||||||
ObTempBlockStore::set_inner_allocator_attr(mem_attr);
|
ObTempBlockStore::set_inner_allocator_attr(mem_attr);
|
||||||
OZ(ObTempBlockStore::init(mem_limit, enable_dump, mem_attr.tenant_id_, mem_attr.ctx_id_, mem_attr_.label_));
|
OZ(ObTempBlockStore::init(mem_limit, enable_dump, mem_attr.tenant_id_, mem_attr.ctx_id_, mem_attr_.label_,
|
||||||
|
compressor_type, enable_trunc));
|
||||||
OZ(row_meta_.init(exprs, row_extra_size, reorder_fixed_expr));
|
OZ(row_meta_.init(exprs, row_extra_size, reorder_fixed_expr));
|
||||||
inited_ = true;
|
inited_ = true;
|
||||||
return ret;
|
return ret;
|
||||||
@ -470,7 +473,9 @@ int ObTempRowStore::init(const RowMeta &row_meta,
|
|||||||
const int64_t max_batch_size,
|
const int64_t max_batch_size,
|
||||||
const lib::ObMemAttr &mem_attr,
|
const lib::ObMemAttr &mem_attr,
|
||||||
const int64_t mem_limit,
|
const int64_t mem_limit,
|
||||||
bool enable_dump)
|
bool enable_dump,
|
||||||
|
const bool enable_trunc /*false*/,
|
||||||
|
const common::ObCompressorType compressor_type /*NONE_COMPRESSOR*/)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
mem_attr_ = mem_attr;
|
mem_attr_ = mem_attr;
|
||||||
@ -478,7 +483,8 @@ int ObTempRowStore::init(const RowMeta &row_meta,
|
|||||||
max_batch_size_ = max_batch_size;
|
max_batch_size_ = max_batch_size;
|
||||||
CK (!row_meta.fixed_expr_reordered())
|
CK (!row_meta.fixed_expr_reordered())
|
||||||
row_meta_ = row_meta;
|
row_meta_ = row_meta;
|
||||||
OZ(ObTempBlockStore::init(mem_limit, enable_dump, mem_attr.tenant_id_, mem_attr.ctx_id_, mem_attr_.label_));
|
OZ(ObTempBlockStore::init(mem_limit, enable_dump, mem_attr.tenant_id_, mem_attr.ctx_id_, mem_attr_.label_,
|
||||||
|
compressor_type, enable_trunc));
|
||||||
inited_ = true;
|
inited_ = true;
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -149,13 +149,17 @@ public:
|
|||||||
const int64_t mem_limit,
|
const int64_t mem_limit,
|
||||||
bool enable_dump,
|
bool enable_dump,
|
||||||
uint32_t row_extra_size,
|
uint32_t row_extra_size,
|
||||||
const bool reorder_fixed_expr = true);
|
const bool reorder_fixed_expr = true,
|
||||||
|
const bool enable_trunc = false,
|
||||||
|
const common::ObCompressorType compressor_type = NONE_COMPRESSOR);
|
||||||
|
|
||||||
int init(const RowMeta &row_meta,
|
int init(const RowMeta &row_meta,
|
||||||
const int64_t max_batch_size,
|
const int64_t max_batch_size,
|
||||||
const lib::ObMemAttr &mem_attr,
|
const lib::ObMemAttr &mem_attr,
|
||||||
const int64_t mem_limit,
|
const int64_t mem_limit,
|
||||||
bool enable_dump);
|
bool enable_dump,
|
||||||
|
const bool enable_trunc = false,
|
||||||
|
const common::ObCompressorType compressor_type = NONE_COMPRESSOR);
|
||||||
|
|
||||||
int init_batch_ctx();
|
int init_batch_ctx();
|
||||||
|
|
||||||
|
|||||||
@ -193,10 +193,13 @@ int ObSortVecOp::init_temp_row_store(const common::ObIArray<ObExpr *> &exprs,
|
|||||||
const bool is_sort_key, ObTempRowStore &row_store)
|
const bool is_sort_key, ObTempRowStore &row_store)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
const bool enable_trunc = true;
|
||||||
|
const bool reorder_fixed_expr = true;
|
||||||
if (row_store.is_inited()) {
|
if (row_store.is_inited()) {
|
||||||
// do nothing
|
// do nothing
|
||||||
} else if (OB_FAIL(row_store.init(exprs, batch_size, mem_attr, 2 * 1024 * 1024, true,
|
} else if (OB_FAIL(row_store.init(exprs, batch_size, mem_attr, 2 * 1024 * 1024, true,
|
||||||
sort_op_provider_.get_extra_size(is_sort_key) /* row_extra_size */))) {
|
sort_op_provider_.get_extra_size(is_sort_key) /* row_extra_size */,
|
||||||
|
reorder_fixed_expr, enable_trunc))) {
|
||||||
LOG_WARN("init row store failed", K(ret));
|
LOG_WARN("init row store failed", K(ret));
|
||||||
} else if (OB_FAIL(row_store.alloc_dir_id())) {
|
} else if (OB_FAIL(row_store.alloc_dir_id())) {
|
||||||
LOG_WARN("failed to alloc dir id", K(ret));
|
LOG_WARN("failed to alloc dir id", K(ret));
|
||||||
|
|||||||
@ -166,9 +166,11 @@ int ObSortVecOpImpl<Compare, Store_Row, has_addon>::init_temp_row_store(
|
|||||||
ObTempRowStore &row_store)
|
ObTempRowStore &row_store)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
const bool enable_trunc = true;
|
||||||
|
const bool reorder_fixed_expr = true;
|
||||||
ObMemAttr mem_attr(tenant_id_, ObModIds::OB_SQL_SORT_ROW, ObCtxIds::WORK_AREA);
|
ObMemAttr mem_attr(tenant_id_, ObModIds::OB_SQL_SORT_ROW, ObCtxIds::WORK_AREA);
|
||||||
if (OB_FAIL(row_store.init(exprs, batch_size, mem_attr, mem_limit, enable_dump,
|
if (OB_FAIL(row_store.init(exprs, batch_size, mem_attr, mem_limit, enable_dump,
|
||||||
extra_size /* row_extra_size */))) {
|
extra_size /* row_extra_size */, reorder_fixed_expr, enable_trunc))) {
|
||||||
SQL_ENG_LOG(WARN, "init row store failed", K(ret));
|
SQL_ENG_LOG(WARN, "init row store failed", K(ret));
|
||||||
} else {
|
} else {
|
||||||
row_store.set_dir_id(sql_mem_processor_.get_dir_id());
|
row_store.set_dir_id(sql_mem_processor_.get_dir_id());
|
||||||
|
|||||||
Reference in New Issue
Block a user