// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include "runtime/buffer_control_block.h" #include #include #include #include #include // IWYU pragma: no_include #include // IWYU pragma: keep #include #include #include #include #include #include "arrow/record_batch.h" #include "arrow/type_fwd.h" #include "pipeline/exec/result_sink_operator.h" #include "runtime/exec_env.h" #include "runtime/thread_context.h" #include "util/thrift_util.h" namespace doris { void GetResultBatchCtx::on_failure(const Status& status) { DCHECK(!status.ok()) << "status is ok, errmsg=" << status; status.to_protobuf(result->mutable_status()); { // call by result sink done->Run(); } delete this; } void GetResultBatchCtx::on_close(int64_t packet_seq, QueryStatistics* statistics) { Status status; status.to_protobuf(result->mutable_status()); if (statistics != nullptr) { statistics->to_pb(result->mutable_query_statistics()); } result->set_packet_seq(packet_seq); result->set_eos(true); { done->Run(); } delete this; } void GetResultBatchCtx::on_data(const std::unique_ptr& t_result, int64_t packet_seq, bool eos) { Status st = Status::OK(); if (t_result != nullptr) { uint8_t* buf = nullptr; uint32_t len = 0; ThriftSerializer ser(false, 4096); st = ser.serialize(&t_result->result_batch, &len, &buf); if (st.ok()) { result->set_row_batch(std::string((const char*)buf, len)); result->set_packet_seq(packet_seq); result->set_eos(eos); } else { LOG(WARNING) << "TFetchDataResult serialize failed, errmsg=" << st; } } else { result->set_empty_batch(true); result->set_packet_seq(packet_seq); result->set_eos(eos); } /// The size limit of proto buffer message is 2G if (result->ByteSizeLong() > std::numeric_limits::max()) { st = Status::InternalError("Message size exceeds 2GB: {}", result->ByteSizeLong()); result->clear_row_batch(); result->set_empty_batch(true); } st.to_protobuf(result->mutable_status()); { done->Run(); } delete this; } BufferControlBlock::BufferControlBlock(const TUniqueId& id, int buffer_size) : _fragment_id(id), _is_close(false), _is_cancelled(false), _buffer_rows(0), _buffer_limit(buffer_size), _packet_num(0) { _query_statistics = std::make_unique(); } BufferControlBlock::~BufferControlBlock() { cancel(); } Status BufferControlBlock::init() { return Status::OK(); } bool BufferControlBlock::can_sink() { std::unique_lock l(_lock); return _get_batch_queue_empty() || _buffer_rows < _buffer_limit || _is_cancelled; } Status BufferControlBlock::add_batch(std::unique_ptr& result) { std::unique_lock l(_lock); if (_is_cancelled) { return Status::Cancelled("Cancelled"); } int num_rows = result->result_batch.rows.size(); while ((!_fe_result_batch_queue.empty() && _buffer_rows > _buffer_limit) && !_is_cancelled) { _data_removal.wait_for(l, std::chrono::seconds(1)); } if (_is_cancelled) { return Status::Cancelled("Cancelled"); } if (_waiting_rpc.empty()) { // Merge result into batch to reduce rpc times if (!_fe_result_batch_queue.empty() && ((_fe_result_batch_queue.back()->result_batch.rows.size() + num_rows) < _buffer_limit) && !result->eos) { std::vector& back_rows = _fe_result_batch_queue.back()->result_batch.rows; std::vector& result_rows = result->result_batch.rows; back_rows.insert(back_rows.end(), std::make_move_iterator(result_rows.begin()), std::make_move_iterator(result_rows.end())); } else { _fe_result_batch_queue.push_back(std::move(result)); } _buffer_rows += num_rows; } else { auto ctx = _waiting_rpc.front(); _waiting_rpc.pop_front(); ctx->on_data(result, _packet_num); _packet_num++; } return Status::OK(); } Status BufferControlBlock::add_arrow_batch(std::shared_ptr& result) { std::unique_lock l(_lock); if (_is_cancelled) { return Status::Cancelled("Cancelled"); } int num_rows = result->num_rows(); while ((!_arrow_flight_batch_queue.empty() && _buffer_rows > _buffer_limit) && !_is_cancelled) { _data_removal.wait_for(l, std::chrono::seconds(1)); } if (_is_cancelled) { return Status::Cancelled("Cancelled"); } // TODO: merge RocordBatch, ToStructArray -> Make again _arrow_flight_batch_queue.push_back(std::move(result)); _buffer_rows += num_rows; _data_arrival.notify_one(); return Status::OK(); } void BufferControlBlock::get_batch(GetResultBatchCtx* ctx) { std::lock_guard l(_lock); if (!_status.ok()) { ctx->on_failure(_status); return; } if (_is_cancelled) { ctx->on_failure(Status::Cancelled("Cancelled")); return; } if (!_fe_result_batch_queue.empty()) { // get result std::unique_ptr result = std::move(_fe_result_batch_queue.front()); _fe_result_batch_queue.pop_front(); _buffer_rows -= result->result_batch.rows.size(); _data_removal.notify_one(); ctx->on_data(result, _packet_num); _packet_num++; return; } if (_is_close) { ctx->on_close(_packet_num, _query_statistics.get()); return; } // no ready data, push ctx to waiting list _waiting_rpc.push_back(ctx); } Status BufferControlBlock::get_arrow_batch(std::shared_ptr* result) { std::unique_lock l(_lock); if (!_status.ok()) { return _status; } if (_is_cancelled) { return Status::Cancelled("Cancelled"); } while (_arrow_flight_batch_queue.empty() && !_is_cancelled && !_is_close) { _data_arrival.wait_for(l, std::chrono::seconds(1)); } if (_is_cancelled) { return Status::Cancelled("Cancelled"); } if (!_arrow_flight_batch_queue.empty()) { *result = std::move(_arrow_flight_batch_queue.front()); _arrow_flight_batch_queue.pop_front(); _buffer_rows -= (*result)->num_rows(); _data_removal.notify_one(); _packet_num++; return Status::OK(); } // normal path end if (_is_close) { return Status::OK(); } return Status::InternalError("Get Arrow Batch Abnormal Ending"); } Status BufferControlBlock::close(Status exec_status) { std::unique_lock l(_lock); _is_close = true; _status = exec_status; // notify blocked get thread _data_arrival.notify_all(); if (!_waiting_rpc.empty()) { if (_status.ok()) { for (auto& ctx : _waiting_rpc) { ctx->on_close(_packet_num, _query_statistics.get()); } } else { for (auto& ctx : _waiting_rpc) { ctx->on_failure(_status); } } _waiting_rpc.clear(); } return Status::OK(); } void BufferControlBlock::cancel() { std::unique_lock l(_lock); _is_cancelled = true; _data_removal.notify_all(); _data_arrival.notify_all(); for (auto& ctx : _waiting_rpc) { ctx->on_failure(Status::Cancelled("Cancelled")); } _waiting_rpc.clear(); } Status PipBufferControlBlock::add_batch(std::unique_ptr& result) { RETURN_IF_ERROR(BufferControlBlock::add_batch(result)); _update_dependency(); return Status::OK(); } Status PipBufferControlBlock::add_arrow_batch(std::shared_ptr& result) { RETURN_IF_ERROR(BufferControlBlock::add_arrow_batch(result)); _update_dependency(); return Status::OK(); } void PipBufferControlBlock::get_batch(GetResultBatchCtx* ctx) { BufferControlBlock::get_batch(ctx); _update_dependency(); } Status PipBufferControlBlock::get_arrow_batch(std::shared_ptr* result) { RETURN_IF_ERROR(BufferControlBlock::get_arrow_batch(result)); _update_dependency(); return Status::OK(); } void PipBufferControlBlock::cancel() { BufferControlBlock::cancel(); _update_dependency(); } void PipBufferControlBlock::set_dependency( std::shared_ptr result_sink_dependency) { _result_sink_dependency = result_sink_dependency; } void PipBufferControlBlock::_update_dependency() { if (_result_sink_dependency && (_batch_queue_empty || _buffer_rows < _buffer_limit || _is_cancelled)) { _result_sink_dependency->set_ready(); } else if (_result_sink_dependency && (!_batch_queue_empty && _buffer_rows < _buffer_limit && !_is_cancelled)) { _result_sink_dependency->block(); } } void PipBufferControlBlock::_update_batch_queue_empty() { _batch_queue_empty = _fe_result_batch_queue.empty() && _arrow_flight_batch_queue.empty(); _update_dependency(); } } // namespace doris