[profile](refactor) Fix invalid shuffle profile (#26298)

This commit is contained in:
Gabriel
2023-11-02 17:07:05 +08:00
committed by GitHub
parent dd8bcc831c
commit 6828250207
4 changed files with 29 additions and 18 deletions

View File

@ -392,9 +392,11 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
} else if (_part_type == TPartitionType::HASH_PARTITIONED ||
_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
auto rows = block->rows();
SCOPED_TIMER(local_state._split_block_hash_compute_timer);
RETURN_IF_ERROR(
local_state._partitioner->do_partitioning(state, block, _mem_tracker.get()));
{
SCOPED_TIMER(local_state._split_block_hash_compute_timer);
RETURN_IF_ERROR(
local_state._partitioner->do_partitioning(state, block, _mem_tracker.get()));
}
if (_part_type == TPartitionType::HASH_PARTITIONED) {
RETURN_IF_ERROR(channel_add_rows(state, local_state.channels,
local_state._partition_count,

View File

@ -106,6 +106,11 @@ Status ResultFileSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& i
_sender_id = info.sender_id;
_brpc_wait_timer = ADD_TIMER(_profile, "BrpcSendTime.Wait");
_local_send_timer = ADD_TIMER(_profile, "LocalSendTime");
_brpc_send_timer = ADD_TIMER(_profile, "BrpcSendTime");
_split_block_distribute_by_channel_timer =
ADD_TIMER(_profile, "SplitBlockDistributeByChannelTime");
_brpc_send_timer = ADD_TIMER(_profile, "BrpcSendTime");
auto& p = _parent->cast<ResultFileSinkOperatorX>();
CHECK(p._file_opts.get() != nullptr);
if (p._is_top_sink) {

View File

@ -58,6 +58,12 @@ public:
[[nodiscard]] int sender_id() const { return _sender_id; }
RuntimeProfile::Counter* brpc_wait_timer() { return _brpc_wait_timer; }
RuntimeProfile::Counter* local_send_timer() { return _local_send_timer; }
RuntimeProfile::Counter* brpc_send_timer() { return _brpc_send_timer; }
RuntimeProfile::Counter* merge_block_timer() { return _merge_block_timer; }
RuntimeProfile::Counter* split_block_distribute_by_channel_timer() {
return _split_block_distribute_by_channel_timer;
}
private:
friend class ResultFileSinkOperatorX;
@ -73,6 +79,10 @@ private:
vectorized::BlockSerializer<ResultFileSinkLocalState> _serializer;
std::unique_ptr<vectorized::BroadcastPBlockHolder> _block_holder;
RuntimeProfile::Counter* _brpc_wait_timer;
RuntimeProfile::Counter* _local_send_timer;
RuntimeProfile::Counter* _brpc_send_timer;
RuntimeProfile::Counter* _merge_block_timer;
RuntimeProfile::Counter* _split_block_distribute_by_channel_timer;
int _sender_id;
};

View File

@ -126,9 +126,7 @@ Status Channel<Parent>::send_current_block(bool eos, Status exec_status) {
template <typename Parent>
Status Channel<Parent>::send_local_block(Status exec_status, bool eos) {
if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) {
SCOPED_TIMER(_parent->local_send_timer());
}
SCOPED_TIMER(_parent->local_send_timer());
Block block = _serializer.get_block()->to_block();
_serializer.get_block()->set_muatable_columns(block.clone_empty_columns());
if (_recvr_is_valid()) {
@ -157,9 +155,7 @@ Status Channel<Parent>::send_local_block(Status exec_status, bool eos) {
template <typename Parent>
Status Channel<Parent>::send_local_block(Block* block) {
if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) {
SCOPED_TIMER(_parent->local_send_timer());
}
SCOPED_TIMER(_parent->local_send_timer());
if (_recvr_is_valid()) {
if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) {
COUNTER_UPDATE(_parent->local_bytes_send_counter(), block->bytes());
@ -176,9 +172,9 @@ Status Channel<Parent>::send_local_block(Block* block) {
template <typename Parent>
Status Channel<Parent>::send_remote_block(PBlock* block, bool eos, Status exec_status) {
if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) {
SCOPED_TIMER(_parent->brpc_send_timer());
COUNTER_UPDATE(_parent->blocks_sent_counter(), 1);
}
SCOPED_TIMER(_parent->brpc_send_timer());
if (_closure == nullptr) {
_closure = new RefCountClosure<PTransmitDataResult>();
@ -631,8 +627,10 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
} else if (_part_type == TPartitionType::HASH_PARTITIONED ||
_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
auto rows = block->rows();
SCOPED_TIMER(_split_block_hash_compute_timer);
RETURN_IF_ERROR(_partitioner->do_partitioning(state, block, _mem_tracker.get()));
{
SCOPED_TIMER(_split_block_hash_compute_timer);
RETURN_IF_ERROR(_partitioner->do_partitioning(state, block, _mem_tracker.get()));
}
if (_part_type == TPartitionType::HASH_PARTITIONED) {
RETURN_IF_ERROR(channel_add_rows(state, _channels, _partition_count,
(uint64_t*)_partitioner->get_channel_ids(), rows,
@ -729,16 +727,12 @@ Status BlockSerializer<Parent>::next_serialized_block(Block* block, PBlock* dest
SCOPED_CONSUME_MEM_TRACKER(_parent->mem_tracker());
if (rows) {
if (rows->size() > 0) {
if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) {
SCOPED_TIMER(_parent->split_block_distribute_by_channel_timer());
}
SCOPED_TIMER(_parent->split_block_distribute_by_channel_timer());
const int* begin = &(*rows)[0];
_mutable_block->add_rows(block, begin, begin + rows->size());
}
} else if (!block->empty()) {
if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) {
SCOPED_TIMER(_parent->merge_block_timer());
}
SCOPED_TIMER(_parent->merge_block_timer());
RETURN_IF_ERROR(_mutable_block->merge(*block));
}
}