[bugfix]: vec_sort(in ddl) support compression.

This commit is contained in:
Monk-Liu
2024-03-08 09:20:28 +00:00
committed by ob-robot
parent 7fa555dad6
commit 1c49eb09bf
8 changed files with 69 additions and 54 deletions

View File

@ -2058,6 +2058,8 @@ int ObStaticEngineCG::generate_spec(ObLogSort &op, ObSortSpec &spec, const bool
LOG_WARN("failed to sort funcs", K(ret));
} else if (OB_FAIL(append_array_no_dup(spec.all_exprs_, spec.get_child()->output_))) {
LOG_WARN("failed to append array no dup", K(ret));
} else if (opt_ctx_->is_online_ddl() && OB_FAIL(fill_compress_type(op, spec.compress_type_))) {
LOG_WARN("fail to gt compress_type", K(ret));
} else {
spec.prefix_pos_ = op.get_prefix_pos();
spec.is_local_merge_sort_ = op.is_local_merge_sort();
@ -2067,40 +2069,7 @@ int ObStaticEngineCG::generate_spec(ObLogSort &op, ObSortSpec &spec, const bool
spec.enable_encode_sortkey_opt_ = op.enable_encode_sortkey_opt();
spec.part_cnt_ = op.get_part_cnt();
LOG_TRACE("trace order by", K(spec.all_exprs_.count()), K(spec.all_exprs_));
if (OB_SUCC(ret)) {
int64_t tenant_id = op.get_plan()->get_optimizer_context().get_session_info()->get_effective_tenant_id();
if (opt_ctx_->is_online_ddl()) {
// for normal sort we use default compress type. for online ddl, we use the compress type in source table
ObLogicalOperator *child_op = op.get_child(0);
ObCompressorType tmp_compr_type = NONE_COMPRESSOR;
while(OB_SUCC(ret) && OB_NOT_NULL(child_op) && child_op->get_type() != log_op_def::LOG_TABLE_SCAN ) {
child_op = child_op->get_child(0);
if (OB_NOT_NULL(child_op) && child_op->get_type() == log_op_def::LOG_TABLE_SCAN ) {
share::schema::ObSchemaGetterGuard *schema_guard = nullptr;
const share::schema::ObTableSchema *table_schema = nullptr;
uint64_t table_id = static_cast<ObLogTableScan*>(child_op)->get_ref_table_id();
if (OB_ISNULL(schema_guard = opt_ctx_->get_schema_guard())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to get schema guard", K(ret));
} else if (OB_FAIL(schema_guard->get_table_schema(tenant_id, table_id, table_schema))) {
LOG_WARN("fail to get table schema", K(ret));
} else if (OB_ISNULL(table_schema)) {
ret = OB_TABLE_NOT_EXIST;
LOG_WARN("can't find table schema", K(ret), K(table_id));
} else {
tmp_compr_type = table_schema->get_compressor_type();
}
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(ObDDLUtil::get_temp_store_compress_type(tmp_compr_type,
op.get_parallel(),
spec.compress_type_))) {
LOG_WARN("fail to get compress type", K(ret));
}
}
}
}
}
if (OB_SUCC(ret)) {
if (spec.sort_collations_.count() != spec.sort_cmp_funs_.count()
@ -2117,6 +2086,42 @@ int ObStaticEngineCG::generate_spec(ObLogSort &op, ObSortSpec &spec, const bool
return ret;
}
int ObStaticEngineCG::fill_compress_type(ObLogSort &op, ObCompressorType &compr_type)
{
int ret = OB_SUCCESS;
int64_t tenant_id = op.get_plan()->get_optimizer_context().get_session_info()->get_effective_tenant_id();
// for normal sort we use default compress type. for online ddl, we use the compress type in source table
ObLogicalOperator *child_op = op.get_child(0);
ObCompressorType tmp_compr_type = NONE_COMPRESSOR;
while(OB_SUCC(ret) && OB_NOT_NULL(child_op) && child_op->get_type() != log_op_def::LOG_TABLE_SCAN ) {
child_op = child_op->get_child(0);
if (OB_NOT_NULL(child_op) && child_op->get_type() == log_op_def::LOG_TABLE_SCAN ) {
share::schema::ObSchemaGetterGuard *schema_guard = nullptr;
const share::schema::ObTableSchema *table_schema = nullptr;
uint64_t table_id = static_cast<ObLogTableScan*>(child_op)->get_ref_table_id();
if (OB_ISNULL(schema_guard = opt_ctx_->get_schema_guard())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to get schema guard", K(ret));
} else if (OB_FAIL(schema_guard->get_table_schema(tenant_id, table_id, table_schema))) {
LOG_WARN("fail to get table schema", K(ret));
} else if (OB_ISNULL(table_schema)) {
ret = OB_TABLE_NOT_EXIST;
LOG_WARN("can't find table schema", K(ret), K(table_id));
} else {
tmp_compr_type = table_schema->get_compressor_type();
}
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(ObDDLUtil::get_temp_store_compress_type(tmp_compr_type,
op.get_parallel(),
compr_type))) {
LOG_WARN("fail to get compress type", K(ret));
}
}
return ret;
}
int ObStaticEngineCG::append_child_output_no_dup(const bool is_store_sortkey_separately,
const ObIArray<ObExpr *> &child_output_exprs,
ObIArray<ObExpr *> &sk_exprs,
@ -2280,6 +2285,8 @@ int ObStaticEngineCG::generate_spec(ObLogSort &op, ObSortVecSpec &spec, const bo
spec.get_child()->output_, spec.sk_exprs_,
spec.addon_exprs_))) {
LOG_WARN("failed to append array no dup", K(ret));
} else if (opt_ctx_->is_online_ddl() && OB_FAIL(fill_compress_type(op, spec.compress_type_))) {
LOG_WARN("fail to gt compress_type", K(ret));
} else {
spec.prefix_pos_ = op.get_prefix_pos();
spec.is_local_merge_sort_ = op.is_local_merge_sort();

View File

@ -448,6 +448,8 @@ private:
const ObSortCollations &collations,
ObSortFuncs &sort_funcs,
const ObIArray<ObExpr*> &sort_exprs);
int fill_compress_type(ObLogSort &op, ObCompressorType &compr_type);
int check_not_support_cmp_type(
const ObSortCollations &collations,
const ObIArray<ObExpr*> &sort_exprs);

View File

@ -82,11 +82,11 @@ int ObPrefixSortVecImpl<Compare, Store_Row, has_addon>::init(ObSortVecOpContext
SQL_ENG_LOG(WARN, "sort impl init failed", K(ret));
} else if (OB_FAIL(init_temp_row_store(
*sk_exprs_, INT64_MAX, batch_size, false, false /*enable dump*/,
Store_Row::get_extra_size(true /*is_sort_key*/), im_sk_store_))) {
Store_Row::get_extra_size(true /*is_sort_key*/), NONE_COMPRESSOR, im_sk_store_))) {
SQL_ENG_LOG(WARN, "failed to init temp row store", K(ret));
} else if (OB_FAIL(init_temp_row_store(
*addon_exprs_, INT64_MAX, batch_size, false, false /*enable dump*/,
Store_Row::get_extra_size(false /*is_sort_key*/), im_addon_store_))) {
Store_Row::get_extra_size(false /*is_sort_key*/), NONE_COMPRESSOR, im_addon_store_))) {
SQL_ENG_LOG(WARN, "failed to init temp row store", K(ret));
} else {
selector_ =

View File

@ -28,13 +28,13 @@ ObSortVecSpec::ObSortVecSpec(common::ObIAllocator &alloc, const ObPhyOperatorTyp
sk_exprs_(alloc), addon_exprs_(alloc), sk_collations_(alloc), addon_collations_(alloc),
minimum_row_count_(0), topk_precision_(0), prefix_pos_(0), is_local_merge_sort_(false),
is_fetch_with_ties_(false), prescan_enabled_(false), enable_encode_sortkey_opt_(false),
has_addon_(false), part_cnt_(0)
has_addon_(false), part_cnt_(0), compress_type_(NONE_COMPRESSOR)
{}
OB_SERIALIZE_MEMBER((ObSortVecSpec, ObOpSpec), topn_expr_, topk_limit_expr_, topk_offset_expr_,
sk_exprs_, addon_exprs_, sk_collations_, addon_collations_, minimum_row_count_,
topk_precision_, prefix_pos_, is_local_merge_sort_, is_fetch_with_ties_,
prescan_enabled_, enable_encode_sortkey_opt_, has_addon_, part_cnt_);
prescan_enabled_, enable_encode_sortkey_opt_, has_addon_, part_cnt_, compress_type_);
ObSortVecOp::ObSortVecOp(ObExecContext &ctx_, const ObOpSpec &spec, ObOpInput *input) :
ObOperator(ctx_, spec, input), sort_op_provider_(op_monitor_info_), sort_row_count_(0),
@ -190,7 +190,8 @@ int ObSortVecOp::process_sort_batch()
int ObSortVecOp::init_temp_row_store(const common::ObIArray<ObExpr *> &exprs,
const int64_t batch_size, const ObMemAttr &mem_attr,
const bool is_sort_key, ObTempRowStore &row_store)
const bool is_sort_key, ObCompressorType compress_type,
ObTempRowStore &row_store)
{
int ret = OB_SUCCESS;
const bool enable_trunc = true;
@ -199,7 +200,7 @@ int ObSortVecOp::init_temp_row_store(const common::ObIArray<ObExpr *> &exprs,
// do nothing
} else if (OB_FAIL(row_store.init(exprs, batch_size, mem_attr, 2 * 1024 * 1024, true,
sort_op_provider_.get_extra_size(is_sort_key) /* row_extra_size */,
reorder_fixed_expr, enable_trunc))) {
reorder_fixed_expr, enable_trunc, compress_type))) {
LOG_WARN("init row store failed", K(ret));
} else if (OB_FAIL(row_store.alloc_dir_id())) {
LOG_WARN("failed to alloc dir id", K(ret));
@ -215,11 +216,11 @@ int ObSortVecOp::init_prescan_row_store()
ObMemAttr mem_attr(ctx_.get_my_session()->get_effective_tenant_id(), "SORT_VEC_CTX",
ObCtxIds::WORK_AREA);
if (OB_FAIL(init_temp_row_store(MY_SPEC.sk_exprs_, MY_SPEC.max_batch_size_, mem_attr, true,
sk_row_store_))) {
MY_SPEC.compress_type_, sk_row_store_))) {
LOG_WARN("failed to init temp row store", K(ret));
} else if (MY_SPEC.has_addon_
&& OB_FAIL(init_temp_row_store(MY_SPEC.addon_exprs_, MY_SPEC.max_batch_size_, mem_attr,
false, addon_row_store_))) {
false, MY_SPEC.compress_type_, addon_row_store_))) {
LOG_WARN("failed to init temp row store", K(ret));
}
return ret;
@ -386,6 +387,7 @@ int ObSortVecOp::init_sort(int64_t tenant_id, int64_t row_count, int64_t topn_cn
context.topn_cnt_ = topn_cnt;
context.is_fetch_with_ties_ = MY_SPEC.is_fetch_with_ties_;
context.has_addon_ = MY_SPEC.has_addon_;
context.compress_type_ = MY_SPEC.compress_type_;
if (MY_SPEC.prefix_pos_ > 0) {
context.prefix_pos_ = MY_SPEC.prefix_pos_;
context.op_ = this;

View File

@ -35,7 +35,7 @@ public:
K_(topk_offset_expr), K_(prefix_pos), K_(minimum_row_count),
K_(topk_precision), K_(prefix_pos), K_(is_local_merge_sort),
K_(prescan_enabled), K_(enable_encode_sortkey_opt), K_(has_addon),
K_(part_cnt));
K_(part_cnt), K_(compress_type));
public:
ObExpr *topn_expr_;
@ -57,6 +57,7 @@ public:
bool has_addon_;
// if use, all_exprs_ is : hash(part_by) + part_by + order_by.
int64_t part_cnt_;
ObCompressorType compress_type_;
};
class ObSortVecOp : public ObOperator
@ -93,7 +94,7 @@ private:
const ObCompactRow **addon_stored_rows = nullptr);
int init_sort(int64_t tenant_id, int64_t row_count, int64_t topn_cnt = INT64_MAX);
int init_temp_row_store(const common::ObIArray<ObExpr *> &exprs, const int64_t batch_size,
const ObMemAttr &mem_attr, const bool is_sort_key,
const ObMemAttr &mem_attr, const bool is_sort_key, ObCompressorType compress_type,
ObTempRowStore &row_store);
private:

