[fix](multi-table-load) fix multi table load can not finish (#29957)

This commit is contained in:
HHoflittlefish777
2024-01-17 13:57:53 +08:00
committed by yiguolei
parent 3deee14680
commit 6ef9ed08aa
3 changed files with 33 additions and 20 deletions

View File

@ -209,7 +209,7 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> para
_unplanned_pipes.size(), _planned_pipes.size(), params.size());
_unplanned_pipes.clear();
_inflight_plan_cnt += params.size();
_inflight_cnt += params.size();
for (auto& plan : params) {
if (!plan.__isset.table_name ||
_planned_pipes.find(plan.table_name) == _planned_pipes.end()) {
@ -263,20 +263,9 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> para
_status = *status;
}
--_inflight_plan_cnt;
if (_inflight_plan_cnt == 0 && is_consume_finished()) {
_ctx->number_total_rows = _number_total_rows;
_ctx->number_loaded_rows = _number_loaded_rows;
_ctx->number_filtered_rows = _number_filtered_rows;
_ctx->number_unselected_rows = _number_unselected_rows;
_ctx->commit_infos = _tablet_commit_infos;
LOG(INFO) << "all plan for multi-table load complete. number_total_rows="
<< _ctx->number_total_rows
<< " number_loaded_rows=" << _ctx->number_loaded_rows
<< " number_filtered_rows=" << _ctx->number_filtered_rows
<< " number_unselected_rows=" << _ctx->number_unselected_rows;
_ctx->promise.set_value(
_status); // when all done, finish the routine load task
auto inflight_cnt = _inflight_cnt.fetch_sub(1);
if (inflight_cnt == 1 && is_consume_finished()) {
_handle_consumer_finished();
}
}));
}
@ -303,6 +292,19 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> para
#endif
void MultiTablePipe::_handle_consumer_finished() {
_ctx->number_total_rows = _number_total_rows;
_ctx->number_loaded_rows = _number_loaded_rows;
_ctx->number_filtered_rows = _number_filtered_rows;
_ctx->number_unselected_rows = _number_unselected_rows;
_ctx->commit_infos = _tablet_commit_infos;
LOG(INFO) << "all plan for multi-table load complete. number_total_rows="
<< _ctx->number_total_rows << " number_loaded_rows=" << _ctx->number_loaded_rows
<< " number_filtered_rows=" << _ctx->number_filtered_rows
<< " number_unselected_rows=" << _ctx->number_unselected_rows;
_ctx->promise.set_value(_status); // when all done, finish the routine load task
}
Status MultiTablePipe::put_pipe(const TUniqueId& pipe_id,
std::shared_ptr<io::StreamLoadPipe> pipe) {
std::lock_guard<std::mutex> l(_pipe_map_lock);

View File

@ -46,7 +46,13 @@ public:
// request and execute plans for unplanned pipes
Status request_and_exec_plans();
void set_consume_finished() { _consume_finished.store(true, std::memory_order_release); }
void handle_consume_finished() {
_set_consume_finished();
auto inflight_cnt = _inflight_cnt.fetch_sub(1);
if (inflight_cnt == 1) {
_handle_consumer_finished();
}
}
bool is_consume_finished() { return _consume_finished.load(std::memory_order_acquire); }
@ -71,25 +77,30 @@ private:
template <typename ExecParam>
Status exec_plans(ExecEnv* exec_env, std::vector<ExecParam> params);
void _set_consume_finished() { _consume_finished.store(true, std::memory_order_release); }
void _handle_consumer_finished();
private:
std::unordered_map<std::string /*table*/, KafkaConsumerPipePtr> _planned_pipes;
std::unordered_map<std::string /*table*/, KafkaConsumerPipePtr> _unplanned_pipes;
std::atomic<uint64_t> _unplanned_row_cnt {0}; // trigger plan request when exceed threshold
std::atomic<uint64_t> _inflight_plan_cnt {0}; // how many plan fragment are executing?
// inflight count, when it is zero, means consume and all plans is finished
std::atomic<uint64_t> _inflight_cnt {1};
std::atomic<bool> _consume_finished {false};
// note: Use raw pointer here to avoid cycle reference with StreamLoadContext.
// Life cycle of MultiTablePipe is under control of StreamLoadContext, which means StreamLoadContext is created
// before NultiTablePipe and released after it. It is safe to use raw pointer here.
StreamLoadContext* _ctx = nullptr;
Status _status; // save the first error status of all executing plan fragment
#ifndef BE_TEST
std::mutex _tablet_commit_infos_lock;
std::vector<TTabletCommitInfo> _tablet_commit_infos; // collect from each plan fragment
std::atomic<int64_t> _number_total_rows {0};
std::atomic<int64_t> _number_loaded_rows {0};
std::atomic<int64_t> _number_filtered_rows {0};
std::atomic<int64_t> _number_unselected_rows {0};
#endif
std::mutex _pipe_map_lock;
std::unordered_map<TUniqueId /*instance id*/, std::shared_ptr<io::StreamLoadPipe>> _pipe_map;

View File

@ -350,7 +350,7 @@ void RoutineLoadTaskExecutor::exec_task(std::shared_ptr<StreamLoadContext> ctx,
HANDLE_ERROR(multi_table_pipe->request_and_exec_plans(),
"multi tables task executes plan error");
// need memory order
multi_table_pipe->set_consume_finished();
multi_table_pipe->handle_consume_finished();
HANDLE_ERROR(kafka_pipe->finish(), "finish multi table task failed");
}