diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 0d32c16186..8cf11047b6 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -294,29 +294,22 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { params.__isset.delta_urls = true; } } + + // load rows + static std::string s_dpp_normal_all = "dpp.norm.ALL"; + static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL"; + static std::string s_unselected_rows = "unselected.rows"; + int64_t num_rows_load_success = 0; + int64_t num_rows_load_filtered = 0; + int64_t num_rows_load_unselected = 0; if (req.runtime_state->num_rows_load_total() > 0 || req.runtime_state->num_rows_load_filtered() > 0) { params.__isset.load_counters = true; - static std::string s_dpp_normal_all = "dpp.norm.ALL"; - static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL"; - static std::string s_unselected_rows = "unselected.rows"; - - params.load_counters.emplace( - s_dpp_normal_all, std::to_string(req.runtime_state->num_rows_load_success())); - params.load_counters.emplace( - s_dpp_abnormal_all, - std::to_string(req.runtime_state->num_rows_load_filtered())); - params.load_counters.emplace( - s_unselected_rows, - std::to_string(req.runtime_state->num_rows_load_unselected())); + num_rows_load_success = req.runtime_state->num_rows_load_success(); + num_rows_load_filtered = req.runtime_state->num_rows_load_filtered(); + num_rows_load_unselected = req.runtime_state->num_rows_load_unselected(); } else if (!req.runtime_states.empty()) { - static std::string s_dpp_normal_all = "dpp.norm.ALL"; - static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL"; - static std::string s_unselected_rows = "unselected.rows"; - int64_t num_rows_load_success = 0; - int64_t num_rows_load_filtered = 0; - int64_t num_rows_load_unselected = 0; for (auto* rs : req.runtime_states) { if (rs->num_rows_load_total() > 0 || rs->num_rows_load_filtered() > 0) { params.__isset.load_counters = true; @@ -325,12 +318,16 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { num_rows_load_unselected += rs->num_rows_load_unselected(); } } - params.load_counters.emplace(s_dpp_normal_all, std::to_string(num_rows_load_success)); - params.load_counters.emplace(s_dpp_abnormal_all, - std::to_string(num_rows_load_filtered)); - params.load_counters.emplace(s_unselected_rows, - std::to_string(num_rows_load_unselected)); } + params.load_counters.emplace(s_dpp_normal_all, std::to_string(num_rows_load_success)); + params.load_counters.emplace(s_dpp_abnormal_all, std::to_string(num_rows_load_filtered)); + params.load_counters.emplace(s_unselected_rows, std::to_string(num_rows_load_unselected)); + LOG(INFO) << "execute coordinator callback, query id: " << print_id(req.query_id) + << ", instance id: " << print_id(req.fragment_instance_id) + << ", num_rows_load_success: " << num_rows_load_success + << ", num_rows_load_filtered: " << num_rows_load_filtered + << ", num_rows_load_unselected: " << num_rows_load_unselected; + if (!req.runtime_state->get_error_log_file_path().empty()) { params.__set_tracking_url( to_load_error_http_path(req.runtime_state->get_error_log_file_path())); diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp index 0dc0ac344b..5969461f35 100644 --- a/be/src/runtime/load_channel.cpp +++ b/be/src/runtime/load_channel.cpp @@ -47,9 +47,14 @@ LoadChannel::LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool is_hig LoadChannel::~LoadChannel() { g_loadchannel_cnt << -1; + std::stringstream rows_str; + for (const auto& entry : _tablets_channels_rows) { + rows_str << ", index id: " << entry.first << ", total_received_rows: " << entry.second.first + << ", num_rows_filtered: " << entry.second.second; + } LOG(INFO) << "load channel removed" << " load_id=" << _load_id << ", is high priority=" << _is_high_priority - << ", sender_ip=" << _sender_ip; + << ", sender_ip=" << _sender_ip << rows_str.str(); } void LoadChannel::_init_profile() { @@ -165,6 +170,9 @@ Status LoadChannel::_handle_eos(BaseTabletsChannel* channel, std::lock_guard l(_lock); { std::lock_guard l(_tablets_channels_lock); + _tablets_channels_rows.insert(std::make_pair( + index_id, + std::make_pair(channel->total_received_rows(), channel->num_rows_filtered()))); _tablets_channels.erase(index_id); } _finished_channel_ids.emplace(index_id); diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h index bdeedbd9ea..fc19e94215 100644 --- a/be/src/runtime/load_channel.h +++ b/be/src/runtime/load_channel.h @@ -107,6 +107,8 @@ private: std::mutex _lock; // index id -> tablets channel std::unordered_map> _tablets_channels; + // index id -> (received rows, filtered rows) + std::unordered_map> _tablets_channels_rows; SpinLock _tablets_channels_lock; // This is to save finished channels id, to handle the retry request. std::unordered_set _finished_channel_ids; diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index 2cafe30827..7ead68d916 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -361,6 +361,8 @@ void TabletsChannel::_commit_txn(DeltaWriter* writer, const PTabletWriterAddBloc tablet_info->set_schema_hash(0); tablet_info->set_received_rows(writer->total_received_rows()); tablet_info->set_num_rows_filtered(writer->num_rows_filtered()); + _total_received_rows += writer->total_received_rows(); + _num_rows_filtered += writer->num_rows_filtered(); } else { _add_error_tablet(res->mutable_tablet_errors(), writer->tablet_id(), st); } diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h index 75a0b7679e..15f68ba8e3 100644 --- a/be/src/runtime/tablets_channel.h +++ b/be/src/runtime/tablets_channel.h @@ -109,6 +109,10 @@ public: void refresh_profile(); + size_t total_received_rows() const { return _total_received_rows; } + + size_t num_rows_filtered() const { return _num_rows_filtered; } + protected: Status _get_current_seq(int64_t& cur_seq, const PTabletWriterAddBlockRequest& request); @@ -186,6 +190,10 @@ protected: RuntimeProfile::Counter* _add_batch_timer = nullptr; RuntimeProfile::Counter* _write_block_timer = nullptr; RuntimeProfile::Counter* _incremental_open_timer = nullptr; + + // record rows received and filtered + size_t _total_received_rows = 0; + size_t _num_rows_filtered = 0; }; class DeltaWriter;