diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 7422890aa4..42c0e83f4b 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -78,6 +78,7 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state, VLOG_DEBUG << "group commit meets commit condition for data size, label=" << label << ", instance_id=" << load_instance_id << ", data_bytes=" << _data_bytes; _need_commit = true; + data_size_condition = true; } _get_cond.notify_all(); return Status::OK(); @@ -417,6 +418,9 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_ << ", exec_plan_fragment status=" << status.to_string() << ", commit/abort txn rpc status=" << st.to_string() << ", commit/abort txn status=" << result_status.to_string() + << ", this group commit includes " << load_block_queue->group_commit_load_count << " loads" + << ", flush because meet " + << (load_block_queue->data_size_condition ? "data size " : "time ") << "condition" << ", wal space info:" << ExecEnv::GetInstance()->wal_mgr()->get_wal_dirs_info_string(); if (state) { if (!state->get_error_log_file_path().empty()) { diff --git a/be/src/runtime/group_commit_mgr.h b/be/src/runtime/group_commit_mgr.h index e3b28be580..5357ba208f 100644 --- a/be/src/runtime/group_commit_mgr.h +++ b/be/src/runtime/group_commit_mgr.h @@ -78,6 +78,10 @@ public: int64_t txn_id; int64_t schema_version; bool wait_internal_group_commit_finish = false; + bool data_size_condition = false; + + // counts of load in one group commit + std::atomic_size_t group_commit_load_count = 0; // the execute status of this internal group commit std::mutex mutex; diff --git a/be/src/vec/sink/group_commit_block_sink.cpp b/be/src/vec/sink/group_commit_block_sink.cpp index 6ff9d4a142..2ce03ba0c1 100644 --- a/be/src/vec/sink/group_commit_block_sink.cpp +++ b/be/src/vec/sink/group_commit_block_sink.cpp @@ -35,6 +35,8 @@ namespace doris { namespace vectorized { +bvar::Adder g_group_commit_load_rows("doris_group_commit_load_rows"); +bvar::Adder g_group_commit_load_bytes("doris_group_commit_load_bytes"); GroupCommitBlockSink::GroupCommitBlockSink(ObjectPool* pool, const RowDescriptor& row_desc, const std::vector& texprs, Status* status) @@ -48,6 +50,7 @@ GroupCommitBlockSink::~GroupCommitBlockSink() { if (_load_block_queue) { _remove_estimated_wal_bytes(); _load_block_queue->remove_load_id(_load_id); + _load_block_queue->group_commit_load_count.fetch_add(1); } } @@ -144,12 +147,13 @@ Status GroupCommitBlockSink::send(RuntimeState* state, vectorized::Block* input_ return status; } SCOPED_TIMER(_profile->total_time_counter()); + // update incrementally so that FE can get the progress. // the real 'num_rows_load_total' will be set when sink being closed. state->update_num_rows_load_total(rows); state->update_num_bytes_load_total(bytes); - DorisMetrics::instance()->load_rows->increment(rows); - DorisMetrics::instance()->load_bytes->increment(bytes); + g_group_commit_load_rows << rows; + g_group_commit_load_bytes << bytes; std::shared_ptr block; bool has_filtered_rows = false;