[bugfix](performance) fix performance problem (#31093)

This commit is contained in:
yiguolei
2024-02-19 17:47:35 +08:00
committed by yiguolei
parent 180fc13f6b
commit 7607bfc78d
3 changed files with 20 additions and 1 deletions

View File

@ -112,6 +112,11 @@ public:
void close_a_pipeline();
void set_merge_controller_handler(
std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {
_merge_controller_handler = handler;
}
virtual void add_merge_controller_handler(
std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {}
@ -188,6 +193,10 @@ protected:
std::shared_ptr<QueryContext> _query_ctx;
// This shared ptr is never used. It is just a reference to hold the object.
// There is a weak ptr in runtime filter manager to reference this object.
std::shared_ptr<RuntimeFilterMergeControllerEntity> _merge_controller_handler;
MonotonicStopWatch _fragment_watcher;
RuntimeProfile::Counter* _start_timer = nullptr;
RuntimeProfile::Counter* _prepare_timer = nullptr;

View File

@ -719,6 +719,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params,
static_cast<void>(_runtimefilter_controller.add_entity(
params.params, params.params.query_id, params.query_options, &handler,
RuntimeFilterParamsContext::create(fragment_executor->runtime_state())));
fragment_executor->set_merge_controller_handler(handler);
{
std::lock_guard<std::mutex> lock(_lock);
_fragment_instance_map.insert(
@ -806,6 +807,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
static_cast<void>(_runtimefilter_controller.add_entity(
params.local_params[i], params.query_id, params.query_options, &handler,
RuntimeFilterParamsContext::create(context->get_runtime_state())));
context->set_merge_controller_handler(handler);
const TUniqueId& fragment_instance_id = params.local_params[i].fragment_instance_id;
{
std::lock_guard<std::mutex> lock(_lock);
@ -885,7 +887,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
static_cast<void>(_runtimefilter_controller.add_entity(
local_params, params.query_id, params.query_options, &handler,
RuntimeFilterParamsContext::create(context->get_runtime_state())));
context->set_merge_controller_handler(handler);
{
std::lock_guard<std::mutex> lock(_lock);
_pipeline_map.insert(std::make_pair(fragment_instance_id, context));

View File

@ -134,6 +134,11 @@ public:
void set_need_wait_execution_trigger() { _need_wait_execution_trigger = true; }
void set_merge_controller_handler(
std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {
_merge_controller_handler = handler;
}
std::shared_ptr<QueryContext> get_query_ctx() { return _query_ctx; }
TUniqueId fragment_instance_id() const { return _fragment_instance_id; }
@ -214,6 +219,9 @@ private:
RuntimeProfile::Counter* _blocks_produced_counter = nullptr;
RuntimeProfile::Counter* _fragment_cpu_timer = nullptr;
// This shared ptr is never used. It is just a reference to hold the object.
// There is a weak ptr in runtime filter manager to reference this object.
std::shared_ptr<RuntimeFilterMergeControllerEntity> _merge_controller_handler;
// If set the true, this plan fragment will be executed only after FE send execution start rpc.
bool _need_wait_execution_trigger = false;