[improve](insert-into) record rows info in log for check (#29581)
This commit is contained in:
committed by
GitHub
parent
fa4721125e
commit
e2e9b9d8a3
@ -294,29 +294,22 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
|
||||
params.__isset.delta_urls = true;
|
||||
}
|
||||
}
|
||||
|
||||
// load rows
|
||||
static std::string s_dpp_normal_all = "dpp.norm.ALL";
|
||||
static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL";
|
||||
static std::string s_unselected_rows = "unselected.rows";
|
||||
int64_t num_rows_load_success = 0;
|
||||
int64_t num_rows_load_filtered = 0;
|
||||
int64_t num_rows_load_unselected = 0;
|
||||
if (req.runtime_state->num_rows_load_total() > 0 ||
|
||||
req.runtime_state->num_rows_load_filtered() > 0) {
|
||||
params.__isset.load_counters = true;
|
||||
|
||||
static std::string s_dpp_normal_all = "dpp.norm.ALL";
|
||||
static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL";
|
||||
static std::string s_unselected_rows = "unselected.rows";
|
||||
|
||||
params.load_counters.emplace(
|
||||
s_dpp_normal_all, std::to_string(req.runtime_state->num_rows_load_success()));
|
||||
params.load_counters.emplace(
|
||||
s_dpp_abnormal_all,
|
||||
std::to_string(req.runtime_state->num_rows_load_filtered()));
|
||||
params.load_counters.emplace(
|
||||
s_unselected_rows,
|
||||
std::to_string(req.runtime_state->num_rows_load_unselected()));
|
||||
num_rows_load_success = req.runtime_state->num_rows_load_success();
|
||||
num_rows_load_filtered = req.runtime_state->num_rows_load_filtered();
|
||||
num_rows_load_unselected = req.runtime_state->num_rows_load_unselected();
|
||||
} else if (!req.runtime_states.empty()) {
|
||||
static std::string s_dpp_normal_all = "dpp.norm.ALL";
|
||||
static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL";
|
||||
static std::string s_unselected_rows = "unselected.rows";
|
||||
int64_t num_rows_load_success = 0;
|
||||
int64_t num_rows_load_filtered = 0;
|
||||
int64_t num_rows_load_unselected = 0;
|
||||
for (auto* rs : req.runtime_states) {
|
||||
if (rs->num_rows_load_total() > 0 || rs->num_rows_load_filtered() > 0) {
|
||||
params.__isset.load_counters = true;
|
||||
@ -325,12 +318,16 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
|
||||
num_rows_load_unselected += rs->num_rows_load_unselected();
|
||||
}
|
||||
}
|
||||
params.load_counters.emplace(s_dpp_normal_all, std::to_string(num_rows_load_success));
|
||||
params.load_counters.emplace(s_dpp_abnormal_all,
|
||||
std::to_string(num_rows_load_filtered));
|
||||
params.load_counters.emplace(s_unselected_rows,
|
||||
std::to_string(num_rows_load_unselected));
|
||||
}
|
||||
params.load_counters.emplace(s_dpp_normal_all, std::to_string(num_rows_load_success));
|
||||
params.load_counters.emplace(s_dpp_abnormal_all, std::to_string(num_rows_load_filtered));
|
||||
params.load_counters.emplace(s_unselected_rows, std::to_string(num_rows_load_unselected));
|
||||
LOG(INFO) << "execute coordinator callback, query id: " << print_id(req.query_id)
|
||||
<< ", instance id: " << print_id(req.fragment_instance_id)
|
||||
<< ", num_rows_load_success: " << num_rows_load_success
|
||||
<< ", num_rows_load_filtered: " << num_rows_load_filtered
|
||||
<< ", num_rows_load_unselected: " << num_rows_load_unselected;
|
||||
|
||||
if (!req.runtime_state->get_error_log_file_path().empty()) {
|
||||
params.__set_tracking_url(
|
||||
to_load_error_http_path(req.runtime_state->get_error_log_file_path()));
|
||||
|
||||
@ -47,9 +47,14 @@ LoadChannel::LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool is_hig
|
||||
|
||||
LoadChannel::~LoadChannel() {
|
||||
g_loadchannel_cnt << -1;
|
||||
std::stringstream rows_str;
|
||||
for (const auto& entry : _tablets_channels_rows) {
|
||||
rows_str << ", index id: " << entry.first << ", total_received_rows: " << entry.second.first
|
||||
<< ", num_rows_filtered: " << entry.second.second;
|
||||
}
|
||||
LOG(INFO) << "load channel removed"
|
||||
<< " load_id=" << _load_id << ", is high priority=" << _is_high_priority
|
||||
<< ", sender_ip=" << _sender_ip;
|
||||
<< ", sender_ip=" << _sender_ip << rows_str.str();
|
||||
}
|
||||
|
||||
void LoadChannel::_init_profile() {
|
||||
@ -165,6 +170,9 @@ Status LoadChannel::_handle_eos(BaseTabletsChannel* channel,
|
||||
std::lock_guard<std::mutex> l(_lock);
|
||||
{
|
||||
std::lock_guard<SpinLock> l(_tablets_channels_lock);
|
||||
_tablets_channels_rows.insert(std::make_pair(
|
||||
index_id,
|
||||
std::make_pair(channel->total_received_rows(), channel->num_rows_filtered())));
|
||||
_tablets_channels.erase(index_id);
|
||||
}
|
||||
_finished_channel_ids.emplace(index_id);
|
||||
|
||||
@ -107,6 +107,8 @@ private:
|
||||
std::mutex _lock;
|
||||
// index id -> tablets channel
|
||||
std::unordered_map<int64_t, std::shared_ptr<BaseTabletsChannel>> _tablets_channels;
|
||||
// index id -> (received rows, filtered rows)
|
||||
std::unordered_map<int64_t, std::pair<size_t, size_t>> _tablets_channels_rows;
|
||||
SpinLock _tablets_channels_lock;
|
||||
// This is to save finished channels id, to handle the retry request.
|
||||
std::unordered_set<int64_t> _finished_channel_ids;
|
||||
|
||||
@ -361,6 +361,8 @@ void TabletsChannel::_commit_txn(DeltaWriter* writer, const PTabletWriterAddBloc
|
||||
tablet_info->set_schema_hash(0);
|
||||
tablet_info->set_received_rows(writer->total_received_rows());
|
||||
tablet_info->set_num_rows_filtered(writer->num_rows_filtered());
|
||||
_total_received_rows += writer->total_received_rows();
|
||||
_num_rows_filtered += writer->num_rows_filtered();
|
||||
} else {
|
||||
_add_error_tablet(res->mutable_tablet_errors(), writer->tablet_id(), st);
|
||||
}
|
||||
|
||||
@ -109,6 +109,10 @@ public:
|
||||
|
||||
void refresh_profile();
|
||||
|
||||
size_t total_received_rows() const { return _total_received_rows; }
|
||||
|
||||
size_t num_rows_filtered() const { return _num_rows_filtered; }
|
||||
|
||||
protected:
|
||||
Status _get_current_seq(int64_t& cur_seq, const PTabletWriterAddBlockRequest& request);
|
||||
|
||||
@ -186,6 +190,10 @@ protected:
|
||||
RuntimeProfile::Counter* _add_batch_timer = nullptr;
|
||||
RuntimeProfile::Counter* _write_block_timer = nullptr;
|
||||
RuntimeProfile::Counter* _incremental_open_timer = nullptr;
|
||||
|
||||
// record rows received and filtered
|
||||
size_t _total_received_rows = 0;
|
||||
size_t _num_rows_filtered = 0;
|
||||
};
|
||||
|
||||
class DeltaWriter;
|
||||
|
||||
Reference in New Issue
Block a user