[CP] [CP] fix connect by slow query optimization OOM problm
This commit is contained in:
1
deps/oblib/src/lib/container/ob_2d_array.h
vendored
1
deps/oblib/src/lib/container/ob_2d_array.h
vendored
@ -59,6 +59,7 @@ public:
|
|||||||
int prepare_allocate(int64_t capacity);
|
int prepare_allocate(int64_t capacity);
|
||||||
virtual int64_t to_string(char *buffer, int64_t length) const;
|
virtual int64_t to_string(char *buffer, int64_t length) const;
|
||||||
inline int64_t get_block_size() const { return LOCAL_BLOCK_SIZE; }
|
inline int64_t get_block_size() const { return LOCAL_BLOCK_SIZE; }
|
||||||
|
inline void set_block_allocator(const BlockAllocatorT &alloc) { block_alloc_ = alloc; }
|
||||||
inline const BlockAllocatorT &get_block_allocator() const { return block_alloc_; }
|
inline const BlockAllocatorT &get_block_allocator() const { return block_alloc_; }
|
||||||
void set_tenant_id(int64_t tenant_id) { block_alloc_.set_tenant_id(tenant_id); }
|
void set_tenant_id(int64_t tenant_id) { block_alloc_.set_tenant_id(tenant_id); }
|
||||||
void set_ctx_id(int64_t ctx_id) { block_alloc_.set_ctx_id(ctx_id); }
|
void set_ctx_id(int64_t ctx_id) { block_alloc_.set_ctx_id(ctx_id); }
|
||||||
|
|||||||
@ -194,7 +194,7 @@ void ObConnectByOpPump::free_pump_node(PumpNode &pop_node)
|
|||||||
pop_node.row_fetcher_.iterator_ = NULL;
|
pop_node.row_fetcher_.iterator_ = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObConnectByOpPump::free_pump_node_stack(ObIArray<PumpNode> &stack)
|
int ObConnectByOpPump::free_pump_node_stack(ObSegmentArray<PumpNode> &stack)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
PumpNode pop_node;
|
PumpNode pop_node;
|
||||||
|
|||||||
@ -300,7 +300,7 @@ private:
|
|||||||
int check_child_cycle(PumpNode &node, PumpNode *left_node);
|
int check_child_cycle(PumpNode &node, PumpNode *left_node);
|
||||||
void free_pump_node(PumpNode &node);
|
void free_pump_node(PumpNode &node);
|
||||||
int alloc_iter(PumpNode &node);
|
int alloc_iter(PumpNode &node);
|
||||||
int free_pump_node_stack(ObIArray<PumpNode> &stack);
|
int free_pump_node_stack(ObSegmentArray<PumpNode> &stack);
|
||||||
void set_row_store_constructed() { datum_store_constructed_ = true; }
|
void set_row_store_constructed() { datum_store_constructed_ = true; }
|
||||||
bool get_row_store_constructed() { return datum_store_constructed_; }
|
bool get_row_store_constructed() { return datum_store_constructed_; }
|
||||||
void set_allocator(common::ObIAllocator &alloc) { allocator_.set_allocator(alloc); }
|
void set_allocator(common::ObIAllocator &alloc) { allocator_.set_allocator(alloc); }
|
||||||
@ -311,7 +311,7 @@ private:
|
|||||||
//用于初始化检测环的hash_set
|
//用于初始化检测环的hash_set
|
||||||
static const int64_t CONNECT_BY_TREE_HEIGHT = 16;
|
static const int64_t CONNECT_BY_TREE_HEIGHT = 16;
|
||||||
// ObNewRow prior_exprs_result_row_;//用来存放计算connect by prior表达式的结果
|
// ObNewRow prior_exprs_result_row_;//用来存放计算connect by prior表达式的结果
|
||||||
ObArray<PumpNode> pump_stack_;
|
ObSegmentArray<PumpNode> pump_stack_;
|
||||||
ObChunkDatumStore datum_store_;
|
ObChunkDatumStore datum_store_;
|
||||||
const ObIArray<ObExpr *> *cur_output_exprs_;
|
const ObIArray<ObExpr *> *cur_output_exprs_;
|
||||||
RowMap hash_filter_rows_;
|
RowMap hash_filter_rows_;
|
||||||
|
|||||||
@ -81,12 +81,19 @@ int ObConnectByOpBFSPump::sort_sibling_rows()
|
|||||||
if (OB_ISNULL(sort_collations_)) {
|
if (OB_ISNULL(sort_collations_)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("unexpected sort columns", K(ret));
|
LOG_WARN("unexpected sort columns", K(ret));
|
||||||
} else if (0 != sort_collations_->count()
|
} else if (OB_UNLIKELY(0 != sort_collations_->count())) {
|
||||||
&& !sort_stack_.empty()) {
|
ret = OB_ERR_UNEXPECTED;
|
||||||
RowComparer compare(sort_collations_, sort_cmp_funs_, ret);
|
LOG_WARN("unexpected sort in connect by", K(ret));
|
||||||
PumpNode *first_node = &sort_stack_.at(0);
|
|
||||||
std::sort(first_node, first_node + sort_stack_.count(), compare);
|
|
||||||
}
|
}
|
||||||
|
// if (OB_ISNULL(sort_collations_)) {
|
||||||
|
// ret = OB_ERR_UNEXPECTED;
|
||||||
|
// LOG_WARN("unexpected sort columns", K(ret));
|
||||||
|
// } else if (0 != sort_collations_->count()
|
||||||
|
// && !sort_stack_.empty()) {
|
||||||
|
// RowComparer compare(sort_collations_, sort_cmp_funs_, ret);
|
||||||
|
// PumpNode *first_node = &sort_stack_.at(0);
|
||||||
|
// std::sort(first_node, first_node + sort_stack_.count(), compare);
|
||||||
|
// }
|
||||||
|
|
||||||
//add siblings to pump stack
|
//add siblings to pump stack
|
||||||
while(OB_SUCC(ret) && false == sort_stack_.empty()) {
|
while(OB_SUCC(ret) && false == sort_stack_.empty()) {
|
||||||
@ -166,7 +173,7 @@ int ObConnectByOpBFSPump::free_path_stack()
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObConnectByOpBFSPump::free_pump_node_stack(ObIArray<PumpNode> &stack)
|
int ObConnectByOpBFSPump::free_pump_node_stack(ObSegmentArray<PumpNode> &stack)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
PumpNode pop_node;
|
PumpNode pop_node;
|
||||||
|
|||||||
@ -128,12 +128,12 @@ private:
|
|||||||
int add_path_node(PumpNode &pump_node);
|
int add_path_node(PumpNode &pump_node);
|
||||||
int check_cycle_path();
|
int check_cycle_path();
|
||||||
int free_path_stack();
|
int free_path_stack();
|
||||||
int free_pump_node_stack(ObIArray<PumpNode> &stack);
|
int free_pump_node_stack(ObSegmentArray<PumpNode> &stack);
|
||||||
void set_allocator(common::ObIAllocator &alloc) { allocator_.set_allocator(alloc); }
|
void set_allocator(common::ObIAllocator &alloc) { allocator_.set_allocator(alloc); }
|
||||||
private:
|
private:
|
||||||
common::ObArray<PumpNode> pump_stack_;
|
common::ObSegmentArray<PumpNode> pump_stack_;
|
||||||
common::ObArray<PathNode> path_stack_;
|
common::ObSegmentArray<PathNode> path_stack_;
|
||||||
common::ObArray<PumpNode> sort_stack_;
|
common::ObSegmentArray<PumpNode> sort_stack_;
|
||||||
common::ObSEArray<const ObChunkDatumStore::StoredRow *, 2> free_record_;
|
common::ObSEArray<const ObChunkDatumStore::StoredRow *, 2> free_record_;
|
||||||
//记录connect by后除去prior 常量表达式(如prior 0)的所有表达式
|
//记录connect by后除去prior 常量表达式(如prior 0)的所有表达式
|
||||||
ObNLConnectByWithIndexOp *connect_by_;
|
ObNLConnectByWithIndexOp *connect_by_;
|
||||||
|
|||||||
Reference in New Issue
Block a user