[Chore](case) pick FragmentMgr::send_filter_size.return_eof/RuntimeFilterProducer::send_size.rpc_fail to 2.1 (#48817)
part of https://github.com/apache/doris/pull/48225
This commit is contained in:
@ -1185,6 +1185,11 @@ Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filt
|
||||
callback->cntl_->set_timeout_ms(std::min(3600, state->execution_timeout()) * 1000);
|
||||
callback->cntl_->ignore_eovercrowded();
|
||||
|
||||
if (config::enable_debug_points &&
|
||||
DebugPoints::instance()->is_enable("RuntimeFilterProducer::send_size.rpc_fail")) {
|
||||
closure->cntl_->SetFailed("inject RuntimeFilterProducer::send_size.rpc_fail");
|
||||
}
|
||||
|
||||
stub->send_filter_size(closure->cntl_.get(), closure->request_.get(), closure->response_.get(),
|
||||
closure.get());
|
||||
closure.release();
|
||||
|
||||
@ -1838,6 +1838,11 @@ Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request,
|
||||
Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest* request) {
|
||||
UniqueId queryid = request->query_id();
|
||||
|
||||
if (config::enable_debug_points &&
|
||||
DebugPoints::instance()->is_enable("FragmentMgr::send_filter_size.return_eof")) {
|
||||
return Status::EndOfFile("inject FragmentMgr::send_filter_size.return_eof");
|
||||
}
|
||||
|
||||
std::shared_ptr<QueryContext> query_ctx;
|
||||
{
|
||||
TUniqueId query_id;
|
||||
|
||||
@ -0,0 +1,16 @@
|
||||
-- This file is automatically generated. You should know what you did if you want to edit this
|
||||
-- !normal --
|
||||
0 true
|
||||
|
||||
-- !rpc_failed --
|
||||
0 true
|
||||
|
||||
-- !eof --
|
||||
0 true
|
||||
|
||||
-- !rpc_failed --
|
||||
0
|
||||
|
||||
-- !eof --
|
||||
0
|
||||
|
||||
@ -0,0 +1,101 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
suite("test_inject_send_filter_size_fail") {
|
||||
sql "set parallel_pipeline_task_num=3"
|
||||
sql "set enable_runtime_filter_prune=false"
|
||||
sql "set enable_sync_runtime_filter_size=true"
|
||||
sql "set runtime_filter_type='IN_OR_BLOOM_FILTER,MIN_MAX'"
|
||||
sql "set runtime_filter_wait_infinitely=false"
|
||||
|
||||
sql """ drop table if exists t1; """
|
||||
sql """ drop table if exists t3; """
|
||||
sql """ drop table if exists t5; """
|
||||
|
||||
sql """
|
||||
create table t1 (
|
||||
k1 int null,
|
||||
k2 int null
|
||||
)
|
||||
duplicate key (k1)
|
||||
distributed BY hash(k1) buckets 16
|
||||
properties("replication_num" = "1");
|
||||
"""
|
||||
|
||||
sql """
|
||||
create table t3 (
|
||||
k1 int null,
|
||||
k2 int null
|
||||
)
|
||||
duplicate key (k1)
|
||||
distributed BY hash(k1) buckets 16
|
||||
properties("replication_num" = "1");
|
||||
|
||||
"""
|
||||
|
||||
sql """
|
||||
create table t5 (
|
||||
k1 int null,
|
||||
k2 int null
|
||||
)
|
||||
duplicate key (k1)
|
||||
distributed BY hash(k1) buckets 16
|
||||
properties("replication_num" = "1");
|
||||
"""
|
||||
|
||||
sql """
|
||||
insert into t1 select e1,e1 from (select 1 k1) as t lateral view explode_numbers(100000) tmp1 as e1;
|
||||
"""
|
||||
|
||||
sql """
|
||||
insert into t3 values(1,1),(2,2),(3,3);
|
||||
"""
|
||||
|
||||
sql """
|
||||
insert into t5 values(1,1),(2,2),(3,3),(4,4),(5,5);
|
||||
"""
|
||||
|
||||
sql "analyze table t1 with sync;"
|
||||
sql "analyze table t3 with sync;"
|
||||
sql "analyze table t5 with sync;"
|
||||
|
||||
qt_normal "select count(*),sleep(2) from (select t1.k1 from t5 join [shuffle] t1 on t1.k1=t5.k1) tmp join [shuffle] t3 join t3 t3s [shuffle] on tmp.k1=t3.k1 and t3s.k1=t3.k1 where t3.k2=5;"
|
||||
|
||||
try {
|
||||
GetDebugPoint().enableDebugPointForAllBEs("RuntimeFilterProducer::send_size.rpc_fail")
|
||||
test {
|
||||
sql """select count(*) from (select t1.k1 from t5 join [shuffle] t1 on t1.k1=t5.k1) tmp join [shuffle] t3 join t3 t3s [shuffle] on tmp.k1=t3.k1 and t3s.k1=t3.k1 where t3.k2=5;"""
|
||||
exception "inject RuntimeFilterProducer::send_size.rpc_fail"
|
||||
}
|
||||
} finally {
|
||||
GetDebugPoint().disableDebugPointForAllBEs("RuntimeFilterProducer::send_size.rpc_fail")
|
||||
}
|
||||
|
||||
try {
|
||||
GetDebugPoint().enableDebugPointForAllBEs("FragmentMgr::send_filter_size.return_eof")
|
||||
qt_eof "select count(*),sleep(2) from (select t1.k1 from t5 join [shuffle] t1 on t1.k1=t5.k1) tmp join [shuffle] t3 join t3 t3s [shuffle] on tmp.k1=t3.k1 and t3s.k1=t3.k1 where t3.k2=5;"
|
||||
} finally {
|
||||
GetDebugPoint().disableDebugPointForAllBEs("FragmentMgr::send_filter_size.return_eof")
|
||||
}
|
||||
|
||||
try {
|
||||
GetDebugPoint().enableDebugPointForAllBEs("FragmentMgr::send_filter_size.return_eof")
|
||||
qt_eof "select count(*) from (select t1.k1 from t5 join [shuffle] t1 on t1.k1=t5.k1) tmp join [shuffle] t3 join t3 t3s [shuffle] on tmp.k1=t3.k1 and t3s.k1=t3.k1 where t3.k2=5;"
|
||||
} finally {
|
||||
GetDebugPoint().disableDebugPointForAllBEs("FragmentMgr::send_filter_size.return_eof")
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user