remove some unused code about spinlock remove some wno and fix warning remove varadic macro usage
424 lines
16 KiB
C++
424 lines
16 KiB
C++
// Licensed to the Apache Software Foundation (ASF) under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing,
|
|
// software distributed under the License is distributed on an
|
|
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
// KIND, either express or implied. See the License for the
|
|
// specific language governing permissions and limitations
|
|
// under the License.
|
|
|
|
#include "vec/runtime/vdata_stream_recvr.h"
|
|
|
|
#include <fmt/format.h>
|
|
#include <gen_cpp/Metrics_types.h>
|
|
#include <gen_cpp/Types_types.h>
|
|
#include <gen_cpp/data.pb.h>
|
|
|
|
#include <algorithm>
|
|
#include <functional>
|
|
#include <string>
|
|
|
|
#include "common/logging.h"
|
|
#include "runtime/memory/mem_tracker.h"
|
|
#include "runtime/runtime_state.h"
|
|
#include "runtime/thread_context.h"
|
|
#include "util/uid_util.h"
|
|
#include "vec/core/block.h"
|
|
#include "vec/core/materialize_block.h"
|
|
#include "vec/core/sort_cursor.h"
|
|
#include "vec/runtime/vdata_stream_mgr.h"
|
|
#include "vec/runtime/vsorted_run_merger.h"
|
|
|
|
namespace doris::vectorized {
|
|
|
|
VDataStreamRecvr::SenderQueue::SenderQueue(VDataStreamRecvr* parent_recvr, int num_senders,
|
|
RuntimeProfile* profile)
|
|
: _recvr(parent_recvr),
|
|
_is_cancelled(false),
|
|
_num_remaining_senders(num_senders),
|
|
_received_first_batch(false) {}
|
|
|
|
VDataStreamRecvr::SenderQueue::~SenderQueue() {
|
|
// Check pending closures, if it is not empty, should clear it here. but it should not happen.
|
|
// closure will delete itself during run method. If it is not called, brpc will memory leak.
|
|
DCHECK(_pending_closures.empty());
|
|
for (auto closure_pair : _pending_closures) {
|
|
closure_pair.first->Run();
|
|
}
|
|
_pending_closures.clear();
|
|
}
|
|
|
|
bool VDataStreamRecvr::SenderQueue::should_wait() {
|
|
std::unique_lock<std::mutex> l(_lock);
|
|
return !_is_cancelled && _block_queue.empty() && _num_remaining_senders > 0;
|
|
}
|
|
|
|
Status VDataStreamRecvr::SenderQueue::get_batch(Block* block, bool* eos) {
|
|
std::unique_lock<std::mutex> l(_lock);
|
|
// wait until something shows up or we know we're done
|
|
while (!_is_cancelled && _block_queue.empty() && _num_remaining_senders > 0) {
|
|
VLOG_ROW << "wait arrival fragment_instance_id=" << _recvr->fragment_instance_id()
|
|
<< " node=" << _recvr->dest_node_id();
|
|
// Don't count time spent waiting on the sender as active time.
|
|
CANCEL_SAFE_SCOPED_TIMER(_recvr->_data_arrival_timer, &_is_cancelled);
|
|
CANCEL_SAFE_SCOPED_TIMER(
|
|
_received_first_batch ? nullptr : _recvr->_first_batch_wait_total_timer,
|
|
&_is_cancelled);
|
|
_data_arrival_cv.wait(l);
|
|
}
|
|
return _inner_get_batch_without_lock(block, eos);
|
|
}
|
|
|
|
Status VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block, bool* eos) {
|
|
if (_is_cancelled) {
|
|
return Status::Cancelled("Cancelled");
|
|
}
|
|
|
|
if (_block_queue.empty()) {
|
|
DCHECK_EQ(_num_remaining_senders, 0);
|
|
*eos = true;
|
|
return Status::OK();
|
|
}
|
|
|
|
_received_first_batch = true;
|
|
|
|
DCHECK(!_block_queue.empty());
|
|
auto [next_block, block_byte_size] = std::move(_block_queue.front());
|
|
_recvr->_blocks_memory_usage->add(-block_byte_size);
|
|
_block_queue.pop_front();
|
|
|
|
if (!_pending_closures.empty()) {
|
|
auto closure_pair = _pending_closures.front();
|
|
closure_pair.first->Run();
|
|
_pending_closures.pop_front();
|
|
|
|
closure_pair.second.stop();
|
|
_recvr->_buffer_full_total_timer->update(closure_pair.second.elapsed_time());
|
|
}
|
|
block->swap(*next_block);
|
|
*eos = false;
|
|
return Status::OK();
|
|
}
|
|
|
|
void VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_number,
|
|
int64_t packet_seq,
|
|
::google::protobuf::Closure** done) {
|
|
{
|
|
std::lock_guard<std::mutex> l(_lock);
|
|
if (_is_cancelled) {
|
|
return;
|
|
}
|
|
auto iter = _packet_seq_map.find(be_number);
|
|
if (iter != _packet_seq_map.end()) {
|
|
if (iter->second >= packet_seq) {
|
|
LOG(WARNING) << fmt::format(
|
|
"packet already exist [cur_packet_id= {} receive_packet_id={}]",
|
|
iter->second, packet_seq);
|
|
return;
|
|
}
|
|
iter->second = packet_seq;
|
|
} else {
|
|
_packet_seq_map.emplace(be_number, packet_seq);
|
|
}
|
|
auto pblock_byte_size = pblock.ByteSizeLong();
|
|
COUNTER_UPDATE(_recvr->_bytes_received_counter, pblock_byte_size);
|
|
|
|
DCHECK(_num_remaining_senders >= 0);
|
|
if (_num_remaining_senders == 0) {
|
|
DCHECK(_sender_eos_set.end() != _sender_eos_set.find(be_number));
|
|
return;
|
|
}
|
|
}
|
|
|
|
BlockUPtr block = nullptr;
|
|
int64_t deserialize_time = 0;
|
|
{
|
|
SCOPED_RAW_TIMER(&deserialize_time);
|
|
block = Block::create_unique(pblock);
|
|
}
|
|
|
|
auto block_byte_size = block->allocated_bytes();
|
|
VLOG_ROW << "added #rows=" << block->rows() << " batch_size=" << block_byte_size << "\n";
|
|
|
|
std::lock_guard<std::mutex> l(_lock);
|
|
if (_is_cancelled) {
|
|
return;
|
|
}
|
|
|
|
COUNTER_UPDATE(_recvr->_deserialize_row_batch_timer, deserialize_time);
|
|
COUNTER_UPDATE(_recvr->_decompress_timer, block->get_decompress_time());
|
|
COUNTER_UPDATE(_recvr->_decompress_bytes, block->get_decompressed_bytes());
|
|
|
|
_block_queue.emplace_back(std::move(block), block_byte_size);
|
|
// if done is nullptr, this function can't delay this response
|
|
if (done != nullptr && _recvr->exceeds_limit(block_byte_size)) {
|
|
MonotonicStopWatch monotonicStopWatch;
|
|
monotonicStopWatch.start();
|
|
DCHECK(*done != nullptr);
|
|
_pending_closures.emplace_back(*done, monotonicStopWatch);
|
|
*done = nullptr;
|
|
}
|
|
_recvr->_blocks_memory_usage->add(block_byte_size);
|
|
_data_arrival_cv.notify_one();
|
|
}
|
|
|
|
void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) {
|
|
{
|
|
std::unique_lock<std::mutex> l(_lock);
|
|
if (_is_cancelled || !block->rows()) {
|
|
return;
|
|
}
|
|
}
|
|
|
|
auto block_bytes_received = block->bytes();
|
|
// Has to use unique ptr here, because clone column may failed if allocate memory failed.
|
|
BlockUPtr nblock = Block::create_unique(block->get_columns_with_type_and_name());
|
|
|
|
// local exchange should copy the block contented if use move == false
|
|
if (use_move) {
|
|
block->clear();
|
|
} else {
|
|
auto rows = block->rows();
|
|
for (int i = 0; i < nblock->columns(); ++i) {
|
|
nblock->get_by_position(i).column =
|
|
nblock->get_by_position(i).column->clone_resized(rows);
|
|
}
|
|
}
|
|
materialize_block_inplace(*nblock);
|
|
|
|
size_t block_mem_size = nblock->allocated_bytes();
|
|
std::unique_lock<std::mutex> l(_lock);
|
|
if (_is_cancelled) {
|
|
return;
|
|
}
|
|
COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_bytes_received);
|
|
|
|
_block_queue.emplace_back(std::move(nblock), block_mem_size);
|
|
_data_arrival_cv.notify_one();
|
|
|
|
if (_recvr->exceeds_limit(block_mem_size)) {
|
|
// yiguolei
|
|
// It is too tricky here, if the running thread is bthread then the tid may be wrong.
|
|
std::thread::id tid = std::this_thread::get_id();
|
|
MonotonicStopWatch monotonicStopWatch;
|
|
monotonicStopWatch.start();
|
|
auto iter = _local_closure.find(tid);
|
|
if (iter == _local_closure.end()) {
|
|
_local_closure.emplace(tid, new ThreadClosure);
|
|
iter = _local_closure.find(tid);
|
|
}
|
|
_pending_closures.emplace_back(iter->second.get(), monotonicStopWatch);
|
|
iter->second->wait(l);
|
|
}
|
|
|
|
_recvr->_blocks_memory_usage->add(block_mem_size);
|
|
}
|
|
|
|
void VDataStreamRecvr::SenderQueue::decrement_senders(int be_number) {
|
|
std::lock_guard<std::mutex> l(_lock);
|
|
if (_sender_eos_set.end() != _sender_eos_set.find(be_number)) {
|
|
return;
|
|
}
|
|
_sender_eos_set.insert(be_number);
|
|
DCHECK_GT(_num_remaining_senders, 0);
|
|
_num_remaining_senders--;
|
|
VLOG_FILE << "decremented senders: fragment_instance_id=" << _recvr->fragment_instance_id()
|
|
<< " node_id=" << _recvr->dest_node_id() << " #senders=" << _num_remaining_senders;
|
|
if (_num_remaining_senders == 0) {
|
|
_data_arrival_cv.notify_one();
|
|
}
|
|
}
|
|
|
|
void VDataStreamRecvr::SenderQueue::cancel() {
|
|
{
|
|
std::lock_guard<std::mutex> l(_lock);
|
|
if (_is_cancelled) {
|
|
return;
|
|
}
|
|
_is_cancelled = true;
|
|
VLOG_QUERY << "cancelled stream: _fragment_instance_id=" << _recvr->fragment_instance_id()
|
|
<< " node_id=" << _recvr->dest_node_id();
|
|
}
|
|
// Wake up all threads waiting to produce/consume batches. They will all
|
|
// notice that the stream is cancelled and handle it.
|
|
_data_arrival_cv.notify_all();
|
|
// _data_removal_cv.notify_all();
|
|
// PeriodicCounterUpdater::StopTimeSeriesCounter(
|
|
// _recvr->_bytes_received_time_series_counter);
|
|
|
|
{
|
|
std::lock_guard<std::mutex> l(_lock);
|
|
for (auto closure_pair : _pending_closures) {
|
|
closure_pair.first->Run();
|
|
}
|
|
_pending_closures.clear();
|
|
}
|
|
}
|
|
|
|
void VDataStreamRecvr::SenderQueue::close() {
|
|
{
|
|
// If _is_cancelled is not set to true, there may be concurrent send
|
|
// which add batch to _block_queue. The batch added after _block_queue
|
|
// is clear will be memory leak
|
|
std::lock_guard<std::mutex> l(_lock);
|
|
_is_cancelled = true;
|
|
|
|
for (auto closure_pair : _pending_closures) {
|
|
closure_pair.first->Run();
|
|
}
|
|
_pending_closures.clear();
|
|
}
|
|
|
|
// Delete any batches queued in _block_queue
|
|
_block_queue.clear();
|
|
}
|
|
|
|
VDataStreamRecvr::VDataStreamRecvr(
|
|
VDataStreamMgr* stream_mgr, RuntimeState* state, const RowDescriptor& row_desc,
|
|
const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders,
|
|
bool is_merging, RuntimeProfile* profile,
|
|
std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr)
|
|
: _mgr(stream_mgr),
|
|
#ifdef USE_MEM_TRACKER
|
|
_query_mem_tracker(state->query_mem_tracker()),
|
|
_query_id(state->query_id()),
|
|
#endif
|
|
_fragment_instance_id(fragment_instance_id),
|
|
_dest_node_id(dest_node_id),
|
|
_row_desc(row_desc),
|
|
_is_merging(is_merging),
|
|
_is_closed(false),
|
|
_profile(profile),
|
|
_sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr),
|
|
_enable_pipeline(state->enable_pipeline_exec()) {
|
|
// DataStreamRecvr may be destructed after the instance execution thread ends.
|
|
_mem_tracker =
|
|
std::make_unique<MemTracker>("VDataStreamRecvr:" + print_id(_fragment_instance_id),
|
|
_profile, nullptr, "PeakMemoryUsage");
|
|
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
|
|
|
|
// Create one queue per sender if is_merging is true.
|
|
int num_queues = is_merging ? num_senders : 1;
|
|
_sender_queues.reserve(num_queues);
|
|
int num_sender_per_queue = is_merging ? 1 : num_senders;
|
|
for (int i = 0; i < num_queues; ++i) {
|
|
SenderQueue* queue = nullptr;
|
|
if (_enable_pipeline) {
|
|
queue = _sender_queue_pool.add(new PipSenderQueue(this, num_sender_per_queue, profile));
|
|
} else {
|
|
queue = _sender_queue_pool.add(new SenderQueue(this, num_sender_per_queue, profile));
|
|
}
|
|
_sender_queues.push_back(queue);
|
|
}
|
|
|
|
// Initialize the counters
|
|
auto* memory_usage = _profile->create_child("PeakMemoryUsage", true, true);
|
|
_profile->add_child(memory_usage, false, nullptr);
|
|
_blocks_memory_usage = memory_usage->AddHighWaterMarkCounter("Blocks", TUnit::BYTES);
|
|
_bytes_received_counter = ADD_COUNTER(_profile, "BytesReceived", TUnit::BYTES);
|
|
_local_bytes_received_counter = ADD_COUNTER(_profile, "LocalBytesReceived", TUnit::BYTES);
|
|
|
|
_deserialize_row_batch_timer = ADD_TIMER(_profile, "DeserializeRowBatchTimer");
|
|
_data_arrival_timer = ADD_TIMER(_profile, "DataArrivalWaitTime");
|
|
_buffer_full_total_timer = ADD_TIMER(_profile, "SendersBlockedTotalTimer(*)");
|
|
_first_batch_wait_total_timer = ADD_TIMER(_profile, "FirstBatchArrivalWaitTime");
|
|
_decompress_timer = ADD_TIMER(_profile, "DecompressTime");
|
|
_decompress_bytes = ADD_COUNTER(_profile, "DecompressBytes", TUnit::BYTES);
|
|
}
|
|
|
|
VDataStreamRecvr::~VDataStreamRecvr() {
|
|
DCHECK(_mgr == nullptr) << "Must call close()";
|
|
}
|
|
|
|
Status VDataStreamRecvr::create_merger(const VExprContextSPtrs& ordering_expr,
|
|
const std::vector<bool>& is_asc_order,
|
|
const std::vector<bool>& nulls_first, size_t batch_size,
|
|
int64_t limit, size_t offset) {
|
|
DCHECK(_is_merging);
|
|
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
|
|
std::vector<BlockSupplier> child_block_suppliers;
|
|
// Create the merger that will a single stream of sorted rows.
|
|
_merger.reset(new VSortedRunMerger(ordering_expr, is_asc_order, nulls_first, batch_size, limit,
|
|
offset, _profile));
|
|
|
|
for (int i = 0; i < _sender_queues.size(); ++i) {
|
|
child_block_suppliers.emplace_back(std::bind(std::mem_fn(&SenderQueue::get_batch),
|
|
_sender_queues[i], std::placeholders::_1,
|
|
std::placeholders::_2));
|
|
}
|
|
RETURN_IF_ERROR(_merger->prepare(child_block_suppliers));
|
|
return Status::OK();
|
|
}
|
|
|
|
void VDataStreamRecvr::add_block(const PBlock& pblock, int sender_id, int be_number,
|
|
int64_t packet_seq, ::google::protobuf::Closure** done) {
|
|
SCOPED_ATTACH_TASK_WITH_ID(_query_mem_tracker, _query_id, _fragment_instance_id);
|
|
int use_sender_id = _is_merging ? sender_id : 0;
|
|
_sender_queues[use_sender_id]->add_block(pblock, be_number, packet_seq, done);
|
|
}
|
|
|
|
void VDataStreamRecvr::add_block(Block* block, int sender_id, bool use_move) {
|
|
int use_sender_id = _is_merging ? sender_id : 0;
|
|
_sender_queues[use_sender_id]->add_block(block, use_move);
|
|
}
|
|
|
|
bool VDataStreamRecvr::sender_queue_empty(int sender_id) {
|
|
int use_sender_id = _is_merging ? sender_id : 0;
|
|
return _sender_queues[use_sender_id]->queue_empty();
|
|
}
|
|
|
|
bool VDataStreamRecvr::ready_to_read() {
|
|
for (size_t i = 0; i < _sender_queues.size(); i++) {
|
|
if (_sender_queues[i]->should_wait()) {
|
|
return false;
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
|
|
Status VDataStreamRecvr::get_next(Block* block, bool* eos) {
|
|
if (!_is_merging) {
|
|
block->clear();
|
|
return _sender_queues[0]->get_batch(block, eos);
|
|
} else {
|
|
return _merger->get_next(block, eos);
|
|
}
|
|
}
|
|
|
|
void VDataStreamRecvr::remove_sender(int sender_id, int be_number) {
|
|
int use_sender_id = _is_merging ? sender_id : 0;
|
|
_sender_queues[use_sender_id]->decrement_senders(be_number);
|
|
}
|
|
|
|
void VDataStreamRecvr::cancel_stream() {
|
|
for (int i = 0; i < _sender_queues.size(); ++i) {
|
|
_sender_queues[i]->cancel();
|
|
}
|
|
}
|
|
|
|
void VDataStreamRecvr::close() {
|
|
if (_is_closed) {
|
|
return;
|
|
}
|
|
_is_closed = true;
|
|
for (int i = 0; i < _sender_queues.size(); ++i) {
|
|
_sender_queues[i]->close();
|
|
}
|
|
// Remove this receiver from the DataStreamMgr that created it.
|
|
// TODO: log error msg
|
|
_mgr->deregister_recvr(fragment_instance_id(), dest_node_id());
|
|
_mgr = nullptr;
|
|
|
|
_merger.reset();
|
|
}
|
|
|
|
} // namespace doris::vectorized
|