diff --git a/src/sql/code_generator/ob_static_engine_cg.cpp b/src/sql/code_generator/ob_static_engine_cg.cpp index 91aa0e2600..b110c04984 100644 --- a/src/sql/code_generator/ob_static_engine_cg.cpp +++ b/src/sql/code_generator/ob_static_engine_cg.cpp @@ -2058,6 +2058,8 @@ int ObStaticEngineCG::generate_spec(ObLogSort &op, ObSortSpec &spec, const bool LOG_WARN("failed to sort funcs", K(ret)); } else if (OB_FAIL(append_array_no_dup(spec.all_exprs_, spec.get_child()->output_))) { LOG_WARN("failed to append array no dup", K(ret)); + } else if (opt_ctx_->is_online_ddl() && OB_FAIL(fill_compress_type(op, spec.compress_type_))) { + LOG_WARN("fail to gt compress_type", K(ret)); } else { spec.prefix_pos_ = op.get_prefix_pos(); spec.is_local_merge_sort_ = op.is_local_merge_sort(); @@ -2067,40 +2069,7 @@ int ObStaticEngineCG::generate_spec(ObLogSort &op, ObSortSpec &spec, const bool spec.enable_encode_sortkey_opt_ = op.enable_encode_sortkey_opt(); spec.part_cnt_ = op.get_part_cnt(); LOG_TRACE("trace order by", K(spec.all_exprs_.count()), K(spec.all_exprs_)); - if (OB_SUCC(ret)) { - int64_t tenant_id = op.get_plan()->get_optimizer_context().get_session_info()->get_effective_tenant_id(); - if (opt_ctx_->is_online_ddl()) { - // for normal sort we use default compress type. for online ddl, we use the compress type in source table - ObLogicalOperator *child_op = op.get_child(0); - ObCompressorType tmp_compr_type = NONE_COMPRESSOR; - while(OB_SUCC(ret) && OB_NOT_NULL(child_op) && child_op->get_type() != log_op_def::LOG_TABLE_SCAN ) { - child_op = child_op->get_child(0); - if (OB_NOT_NULL(child_op) && child_op->get_type() == log_op_def::LOG_TABLE_SCAN ) { - share::schema::ObSchemaGetterGuard *schema_guard = nullptr; - const share::schema::ObTableSchema *table_schema = nullptr; - uint64_t table_id = static_cast(child_op)->get_ref_table_id(); - if (OB_ISNULL(schema_guard = opt_ctx_->get_schema_guard())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("fail to get schema guard", K(ret)); - } else if (OB_FAIL(schema_guard->get_table_schema(tenant_id, table_id, table_schema))) { - LOG_WARN("fail to get table schema", K(ret)); - } else if (OB_ISNULL(table_schema)) { - ret = OB_TABLE_NOT_EXIST; - LOG_WARN("can't find table schema", K(ret), K(table_id)); - } else { - tmp_compr_type = table_schema->get_compressor_type(); - } - } - } - if (OB_SUCC(ret)) { - if (OB_FAIL(ObDDLUtil::get_temp_store_compress_type(tmp_compr_type, - op.get_parallel(), - spec.compress_type_))) { - LOG_WARN("fail to get compress type", K(ret)); - } - } - } - } + } if (OB_SUCC(ret)) { if (spec.sort_collations_.count() != spec.sort_cmp_funs_.count() @@ -2117,6 +2086,42 @@ int ObStaticEngineCG::generate_spec(ObLogSort &op, ObSortSpec &spec, const bool return ret; } +int ObStaticEngineCG::fill_compress_type(ObLogSort &op, ObCompressorType &compr_type) +{ + int ret = OB_SUCCESS; + int64_t tenant_id = op.get_plan()->get_optimizer_context().get_session_info()->get_effective_tenant_id(); + // for normal sort we use default compress type. for online ddl, we use the compress type in source table + ObLogicalOperator *child_op = op.get_child(0); + ObCompressorType tmp_compr_type = NONE_COMPRESSOR; + while(OB_SUCC(ret) && OB_NOT_NULL(child_op) && child_op->get_type() != log_op_def::LOG_TABLE_SCAN ) { + child_op = child_op->get_child(0); + if (OB_NOT_NULL(child_op) && child_op->get_type() == log_op_def::LOG_TABLE_SCAN ) { + share::schema::ObSchemaGetterGuard *schema_guard = nullptr; + const share::schema::ObTableSchema *table_schema = nullptr; + uint64_t table_id = static_cast(child_op)->get_ref_table_id(); + if (OB_ISNULL(schema_guard = opt_ctx_->get_schema_guard())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("fail to get schema guard", K(ret)); + } else if (OB_FAIL(schema_guard->get_table_schema(tenant_id, table_id, table_schema))) { + LOG_WARN("fail to get table schema", K(ret)); + } else if (OB_ISNULL(table_schema)) { + ret = OB_TABLE_NOT_EXIST; + LOG_WARN("can't find table schema", K(ret), K(table_id)); + } else { + tmp_compr_type = table_schema->get_compressor_type(); + } + } + } + if (OB_SUCC(ret)) { + if (OB_FAIL(ObDDLUtil::get_temp_store_compress_type(tmp_compr_type, + op.get_parallel(), + compr_type))) { + LOG_WARN("fail to get compress type", K(ret)); + } + } + return ret; +} + int ObStaticEngineCG::append_child_output_no_dup(const bool is_store_sortkey_separately, const ObIArray &child_output_exprs, ObIArray &sk_exprs, @@ -2280,6 +2285,8 @@ int ObStaticEngineCG::generate_spec(ObLogSort &op, ObSortVecSpec &spec, const bo spec.get_child()->output_, spec.sk_exprs_, spec.addon_exprs_))) { LOG_WARN("failed to append array no dup", K(ret)); + } else if (opt_ctx_->is_online_ddl() && OB_FAIL(fill_compress_type(op, spec.compress_type_))) { + LOG_WARN("fail to gt compress_type", K(ret)); } else { spec.prefix_pos_ = op.get_prefix_pos(); spec.is_local_merge_sort_ = op.is_local_merge_sort(); diff --git a/src/sql/code_generator/ob_static_engine_cg.h b/src/sql/code_generator/ob_static_engine_cg.h index 0cbee632be..17e9fbc845 100644 --- a/src/sql/code_generator/ob_static_engine_cg.h +++ b/src/sql/code_generator/ob_static_engine_cg.h @@ -448,6 +448,8 @@ private: const ObSortCollations &collations, ObSortFuncs &sort_funcs, const ObIArray &sort_exprs); + + int fill_compress_type(ObLogSort &op, ObCompressorType &compr_type); int check_not_support_cmp_type( const ObSortCollations &collations, const ObIArray &sort_exprs); diff --git a/src/sql/engine/sort/ob_prefix_sort_vec_op_impl.ipp b/src/sql/engine/sort/ob_prefix_sort_vec_op_impl.ipp index 4c898c3192..6398b944e1 100644 --- a/src/sql/engine/sort/ob_prefix_sort_vec_op_impl.ipp +++ b/src/sql/engine/sort/ob_prefix_sort_vec_op_impl.ipp @@ -82,11 +82,11 @@ int ObPrefixSortVecImpl::init(ObSortVecOpContext SQL_ENG_LOG(WARN, "sort impl init failed", K(ret)); } else if (OB_FAIL(init_temp_row_store( *sk_exprs_, INT64_MAX, batch_size, false, false /*enable dump*/, - Store_Row::get_extra_size(true /*is_sort_key*/), im_sk_store_))) { + Store_Row::get_extra_size(true /*is_sort_key*/), NONE_COMPRESSOR, im_sk_store_))) { SQL_ENG_LOG(WARN, "failed to init temp row store", K(ret)); } else if (OB_FAIL(init_temp_row_store( *addon_exprs_, INT64_MAX, batch_size, false, false /*enable dump*/, - Store_Row::get_extra_size(false /*is_sort_key*/), im_addon_store_))) { + Store_Row::get_extra_size(false /*is_sort_key*/), NONE_COMPRESSOR, im_addon_store_))) { SQL_ENG_LOG(WARN, "failed to init temp row store", K(ret)); } else { selector_ = diff --git a/src/sql/engine/sort/ob_sort_vec_op.cpp b/src/sql/engine/sort/ob_sort_vec_op.cpp index 408d1bd8f4..72aaac3f42 100644 --- a/src/sql/engine/sort/ob_sort_vec_op.cpp +++ b/src/sql/engine/sort/ob_sort_vec_op.cpp @@ -28,13 +28,13 @@ ObSortVecSpec::ObSortVecSpec(common::ObIAllocator &alloc, const ObPhyOperatorTyp sk_exprs_(alloc), addon_exprs_(alloc), sk_collations_(alloc), addon_collations_(alloc), minimum_row_count_(0), topk_precision_(0), prefix_pos_(0), is_local_merge_sort_(false), is_fetch_with_ties_(false), prescan_enabled_(false), enable_encode_sortkey_opt_(false), - has_addon_(false), part_cnt_(0) + has_addon_(false), part_cnt_(0), compress_type_(NONE_COMPRESSOR) {} OB_SERIALIZE_MEMBER((ObSortVecSpec, ObOpSpec), topn_expr_, topk_limit_expr_, topk_offset_expr_, sk_exprs_, addon_exprs_, sk_collations_, addon_collations_, minimum_row_count_, topk_precision_, prefix_pos_, is_local_merge_sort_, is_fetch_with_ties_, - prescan_enabled_, enable_encode_sortkey_opt_, has_addon_, part_cnt_); + prescan_enabled_, enable_encode_sortkey_opt_, has_addon_, part_cnt_, compress_type_); ObSortVecOp::ObSortVecOp(ObExecContext &ctx_, const ObOpSpec &spec, ObOpInput *input) : ObOperator(ctx_, spec, input), sort_op_provider_(op_monitor_info_), sort_row_count_(0), @@ -190,7 +190,8 @@ int ObSortVecOp::process_sort_batch() int ObSortVecOp::init_temp_row_store(const common::ObIArray &exprs, const int64_t batch_size, const ObMemAttr &mem_attr, - const bool is_sort_key, ObTempRowStore &row_store) + const bool is_sort_key, ObCompressorType compress_type, + ObTempRowStore &row_store) { int ret = OB_SUCCESS; const bool enable_trunc = true; @@ -199,7 +200,7 @@ int ObSortVecOp::init_temp_row_store(const common::ObIArray &exprs, // do nothing } else if (OB_FAIL(row_store.init(exprs, batch_size, mem_attr, 2 * 1024 * 1024, true, sort_op_provider_.get_extra_size(is_sort_key) /* row_extra_size */, - reorder_fixed_expr, enable_trunc))) { + reorder_fixed_expr, enable_trunc, compress_type))) { LOG_WARN("init row store failed", K(ret)); } else if (OB_FAIL(row_store.alloc_dir_id())) { LOG_WARN("failed to alloc dir id", K(ret)); @@ -215,11 +216,11 @@ int ObSortVecOp::init_prescan_row_store() ObMemAttr mem_attr(ctx_.get_my_session()->get_effective_tenant_id(), "SORT_VEC_CTX", ObCtxIds::WORK_AREA); if (OB_FAIL(init_temp_row_store(MY_SPEC.sk_exprs_, MY_SPEC.max_batch_size_, mem_attr, true, - sk_row_store_))) { + MY_SPEC.compress_type_, sk_row_store_))) { LOG_WARN("failed to init temp row store", K(ret)); } else if (MY_SPEC.has_addon_ && OB_FAIL(init_temp_row_store(MY_SPEC.addon_exprs_, MY_SPEC.max_batch_size_, mem_attr, - false, addon_row_store_))) { + false, MY_SPEC.compress_type_, addon_row_store_))) { LOG_WARN("failed to init temp row store", K(ret)); } return ret; @@ -386,6 +387,7 @@ int ObSortVecOp::init_sort(int64_t tenant_id, int64_t row_count, int64_t topn_cn context.topn_cnt_ = topn_cnt; context.is_fetch_with_ties_ = MY_SPEC.is_fetch_with_ties_; context.has_addon_ = MY_SPEC.has_addon_; + context.compress_type_ = MY_SPEC.compress_type_; if (MY_SPEC.prefix_pos_ > 0) { context.prefix_pos_ = MY_SPEC.prefix_pos_; context.op_ = this; diff --git a/src/sql/engine/sort/ob_sort_vec_op.h b/src/sql/engine/sort/ob_sort_vec_op.h index 1d9fd510a2..44ddf0d327 100644 --- a/src/sql/engine/sort/ob_sort_vec_op.h +++ b/src/sql/engine/sort/ob_sort_vec_op.h @@ -35,7 +35,7 @@ public: K_(topk_offset_expr), K_(prefix_pos), K_(minimum_row_count), K_(topk_precision), K_(prefix_pos), K_(is_local_merge_sort), K_(prescan_enabled), K_(enable_encode_sortkey_opt), K_(has_addon), - K_(part_cnt)); + K_(part_cnt), K_(compress_type)); public: ObExpr *topn_expr_; @@ -57,6 +57,7 @@ public: bool has_addon_; // if use, all_exprs_ is : hash(part_by) + part_by + order_by. int64_t part_cnt_; + ObCompressorType compress_type_; }; class ObSortVecOp : public ObOperator @@ -93,7 +94,7 @@ private: const ObCompactRow **addon_stored_rows = nullptr); int init_sort(int64_t tenant_id, int64_t row_count, int64_t topn_cnt = INT64_MAX); int init_temp_row_store(const common::ObIArray &exprs, const int64_t batch_size, - const ObMemAttr &mem_attr, const bool is_sort_key, + const ObMemAttr &mem_attr, const bool is_sort_key, ObCompressorType compress_type, ObTempRowStore &row_store); private: diff --git a/src/sql/engine/sort/ob_sort_vec_op_context.h b/src/sql/engine/sort/ob_sort_vec_op_context.h index 16149fd7d0..7a3267c6e8 100644 --- a/src/sql/engine/sort/ob_sort_vec_op_context.h +++ b/src/sql/engine/sort/ob_sort_vec_op_context.h @@ -24,11 +24,11 @@ struct ObSortVecOpContext tenant_id_(UINT64_MAX), sk_exprs_(nullptr), addon_exprs_(nullptr), sk_collations_(nullptr), base_sk_collations_(nullptr), addon_collations_(nullptr), eval_ctx_(nullptr), exec_ctx_(nullptr), op_(nullptr), prefix_pos_(0), part_cnt_(0), topn_cnt_(INT64_MAX), - sort_row_cnt_(nullptr), flag_(0) + sort_row_cnt_(nullptr), flag_(0), compress_type_(NONE_COMPRESSOR) {} TO_STRING_KV(K_(tenant_id), KP_(sk_exprs), KP_(addon_exprs), KP_(sk_collations), KP_(base_sk_collations), KP_(addon_collations), K_(prefix_pos), K_(part_cnt), - K_(topn_cnt), KP_(sort_row_cnt), K_(flag)); + K_(topn_cnt), KP_(sort_row_cnt), K_(flag), K_(compress_type)); uint64_t tenant_id_; const ObIArray *sk_exprs_; @@ -56,6 +56,7 @@ struct ObSortVecOpContext }; uint32_t flag_; }; + ObCompressorType compress_type_; }; } // end namespace sql diff --git a/src/sql/engine/sort/ob_sort_vec_op_impl.h b/src/sql/engine/sort/ob_sort_vec_op_impl.h index b1c2fc98b7..4610801e53 100644 --- a/src/sql/engine/sort/ob_sort_vec_op_impl.h +++ b/src/sql/engine/sort/ob_sort_vec_op_impl.h @@ -60,7 +60,7 @@ public: ties_array_pos_(0), ties_array_(), sorted_dumped_rows_ptrs_(), last_ties_row_(nullptr), rows_(nullptr), sort_exprs_getter_(allocator_), store_row_factory_(allocator_, sql_mem_processor_, sk_row_meta_, addon_row_meta_, inmem_row_size_, topn_cnt_), - topn_filter_(nullptr), is_topn_filter_enabled_(false) + topn_filter_(nullptr), is_topn_filter_enabled_(false), compress_type_(NONE_COMPRESSOR) {} virtual ~ObSortVecOpImpl() { @@ -95,7 +95,7 @@ public: int init_temp_row_store(const common::ObIArray &exprs, const int64_t mem_limit, const int64_t batch_size, const bool need_callback, - const bool enable_dump, const int64_t extra_size, + const bool enable_dump, const int64_t extra_size, ObCompressorType compress_type, ObTempRowStore &row_store); int init_sort_temp_row_store(const int64_t batch_size); int init_store_row_factory(); @@ -281,13 +281,13 @@ protected: ret = OB_ALLOCATE_MEMORY_FAILED; SQL_ENG_LOG(WARN, "allocate memory failed", K(ret)); } else if (OB_FAIL(init_temp_row_store(*sk_exprs_, 1, eval_ctx_->max_batch_size_, true, - true /*enable dump*/, Store_Row::get_extra_size(true), + true /*enable dump*/, Store_Row::get_extra_size(true), compress_type_, chunk->sk_store_))) { SQL_ENG_LOG(WARN, "failed to init temp row store", K(ret)); } else if (has_addon && OB_FAIL(init_temp_row_store( *addon_exprs_, 1, eval_ctx_->max_batch_size_, true, true /*enable dump*/, - Store_Row::get_extra_size(false), chunk->addon_store_))) { + Store_Row::get_extra_size(false), compress_type_, chunk->addon_store_))) { SQL_ENG_LOG(WARN, "failed to init temp row store", K(ret)); } else { while (OB_SUCC(ret)) { @@ -443,6 +443,7 @@ protected: ObSortVecOpStoreRowFactory store_row_factory_; ObSortVecOpEagerFilter *topn_filter_; bool is_topn_filter_enabled_; + ObCompressorType compress_type_; }; } // end namespace sql diff --git a/src/sql/engine/sort/ob_sort_vec_op_impl.ipp b/src/sql/engine/sort/ob_sort_vec_op_impl.ipp index c05b659e94..43323e6903 100644 --- a/src/sql/engine/sort/ob_sort_vec_op_impl.ipp +++ b/src/sql/engine/sort/ob_sort_vec_op_impl.ipp @@ -162,7 +162,7 @@ int ObSortVecOpImpl::init_vec_ptrs( template int ObSortVecOpImpl::init_temp_row_store( const common::ObIArray &exprs, const int64_t mem_limit, const int64_t batch_size, - const bool need_callback, const bool enable_dump, const int64_t extra_size, + const bool need_callback, const bool enable_dump, const int64_t extra_size, ObCompressorType compress_type, ObTempRowStore &row_store) { int ret = OB_SUCCESS; @@ -170,7 +170,7 @@ int ObSortVecOpImpl::init_temp_row_store( const bool reorder_fixed_expr = true; ObMemAttr mem_attr(tenant_id_, ObModIds::OB_SQL_SORT_ROW, ObCtxIds::WORK_AREA); if (OB_FAIL(row_store.init(exprs, batch_size, mem_attr, mem_limit, enable_dump, - extra_size /* row_extra_size */, reorder_fixed_expr, enable_trunc))) { + extra_size /* row_extra_size */, reorder_fixed_expr, enable_trunc, compress_type))) { SQL_ENG_LOG(WARN, "init row store failed", K(ret)); } else { row_store.set_dir_id(sql_mem_processor_.get_dir_id()); @@ -190,13 +190,13 @@ int ObSortVecOpImpl::init_sort_temp_row_store( int ret = OB_SUCCESS; if (OB_FAIL(init_temp_row_store(*sk_exprs_, INT64_MAX, batch_size, false /*need_callback*/, false /*enable dump*/, Store_Row::get_extra_size(true /*is_sk*/), - sk_store_))) { + compress_type_, sk_store_))) { SQL_ENG_LOG(WARN, "failed to init temp row store", K(ret)); } else if (FALSE_IT(sk_row_meta_ = &sk_store_.get_row_meta())) { } else if (has_addon) { if (OB_FAIL(init_temp_row_store(*addon_exprs_, INT64_MAX, batch_size, false /*need_callback*/, false /*enable dump*/, - Store_Row::get_extra_size(false /*is_sk*/), addon_store_))) { + Store_Row::get_extra_size(false /*is_sk*/), compress_type_, addon_store_))) { SQL_ENG_LOG(WARN, "failed to init temp row store", K(ret)); } else { addon_row_meta_ = &addon_store_.get_row_meta(); @@ -270,6 +270,7 @@ int ObSortVecOpImpl::init(ObSortVecOpContext &ctx topn_cnt_ = ctx.topn_cnt_; use_heap_sort_ = is_topn_sort(); is_fetch_with_ties_ = ctx.is_fetch_with_ties_; + compress_type_ = ctx.compress_type_; int64_t batch_size = eval_ctx_->max_batch_size_; if (OB_FAIL(merge_sk_addon_exprs(sk_exprs_, addon_exprs_))) { SQL_ENG_LOG(WARN, "failed to merge sort key and addon exprs", K(ret));