Fix some bug for vec2.0
This commit is contained in:
@ -819,9 +819,6 @@ set_distinct_batch(const common::ObIArray<ObExpr *> &exprs,
|
||||
ObEvalCtx::BatchInfoScopeGuard batch_info_guard(*eval_ctx_);
|
||||
batch_info_guard.set_batch_idx(0);
|
||||
batch_info_guard.set_batch_size(batch_size);
|
||||
if (!is_push_down_ && OB_FAIL(prefetch<HashBucket>(hash_values_for_batch, batch_size, skip))) {
|
||||
SQL_ENG_LOG(WARN, "failed to prefetch", K(ret));
|
||||
}
|
||||
auto sf = [&] (const IVectorPtrs &vectors,
|
||||
const uint16_t selector[],
|
||||
const int64_t size,
|
||||
@ -829,7 +826,9 @@ set_distinct_batch(const common::ObIArray<ObExpr *> &exprs,
|
||||
ret = preprocess_part_.store_.add_batch(vectors, selector, size, stored_rows);
|
||||
return ret;
|
||||
};
|
||||
if (OB_FAIL(hash_table_.set_distinct_batch(preprocess_part_.store_.get_row_meta(), batch_size, skip,
|
||||
if (!is_push_down_ && OB_FAIL(prefetch<HashBucket>(hash_values_for_batch, batch_size, skip))) {
|
||||
SQL_ENG_LOG(WARN, "failed to prefetch", K(ret));
|
||||
} else if (OB_FAIL(hash_table_.set_distinct_batch(preprocess_part_.store_.get_row_meta(), batch_size, skip,
|
||||
my_skip, hash_values_for_batch, sf))) {
|
||||
LOG_WARN("failed to set batch", K(ret));
|
||||
}
|
||||
|
||||
@ -38,8 +38,8 @@ OB_SERIALIZE_MEMBER((ObSortVecSpec, ObOpSpec), topn_expr_, topk_limit_expr_, top
|
||||
|
||||
ObSortVecOp::ObSortVecOp(ObExecContext &ctx_, const ObOpSpec &spec, ObOpInput *input) :
|
||||
ObOperator(ctx_, spec, input), sort_op_provider_(op_monitor_info_), sort_row_count_(0),
|
||||
is_first_(true), ret_row_count_(0), sk_row_store_(&ctx_.get_allocator()),
|
||||
addon_row_store_(&ctx_.get_allocator()), sk_row_iter_(), addon_row_iter_()
|
||||
is_first_(true), ret_row_count_(0), sk_row_store_(), addon_row_store_(), sk_row_iter_(),
|
||||
addon_row_iter_()
|
||||
{}
|
||||
|
||||
int ObSortVecOp::inner_rescan()
|
||||
@ -50,7 +50,6 @@ int ObSortVecOp::inner_rescan()
|
||||
|
||||
void ObSortVecOp::reset()
|
||||
{
|
||||
sort_op_provider_.reset();
|
||||
sort_row_count_ = 0;
|
||||
ret_row_count_ = 0;
|
||||
is_first_ = true;
|
||||
@ -58,17 +57,16 @@ void ObSortVecOp::reset()
|
||||
addon_row_iter_.reset();
|
||||
sk_row_store_.reset();
|
||||
addon_row_store_.reset();
|
||||
sort_op_provider_.reset();
|
||||
}
|
||||
|
||||
void ObSortVecOp::destroy()
|
||||
{
|
||||
reset();
|
||||
sk_row_store_.~ObTempRowStore();
|
||||
addon_row_store_.~ObTempRowStore();
|
||||
sort_op_provider_.unregister_profile_if_necessary();
|
||||
sort_op_provider_.~ObSortVecOpProvider();
|
||||
sort_row_count_ = 0;
|
||||
is_first_ = true;
|
||||
ret_row_count_ = 0;
|
||||
sk_row_store_.reset();
|
||||
addon_row_store_.reset();
|
||||
ObOperator::destroy();
|
||||
}
|
||||
|
||||
@ -345,7 +343,11 @@ int ObSortVecOp::scan_all_then_sort_batch()
|
||||
}
|
||||
if (MY_SPEC.has_addon_) {
|
||||
blk_holder_.release();
|
||||
addon_row_iter_.reset();
|
||||
addon_row_store_.reset();
|
||||
}
|
||||
sk_row_iter_.reset();
|
||||
sk_row_store_.reset();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -394,6 +396,8 @@ int ObSortVecOp::init_sort(int64_t tenant_id, int64_t row_count, int64_t topn_cn
|
||||
if (OB_FAIL(sort_op_provider_.init(context))) {
|
||||
LOG_WARN("failed to init sort operator provider", K(ret));
|
||||
} else {
|
||||
sk_row_store_.set_allocator(*sort_op_provider_.get_malloc_allocator());
|
||||
addon_row_store_.set_allocator(*sort_op_provider_.get_malloc_allocator());
|
||||
sort_op_provider_.set_input_rows(row_count);
|
||||
sort_op_provider_.set_input_width(MY_SPEC.width_ + aqs_head);
|
||||
sort_op_provider_.set_operator_type(MY_SPEC.type_);
|
||||
|
||||
@ -69,6 +69,7 @@ public:
|
||||
void set_operator_type(ObPhyOperatorType op_type);
|
||||
void set_operator_id(uint64_t op_id);
|
||||
void set_io_event_observer(ObIOEventObserver *observer);
|
||||
common::ObIAllocator *get_malloc_allocator() { return alloc_; }
|
||||
|
||||
private:
|
||||
int init_mem_context(uint64_t tenant_id);
|
||||
|
||||
Reference in New Issue
Block a user