[fix](pipeline) fix remove pipeline_x_context from fragment manager (#24062)

This commit is contained in:
Lijia Liu
2023-09-10 20:53:26 +08:00
committed by GitHub
parent 1df2e4454f
commit a05003fbe1
5 changed files with 7 additions and 20 deletions

View File

@ -135,6 +135,10 @@ public:
}
bool is_group_commit() { return _group_commit; }
virtual void instance_ids(std::vector<TUniqueId>& ins_ids) const {
ins_ids.resize(1);
ins_ids[0] = _fragment_instance_id;
}
protected:
Status _create_sink(int sender_id, const TDataSink& t_data_sink, RuntimeState* state);

View File

@ -69,7 +69,7 @@ public:
~PipelineXFragmentContext() override;
void instance_ids(std::vector<TUniqueId>& ins_ids) const {
void instance_ids(std::vector<TUniqueId>& ins_ids) const override {
ins_ids.resize(_runtime_states.size());
for (size_t i = 0; i < _runtime_states.size(); i++) {
ins_ids[i] = _runtime_states[i]->fragment_instance_id();

View File

@ -344,7 +344,7 @@ void FragmentMgr::_exec_actual(std::shared_ptr<PlanFragmentExecutor> fragment_ex
bool all_done = false;
if (query_ctx != nullptr) {
// decrease the number of unfinished fragments
all_done = query_ctx->countdown();
all_done = query_ctx->countdown(1);
}
// remove exec state after this fragment finished
@ -455,18 +455,6 @@ void FragmentMgr::remove_pipeline_context(
std::lock_guard<std::mutex> lock(_lock);
auto query_id = f_context->get_query_id();
auto* q_context = f_context->get_query_context();
bool all_done = q_context->countdown();
_pipeline_map.erase(f_context->get_fragment_instance_id());
if (all_done) {
_query_ctx_map.erase(query_id);
}
}
void FragmentMgr::remove_pipeline_context(
std::shared_ptr<pipeline::PipelineXFragmentContext> f_context) {
std::lock_guard<std::mutex> lock(_lock);
auto query_id = f_context->get_query_id();
auto* q_context = f_context->get_query_context();
std::vector<TUniqueId> ins_ids;
f_context->instance_ids(ins_ids);
bool all_done = q_context->countdown(ins_ids.size());

View File

@ -83,9 +83,6 @@ public:
void remove_pipeline_context(
std::shared_ptr<pipeline::PipelineFragmentContext> pipeline_context);
void remove_pipeline_context(
std::shared_ptr<pipeline::PipelineXFragmentContext> pipeline_context);
// TODO(zc): report this is over
Status exec_plan_fragment(const TExecPlanFragmentParams& params, const FinishCallback& cb);

View File

@ -95,9 +95,7 @@ public:
// Notice. For load fragments, the fragment_num sent by FE has a small probability of 0.
// this may be a bug, bug <= 1 in theory it shouldn't cause any problems at this stage.
bool countdown() { return countdown(1); }
bool countdown(int delta) { return fragment_num.fetch_sub(delta) <= 1; }
bool countdown(int instance_num) { return fragment_num.fetch_sub(instance_num) <= 1; }
ExecEnv* exec_env() { return _exec_env; }