From 98c782bedb2ad2f91dc2b2fd73c638bf48aff081 Mon Sep 17 00:00:00 2001 From: Pxl Date: Sat, 8 Mar 2025 16:21:22 +0800 Subject: [PATCH] [Chore](case) pick FragmentMgr::send_filter_size.return_eof/RuntimeFilterProducer::send_size.rpc_fail to 2.1 (#48817) part of https://github.com/apache/doris/pull/48225 --- be/src/exprs/runtime_filter.cpp | 5 + be/src/runtime/fragment_mgr.cpp | 5 + .../test_inject_send_filter_size_fail.out | 16 +++ .../test_inject_send_filter_size_fail.groovy | 101 ++++++++++++++++++ 4 files changed, 127 insertions(+) create mode 100644 regression-test/data/query_p0/join/test_inject_send_filter_size_fail/test_inject_send_filter_size_fail.out create mode 100644 regression-test/suites/query_p0/join/test_inject_send_filter_size_fail/test_inject_send_filter_size_fail.groovy diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index f4a4cf469e..b4d911ec0c 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -1185,6 +1185,11 @@ Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filt callback->cntl_->set_timeout_ms(std::min(3600, state->execution_timeout()) * 1000); callback->cntl_->ignore_eovercrowded(); + if (config::enable_debug_points && + DebugPoints::instance()->is_enable("RuntimeFilterProducer::send_size.rpc_fail")) { + closure->cntl_->SetFailed("inject RuntimeFilterProducer::send_size.rpc_fail"); + } + stub->send_filter_size(closure->cntl_.get(), closure->request_.get(), closure->response_.get(), closure.get()); closure.release(); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index e9d156e2f2..1a277c9488 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -1838,6 +1838,11 @@ Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request, Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest* request) { UniqueId queryid = request->query_id(); + if (config::enable_debug_points && + DebugPoints::instance()->is_enable("FragmentMgr::send_filter_size.return_eof")) { + return Status::EndOfFile("inject FragmentMgr::send_filter_size.return_eof"); + } + std::shared_ptr query_ctx; { TUniqueId query_id; diff --git a/regression-test/data/query_p0/join/test_inject_send_filter_size_fail/test_inject_send_filter_size_fail.out b/regression-test/data/query_p0/join/test_inject_send_filter_size_fail/test_inject_send_filter_size_fail.out new file mode 100644 index 0000000000..42b0346861 --- /dev/null +++ b/regression-test/data/query_p0/join/test_inject_send_filter_size_fail/test_inject_send_filter_size_fail.out @@ -0,0 +1,16 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !normal -- +0 true + +-- !rpc_failed -- +0 true + +-- !eof -- +0 true + +-- !rpc_failed -- +0 + +-- !eof -- +0 + diff --git a/regression-test/suites/query_p0/join/test_inject_send_filter_size_fail/test_inject_send_filter_size_fail.groovy b/regression-test/suites/query_p0/join/test_inject_send_filter_size_fail/test_inject_send_filter_size_fail.groovy new file mode 100644 index 0000000000..9e0dbeeaad --- /dev/null +++ b/regression-test/suites/query_p0/join/test_inject_send_filter_size_fail/test_inject_send_filter_size_fail.groovy @@ -0,0 +1,101 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_inject_send_filter_size_fail") { + sql "set parallel_pipeline_task_num=3" + sql "set enable_runtime_filter_prune=false" + sql "set enable_sync_runtime_filter_size=true" + sql "set runtime_filter_type='IN_OR_BLOOM_FILTER,MIN_MAX'" + sql "set runtime_filter_wait_infinitely=false" + + sql """ drop table if exists t1; """ + sql """ drop table if exists t3; """ + sql """ drop table if exists t5; """ + + sql """ + create table t1 ( + k1 int null, + k2 int null + ) + duplicate key (k1) + distributed BY hash(k1) buckets 16 + properties("replication_num" = "1"); + """ + + sql """ + create table t3 ( + k1 int null, + k2 int null + ) + duplicate key (k1) + distributed BY hash(k1) buckets 16 + properties("replication_num" = "1"); + + """ + + sql """ + create table t5 ( + k1 int null, + k2 int null + ) + duplicate key (k1) + distributed BY hash(k1) buckets 16 + properties("replication_num" = "1"); + """ + + sql """ + insert into t1 select e1,e1 from (select 1 k1) as t lateral view explode_numbers(100000) tmp1 as e1; + """ + + sql """ + insert into t3 values(1,1),(2,2),(3,3); + """ + + sql """ + insert into t5 values(1,1),(2,2),(3,3),(4,4),(5,5); + """ + + sql "analyze table t1 with sync;" + sql "analyze table t3 with sync;" + sql "analyze table t5 with sync;" + + qt_normal "select count(*),sleep(2) from (select t1.k1 from t5 join [shuffle] t1 on t1.k1=t5.k1) tmp join [shuffle] t3 join t3 t3s [shuffle] on tmp.k1=t3.k1 and t3s.k1=t3.k1 where t3.k2=5;" + + try { + GetDebugPoint().enableDebugPointForAllBEs("RuntimeFilterProducer::send_size.rpc_fail") + test { + sql """select count(*) from (select t1.k1 from t5 join [shuffle] t1 on t1.k1=t5.k1) tmp join [shuffle] t3 join t3 t3s [shuffle] on tmp.k1=t3.k1 and t3s.k1=t3.k1 where t3.k2=5;""" + exception "inject RuntimeFilterProducer::send_size.rpc_fail" + } + } finally { + GetDebugPoint().disableDebugPointForAllBEs("RuntimeFilterProducer::send_size.rpc_fail") + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("FragmentMgr::send_filter_size.return_eof") + qt_eof "select count(*),sleep(2) from (select t1.k1 from t5 join [shuffle] t1 on t1.k1=t5.k1) tmp join [shuffle] t3 join t3 t3s [shuffle] on tmp.k1=t3.k1 and t3s.k1=t3.k1 where t3.k2=5;" + } finally { + GetDebugPoint().disableDebugPointForAllBEs("FragmentMgr::send_filter_size.return_eof") + } + + try { + GetDebugPoint().enableDebugPointForAllBEs("FragmentMgr::send_filter_size.return_eof") + qt_eof "select count(*) from (select t1.k1 from t5 join [shuffle] t1 on t1.k1=t5.k1) tmp join [shuffle] t3 join t3 t3s [shuffle] on tmp.k1=t3.k1 and t3s.k1=t3.k1 where t3.k2=5;" + } finally { + GetDebugPoint().disableDebugPointForAllBEs("FragmentMgr::send_filter_size.return_eof") + } +}