View File

@ -24,11 +24,11 @@ struct ObSortVecOpContext
tenant_id_(UINT64_MAX), sk_exprs_(nullptr), addon_exprs_(nullptr), sk_collations_(nullptr),
base_sk_collations_(nullptr), addon_collations_(nullptr), eval_ctx_(nullptr),
exec_ctx_(nullptr), op_(nullptr), prefix_pos_(0), part_cnt_(0), topn_cnt_(INT64_MAX),
sort_row_cnt_(nullptr), flag_(0)
sort_row_cnt_(nullptr), flag_(0), compress_type_(NONE_COMPRESSOR)
{}
TO_STRING_KV(K_(tenant_id), KP_(sk_exprs), KP_(addon_exprs), KP_(sk_collations),
KP_(base_sk_collations), KP_(addon_collations), K_(prefix_pos), K_(part_cnt),
K_(topn_cnt), KP_(sort_row_cnt), K_(flag));
K_(topn_cnt), KP_(sort_row_cnt), K_(flag), K_(compress_type));
uint64_t tenant_id_;
const ObIArray<ObExpr *> *sk_exprs_;
@ -56,6 +56,7 @@ struct ObSortVecOpContext
};
uint32_t flag_;
};
ObCompressorType compress_type_;
};
} // end namespace sql

