* [enhancement](stream load pipe) using queryid or load id to identify stream load pipe instead of fragment instance id NewLoadStreamMgr already has pipe and other info. Do not need save the pipe into fragment state. and FragmentState should be more clear. But this pr will change the behaviour of BE. I will pick the pr to doris 1.2.3 and add the load id to FE support. The user could upgrade from 1.2.3 to 2.x Co-authored-by: yiguolei <yiguolei@gmail.com>
140 lines
4.8 KiB
C++
140 lines
4.8 KiB
C++
// Licensed to the Apache Software Foundation (ASF) under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing,
|
|
// software distributed under the License is distributed on an
|
|
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
// KIND, either express or implied. See the License for the
|
|
// specific language governing permissions and limitations
|
|
// under the License.
|
|
|
|
#include "runtime/fragment_mgr.h"
|
|
|
|
#include <gtest/gtest.h>
|
|
|
|
#include "common/config.h"
|
|
#include "exec/data_sink.h"
|
|
#include "runtime/plan_fragment_executor.h"
|
|
|
|
namespace doris {
|
|
|
|
static Status s_prepare_status;
|
|
static Status s_open_status;
|
|
// Mock used for this unittest
|
|
PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env,
|
|
const report_status_callback& report_status_cb)
|
|
: _exec_env(exec_env), _report_status_cb(report_status_cb) {}
|
|
|
|
PlanFragmentExecutor::~PlanFragmentExecutor() {}
|
|
|
|
Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
|
|
QueryFragmentsCtx* batch_ctx) {
|
|
return s_prepare_status;
|
|
}
|
|
|
|
Status PlanFragmentExecutor::open() {
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
|
return s_open_status;
|
|
}
|
|
|
|
void PlanFragmentExecutor::cancel(const PPlanFragmentCancelReason& reason, const std::string& msg) {
|
|
}
|
|
|
|
void PlanFragmentExecutor::close() {}
|
|
|
|
class FragmentMgrTest : public testing::Test {
|
|
public:
|
|
FragmentMgrTest() {}
|
|
|
|
protected:
|
|
virtual void SetUp() {
|
|
s_prepare_status = Status::OK();
|
|
s_open_status = Status::OK();
|
|
|
|
config::fragment_pool_thread_num_min = 32;
|
|
config::fragment_pool_thread_num_max = 32;
|
|
config::fragment_pool_queue_size = 1024;
|
|
}
|
|
virtual void TearDown() {}
|
|
};
|
|
|
|
TEST_F(FragmentMgrTest, Normal) {
|
|
FragmentMgr mgr(nullptr);
|
|
TExecPlanFragmentParams params;
|
|
params.params.fragment_instance_id = TUniqueId();
|
|
params.params.fragment_instance_id.__set_hi(100);
|
|
params.params.fragment_instance_id.__set_lo(200);
|
|
EXPECT_TRUE(mgr.exec_plan_fragment(params).ok());
|
|
// Duplicated
|
|
EXPECT_TRUE(mgr.exec_plan_fragment(params).ok());
|
|
}
|
|
|
|
TEST_F(FragmentMgrTest, AddNormal) {
|
|
FragmentMgr mgr(nullptr);
|
|
for (int i = 0; i < 8; ++i) {
|
|
TExecPlanFragmentParams params;
|
|
params.params.fragment_instance_id = TUniqueId();
|
|
params.params.fragment_instance_id.__set_hi(100 + i);
|
|
params.params.fragment_instance_id.__set_lo(200);
|
|
EXPECT_TRUE(mgr.exec_plan_fragment(params).ok());
|
|
}
|
|
}
|
|
|
|
TEST_F(FragmentMgrTest, CancelNormal) {
|
|
FragmentMgr mgr(nullptr);
|
|
TExecPlanFragmentParams params;
|
|
params.params.fragment_instance_id = TUniqueId();
|
|
params.params.fragment_instance_id.__set_hi(100);
|
|
params.params.fragment_instance_id.__set_lo(200);
|
|
EXPECT_TRUE(mgr.exec_plan_fragment(params).ok());
|
|
}
|
|
|
|
TEST_F(FragmentMgrTest, CancelWithoutAdd) {
|
|
FragmentMgr mgr(nullptr);
|
|
TExecPlanFragmentParams params;
|
|
params.params.fragment_instance_id = TUniqueId();
|
|
params.params.fragment_instance_id.__set_hi(100);
|
|
params.params.fragment_instance_id.__set_lo(200);
|
|
}
|
|
|
|
TEST_F(FragmentMgrTest, PrepareFailed) {
|
|
s_prepare_status = Status::InternalError("Prepare failed.");
|
|
FragmentMgr mgr(nullptr);
|
|
TExecPlanFragmentParams params;
|
|
params.params.fragment_instance_id = TUniqueId();
|
|
params.params.fragment_instance_id.__set_hi(100);
|
|
params.params.fragment_instance_id.__set_lo(200);
|
|
EXPECT_FALSE(mgr.exec_plan_fragment(params).ok());
|
|
}
|
|
|
|
TEST_F(FragmentMgrTest, OfferPoolFailed) {
|
|
config::fragment_pool_thread_num_min = 1;
|
|
config::fragment_pool_thread_num_max = 1;
|
|
config::fragment_pool_queue_size = 0;
|
|
FragmentMgr mgr(doris::ExecEnv::GetInstance());
|
|
|
|
TExecPlanFragmentParams params;
|
|
params.params.fragment_instance_id = TUniqueId();
|
|
params.params.fragment_instance_id.__set_hi(100);
|
|
params.params.fragment_instance_id.__set_lo(200);
|
|
EXPECT_TRUE(mgr.exec_plan_fragment(params).ok());
|
|
|
|
// the first plan open will cost 50ms, so the next 3 plans will be aborted.
|
|
for (int i = 1; i < 4; ++i) {
|
|
TExecPlanFragmentParams params;
|
|
params.params.fragment_instance_id = TUniqueId();
|
|
params.params.fragment_instance_id.__set_hi(100 + i);
|
|
params.params.fragment_instance_id.__set_lo(200);
|
|
EXPECT_FALSE(mgr.exec_plan_fragment(params).ok());
|
|
}
|
|
}
|
|
|
|
} // namespace doris
|