Fixing bug: PX hangs due to calling get_next_batch after coordinater being drained.
This commit is contained in:
@ -886,6 +886,9 @@ int ObOperator::inner_rescan()
|
||||
if (br_it_) {
|
||||
br_it_->rescan();
|
||||
}
|
||||
// If an operator rescan after drained, the exch_drained_ must be reset
|
||||
// so it can be drained again.
|
||||
exch_drained_ = false;
|
||||
batch_reach_end_ = false;
|
||||
row_reach_end_ = false;
|
||||
clear_batch_end_flag();
|
||||
@ -1445,6 +1448,14 @@ int ObOperator::do_drain_exch()
|
||||
} else if (!exch_drained_) {
|
||||
int tmp_ret = inner_drain_exch();
|
||||
exch_drained_ = true;
|
||||
// If an operator is drained, it means that the parent operator will never call its
|
||||
// get_next_batch function again theoretically. However, we cannot guarantee that there won't be
|
||||
// any bugs that call get_next_batch again after drain. To prevent this situation, we set the
|
||||
// all iter end flags here.
|
||||
// For specific case, refer to issue:
|
||||
brs_.end_ = true;
|
||||
batch_reach_end_ = true;
|
||||
row_reach_end_ = true;
|
||||
if (!spec_.is_receive()) {
|
||||
for (int64_t i = 0; i < child_cnt_ && OB_SUCC(ret); i++) {
|
||||
if (OB_ISNULL(children_[i])) {
|
||||
|
@ -1169,7 +1169,10 @@ int ObSubPlanFilterOp::inner_get_next_batch(const int64_t max_row_cnt)
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t op_max_batch_size = min(max_row_cnt, MY_SPEC.max_batch_size_);
|
||||
int64_t params_size = 0;
|
||||
if (need_init_before_get_row_) {
|
||||
if (iter_end_) {
|
||||
brs_.size_ = 0;
|
||||
brs_.end_ = true;
|
||||
} else if (need_init_before_get_row_) {
|
||||
OZ(prepare_onetime_exprs());
|
||||
}
|
||||
//从主表中获取一行数据
|
||||
|
@ -3384,3 +3384,33 @@ select /*+ no_rewrite*/ * from t1 where exists (select /*+no_unnest */ 1 from t3
|
||||
| 26 | 9 | 17 | 17 |
|
||||
| 28 | 3 | 18 | 18 |
|
||||
+----+------+------+------+
|
||||
|
||||
drop table if exists t;
|
||||
drop table if exists t2;
|
||||
drop table if exists t3;
|
||||
create table t(c1 int);
|
||||
create table t2(c1 int);
|
||||
create table t3(c1 int, c2 varchar(1500)) partition by hash(c1) partitions 4;
|
||||
insert into t2 values(1000);
|
||||
insert into t3 values(0, repeat('0123456789', 100));
|
||||
insert into t3 values(0, repeat('0123456789', 101));
|
||||
insert into t3 values(0, repeat('0123456789', 101));
|
||||
insert into t3 values(0, repeat('0123456789', 101));
|
||||
insert into t3 values(0, repeat('0123456789', 101));
|
||||
insert into t3 values(0, repeat('0123456789', 101));
|
||||
insert into t3 select * from t3 where length(c2) > 1000;
|
||||
insert into t3 select * from t3 where length(c2) > 1000;
|
||||
insert into t3 select * from t3 where length(c2) > 1000;
|
||||
insert into t3 select * from t3 where length(c2) > 1000;
|
||||
insert into t3 select * from t3 where length(c2) > 1000;
|
||||
insert into t3 select * from t3 where length(c2) > 1000;
|
||||
insert into t3 select * from t3 where length(c2) > 1000;
|
||||
select /*+ no_rewrite */ * from t where exists (select 1 from t2, t3 where t2.c1 = length(t3.c2));
|
||||
+------+
|
||||
| c1 |
|
||||
+------+
|
||||
+------+
|
||||
|
||||
drop table if exists t;
|
||||
drop table if exists t2;
|
||||
drop table if exists t3;
|
||||
|
@ -343,3 +343,33 @@ insert into t3 (c1, c2, c3, c4) values (26, 9,17,17);
|
||||
insert into t3 (c1, c2, c3, c4) values (28, 3,18,18);
|
||||
|
||||
select /*+ no_rewrite*/ * from t1 where exists (select /*+no_unnest */ 1 from t3 where t1.c1 > t3.c3 and t3.c4 < 100 limit 1) ;
|
||||
|
||||
--disable_warnings
|
||||
drop table if exists t;
|
||||
drop table if exists t2;
|
||||
drop table if exists t3;
|
||||
--enable_warnings
|
||||
create table t(c1 int);
|
||||
create table t2(c1 int);
|
||||
create table t3(c1 int, c2 varchar(1500)) partition by hash(c1) partitions 4;
|
||||
insert into t2 values(1000);
|
||||
insert into t3 values(0, repeat('0123456789', 100));
|
||||
insert into t3 values(0, repeat('0123456789', 101));
|
||||
insert into t3 values(0, repeat('0123456789', 101));
|
||||
insert into t3 values(0, repeat('0123456789', 101));
|
||||
insert into t3 values(0, repeat('0123456789', 101));
|
||||
insert into t3 values(0, repeat('0123456789', 101));
|
||||
insert into t3 select * from t3 where length(c2) > 1000;
|
||||
insert into t3 select * from t3 where length(c2) > 1000;
|
||||
insert into t3 select * from t3 where length(c2) > 1000;
|
||||
insert into t3 select * from t3 where length(c2) > 1000;
|
||||
insert into t3 select * from t3 where length(c2) > 1000;
|
||||
insert into t3 select * from t3 where length(c2) > 1000;
|
||||
insert into t3 select * from t3 where length(c2) > 1000;
|
||||
select /*+ no_rewrite */ * from t where exists (select 1 from t2, t3 where t2.c1 = length(t3.c2));
|
||||
|
||||
--disable_warnings
|
||||
drop table if exists t;
|
||||
drop table if exists t2;
|
||||
drop table if exists t3;
|
||||
--enable_warnings
|
||||
|
Reference in New Issue
Block a user