View File

@ -60,7 +60,7 @@ public:
ties_array_pos_(0), ties_array_(), sorted_dumped_rows_ptrs_(), last_ties_row_(nullptr), rows_(nullptr),
sort_exprs_getter_(allocator_),
store_row_factory_(allocator_, sql_mem_processor_, sk_row_meta_, addon_row_meta_, inmem_row_size_, topn_cnt_),
topn_filter_(nullptr), is_topn_filter_enabled_(false)
topn_filter_(nullptr), is_topn_filter_enabled_(false), compress_type_(NONE_COMPRESSOR)
{}
virtual ~ObSortVecOpImpl()
{
@ -95,7 +95,7 @@ public:
int init_temp_row_store(const common::ObIArray<ObExpr *> &exprs, const int64_t mem_limit,
const int64_t batch_size, const bool need_callback,
const bool enable_dump, const int64_t extra_size,
const bool enable_dump, const int64_t extra_size, ObCompressorType compress_type,
ObTempRowStore &row_store);
int init_sort_temp_row_store(const int64_t batch_size);
int init_store_row_factory();
@ -281,13 +281,13 @@ protected:
ret = OB_ALLOCATE_MEMORY_FAILED;
SQL_ENG_LOG(WARN, "allocate memory failed", K(ret));
} else if (OB_FAIL(init_temp_row_store(*sk_exprs_, 1, eval_ctx_->max_batch_size_, true,
true /*enable dump*/, Store_Row::get_extra_size(true),
true /*enable dump*/, Store_Row::get_extra_size(true), compress_type_,
chunk->sk_store_))) {
SQL_ENG_LOG(WARN, "failed to init temp row store", K(ret));
} else if (has_addon
&& OB_FAIL(init_temp_row_store(
*addon_exprs_, 1, eval_ctx_->max_batch_size_, true, true /*enable dump*/,
Store_Row::get_extra_size(false), chunk->addon_store_))) {
Store_Row::get_extra_size(false), compress_type_, chunk->addon_store_))) {
SQL_ENG_LOG(WARN, "failed to init temp row store", K(ret));
} else {
while (OB_SUCC(ret)) {
@ -443,6 +443,7 @@ protected:
ObSortVecOpStoreRowFactory<Store_Row, has_addon> store_row_factory_;
ObSortVecOpEagerFilter<Compare, Store_Row, has_addon> *topn_filter_;
bool is_topn_filter_enabled_;
ObCompressorType compress_type_;
};
} // end namespace sql

