[FEAT MERGE]: use parameter _ob_ddl_temp_file_compress_func to control compression in ddl.

This commit is contained in:
Monk-Liu
2024-03-07 04:45:49 +00:00
committed by ob-robot
parent 5279be2c40
commit 92fca7def1
32 changed files with 491 additions and 780 deletions

View File

@ -195,7 +195,7 @@ int ObCompactStore::add_batch(const common::ObIArray<ObExpr *> &exprs, ObEvalCtx
int ret = OB_SUCCESS;
CK(is_inited());
OZ(init_batch_ctx(exprs.count(), ctx.max_batch_size_));
bool all_batch_res = (compact_level_ == share::SORT_DEFAULT_LEVEL || compact_level_ == share::SORT_COMPRESSION_LEVEL);
bool all_batch_res = false;
for (int64_t i = 0; i < exprs.count() && OB_SUCC(ret); i++) {
ObExpr *e = exprs.at(i);
if (OB_ISNULL(e)) {
@ -354,21 +354,18 @@ int ObCompactStore::init(const int64_t mem_limit,
const bool enable_dump,
const uint32_t row_extra_size,
const bool enable_trunc,
const share::SortCompactLevel compact_level,
const ObCompressorType compress_type,
const ExprFixedArray *exprs)
{
int ret = OB_SUCCESS;
compact_level_ = compact_level;
inited_ = true;
if (OB_ISNULL(exprs) || (compact_level != share::SORT_COMPACT_LEVEL && compact_level != share::SORT_COMPRESSION_COMPACT_LEVEL)) {
} else {
OZ(row_meta_.init(*exprs, row_extra_size));
}
OZ(ObTempBlockStore::init(mem_limit, enable_dump, tenant_id, mem_ctx_id, label, compress_type, enable_trunc));
OZ(block_reader_.init(this));
if (OB_NOT_NULL(exprs)) {
OZ(row_meta_.init(*exprs, row_extra_size));
}
OZ(init_writer_reader());
LOG_INFO("success to init compact store", K(enable_dump), K(enable_trunc), K(compact_level), K(compress_type),
LOG_INFO("success to init compact store", K(enable_dump), K(enable_trunc), K(compress_type),
K(exprs), K(ret));
return ret;
}
@ -381,20 +378,15 @@ int ObCompactStore::init(const int64_t mem_limit,
const bool enable_dump,
const uint32_t row_extra_size,
const bool enable_trunc,
const share::SortCompactLevel compact_level,
const ObCompressorType compress_type)
{
int ret = OB_SUCCESS;
compact_level_ = compact_level;
inited_ = true;
if (compact_level != share::SORT_COMPACT_LEVEL && compact_level != share::SORT_COMPRESSION_COMPACT_LEVEL) {
} else {
OZ(row_meta_.init(col_array, row_extra_size));
}
OZ(row_meta_.init(col_array, row_extra_size));
OZ(ObTempBlockStore::init(mem_limit, enable_dump, tenant_id, mem_ctx_id, label, compress_type, enable_trunc));
OZ(block_reader_.init(this));
OZ(init_writer_reader());
LOG_INFO("success to init compact store", K(enable_dump), K(enable_trunc), K(compact_level), K(compress_type),
LOG_INFO("success to init compact store", K(enable_dump), K(enable_trunc), K(compress_type),
K(col_array), K(ret));
return ret;
}
@ -409,7 +401,6 @@ void ObCompactStore::reset()
writer_->reset();
allocator_->free(writer_);
}
compact_level_ = share::SORT_DEFAULT_LEVEL;
writer_ = nullptr;
reader_ = nullptr;
batch_ctx_ = nullptr;
@ -437,45 +428,14 @@ int ObCompactStore::init_writer_reader()
int ret = OB_SUCCESS;
void *writer_buf = nullptr;
void *reader_buf = nullptr;
switch (compact_level_) {
case share::SORT_COMPRESSION_LEVEL:
case share::SORT_DEFAULT_LEVEL: {
writer_buf = allocator_->alloc(sizeof(ObDefaultBlockWriter));
reader_buf = allocator_->alloc(sizeof(ObDefaultBlockReader));
if (OB_ISNULL(writer_buf) || OB_ISNULL(reader_buf)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to allocate memory for writer", K(ret), KP(writer_buf), KP(reader_buf));
} else {
writer_ = new (writer_buf)ObDefaultBlockWriter(this);
reader_ = new (reader_buf)ObDefaultBlockReader(this);
}
break;
}
case share::SORT_COMPRESSION_COMPACT_LEVEL:
case share::SORT_COMPACT_LEVEL: {
writer_buf = allocator_->alloc(sizeof(ObCompactBlockWriter));
reader_buf = allocator_->alloc(sizeof(ObCompactBlockReader));
if (OB_ISNULL(writer_buf) || OB_ISNULL(reader_buf)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to allocate memory for writer", K(ret));
} else {
writer_ = new (writer_buf)ObCompactBlockWriter(this, &row_meta_);
reader_ = new (reader_buf)ObCompactBlockReader(this, &row_meta_);
}
break;
}
case share::SORT_COMPRESSION_ENCODE_LEVEL:
case share::SORT_ENCODE_LEVEL: {
// TODO
ret = OB_NOT_SUPPORTED;
LOG_WARN("encoding is not supported", K(ret));
LOG_USER_ERROR(OB_NOT_SUPPORTED, "encoding in chunk store");
break;
}
default: {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to init reader/writer", K(ret), K(compact_level_));
}
writer_buf = allocator_->alloc(sizeof(ObCompactBlockWriter));
reader_buf = allocator_->alloc(sizeof(ObCompactBlockReader));
if (OB_ISNULL(writer_buf) || OB_ISNULL(reader_buf)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to allocate memory for writer", K(ret));
} else {
writer_ = new (writer_buf)ObCompactBlockWriter(this, &row_meta_);
reader_ = new (reader_buf)ObCompactBlockReader(this, &row_meta_);
}
return ret;

View File

@ -20,6 +20,7 @@
#include "sql/engine/basic/chunk_store/ob_block_iwriter.h"
#include "sql/engine/basic/chunk_store/ob_chunk_block.h"
#include "src/share/ob_ddl_common.h"
namespace oceanbase
{
namespace storage {
@ -33,7 +34,6 @@ class ObCompactStore final : public ObTempBlockStore
OB_UNIS_VERSION_V(1);
public:
explicit ObCompactStore(common::ObIAllocator *alloc = NULL) : ObTempBlockStore(alloc),
compact_level_(share::SORT_DEFAULT_LEVEL),
writer_(nullptr), reader_(nullptr), batch_ctx_(nullptr),
row_meta_(*allocator_), row_cnt_(0), block_reader_(), start_iter_(false),
cur_blk_id_(0)
@ -49,7 +49,6 @@ public:
const bool enable_dump = true,
const uint32_t row_extra_size = 0,
const bool enable_trunc = true,
const share::SortCompactLevel compact_level = share::SORT_DEFAULT_LEVEL,
const ObCompressorType compress_type = NONE_COMPRESSOR,
const ExprFixedArray *exprs = nullptr);
@ -61,7 +60,6 @@ public:
const bool enable_dump = true,
const uint32_t row_extra_size = 0,
const bool enable_trunc = true,
const share::SortCompactLevel compact_level = share::SORT_DEFAULT_LEVEL,
const ObCompressorType compress_type = NONE_COMPRESSOR);
int add_batch(const common::ObIArray<ObExpr *> &exprs, ObEvalCtx &ctx,
const ObBitVector &skip, const int64_t batch_size,
@ -91,7 +89,6 @@ public:
int has_next(bool &has_next);
ChunkRowMeta *get_row_meta() { return &row_meta_; }
void set_meta(ChunkRowMeta *row_meta) { writer_->set_meta(row_meta); reader_->set_meta(row_meta); }
share::SortCompactLevel get_compact_level() { return compact_level_; }
void set_blk_holder(ObTempBlockStore::BlockHolder *blk_holder) { block_reader_.set_blk_holder(blk_holder); }
protected:
int prepare_blk_for_write(Block *) final override;
@ -112,7 +109,6 @@ private:
int inner_get_next_row(const ObChunkDatumStore::StoredRow *&sr);
private:
share::SortCompactLevel compact_level_;
ObBlockIWriter *writer_;
ObBlockIReader *reader_;
BatchCtx *batch_ctx_;

View File

@ -82,7 +82,6 @@ int ObCreateIndexExecutor::execute(ObExecContext &ctx, ObCreateIndexStmt &stmt)
//impossible
} else if (FALSE_IT(create_index_arg.is_inner_ = my_session->is_inner())) {
} else if (FALSE_IT(create_index_arg.parallelism_ = stmt.get_parallelism())) {
} else if (FALSE_IT(create_index_arg.compact_level_ = stmt.get_compact_level())) {
} else if (FALSE_IT(create_index_arg.consumer_group_id_ = THIS_WORKER.get_group_id())) {
} else if (OB_FAIL(common_rpc_proxy->create_index(create_index_arg, res))) { //send the signal of creating index to rs
LOG_WARN("rpc proxy create index failed", K(create_index_arg),

View File

@ -40,7 +40,6 @@ ObSortSpec::ObSortSpec(common::ObIAllocator &alloc, const ObPhyOperatorType type
prescan_enabled_(false),
enable_encode_sortkey_opt_(false),
part_cnt_(0),
sort_compact_level_(share::SORT_DEFAULT_LEVEL),
compress_type_(NONE_COMPRESSOR)
{}
@ -59,7 +58,6 @@ OB_SERIALIZE_MEMBER((ObSortSpec, ObOpSpec),
prescan_enabled_,
enable_encode_sortkey_opt_,
part_cnt_,
sort_compact_level_,
compress_type_);
ObSortOp::ObSortOp(ObExecContext &ctx_, const ObOpSpec &spec, ObOpInput *input)
@ -294,7 +292,7 @@ int ObSortOp::scan_all_then_sort()
if (OB_FAIL(cache_store.init(2 * 1024 * 1024,
ctx_.get_my_session()->get_effective_tenant_id(),
ObCtxIds::DEFAULT_CTX_ID, "SORT_CACHE_CTX", true/*enable dump*/, 0, true,
MY_SPEC.sort_compact_level_, MY_SPEC.compress_type_, &MY_SPEC.all_exprs_))) {
MY_SPEC.compress_type_, &MY_SPEC.all_exprs_))) {
LOG_WARN("init sample chunk store failed", K(ret));
} else if (OB_FAIL(cache_store.alloc_dir_id())) {
LOG_WARN("failed to alloc dir id", K(ret));
@ -350,7 +348,7 @@ int ObSortOp::scan_all_then_sort_batch()
if (OB_FAIL(cache_store.init(2 * 1024 * 1024,
ctx_.get_my_session()->get_effective_tenant_id(),
ObCtxIds::DEFAULT_CTX_ID, "SORT_CACHE_CTX", true/*enable dump*/, 0, true,
MY_SPEC.sort_compact_level_, MY_SPEC.compress_type_, &MY_SPEC.all_exprs_))) {
MY_SPEC.compress_type_, &MY_SPEC.all_exprs_))) {
LOG_WARN("init sample chunk store failed", K(ret));
} else if (OB_FAIL(cache_store.alloc_dir_id())) {
LOG_WARN("failed to alloc dir id", K(ret));
@ -438,7 +436,7 @@ int ObSortOp::init_sort(int64_t tenant_id,
OZ(sort_impl_.init(tenant_id, &MY_SPEC.sort_collations_, &MY_SPEC.sort_cmp_funs_,
&eval_ctx_, &ctx_, MY_SPEC.enable_encode_sortkey_opt_, MY_SPEC.is_local_merge_sort_,
false /* need_rewind */, MY_SPEC.part_cnt_, topn_cnt, MY_SPEC.is_fetch_with_ties_,
ObChunkDatumStore::BLOCK_SIZE, MY_SPEC.sort_compact_level_, MY_SPEC.compress_type_, &MY_SPEC.all_exprs_));
ObChunkDatumStore::BLOCK_SIZE, MY_SPEC.compress_type_, &MY_SPEC.all_exprs_));
if (is_batch) {
read_batch_func_ = &ObSortOp::sort_impl_next_batch;
} else {

View File

@ -35,7 +35,7 @@ public:
INHERIT_TO_STRING_KV("op_spec", ObOpSpec,
K_(topn_expr), K_(topk_limit_expr), K_(topk_offset_expr), K_(prefix_pos),
K_(minimum_row_count), K_(topk_precision), K_(prefix_pos), K_(is_local_merge_sort),
K_(prescan_enabled), K_(enable_encode_sortkey_opt), K_(part_cnt), K_(sort_compact_level),
K_(prescan_enabled), K_(enable_encode_sortkey_opt), K_(part_cnt),
K_(compress_type));
public:
ObExpr *topn_expr_;
@ -59,7 +59,6 @@ public:
bool enable_encode_sortkey_opt_;
// if use, all_exprs_ is : hash(part_by) + part_by + order_by.
int64_t part_cnt_;
share::SortCompactLevel sort_compact_level_;
ObCompressorType compress_type_;
};

View File

@ -577,7 +577,7 @@ ObSortOpImpl::ObSortOpImpl(ObMonitorNode &op_monitor_info)
max_node_cnt_(0), part_cnt_(0), topn_cnt_(INT64_MAX), outputted_rows_cnt_(0),
is_fetch_with_ties_(false), topn_heap_(NULL), ties_array_pos_(0),
last_ties_row_(NULL), pt_buckets_(NULL), use_partition_topn_sort_(false), heap_nodes_(), cur_heap_idx_(0),
rows_(NULL), sort_compact_level_(share::SORT_DEFAULT_LEVEL), sort_exprs_(nullptr),
rows_(NULL), sort_exprs_(nullptr),
compress_type_(NONE_COMPRESSOR)
{
}
@ -639,7 +639,6 @@ int ObSortOpImpl::init(
const int64_t topn_cnt /* = INT64_MAX */,
const bool is_fetch_with_ties /* = false */,
const int64_t default_block_size /* = 64KB */,
const SortCompactLevel compact_level /* = false */,
const ObCompressorType compress_type /* = NONE_COMPRESS */,
const ExprFixedArray *exprs /* =nullptr */)
{
@ -669,7 +668,6 @@ int ObSortOpImpl::init(
exec_ctx_ = exec_ctx;
part_cnt_ = part_cnt;
topn_cnt_ = topn_cnt;
sort_compact_level_ = compact_level;
compress_type_ = compress_type;
sort_exprs_ = exprs;
use_heap_sort_ = is_topn_sort() && part_cnt_ == 0;
@ -807,7 +805,6 @@ void ObSortOpImpl::reset()
is_fetch_with_ties_ = false;
rows_ = NULL;
ties_array_pos_ = 0;
sort_compact_level_ = share::SORT_DEFAULT_LEVEL;
compress_type_ = NONE_COMPRESSOR;
sort_exprs_ = nullptr;
// for partition topn sort
@ -875,7 +872,7 @@ int ObSortOpImpl::build_chunk(const int64_t level, Input &input, int64_t extra_s
} else if (OB_FAIL(chunk->datum_store_.init(1/*+ mem limit, small limit for dump immediately */,
tenant_id_, ObCtxIds::WORK_AREA, ObModIds::OB_SQL_SORT_ROW,
true/*+ enable dump */, extra_size/* for InMemoryTopnSort */, true,
sort_compact_level_, compress_type_, sort_exprs_))) {
compress_type_, sort_exprs_))) {
LOG_WARN("init row store failed", K(ret));
} else {
chunk->datum_store_.set_dir_id(sql_mem_processor_.get_dir_id());
@ -2238,7 +2235,7 @@ int ObSortOpImpl::get_next_batch_stored_rows(int64_t max_cnt, int64_t &read_rows
LOG_WARN("fail to get next row", K(ret));
} else {
stored_rows_[read_rows++] = const_cast<ObChunkDatumStore::StoredRow *>(sr);
if (sort_compact_level_ != share::SORT_DEFAULT_LEVEL && sort_compact_level_ != share::SORT_COMPRESSION_LEVEL) {
if (use_compact_store()) {
// can't hold multi rows for get_batch, if we use compact/encoding
break;
}

View File

@ -110,14 +110,13 @@ public:
const bool enable_dump = true,
const uint32_t row_extra_size = 0,
const bool enable_truncate = true,
const share::SortCompactLevel compact_level = share::SORT_DEFAULT_LEVEL,
const ObCompressorType compress_type = NONE_COMPRESSOR,
const ExprFixedArray *exprs = nullptr)
{
int ret = OB_SUCCESS;
if (is_compact_) {
ret = compact_store_.init(mem_limit, tenant_id, mem_ctx_id, label, enable_dump, row_extra_size,
enable_truncate, compact_level, compress_type, exprs);
enable_truncate, compress_type, exprs);
} else {
ret = datum_store_.init(mem_limit, tenant_id, mem_ctx_id, label, enable_dump, row_extra_size);
}
@ -262,7 +261,6 @@ public:
const int64_t topn_cnt = INT64_MAX,
const bool is_fetch_with_ties = false,
const int64_t default_block_size = ObChunkDatumStore::BLOCK_SIZE,
const share::SortCompactLevel compact_level = share::SORT_DEFAULT_LEVEL,
const common::ObCompressorType compressor_type = common::NONE_COMPRESSOR,
const ExprFixedArray *exprs = nullptr);
@ -772,7 +770,6 @@ protected:
SortStoredRow *&new_row);
int generate_last_ties_row(const ObChunkDatumStore::StoredRow *orign_row);
int adjust_topn_read_rows(ObChunkDatumStore::StoredRow **stored_rows, int64_t &read_cnt);
bool use_compact_store() { return sort_compact_level_ != SORT_DEFAULT_LEVEL; }
// for partition topn
int init_partition_topn();
void reuse_part_topn_heap();
@ -793,6 +790,7 @@ protected:
const int64_t batch_size,
const uint16_t selector[],
const int64_t size);
bool use_compact_store() { return compress_type_ != NONE_COMPRESSOR; }
DISALLOW_COPY_AND_ASSIGN(ObSortOpImpl);
protected:
@ -861,7 +859,6 @@ protected:
common::ObIArray<ObChunkDatumStore::StoredRow *> *rows_;
ObTempBlockStore::BlockHolder compact_blk_holder_;
ObChunkDatumStore::IteratedBlockHolder default_blk_holder_;
share::SortCompactLevel sort_compact_level_;
const ExprFixedArray *sort_exprs_;
common::ObCompressorType compress_type_;
};