View File

@ -162,7 +162,7 @@ int ObSortVecOpImpl<Compare, Store_Row, has_addon>::init_vec_ptrs(
template <typename Compare, typename Store_Row, bool has_addon>
int ObSortVecOpImpl<Compare, Store_Row, has_addon>::init_temp_row_store(
const common::ObIArray<ObExpr *> &exprs, const int64_t mem_limit, const int64_t batch_size,
const bool need_callback, const bool enable_dump, const int64_t extra_size,
const bool need_callback, const bool enable_dump, const int64_t extra_size, ObCompressorType compress_type,
ObTempRowStore &row_store)
{
int ret = OB_SUCCESS;
@ -170,7 +170,7 @@ int ObSortVecOpImpl<Compare, Store_Row, has_addon>::init_temp_row_store(
const bool reorder_fixed_expr = true;
ObMemAttr mem_attr(tenant_id_, ObModIds::OB_SQL_SORT_ROW, ObCtxIds::WORK_AREA);
if (OB_FAIL(row_store.init(exprs, batch_size, mem_attr, mem_limit, enable_dump,
extra_size /* row_extra_size */, reorder_fixed_expr, enable_trunc))) {
extra_size /* row_extra_size */, reorder_fixed_expr, enable_trunc, compress_type))) {
SQL_ENG_LOG(WARN, "init row store failed", K(ret));
} else {
row_store.set_dir_id(sql_mem_processor_.get_dir_id());
@ -190,13 +190,13 @@ int ObSortVecOpImpl<Compare, Store_Row, has_addon>::init_sort_temp_row_store(
int ret = OB_SUCCESS;
if (OB_FAIL(init_temp_row_store(*sk_exprs_, INT64_MAX, batch_size, false /*need_callback*/,
false /*enable dump*/, Store_Row::get_extra_size(true /*is_sk*/),
sk_store_))) {
compress_type_, sk_store_))) {
SQL_ENG_LOG(WARN, "failed to init temp row store", K(ret));
} else if (FALSE_IT(sk_row_meta_ = &sk_store_.get_row_meta())) {
} else if (has_addon) {
if (OB_FAIL(init_temp_row_store(*addon_exprs_, INT64_MAX, batch_size, false /*need_callback*/,
false /*enable dump*/,
Store_Row::get_extra_size(false /*is_sk*/), addon_store_))) {
Store_Row::get_extra_size(false /*is_sk*/), compress_type_, addon_store_))) {
SQL_ENG_LOG(WARN, "failed to init temp row store", K(ret));
} else {
addon_row_meta_ = &addon_store_.get_row_meta();
@ -270,6 +270,7 @@ int ObSortVecOpImpl<Compare, Store_Row, has_addon>::init(ObSortVecOpContext &ctx
topn_cnt_ = ctx.topn_cnt_;
use_heap_sort_ = is_topn_sort();
is_fetch_with_ties_ = ctx.is_fetch_with_ties_;
compress_type_ = ctx.compress_type_;
int64_t batch_size = eval_ctx_->max_batch_size_;
if (OB_FAIL(merge_sk_addon_exprs(sk_exprs_, addon_exprs_))) {
SQL_ENG_LOG(WARN, "failed to merge sort key and addon exprs", K(ret));