@ -64,6 +64,7 @@ DEFINE_Int32(brpc_port, "8060");
|
||||
DEFINE_Int32(arrow_flight_sql_port, "-1");
|
||||
|
||||
DEFINE_mString(public_access_ip, "");
|
||||
DEFINE_Int32(public_access_port, "-1");
|
||||
|
||||
// the number of bthreads for brpc, the default value is set to -1,
|
||||
// which means the number of bthreads is #cpu-cores
|
||||
@ -524,6 +525,8 @@ DEFINE_Int32(brpc_heavy_work_pool_threads, "-1");
|
||||
DEFINE_Int32(brpc_light_work_pool_threads, "-1");
|
||||
DEFINE_Int32(brpc_heavy_work_pool_max_queue_size, "-1");
|
||||
DEFINE_Int32(brpc_light_work_pool_max_queue_size, "-1");
|
||||
DEFINE_Int32(brpc_arrow_flight_work_pool_threads, "-1");
|
||||
DEFINE_Int32(brpc_arrow_flight_work_pool_max_queue_size, "-1");
|
||||
|
||||
//Enable brpc builtin services, see:
|
||||
//https://brpc.apache.org/docs/server/basics/#disable-built-in-services-completely
|
||||
@ -646,7 +649,11 @@ DEFINE_Int32(load_process_safe_mem_permit_percent, "5");
|
||||
// result buffer cancelled time (unit: second)
|
||||
DEFINE_mInt32(result_buffer_cancelled_interval_time, "300");
|
||||
|
||||
// arrow flight result sink buffer rows size, default 4096 * 8
|
||||
DEFINE_mInt32(arrow_flight_result_sink_buffer_size_rows, "32768");
|
||||
// The timeout for ADBC Client to wait for data using arrow flight reader.
|
||||
// If the query is very complex and no result is generated after this time, consider increasing this timeout.
|
||||
DEFINE_mInt32(arrow_flight_reader_brpc_controller_timeout_ms, "300000");
|
||||
|
||||
// the increased frequency of priority for remaining tasks in BlockingPriorityQueue
|
||||
DEFINE_mInt32(priority_queue_remaining_tasks_increased_frequency, "512");
|
||||
|
||||
@ -104,6 +104,7 @@ DECLARE_Int32(arrow_flight_sql_port);
|
||||
// For ADBC client fetch result, default is empty, the ADBC client uses the backend ip to fetch the result.
|
||||
// If ADBC client cannot access the backend ip, can set public_access_ip to modify the fetch result ip.
|
||||
DECLARE_mString(public_access_ip);
|
||||
DECLARE_Int32(public_access_port);
|
||||
|
||||
// the number of bthreads for brpc, the default value is set to -1,
|
||||
// which means the number of bthreads is #cpu-cores
|
||||
@ -580,6 +581,8 @@ DECLARE_Int32(brpc_heavy_work_pool_threads);
|
||||
DECLARE_Int32(brpc_light_work_pool_threads);
|
||||
DECLARE_Int32(brpc_heavy_work_pool_max_queue_size);
|
||||
DECLARE_Int32(brpc_light_work_pool_max_queue_size);
|
||||
DECLARE_Int32(brpc_arrow_flight_work_pool_threads);
|
||||
DECLARE_Int32(brpc_arrow_flight_work_pool_max_queue_size);
|
||||
|
||||
// The maximum amount of data that can be processed by a stream load
|
||||
DECLARE_mInt64(streaming_load_max_mb);
|
||||
@ -701,6 +704,9 @@ DECLARE_mInt32(result_buffer_cancelled_interval_time);
|
||||
|
||||
// arrow flight result sink buffer rows size, default 4096 * 8
|
||||
DECLARE_mInt32(arrow_flight_result_sink_buffer_size_rows);
|
||||
// The timeout for ADBC Client to wait for data using arrow flight reader.
|
||||
// If the query is very complex and no result is generated after this time, consider increasing this timeout.
|
||||
DECLARE_mInt32(arrow_flight_reader_brpc_controller_timeout_ms);
|
||||
|
||||
// the increased frequency of priority for remaining tasks in BlockingPriorityQueue
|
||||
DECLARE_mInt32(priority_queue_remaining_tasks_increased_frequency);
|
||||
|
||||
@ -115,7 +115,7 @@ Status ResultFileSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& i
|
||||
// create sender
|
||||
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
|
||||
state->fragment_instance_id(), p._buf_size, &_sender, state->enable_pipeline_exec(),
|
||||
state->execution_timeout()));
|
||||
state));
|
||||
// create writer
|
||||
_writer.reset(new (std::nothrow) vectorized::VFileResultWriter(
|
||||
p._file_opts.get(), p._storage_type, state->fragment_instance_id(),
|
||||
|
||||
@ -69,8 +69,7 @@ Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info)
|
||||
// create sender
|
||||
auto& p = _parent->cast<ResultSinkOperatorX>();
|
||||
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
|
||||
state->fragment_instance_id(), p._result_sink_buffer_size_rows, &_sender, true,
|
||||
state->execution_timeout()));
|
||||
state->fragment_instance_id(), p._result_sink_buffer_size_rows, &_sender, true, state));
|
||||
((PipBufferControlBlock*)_sender.get())->set_dependency(_dependency->shared_from_this());
|
||||
return Status::OK();
|
||||
}
|
||||
@ -98,12 +97,11 @@ Status ResultSinkLocalState::open(RuntimeState* state) {
|
||||
}
|
||||
case TResultSinkType::ARROW_FLIGHT_PROTOCAL: {
|
||||
std::shared_ptr<arrow::Schema> arrow_schema;
|
||||
RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs, &arrow_schema,
|
||||
state->timezone()));
|
||||
state->exec_env()->result_mgr()->register_arrow_schema(state->fragment_instance_id(),
|
||||
arrow_schema);
|
||||
RETURN_IF_ERROR(get_arrow_schema_from_expr_ctxs(_output_vexpr_ctxs, &arrow_schema,
|
||||
state->timezone()));
|
||||
_sender->register_arrow_schema(arrow_schema);
|
||||
_writer.reset(new (std::nothrow) vectorized::VArrowFlightResultWriter(
|
||||
_sender.get(), _output_vexpr_ctxs, _profile, arrow_schema));
|
||||
_sender.get(), _output_vexpr_ctxs, _profile));
|
||||
break;
|
||||
}
|
||||
default:
|
||||
|
||||
@ -33,9 +33,11 @@
|
||||
#include "arrow/record_batch.h"
|
||||
#include "arrow/type_fwd.h"
|
||||
#include "pipeline/exec/result_sink_operator.h"
|
||||
#include "runtime/exec_env.h"
|
||||
#include "runtime/thread_context.h"
|
||||
#include "util/runtime_profile.h"
|
||||
#include "util/string_util.h"
|
||||
#include "util/thrift_util.h"
|
||||
#include "vec/core/block.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
@ -93,14 +95,80 @@ void GetResultBatchCtx::on_data(const std::unique_ptr<TFetchDataResult>& t_resul
|
||||
delete this;
|
||||
}
|
||||
|
||||
BufferControlBlock::BufferControlBlock(const TUniqueId& id, int buffer_size)
|
||||
void GetArrowResultBatchCtx::on_failure(const Status& status) {
|
||||
DCHECK(!status.ok()) << "status is ok, errmsg=" << status;
|
||||
status.to_protobuf(result->mutable_status());
|
||||
delete this;
|
||||
}
|
||||
|
||||
void GetArrowResultBatchCtx::on_close(int64_t packet_seq) {
|
||||
Status status;
|
||||
status.to_protobuf(result->mutable_status());
|
||||
result->set_packet_seq(packet_seq);
|
||||
result->set_eos(true);
|
||||
delete this;
|
||||
}
|
||||
|
||||
void GetArrowResultBatchCtx::on_data(
|
||||
const std::shared_ptr<vectorized::Block>& block, int64_t packet_seq, int be_exec_version,
|
||||
segment_v2::CompressionTypePB fragement_transmission_compression_type, std::string timezone,
|
||||
RuntimeProfile::Counter* serialize_batch_ns_timer,
|
||||
RuntimeProfile::Counter* uncompressed_bytes_counter,
|
||||
RuntimeProfile::Counter* compressed_bytes_counter) {
|
||||
Status st = Status::OK();
|
||||
if (result != nullptr) {
|
||||
size_t uncompressed_bytes = 0, compressed_bytes = 0;
|
||||
SCOPED_TIMER(serialize_batch_ns_timer);
|
||||
st = block->serialize(be_exec_version, result->mutable_block(), &uncompressed_bytes,
|
||||
&compressed_bytes, fragement_transmission_compression_type, false);
|
||||
COUNTER_UPDATE(uncompressed_bytes_counter, uncompressed_bytes);
|
||||
COUNTER_UPDATE(compressed_bytes_counter, compressed_bytes);
|
||||
if (st.ok()) {
|
||||
result->set_packet_seq(packet_seq);
|
||||
result->set_eos(false);
|
||||
if (packet_seq == 0) {
|
||||
result->set_timezone(timezone);
|
||||
}
|
||||
} else {
|
||||
result->clear_block();
|
||||
result->set_packet_seq(packet_seq);
|
||||
LOG(WARNING) << "TFetchDataResult serialize failed, errmsg=" << st;
|
||||
}
|
||||
} else {
|
||||
result->set_empty_batch(true);
|
||||
result->set_packet_seq(packet_seq);
|
||||
result->set_eos(false);
|
||||
}
|
||||
|
||||
/// The size limit of proto buffer message is 2G
|
||||
if (result->ByteSizeLong() > std::numeric_limits<int32_t>::max()) {
|
||||
st = Status::InternalError("Message size exceeds 2GB: {}", result->ByteSizeLong());
|
||||
result->clear_block();
|
||||
}
|
||||
st.to_protobuf(result->mutable_status());
|
||||
delete this;
|
||||
}
|
||||
|
||||
BufferControlBlock::BufferControlBlock(const TUniqueId& id, int buffer_size, RuntimeState* state)
|
||||
: _fragment_id(id),
|
||||
_is_close(false),
|
||||
_is_cancelled(false),
|
||||
_buffer_rows(0),
|
||||
_buffer_limit(buffer_size),
|
||||
_packet_num(0) {
|
||||
_packet_num(0),
|
||||
_timezone(state->timezone()),
|
||||
_timezone_obj(state->timezone_obj()),
|
||||
_be_exec_version(state->be_exec_version()),
|
||||
_fragement_transmission_compression_type(
|
||||
state->fragement_transmission_compression_type()),
|
||||
_profile("BufferControlBlock " + print_id(_fragment_id)) {
|
||||
_query_statistics = std::make_unique<QueryStatistics>();
|
||||
_serialize_batch_ns_timer = ADD_TIMER(&_profile, "SerializeBatchNsTime");
|
||||
_uncompressed_bytes_counter = ADD_COUNTER(&_profile, "UncompressedBytes", TUnit::BYTES);
|
||||
_compressed_bytes_counter = ADD_COUNTER(&_profile, "CompressedBytes", TUnit::BYTES);
|
||||
_mem_tracker = MemTrackerLimiter::create_shared(
|
||||
MemTrackerLimiter::Type::QUERY,
|
||||
fmt::format("BufferControlBlock#FragmentInstanceId={}", print_id(_fragment_id)));
|
||||
}
|
||||
|
||||
BufferControlBlock::~BufferControlBlock() {
|
||||
@ -157,28 +225,29 @@ Status BufferControlBlock::add_batch(std::unique_ptr<TFetchDataResult>& result,
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status BufferControlBlock::add_arrow_batch(std::shared_ptr<arrow::RecordBatch>& result) {
|
||||
Status BufferControlBlock::add_arrow_batch(std::shared_ptr<vectorized::Block>& result) {
|
||||
std::unique_lock<std::mutex> l(_lock);
|
||||
|
||||
if (_is_cancelled) {
|
||||
return Status::Cancelled("Cancelled");
|
||||
}
|
||||
|
||||
int num_rows = result->num_rows();
|
||||
|
||||
while ((!_arrow_flight_batch_queue.empty() && _buffer_rows > _buffer_limit) && !_is_cancelled) {
|
||||
_data_removal.wait_for(l, std::chrono::seconds(1));
|
||||
if (_waiting_arrow_result_batch_rpc.empty()) {
|
||||
// TODO: Merge result into block to reduce rpc times
|
||||
int num_rows = result->rows();
|
||||
_arrow_flight_result_batch_queue.push_back(std::move(result));
|
||||
_buffer_rows += num_rows;
|
||||
_arrow_data_arrival
|
||||
.notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<vectorized::Block>,)
|
||||
} else {
|
||||
auto* ctx = _waiting_arrow_result_batch_rpc.front();
|
||||
_waiting_arrow_result_batch_rpc.pop_front();
|
||||
ctx->on_data(result, _packet_num, _be_exec_version,
|
||||
_fragement_transmission_compression_type, _timezone, _serialize_batch_ns_timer,
|
||||
_uncompressed_bytes_counter, _compressed_bytes_counter);
|
||||
_packet_num++;
|
||||
}
|
||||
|
||||
if (_is_cancelled) {
|
||||
return Status::Cancelled("Cancelled");
|
||||
}
|
||||
|
||||
// TODO: merge RocordBatch, ToStructArray -> Make again
|
||||
|
||||
_arrow_flight_batch_queue.push_back(std::move(result));
|
||||
_buffer_rows += num_rows;
|
||||
_data_arrival.notify_one();
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -211,37 +280,113 @@ void BufferControlBlock::get_batch(GetResultBatchCtx* ctx) {
|
||||
_waiting_rpc.push_back(ctx);
|
||||
}
|
||||
|
||||
Status BufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch>* result) {
|
||||
Status BufferControlBlock::get_arrow_batch(std::shared_ptr<vectorized::Block>* result,
|
||||
cctz::time_zone& timezone_obj) {
|
||||
std::unique_lock<std::mutex> l(_lock);
|
||||
if (!_status.ok()) {
|
||||
return _status;
|
||||
}
|
||||
if (_is_cancelled) {
|
||||
return Status::Cancelled("Cancelled");
|
||||
return Status::Cancelled(fmt::format("Cancelled ()", print_id(_fragment_id)));
|
||||
}
|
||||
|
||||
while (_arrow_flight_batch_queue.empty() && !_is_cancelled && !_is_close) {
|
||||
_data_arrival.wait_for(l, std::chrono::seconds(1));
|
||||
while (_arrow_flight_result_batch_queue.empty() && !_is_cancelled && !_is_close) {
|
||||
_arrow_data_arrival.wait_for(l, std::chrono::milliseconds(20));
|
||||
}
|
||||
|
||||
if (_is_cancelled) {
|
||||
return Status::Cancelled("Cancelled");
|
||||
return Status::Cancelled(fmt::format("Cancelled ()", print_id(_fragment_id)));
|
||||
}
|
||||
|
||||
if (!_arrow_flight_batch_queue.empty()) {
|
||||
*result = std::move(_arrow_flight_batch_queue.front());
|
||||
_arrow_flight_batch_queue.pop_front();
|
||||
_buffer_rows -= (*result)->num_rows();
|
||||
_data_removal.notify_one();
|
||||
if (!_arrow_flight_result_batch_queue.empty()) {
|
||||
*result = std::move(_arrow_flight_result_batch_queue.front());
|
||||
_arrow_flight_result_batch_queue.pop_front();
|
||||
timezone_obj = _timezone_obj;
|
||||
_buffer_rows -= (*result)->rows();
|
||||
_packet_num++;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// normal path end
|
||||
if (_is_close) {
|
||||
std::stringstream ss;
|
||||
_profile.pretty_print(&ss);
|
||||
VLOG_NOTICE << fmt::format(
|
||||
"BufferControlBlock finished, fragment_id={}, is_close={}, is_cancelled={}, "
|
||||
"packet_num={}, peak_memory_usage={}, profile={}",
|
||||
print_id(_fragment_id), _is_close, _is_cancelled, _packet_num,
|
||||
_mem_tracker->peak_consumption(), ss.str());
|
||||
return Status::OK();
|
||||
}
|
||||
return Status::InternalError("Get Arrow Batch Abnormal Ending");
|
||||
return Status::InternalError(
|
||||
fmt::format("Get Arrow Batch Abnormal Ending ()", print_id(_fragment_id)));
|
||||
}
|
||||
|
||||
void BufferControlBlock::get_arrow_batch(GetArrowResultBatchCtx* ctx) {
|
||||
std::unique_lock<std::mutex> l(_lock);
|
||||
SCOPED_ATTACH_TASK(_mem_tracker);
|
||||
if (!_status.ok()) {
|
||||
ctx->on_failure(_status);
|
||||
return;
|
||||
}
|
||||
if (_is_cancelled) {
|
||||
ctx->on_failure(Status::Cancelled(fmt::format("Cancelled ()", print_id(_fragment_id))));
|
||||
return;
|
||||
}
|
||||
|
||||
if (!_arrow_flight_result_batch_queue.empty()) {
|
||||
auto block = _arrow_flight_result_batch_queue.front();
|
||||
_arrow_flight_result_batch_queue.pop_front();
|
||||
|
||||
ctx->on_data(block, _packet_num, _be_exec_version, _fragement_transmission_compression_type,
|
||||
_timezone, _serialize_batch_ns_timer, _uncompressed_bytes_counter,
|
||||
_compressed_bytes_counter);
|
||||
_buffer_rows -= block->rows();
|
||||
_packet_num++;
|
||||
return;
|
||||
}
|
||||
|
||||
// normal path end
|
||||
if (_is_close) {
|
||||
ctx->on_close(_packet_num);
|
||||
std::stringstream ss;
|
||||
_profile.pretty_print(&ss);
|
||||
VLOG_NOTICE << fmt::format(
|
||||
"BufferControlBlock finished, fragment_id={}, is_close={}, is_cancelled={}, "
|
||||
"packet_num={}, peak_memory_usage={}, profile={}",
|
||||
print_id(_fragment_id), _is_close, _is_cancelled, _packet_num,
|
||||
_mem_tracker->peak_consumption(), ss.str());
|
||||
return;
|
||||
}
|
||||
// no ready data, push ctx to waiting list
|
||||
_waiting_arrow_result_batch_rpc.push_back(ctx);
|
||||
}
|
||||
|
||||
void BufferControlBlock::register_arrow_schema(const std::shared_ptr<arrow::Schema>& arrow_schema) {
|
||||
std::lock_guard<std::mutex> l(_lock);
|
||||
_arrow_schema = arrow_schema;
|
||||
}
|
||||
|
||||
Status BufferControlBlock::find_arrow_schema(std::shared_ptr<arrow::Schema>* arrow_schema) {
|
||||
std::unique_lock<std::mutex> l(_lock);
|
||||
if (!_status.ok()) {
|
||||
return _status;
|
||||
}
|
||||
if (_is_cancelled) {
|
||||
return Status::Cancelled(fmt::format("Cancelled ()", print_id(_fragment_id)));
|
||||
}
|
||||
|
||||
// normal path end
|
||||
if (_arrow_schema != nullptr) {
|
||||
*arrow_schema = _arrow_schema;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
if (_is_close) {
|
||||
return Status::RuntimeError(fmt::format("Closed ()", print_id(_fragment_id)));
|
||||
}
|
||||
return Status::InternalError(
|
||||
fmt::format("Get Arrow Schema Abnormal Ending ()", print_id(_fragment_id)));
|
||||
}
|
||||
|
||||
Status BufferControlBlock::close(Status exec_status) {
|
||||
@ -251,6 +396,7 @@ Status BufferControlBlock::close(Status exec_status) {
|
||||
|
||||
// notify blocked get thread
|
||||
_data_arrival.notify_all();
|
||||
_arrow_data_arrival.notify_all();
|
||||
if (!_waiting_rpc.empty()) {
|
||||
if (_status.ok()) {
|
||||
for (auto& ctx : _waiting_rpc) {
|
||||
@ -263,18 +409,38 @@ Status BufferControlBlock::close(Status exec_status) {
|
||||
}
|
||||
_waiting_rpc.clear();
|
||||
}
|
||||
|
||||
if (!_waiting_arrow_result_batch_rpc.empty()) {
|
||||
if (_status.ok()) {
|
||||
for (auto& ctx : _waiting_arrow_result_batch_rpc) {
|
||||
ctx->on_close(_packet_num);
|
||||
}
|
||||
} else {
|
||||
for (auto& ctx : _waiting_arrow_result_batch_rpc) {
|
||||
ctx->on_failure(_status);
|
||||
}
|
||||
}
|
||||
_waiting_arrow_result_batch_rpc.clear();
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void BufferControlBlock::cancel(const Status& reason) {
|
||||
std::unique_lock<std::mutex> l(_lock);
|
||||
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
|
||||
_is_cancelled = true;
|
||||
_data_removal.notify_all();
|
||||
_data_arrival.notify_all();
|
||||
_arrow_data_arrival.notify_all();
|
||||
for (auto& ctx : _waiting_rpc) {
|
||||
ctx->on_failure(reason);
|
||||
}
|
||||
_waiting_rpc.clear();
|
||||
for (auto& ctx : _waiting_arrow_result_batch_rpc) {
|
||||
ctx->on_failure(Status::Cancelled("Cancelled"));
|
||||
}
|
||||
_waiting_arrow_result_batch_rpc.clear();
|
||||
_arrow_flight_result_batch_queue.clear();
|
||||
}
|
||||
|
||||
Status PipBufferControlBlock::add_batch(std::unique_ptr<TFetchDataResult>& result,
|
||||
@ -284,7 +450,7 @@ Status PipBufferControlBlock::add_batch(std::unique_ptr<TFetchDataResult>& resul
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status PipBufferControlBlock::add_arrow_batch(std::shared_ptr<arrow::RecordBatch>& result) {
|
||||
Status PipBufferControlBlock::add_arrow_batch(std::shared_ptr<vectorized::Block>& result) {
|
||||
RETURN_IF_ERROR(BufferControlBlock::add_arrow_batch(result));
|
||||
_update_dependency();
|
||||
return Status::OK();
|
||||
@ -295,12 +461,18 @@ void PipBufferControlBlock::get_batch(GetResultBatchCtx* ctx) {
|
||||
_update_dependency();
|
||||
}
|
||||
|
||||
Status PipBufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch>* result) {
|
||||
RETURN_IF_ERROR(BufferControlBlock::get_arrow_batch(result));
|
||||
Status PipBufferControlBlock::get_arrow_batch(std::shared_ptr<vectorized::Block>* result,
|
||||
cctz::time_zone& timezone_obj) {
|
||||
RETURN_IF_ERROR(BufferControlBlock::get_arrow_batch(result, timezone_obj));
|
||||
_update_dependency();
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void PipBufferControlBlock::get_arrow_batch(GetArrowResultBatchCtx* ctx) {
|
||||
BufferControlBlock::get_arrow_batch(ctx);
|
||||
_update_dependency();
|
||||
}
|
||||
|
||||
void PipBufferControlBlock::cancel(const Status& reason) {
|
||||
BufferControlBlock::cancel(reason);
|
||||
_update_dependency();
|
||||
@ -322,7 +494,7 @@ void PipBufferControlBlock::_update_dependency() {
|
||||
}
|
||||
|
||||
void PipBufferControlBlock::_update_batch_queue_empty() {
|
||||
_batch_queue_empty = _fe_result_batch_queue.empty() && _arrow_flight_batch_queue.empty();
|
||||
_batch_queue_empty = _fe_result_batch_queue.empty() && _arrow_flight_result_batch_queue.empty();
|
||||
_update_dependency();
|
||||
}
|
||||
|
||||
|
||||
@ -17,8 +17,11 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <arrow/type.h>
|
||||
#include <cctz/time_zone.h>
|
||||
#include <gen_cpp/PaloInternalService_types.h>
|
||||
#include <gen_cpp/Types_types.h>
|
||||
#include <gen_cpp/segment_v2.pb.h>
|
||||
#include <stdint.h>
|
||||
|
||||
#include <atomic>
|
||||
@ -29,7 +32,9 @@
|
||||
#include <mutex>
|
||||
|
||||
#include "common/status.h"
|
||||
#include "runtime/memory/mem_tracker_limiter.h"
|
||||
#include "runtime/query_statistics.h"
|
||||
#include "util/runtime_profile.h"
|
||||
|
||||
namespace google {
|
||||
namespace protobuf {
|
||||
@ -51,7 +56,13 @@ namespace pipeline {
|
||||
class Dependency;
|
||||
} // namespace pipeline
|
||||
|
||||
namespace vectorized {
|
||||
class Block;
|
||||
} // namespace vectorized
|
||||
|
||||
class PFetchDataResult;
|
||||
class PFetchArrowDataResult;
|
||||
class RuntimeState;
|
||||
|
||||
struct GetResultBatchCtx {
|
||||
brpc::Controller* cntl = nullptr;
|
||||
@ -68,20 +79,46 @@ struct GetResultBatchCtx {
|
||||
bool eos = false);
|
||||
};
|
||||
|
||||
struct GetArrowResultBatchCtx {
|
||||
brpc::Controller* cntl = nullptr;
|
||||
PFetchArrowDataResult* result = nullptr;
|
||||
google::protobuf::Closure* done = nullptr;
|
||||
|
||||
GetArrowResultBatchCtx(brpc::Controller* cntl_, PFetchArrowDataResult* result_,
|
||||
google::protobuf::Closure* done_)
|
||||
: cntl(cntl_), result(result_), done(done_) {}
|
||||
|
||||
void on_failure(const Status& status);
|
||||
void on_close(int64_t packet_seq);
|
||||
void on_data(const std::shared_ptr<vectorized::Block>& block, int64_t packet_seq,
|
||||
int be_exec_version,
|
||||
segment_v2::CompressionTypePB fragement_transmission_compression_type,
|
||||
std::string timezone, RuntimeProfile::Counter* serialize_batch_ns_timer,
|
||||
RuntimeProfile::Counter* uncompressed_bytes_counter,
|
||||
RuntimeProfile::Counter* compressed_bytes_counter);
|
||||
};
|
||||
|
||||
// buffer used for result customer and producer
|
||||
class BufferControlBlock {
|
||||
public:
|
||||
BufferControlBlock(const TUniqueId& id, int buffer_size);
|
||||
BufferControlBlock(const TUniqueId& id, int buffer_size, RuntimeState* state);
|
||||
virtual ~BufferControlBlock();
|
||||
|
||||
Status init();
|
||||
// Only one fragment is written, so can_sink returns true, then the sink must be executed
|
||||
virtual bool can_sink();
|
||||
virtual Status add_batch(std::unique_ptr<TFetchDataResult>& result, bool is_pipeline = false);
|
||||
virtual Status add_arrow_batch(std::shared_ptr<arrow::RecordBatch>& result);
|
||||
virtual Status add_arrow_batch(std::shared_ptr<vectorized::Block>& result);
|
||||
|
||||
virtual void get_batch(GetResultBatchCtx* ctx);
|
||||
virtual Status get_arrow_batch(std::shared_ptr<arrow::RecordBatch>* result);
|
||||
// for ArrowFlightBatchLocalReader
|
||||
virtual Status get_arrow_batch(std::shared_ptr<vectorized::Block>* result,
|
||||
cctz::time_zone& timezone_obj);
|
||||
// for ArrowFlightBatchRemoteReader
|
||||
virtual void get_arrow_batch(GetArrowResultBatchCtx* ctx);
|
||||
|
||||
virtual void register_arrow_schema(const std::shared_ptr<arrow::Schema>& arrow_schema);
|
||||
virtual Status find_arrow_schema(std::shared_ptr<arrow::Schema>* arrow_schema);
|
||||
|
||||
// close buffer block, set _status to exec_status and set _is_close to true;
|
||||
// called because data has been read or error happened.
|
||||
@ -90,6 +127,7 @@ public:
|
||||
virtual void cancel(const Status& reason);
|
||||
|
||||
[[nodiscard]] const TUniqueId& fragment_id() const { return _fragment_id; }
|
||||
[[nodiscard]] std::shared_ptr<MemTrackerLimiter> mem_tracker() { return _mem_tracker; }
|
||||
|
||||
void update_return_rows(int64_t num_rows) {
|
||||
// _query_statistics may be null when the result sink init failed
|
||||
@ -102,12 +140,12 @@ public:
|
||||
|
||||
protected:
|
||||
virtual bool _get_batch_queue_empty() {
|
||||
return _fe_result_batch_queue.empty() && _arrow_flight_batch_queue.empty();
|
||||
return _fe_result_batch_queue.empty() && _arrow_flight_result_batch_queue.empty();
|
||||
}
|
||||
virtual void _update_batch_queue_empty() {}
|
||||
|
||||
using FeResultQueue = std::list<std::unique_ptr<TFetchDataResult>>;
|
||||
using ArrowFlightResultQueue = std::list<std::shared_ptr<arrow::RecordBatch>>;
|
||||
using ArrowFlightResultQueue = std::list<std::shared_ptr<vectorized::Block>>;
|
||||
|
||||
// result's query id
|
||||
TUniqueId _fragment_id;
|
||||
@ -120,7 +158,9 @@ protected:
|
||||
|
||||
// blocking queue for batch
|
||||
FeResultQueue _fe_result_batch_queue;
|
||||
ArrowFlightResultQueue _arrow_flight_batch_queue;
|
||||
ArrowFlightResultQueue _arrow_flight_result_batch_queue;
|
||||
// for arrow flight
|
||||
std::shared_ptr<arrow::Schema> _arrow_schema;
|
||||
|
||||
// protects all subsequent data in this block
|
||||
std::mutex _lock;
|
||||
@ -128,17 +168,33 @@ protected:
|
||||
std::condition_variable _data_arrival;
|
||||
// signal removal of data by stream consumer
|
||||
std::condition_variable _data_removal;
|
||||
// get arrow flight result is a sync method, need wait for data ready and return result.
|
||||
// TODO, waiting for data will block pipeline, so use a request pool to save requests waiting for data.
|
||||
std::condition_variable _arrow_data_arrival;
|
||||
|
||||
std::deque<GetResultBatchCtx*> _waiting_rpc;
|
||||
std::deque<GetArrowResultBatchCtx*> _waiting_arrow_result_batch_rpc;
|
||||
|
||||
// only used for FE using return rows to check limit
|
||||
std::unique_ptr<QueryStatistics> _query_statistics;
|
||||
|
||||
std::string _timezone;
|
||||
cctz::time_zone _timezone_obj;
|
||||
int _be_exec_version;
|
||||
segment_v2::CompressionTypePB _fragement_transmission_compression_type;
|
||||
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
|
||||
|
||||
// only used for ArrowFlightBatchRemoteReader
|
||||
RuntimeProfile _profile;
|
||||
RuntimeProfile::Counter* _serialize_batch_ns_timer = nullptr;
|
||||
RuntimeProfile::Counter* _uncompressed_bytes_counter = nullptr;
|
||||
RuntimeProfile::Counter* _compressed_bytes_counter = nullptr;
|
||||
};
|
||||
|
||||
class PipBufferControlBlock : public BufferControlBlock {
|
||||
public:
|
||||
PipBufferControlBlock(const TUniqueId& id, int buffer_size)
|
||||
: BufferControlBlock(id, buffer_size) {}
|
||||
PipBufferControlBlock(const TUniqueId& id, int buffer_size, RuntimeState* state)
|
||||
: BufferControlBlock(id, buffer_size, state) {}
|
||||
|
||||
bool can_sink() override {
|
||||
return _get_batch_queue_empty() || _buffer_rows < _buffer_limit || _is_cancelled;
|
||||
@ -146,11 +202,14 @@ public:
|
||||
|
||||
Status add_batch(std::unique_ptr<TFetchDataResult>& result, bool is_pipeline = true) override;
|
||||
|
||||
Status add_arrow_batch(std::shared_ptr<arrow::RecordBatch>& result) override;
|
||||
Status add_arrow_batch(std::shared_ptr<vectorized::Block>& result) override;
|
||||
|
||||
void get_batch(GetResultBatchCtx* ctx) override;
|
||||
|
||||
Status get_arrow_batch(std::shared_ptr<arrow::RecordBatch>* result) override;
|
||||
Status get_arrow_batch(std::shared_ptr<vectorized::Block>* result,
|
||||
cctz::time_zone& timezone_obj) override;
|
||||
|
||||
void get_arrow_batch(GetArrowResultBatchCtx* ctx) override;
|
||||
|
||||
void cancel(const Status& reason) override;
|
||||
|
||||
|
||||
@ -34,6 +34,7 @@
|
||||
#include "arrow/type_fwd.h"
|
||||
#include "common/status.h"
|
||||
#include "runtime/buffer_control_block.h"
|
||||
#include "runtime/runtime_state.h"
|
||||
#include "util/doris_metrics.h"
|
||||
#include "util/metrics.h"
|
||||
#include "util/thread.h"
|
||||
@ -69,7 +70,7 @@ Status ResultBufferMgr::init() {
|
||||
|
||||
Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size,
|
||||
std::shared_ptr<BufferControlBlock>* sender,
|
||||
bool enable_pipeline, int exec_timout) {
|
||||
bool enable_pipeline, RuntimeState* state) {
|
||||
*sender = find_control_block(query_id);
|
||||
if (*sender != nullptr) {
|
||||
LOG(WARNING) << "already have buffer control block for this instance " << query_id;
|
||||
@ -79,9 +80,9 @@ Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size
|
||||
std::shared_ptr<BufferControlBlock> control_block = nullptr;
|
||||
|
||||
if (enable_pipeline) {
|
||||
control_block = std::make_shared<PipBufferControlBlock>(query_id, buffer_size);
|
||||
control_block = std::make_shared<PipBufferControlBlock>(query_id, buffer_size, state);
|
||||
} else {
|
||||
control_block = std::make_shared<BufferControlBlock>(query_id, buffer_size);
|
||||
control_block = std::make_shared<BufferControlBlock>(query_id, buffer_size, state);
|
||||
}
|
||||
|
||||
{
|
||||
@ -92,7 +93,7 @@ Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size
|
||||
// otherwise in some case may block all fragment handle threads
|
||||
// details see issue https://github.com/apache/doris/issues/16203
|
||||
// add extra 5s for avoid corner case
|
||||
int64_t max_timeout = time(nullptr) + exec_timout + 5;
|
||||
int64_t max_timeout = time(nullptr) + state->execution_timeout() + 5;
|
||||
cancel_at_time(max_timeout, query_id);
|
||||
}
|
||||
*sender = control_block;
|
||||
@ -110,27 +111,19 @@ std::shared_ptr<BufferControlBlock> ResultBufferMgr::find_control_block(const TU
|
||||
return std::shared_ptr<BufferControlBlock>();
|
||||
}
|
||||
|
||||
void ResultBufferMgr::register_arrow_schema(const TUniqueId& query_id,
|
||||
const std::shared_ptr<arrow::Schema>& arrow_schema) {
|
||||
std::unique_lock<std::shared_mutex> wlock(_arrow_schema_map_lock);
|
||||
_arrow_schema_map.insert(std::make_pair(query_id, arrow_schema));
|
||||
}
|
||||
|
||||
std::shared_ptr<arrow::Schema> ResultBufferMgr::find_arrow_schema(const TUniqueId& query_id) {
|
||||
std::shared_lock<std::shared_mutex> rlock(_arrow_schema_map_lock);
|
||||
auto iter = _arrow_schema_map.find(query_id);
|
||||
|
||||
if (_arrow_schema_map.end() != iter) {
|
||||
return iter->second;
|
||||
Status ResultBufferMgr::find_arrow_schema(const TUniqueId& finst_id,
|
||||
std::shared_ptr<arrow::Schema>* schema) {
|
||||
std::shared_ptr<BufferControlBlock> cb = find_control_block(finst_id);
|
||||
if (cb == nullptr) {
|
||||
return Status::InternalError(
|
||||
"no arrow schema for this query, maybe query has been canceled, finst_id={}",
|
||||
print_id(finst_id));
|
||||
}
|
||||
|
||||
return nullptr;
|
||||
return cb->find_arrow_schema(schema);
|
||||
}
|
||||
|
||||
void ResultBufferMgr::fetch_data(const PUniqueId& finst_id, GetResultBatchCtx* ctx) {
|
||||
TUniqueId tid;
|
||||
tid.__set_hi(finst_id.hi());
|
||||
tid.__set_lo(finst_id.lo());
|
||||
TUniqueId tid = UniqueId(finst_id).to_thrift();
|
||||
std::shared_ptr<BufferControlBlock> cb = find_control_block(tid);
|
||||
if (cb == nullptr) {
|
||||
LOG(WARNING) << "no result for this query, id=" << print_id(tid);
|
||||
@ -140,17 +133,43 @@ void ResultBufferMgr::fetch_data(const PUniqueId& finst_id, GetResultBatchCtx* c
|
||||
cb->get_batch(ctx);
|
||||
}
|
||||
|
||||
Status ResultBufferMgr::fetch_arrow_data(const TUniqueId& finst_id,
|
||||
std::shared_ptr<arrow::RecordBatch>* result) {
|
||||
Status ResultBufferMgr::find_mem_tracker(const TUniqueId& finst_id,
|
||||
std::shared_ptr<MemTrackerLimiter>* mem_tracker) {
|
||||
std::shared_ptr<BufferControlBlock> cb = find_control_block(finst_id);
|
||||
if (cb == nullptr) {
|
||||
LOG(WARNING) << "no result for this query, id=" << print_id(finst_id);
|
||||
return Status::InternalError("no result for this query");
|
||||
return Status::InternalError(
|
||||
"no result for this query, maybe query has been canceled, finst_id={}",
|
||||
print_id(finst_id));
|
||||
}
|
||||
RETURN_IF_ERROR(cb->get_arrow_batch(result));
|
||||
*mem_tracker = cb->mem_tracker();
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status ResultBufferMgr::fetch_arrow_data(const TUniqueId& finst_id,
|
||||
std::shared_ptr<vectorized::Block>* result,
|
||||
cctz::time_zone& timezone_obj) {
|
||||
std::shared_ptr<BufferControlBlock> cb = find_control_block(finst_id);
|
||||
if (cb == nullptr) {
|
||||
return Status::InternalError(
|
||||
"no result for this query, maybe query has been canceled, finst_id={}",
|
||||
print_id(finst_id));
|
||||
}
|
||||
RETURN_IF_ERROR(cb->get_arrow_batch(result, timezone_obj));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void ResultBufferMgr::fetch_arrow_data(const PUniqueId& finst_id, GetArrowResultBatchCtx* ctx) {
|
||||
TUniqueId tid = UniqueId(finst_id).to_thrift();
|
||||
std::shared_ptr<BufferControlBlock> cb = find_control_block(tid);
|
||||
if (cb == nullptr) {
|
||||
ctx->on_failure(Status::InternalError(
|
||||
"no result for this query, maybe query has been canceled, finst_id={}",
|
||||
print_id(tid)));
|
||||
return;
|
||||
}
|
||||
cb->get_arrow_batch(ctx);
|
||||
}
|
||||
|
||||
void ResultBufferMgr::cancel(const TUniqueId& query_id, const Status& reason) {
|
||||
{
|
||||
std::unique_lock<std::shared_mutex> wlock(_buffer_map_lock);
|
||||
@ -161,15 +180,6 @@ void ResultBufferMgr::cancel(const TUniqueId& query_id, const Status& reason) {
|
||||
_buffer_map.erase(iter);
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
std::unique_lock<std::shared_mutex> wlock(_arrow_schema_map_lock);
|
||||
auto arrow_schema_iter = _arrow_schema_map.find(query_id);
|
||||
|
||||
if (_arrow_schema_map.end() != arrow_schema_iter) {
|
||||
_arrow_schema_map.erase(arrow_schema_iter);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ResultBufferMgr::cancel_at_time(time_t cancel_time, const TUniqueId& query_id) {
|
||||
|
||||
@ -17,7 +17,9 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <cctz/time_zone.h>
|
||||
#include <gen_cpp/Types_types.h>
|
||||
#include <gen_cpp/segment_v2.pb.h>
|
||||
|
||||
#include <ctime>
|
||||
#include <map>
|
||||
@ -41,8 +43,14 @@ namespace doris {
|
||||
|
||||
class BufferControlBlock;
|
||||
struct GetResultBatchCtx;
|
||||
struct GetArrowResultBatchCtx;
|
||||
class PUniqueId;
|
||||
class RuntimeState;
|
||||
class MemTrackerLimiter;
|
||||
class Thread;
|
||||
namespace vectorized {
|
||||
class Block;
|
||||
} // namespace vectorized
|
||||
|
||||
// manage all result buffer control block in one backend
|
||||
class ResultBufferMgr {
|
||||
@ -59,16 +67,18 @@ public:
|
||||
// sender is not used when call cancel or unregister
|
||||
Status create_sender(const TUniqueId& query_id, int buffer_size,
|
||||
std::shared_ptr<BufferControlBlock>* sender, bool enable_pipeline,
|
||||
int exec_timeout);
|
||||
RuntimeState* state);
|
||||
|
||||
// fetch data result to FE
|
||||
void fetch_data(const PUniqueId& finst_id, GetResultBatchCtx* ctx);
|
||||
// fetch data result to Arrow Flight Server
|
||||
Status fetch_arrow_data(const TUniqueId& finst_id, std::shared_ptr<arrow::RecordBatch>* result);
|
||||
|
||||
void register_arrow_schema(const TUniqueId& query_id,
|
||||
const std::shared_ptr<arrow::Schema>& arrow_schema);
|
||||
std::shared_ptr<arrow::Schema> find_arrow_schema(const TUniqueId& query_id);
|
||||
// fetch data result to Arrow Flight Client
|
||||
Status fetch_arrow_data(const TUniqueId& finst_id, std::shared_ptr<vectorized::Block>* result,
|
||||
cctz::time_zone& timezone_obj);
|
||||
// fetch data result to Other BE forwards to Client
|
||||
void fetch_arrow_data(const PUniqueId& finst_id, GetArrowResultBatchCtx* ctx);
|
||||
Status find_mem_tracker(const TUniqueId& finst_id,
|
||||
std::shared_ptr<MemTrackerLimiter>* mem_tracker);
|
||||
Status find_arrow_schema(const TUniqueId& query_id, std::shared_ptr<arrow::Schema>* schema);
|
||||
|
||||
// cancel
|
||||
void cancel(const TUniqueId& query_id, const Status& reason);
|
||||
@ -79,7 +89,6 @@ public:
|
||||
private:
|
||||
using BufferMap = std::unordered_map<TUniqueId, std::shared_ptr<BufferControlBlock>>;
|
||||
using TimeoutMap = std::map<time_t, std::vector<TUniqueId>>;
|
||||
using ArrowSchemaMap = std::unordered_map<TUniqueId, std::shared_ptr<arrow::Schema>>;
|
||||
|
||||
std::shared_ptr<BufferControlBlock> find_control_block(const TUniqueId& query_id);
|
||||
|
||||
@ -91,10 +100,6 @@ private:
|
||||
std::shared_mutex _buffer_map_lock;
|
||||
// buffer block map
|
||||
BufferMap _buffer_map;
|
||||
// lock for arrow schema map
|
||||
std::shared_mutex _arrow_schema_map_lock;
|
||||
// for arrow flight
|
||||
ArrowSchemaMap _arrow_schema_map;
|
||||
|
||||
// lock for timeout map
|
||||
std::mutex _timeout_lock;
|
||||
|
||||
@ -17,53 +17,294 @@
|
||||
|
||||
#include "service/arrow_flight/arrow_flight_batch_reader.h"
|
||||
|
||||
#include <arrow/io/memory.h>
|
||||
#include <arrow/ipc/reader.h>
|
||||
#include <arrow/status.h>
|
||||
#include <arrow/type.h>
|
||||
#include <gen_cpp/internal_service.pb.h>
|
||||
|
||||
#include "arrow/builder.h"
|
||||
#include "runtime/exec_env.h"
|
||||
#include "runtime/memory/mem_tracker_limiter.h"
|
||||
#include "runtime/result_buffer_mgr.h"
|
||||
#include "runtime/thread_context.h"
|
||||
#include "service/backend_options.h"
|
||||
#include "util/arrow/block_convertor.h"
|
||||
#include "util/arrow/row_batch.h"
|
||||
#include "util/arrow/utils.h"
|
||||
#include "util/brpc_client_cache.h"
|
||||
#include "util/ref_count_closure.h"
|
||||
#include "util/runtime_profile.h"
|
||||
#include "vec/core/block.h"
|
||||
|
||||
namespace doris {
|
||||
namespace flight {
|
||||
namespace doris::flight {
|
||||
|
||||
std::shared_ptr<arrow::Schema> ArrowFlightBatchReader::schema() const {
|
||||
return schema_;
|
||||
ArrowFlightBatchReaderBase::ArrowFlightBatchReaderBase(
|
||||
const std::shared_ptr<QueryStatement>& statement)
|
||||
: _statement(statement) {}
|
||||
|
||||
std::shared_ptr<arrow::Schema> ArrowFlightBatchReaderBase::schema() const {
|
||||
return _schema;
|
||||
}
|
||||
|
||||
ArrowFlightBatchReader::ArrowFlightBatchReader(std::shared_ptr<QueryStatement> statement,
|
||||
std::shared_ptr<arrow::Schema> schema)
|
||||
: statement_(std::move(statement)), schema_(std::move(schema)) {}
|
||||
arrow::Status ArrowFlightBatchReaderBase::_return_invalid_status(const std::string& msg) {
|
||||
std::string status_msg =
|
||||
fmt::format("ArrowFlightBatchReader {}, packet_seq={}, result={}:{}, finistId={}", msg,
|
||||
_packet_seq, _statement->result_addr.hostname, _statement->result_addr.port,
|
||||
print_id(_statement->query_id));
|
||||
LOG(WARNING) << status_msg;
|
||||
return arrow::Status::Invalid(status_msg);
|
||||
}
|
||||
|
||||
arrow::Result<std::shared_ptr<ArrowFlightBatchReader>> ArrowFlightBatchReader::Create(
|
||||
const std::shared_ptr<QueryStatement>& statement_) {
|
||||
ArrowFlightBatchReaderBase::~ArrowFlightBatchReaderBase() {
|
||||
VLOG_NOTICE << fmt::format(
|
||||
"ArrowFlightBatchReader finished, packet_seq={}, result_addr={}:{}, finistId={}, "
|
||||
"convert_arrow_batch_timer={}, deserialize_block_timer={}, peak_memory_usage={}",
|
||||
_packet_seq, _statement->result_addr.hostname, _statement->result_addr.port,
|
||||
print_id(_statement->query_id), _convert_arrow_batch_timer, _deserialize_block_timer,
|
||||
_mem_tracker->peak_consumption());
|
||||
}
|
||||
|
||||
ArrowFlightBatchLocalReader::ArrowFlightBatchLocalReader(
|
||||
const std::shared_ptr<QueryStatement>& statement,
|
||||
const std::shared_ptr<arrow::Schema>& schema,
|
||||
const std::shared_ptr<MemTrackerLimiter>& mem_tracker)
|
||||
: ArrowFlightBatchReaderBase(statement) {
|
||||
_schema = schema;
|
||||
_mem_tracker = mem_tracker;
|
||||
}
|
||||
|
||||
arrow::Result<std::shared_ptr<ArrowFlightBatchLocalReader>> ArrowFlightBatchLocalReader::Create(
|
||||
const std::shared_ptr<QueryStatement>& statement) {
|
||||
DCHECK(statement->result_addr.hostname == BackendOptions::get_localhost());
|
||||
// Make sure that FE send the fragment to BE and creates the BufferControlBlock before returning ticket
|
||||
// to the ADBC client, so that the schema and control block can be found.
|
||||
auto schema = ExecEnv::GetInstance()->result_mgr()->find_arrow_schema(statement_->query_id);
|
||||
if (schema == nullptr) {
|
||||
ARROW_RETURN_NOT_OK(arrow::Status::Invalid(fmt::format(
|
||||
"Client not found arrow flight schema, maybe query has been canceled, queryid: {}",
|
||||
print_id(statement_->query_id))));
|
||||
}
|
||||
std::shared_ptr<ArrowFlightBatchReader> result(new ArrowFlightBatchReader(statement_, schema));
|
||||
std::shared_ptr<arrow::Schema> schema;
|
||||
RETURN_ARROW_STATUS_IF_ERROR(
|
||||
ExecEnv::GetInstance()->result_mgr()->find_arrow_schema(statement->query_id, &schema));
|
||||
std::shared_ptr<MemTrackerLimiter> mem_tracker;
|
||||
RETURN_ARROW_STATUS_IF_ERROR(ExecEnv::GetInstance()->result_mgr()->find_mem_tracker(
|
||||
statement->query_id, &mem_tracker));
|
||||
|
||||
std::shared_ptr<ArrowFlightBatchLocalReader> result(
|
||||
new ArrowFlightBatchLocalReader(statement, schema, mem_tracker));
|
||||
return result;
|
||||
}
|
||||
|
||||
arrow::Status ArrowFlightBatchReader::ReadNext(std::shared_ptr<arrow::RecordBatch>* out) {
|
||||
// *out not nullptr
|
||||
arrow::Status ArrowFlightBatchLocalReader::ReadNext(std::shared_ptr<arrow::RecordBatch>* out) {
|
||||
// parameter *out not nullptr
|
||||
*out = nullptr;
|
||||
auto st = ExecEnv::GetInstance()->result_mgr()->fetch_arrow_data(statement_->query_id, out);
|
||||
if (UNLIKELY(!st.ok())) {
|
||||
LOG(WARNING) << "ArrowFlightBatchReader fetch arrow data failed: " + st.to_string();
|
||||
SCOPED_ATTACH_TASK(_mem_tracker);
|
||||
std::shared_ptr<vectorized::Block> result;
|
||||
auto st = ExecEnv::GetInstance()->result_mgr()->fetch_arrow_data(_statement->query_id, &result,
|
||||
_timezone_obj);
|
||||
st.prepend("ArrowFlightBatchLocalReader fetch arrow data failed");
|
||||
ARROW_RETURN_NOT_OK(to_arrow_status(st));
|
||||
if (result == nullptr) {
|
||||
// eof, normal path end
|
||||
return arrow::Status::OK();
|
||||
}
|
||||
|
||||
{
|
||||
// convert one batch
|
||||
SCOPED_ATOMIC_TIMER(&_convert_arrow_batch_timer);
|
||||
st = convert_to_arrow_batch(*result, _schema, arrow::default_memory_pool(), out,
|
||||
_timezone_obj);
|
||||
st.prepend("ArrowFlightBatchLocalReader convert block to arrow batch failed");
|
||||
ARROW_RETURN_NOT_OK(to_arrow_status(st));
|
||||
}
|
||||
|
||||
_packet_seq++;
|
||||
if (*out != nullptr) {
|
||||
VLOG_NOTICE << "ArrowFlightBatchReader read next: " << (*out)->num_rows() << ", "
|
||||
<< (*out)->num_columns();
|
||||
VLOG_NOTICE << "ArrowFlightBatchLocalReader read next: " << (*out)->num_rows() << ", "
|
||||
<< (*out)->num_columns() << ", packet_seq: " << _packet_seq;
|
||||
}
|
||||
return arrow::Status::OK();
|
||||
}
|
||||
|
||||
} // namespace flight
|
||||
} // namespace doris
|
||||
ArrowFlightBatchRemoteReader::ArrowFlightBatchRemoteReader(
|
||||
const std::shared_ptr<QueryStatement>& statement,
|
||||
const std::shared_ptr<PBackendService_Stub>& stub)
|
||||
: ArrowFlightBatchReaderBase(statement), _brpc_stub(stub), _block(nullptr) {
|
||||
_mem_tracker = MemTrackerLimiter::create_shared(
|
||||
MemTrackerLimiter::Type::QUERY,
|
||||
fmt::format("ArrowFlightBatchRemoteReader#QueryId={}", print_id(_statement->query_id)));
|
||||
}
|
||||
|
||||
arrow::Result<std::shared_ptr<ArrowFlightBatchRemoteReader>> ArrowFlightBatchRemoteReader::Create(
|
||||
const std::shared_ptr<QueryStatement>& statement) {
|
||||
std::shared_ptr<PBackendService_Stub> stub =
|
||||
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(
|
||||
statement->result_addr);
|
||||
if (!stub) {
|
||||
std::string msg = fmt::format(
|
||||
"ArrowFlightBatchRemoteReader get rpc stub failed, result_addr={}:{}, finistId={}",
|
||||
statement->result_addr.hostname, statement->result_addr.port,
|
||||
print_id(statement->query_id));
|
||||
LOG(WARNING) << msg;
|
||||
return arrow::Status::Invalid(msg);
|
||||
}
|
||||
|
||||
std::shared_ptr<ArrowFlightBatchRemoteReader> result(
|
||||
new ArrowFlightBatchRemoteReader(statement, stub));
|
||||
ARROW_RETURN_NOT_OK(result->init_schema());
|
||||
return result;
|
||||
}
|
||||
|
||||
arrow::Status ArrowFlightBatchRemoteReader::_fetch_schema() {
|
||||
Status st;
|
||||
auto request = std::make_shared<PFetchArrowFlightSchemaRequest>();
|
||||
auto* pfinst_id = request->mutable_finst_id();
|
||||
pfinst_id->set_hi(_statement->query_id.hi);
|
||||
pfinst_id->set_lo(_statement->query_id.lo);
|
||||
auto callback = DummyBrpcCallback<PFetchArrowFlightSchemaResult>::create_shared();
|
||||
auto closure = AutoReleaseClosure<
|
||||
PFetchArrowFlightSchemaRequest,
|
||||
DummyBrpcCallback<PFetchArrowFlightSchemaResult>>::create_unique(request, callback);
|
||||
callback->cntl_->set_timeout_ms(config::arrow_flight_reader_brpc_controller_timeout_ms);
|
||||
callback->cntl_->ignore_eovercrowded();
|
||||
|
||||
_brpc_stub->fetch_arrow_flight_schema(closure->cntl_.get(), closure->request_.get(),
|
||||
closure->response_.get(), closure.get());
|
||||
closure.release();
|
||||
callback->join();
|
||||
|
||||
if (callback->cntl_->Failed()) {
|
||||
if (!ExecEnv::GetInstance()->brpc_internal_client_cache()->available(
|
||||
_brpc_stub, _statement->result_addr.hostname, _statement->result_addr.port)) {
|
||||
ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
|
||||
callback->cntl_->remote_side());
|
||||
}
|
||||
auto error_code = callback->cntl_->ErrorCode();
|
||||
auto error_text = callback->cntl_->ErrorText();
|
||||
return _return_invalid_status(fmt::format("fetch schema error: {}, error_text: {}",
|
||||
berror(error_code), error_text));
|
||||
}
|
||||
st = Status::create(callback->response_->status());
|
||||
ARROW_RETURN_NOT_OK(to_arrow_status(st));
|
||||
|
||||
if (callback->response_->has_schema() && !callback->response_->schema().empty()) {
|
||||
auto input =
|
||||
arrow::io::BufferReader::FromString(std::string(callback->response_->schema()));
|
||||
ARROW_ASSIGN_OR_RAISE(auto reader,
|
||||
arrow::ipc::RecordBatchStreamReader::Open(
|
||||
input.get(), arrow::ipc::IpcReadOptions::Defaults()));
|
||||
_schema = reader->schema();
|
||||
} else {
|
||||
return _return_invalid_status(fmt::format("fetch schema error: not find schema"));
|
||||
}
|
||||
return arrow::Status::OK();
|
||||
}
|
||||
|
||||
arrow::Status ArrowFlightBatchRemoteReader::_fetch_data() {
|
||||
DCHECK(_block == nullptr);
|
||||
while (true) {
|
||||
// if `continue` occurs, data is invalid, continue fetch, block is nullptr.
|
||||
// if `break` occurs, fetch data successfully (block is not nullptr) or fetch eos.
|
||||
Status st;
|
||||
auto request = std::make_shared<PFetchArrowDataRequest>();
|
||||
auto* pfinst_id = request->mutable_finst_id();
|
||||
pfinst_id->set_hi(_statement->query_id.hi);
|
||||
pfinst_id->set_lo(_statement->query_id.lo);
|
||||
auto callback = DummyBrpcCallback<PFetchArrowDataResult>::create_shared();
|
||||
auto closure = AutoReleaseClosure<
|
||||
PFetchArrowDataRequest,
|
||||
DummyBrpcCallback<PFetchArrowDataResult>>::create_unique(request, callback);
|
||||
callback->cntl_->set_timeout_ms(config::arrow_flight_reader_brpc_controller_timeout_ms);
|
||||
callback->cntl_->ignore_eovercrowded();
|
||||
|
||||
_brpc_stub->fetch_arrow_data(closure->cntl_.get(), closure->request_.get(),
|
||||
closure->response_.get(), closure.get());
|
||||
closure.release();
|
||||
callback->join();
|
||||
|
||||
if (callback->cntl_->Failed()) {
|
||||
if (!ExecEnv::GetInstance()->brpc_internal_client_cache()->available(
|
||||
_brpc_stub, _statement->result_addr.hostname,
|
||||
_statement->result_addr.port)) {
|
||||
ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
|
||||
callback->cntl_->remote_side());
|
||||
}
|
||||
auto error_code = callback->cntl_->ErrorCode();
|
||||
auto error_text = callback->cntl_->ErrorText();
|
||||
return _return_invalid_status(fmt::format("fetch data error={}, error_text: {}",
|
||||
berror(error_code), error_text));
|
||||
}
|
||||
st = Status::create(callback->response_->status());
|
||||
ARROW_RETURN_NOT_OK(to_arrow_status(st));
|
||||
|
||||
DCHECK(callback->response_->has_packet_seq());
|
||||
if (_packet_seq != callback->response_->packet_seq()) {
|
||||
return _return_invalid_status(
|
||||
fmt::format("fetch data receive packet failed, expect: {}, receive: {}",
|
||||
_packet_seq, callback->response_->packet_seq()));
|
||||
}
|
||||
_packet_seq++;
|
||||
|
||||
if (callback->response_->has_eos() && callback->response_->eos()) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (callback->response_->has_empty_batch() && callback->response_->empty_batch()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
DCHECK(callback->response_->has_block());
|
||||
if (callback->response_->block().ByteSizeLong() == 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
std::call_once(_timezone_once_flag, [this, callback] {
|
||||
DCHECK(callback->response_->has_timezone());
|
||||
TimezoneUtils::find_cctz_time_zone(callback->response_->timezone(), _timezone_obj);
|
||||
});
|
||||
|
||||
{
|
||||
SCOPED_ATOMIC_TIMER(&_deserialize_block_timer);
|
||||
_block = vectorized::Block::create_shared();
|
||||
st = _block->deserialize(callback->response_->block());
|
||||
ARROW_RETURN_NOT_OK(to_arrow_status(st));
|
||||
break;
|
||||
}
|
||||
|
||||
const auto rows = _block->rows();
|
||||
if (rows == 0) {
|
||||
_block = nullptr;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
return arrow::Status::OK();
|
||||
}
|
||||
|
||||
arrow::Status ArrowFlightBatchRemoteReader::init_schema() {
|
||||
ARROW_RETURN_NOT_OK(_fetch_schema());
|
||||
DCHECK(_schema != nullptr);
|
||||
return arrow::Status::OK();
|
||||
}
|
||||
|
||||
arrow::Status ArrowFlightBatchRemoteReader::ReadNext(std::shared_ptr<arrow::RecordBatch>* out) {
|
||||
// parameter *out not nullptr
|
||||
*out = nullptr;
|
||||
SCOPED_ATTACH_TASK(_mem_tracker);
|
||||
ARROW_RETURN_NOT_OK(_fetch_data());
|
||||
if (_block == nullptr) {
|
||||
// eof, normal path end, last _fetch_data return block is nullptr
|
||||
return arrow::Status::OK();
|
||||
}
|
||||
{
|
||||
// convert one batch
|
||||
SCOPED_ATOMIC_TIMER(&_convert_arrow_batch_timer);
|
||||
auto st = convert_to_arrow_batch(*_block, _schema, arrow::default_memory_pool(), out,
|
||||
_timezone_obj);
|
||||
st.prepend("ArrowFlightBatchRemoteReader convert block to arrow batch failed");
|
||||
ARROW_RETURN_NOT_OK(to_arrow_status(st));
|
||||
}
|
||||
_block = nullptr;
|
||||
|
||||
if (*out != nullptr) {
|
||||
VLOG_NOTICE << "ArrowFlightBatchRemoteReader read next: " << (*out)->num_rows() << ", "
|
||||
<< (*out)->num_columns() << ", packet_seq: " << _packet_seq;
|
||||
}
|
||||
return arrow::Status::OK();
|
||||
}
|
||||
|
||||
} // namespace doris::flight
|
||||
|
||||
@ -17,40 +17,91 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <cctz/time_zone.h>
|
||||
#include <gen_cpp/Types_types.h>
|
||||
|
||||
#include <memory>
|
||||
#include <utility>
|
||||
|
||||
#include "arrow/record_batch.h"
|
||||
#include "runtime/exec_env.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
namespace vectorized {
|
||||
class Block;
|
||||
} // namespace vectorized
|
||||
|
||||
namespace flight {
|
||||
|
||||
struct QueryStatement {
|
||||
public:
|
||||
TUniqueId query_id;
|
||||
TNetworkAddress result_addr; // BE brpc ip & port
|
||||
std::string sql;
|
||||
|
||||
QueryStatement(const TUniqueId& query_id_, const std::string& sql_)
|
||||
: query_id(query_id_), sql(sql_) {}
|
||||
QueryStatement(TUniqueId query_id_, TNetworkAddress result_addr_, std::string sql_)
|
||||
: query_id(std::move(query_id_)),
|
||||
result_addr(std::move(result_addr_)),
|
||||
sql(std::move(sql_)) {}
|
||||
};
|
||||
|
||||
class ArrowFlightBatchReader : public arrow::RecordBatchReader {
|
||||
class ArrowFlightBatchReaderBase : public arrow::RecordBatchReader {
|
||||
public:
|
||||
static arrow::Result<std::shared_ptr<ArrowFlightBatchReader>> Create(
|
||||
const std::shared_ptr<QueryStatement>& statement);
|
||||
|
||||
// RecordBatchReader force override
|
||||
[[nodiscard]] std::shared_ptr<arrow::Schema> schema() const override;
|
||||
|
||||
protected:
|
||||
ArrowFlightBatchReaderBase(const std::shared_ptr<QueryStatement>& statement);
|
||||
~ArrowFlightBatchReaderBase() override;
|
||||
arrow::Status _return_invalid_status(const std::string& msg);
|
||||
|
||||
std::shared_ptr<QueryStatement> _statement;
|
||||
std::shared_ptr<arrow::Schema> _schema;
|
||||
cctz::time_zone _timezone_obj;
|
||||
std::atomic<int64_t> _packet_seq = 0;
|
||||
|
||||
std::atomic<int64_t> _convert_arrow_batch_timer = 0;
|
||||
std::atomic<int64_t> _deserialize_block_timer = 0;
|
||||
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
|
||||
};
|
||||
|
||||
class ArrowFlightBatchLocalReader : public ArrowFlightBatchReaderBase {
|
||||
public:
|
||||
static arrow::Result<std::shared_ptr<ArrowFlightBatchLocalReader>> Create(
|
||||
const std::shared_ptr<QueryStatement>& statement);
|
||||
|
||||
arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* out) override;
|
||||
|
||||
private:
|
||||
std::shared_ptr<QueryStatement> statement_;
|
||||
std::shared_ptr<arrow::Schema> schema_;
|
||||
ArrowFlightBatchLocalReader(const std::shared_ptr<QueryStatement>& statement,
|
||||
const std::shared_ptr<arrow::Schema>& schema,
|
||||
const std::shared_ptr<MemTrackerLimiter>& mem_tracker);
|
||||
};
|
||||
|
||||
ArrowFlightBatchReader(std::shared_ptr<QueryStatement> statement,
|
||||
std::shared_ptr<arrow::Schema> schema);
|
||||
class ArrowFlightBatchRemoteReader : public ArrowFlightBatchReaderBase {
|
||||
public:
|
||||
static arrow::Result<std::shared_ptr<ArrowFlightBatchRemoteReader>> Create(
|
||||
const std::shared_ptr<QueryStatement>& statement);
|
||||
|
||||
// create arrow RecordBatchReader must initialize the schema.
|
||||
// so when creating arrow RecordBatchReader, fetch result data once,
|
||||
// which will return Block and some necessary information, and extract arrow schema from Block.
|
||||
arrow::Status init_schema();
|
||||
arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* out) override;
|
||||
|
||||
private:
|
||||
ArrowFlightBatchRemoteReader(const std::shared_ptr<QueryStatement>& statement,
|
||||
const std::shared_ptr<PBackendService_Stub>& stub);
|
||||
|
||||
arrow::Status _fetch_schema();
|
||||
arrow::Status _fetch_data();
|
||||
|
||||
std::shared_ptr<PBackendService_Stub> _brpc_stub = nullptr;
|
||||
std::once_flag _timezone_once_flag;
|
||||
std::shared_ptr<vectorized::Block> _block;
|
||||
};
|
||||
|
||||
} // namespace flight
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -19,15 +19,17 @@
|
||||
|
||||
#include <arrow/status.h>
|
||||
|
||||
#include <memory>
|
||||
|
||||
#include "arrow/flight/sql/server.h"
|
||||
#include "gutil/strings/split.h"
|
||||
#include "service/arrow_flight/arrow_flight_batch_reader.h"
|
||||
#include "service/arrow_flight/flight_sql_info.h"
|
||||
#include "service/backend_options.h"
|
||||
#include "util/arrow/utils.h"
|
||||
#include "util/uid_util.h"
|
||||
|
||||
namespace doris {
|
||||
namespace flight {
|
||||
namespace doris::flight {
|
||||
|
||||
class FlightSqlServer::Impl {
|
||||
private:
|
||||
@ -41,14 +43,21 @@ private:
|
||||
return arrow::flight::Ticket {std::move(ticket)};
|
||||
}
|
||||
|
||||
arrow::Result<std::pair<std::string, std::string>> decode_ticket(const std::string& ticket) {
|
||||
auto divider = ticket.find(':');
|
||||
if (divider == std::string::npos) {
|
||||
return arrow::Status::Invalid("Malformed ticket");
|
||||
arrow::Result<std::shared_ptr<QueryStatement>> decode_ticket(const std::string& ticket) {
|
||||
std::vector<string> fields = strings::Split(ticket, "&");
|
||||
if (fields.size() != 4) {
|
||||
return arrow::Status::Invalid(fmt::format("Malformed ticket, size: {}", fields.size()));
|
||||
}
|
||||
std::string query_id = ticket.substr(0, divider);
|
||||
std::string sql = ticket.substr(divider + 1);
|
||||
return std::make_pair(std::move(sql), std::move(query_id));
|
||||
|
||||
TUniqueId queryid;
|
||||
parse_id(fields[0], &queryid);
|
||||
TNetworkAddress result_addr;
|
||||
result_addr.hostname = fields[1];
|
||||
result_addr.port = std::stoi(fields[2]);
|
||||
std::string sql = fields[3];
|
||||
std::shared_ptr<QueryStatement> statement =
|
||||
std::make_shared<QueryStatement>(queryid, result_addr, sql);
|
||||
return statement;
|
||||
}
|
||||
|
||||
public:
|
||||
@ -59,18 +68,21 @@ public:
|
||||
arrow::Result<std::unique_ptr<arrow::flight::FlightDataStream>> DoGetStatement(
|
||||
const arrow::flight::ServerCallContext& context,
|
||||
const arrow::flight::sql::StatementQueryTicket& command) {
|
||||
ARROW_ASSIGN_OR_RAISE(auto pair, decode_ticket(command.statement_handle));
|
||||
const std::string& sql = pair.first;
|
||||
const std::string query_id = pair.second;
|
||||
TUniqueId queryid;
|
||||
parse_id(query_id, &queryid);
|
||||
|
||||
auto statement = std::make_shared<QueryStatement>(queryid, sql);
|
||||
|
||||
std::shared_ptr<ArrowFlightBatchReader> reader;
|
||||
ARROW_ASSIGN_OR_RAISE(reader, ArrowFlightBatchReader::Create(statement));
|
||||
|
||||
return std::make_unique<arrow::flight::RecordBatchStream>(reader);
|
||||
ARROW_ASSIGN_OR_RAISE(auto statement, decode_ticket(command.statement_handle));
|
||||
// if IP:BrpcPort in the Ticket is not current BE node,
|
||||
// pulls the query result Block from the BE node specified by IP:BrpcPort,
|
||||
// converts it to Arrow Batch and returns it to ADBC client.
|
||||
// use brpc to transmit blocks between BEs.
|
||||
if (statement->result_addr.hostname == BackendOptions::get_localhost() &&
|
||||
statement->result_addr.port == config::brpc_port) {
|
||||
std::shared_ptr<ArrowFlightBatchLocalReader> reader;
|
||||
ARROW_ASSIGN_OR_RAISE(reader, ArrowFlightBatchLocalReader::Create(statement));
|
||||
return std::make_unique<arrow::flight::RecordBatchStream>(reader);
|
||||
} else {
|
||||
std::shared_ptr<ArrowFlightBatchRemoteReader> reader;
|
||||
ARROW_ASSIGN_OR_RAISE(reader, ArrowFlightBatchRemoteReader::Create(statement));
|
||||
return std::make_unique<arrow::flight::RecordBatchStream>(reader);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@ -135,5 +147,4 @@ Status FlightSqlServer::join() {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
} // namespace flight
|
||||
} // namespace doris
|
||||
} // namespace doris::flight
|
||||
|
||||
@ -155,6 +155,11 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_pool_max_queue_size, MetricUnit::N
|
||||
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_max_threads, MetricUnit::NOUNIT);
|
||||
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_max_threads, MetricUnit::NOUNIT);
|
||||
|
||||
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_pool_queue_size, MetricUnit::NOUNIT);
|
||||
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_active_threads, MetricUnit::NOUNIT);
|
||||
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_pool_max_queue_size, MetricUnit::NOUNIT);
|
||||
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_max_threads, MetricUnit::NOUNIT);
|
||||
|
||||
bthread_key_t btls_key;
|
||||
|
||||
static void thread_context_deleter(void* d) {
|
||||
@ -219,7 +224,14 @@ PInternalServiceImpl::PInternalServiceImpl(ExecEnv* exec_env)
|
||||
config::brpc_light_work_pool_max_queue_size != -1
|
||||
? config::brpc_light_work_pool_max_queue_size
|
||||
: std::max(10240, CpuInfo::num_cores() * 320),
|
||||
"brpc_light") {
|
||||
"brpc_light"),
|
||||
_arrow_flight_work_pool(config::brpc_arrow_flight_work_pool_threads != -1
|
||||
? config::brpc_arrow_flight_work_pool_threads
|
||||
: std::max(512, CpuInfo::num_cores() * 16),
|
||||
config::brpc_arrow_flight_work_pool_max_queue_size != -1
|
||||
? config::brpc_arrow_flight_work_pool_max_queue_size
|
||||
: std::max(20480, CpuInfo::num_cores() * 640),
|
||||
"brpc_arrow_flight") {
|
||||
REGISTER_HOOK_METRIC(heavy_work_pool_queue_size,
|
||||
[this]() { return _heavy_work_pool.get_queue_size(); });
|
||||
REGISTER_HOOK_METRIC(light_work_pool_queue_size,
|
||||
@ -238,6 +250,15 @@ PInternalServiceImpl::PInternalServiceImpl(ExecEnv* exec_env)
|
||||
REGISTER_HOOK_METRIC(light_work_max_threads,
|
||||
[]() { return config::brpc_light_work_pool_threads; });
|
||||
|
||||
REGISTER_HOOK_METRIC(arrow_flight_work_pool_queue_size,
|
||||
[this]() { return _arrow_flight_work_pool.get_queue_size(); });
|
||||
REGISTER_HOOK_METRIC(arrow_flight_work_active_threads,
|
||||
[this]() { return _arrow_flight_work_pool.get_active_threads(); });
|
||||
REGISTER_HOOK_METRIC(arrow_flight_work_pool_max_queue_size,
|
||||
[]() { return config::brpc_arrow_flight_work_pool_max_queue_size; });
|
||||
REGISTER_HOOK_METRIC(arrow_flight_work_max_threads,
|
||||
[]() { return config::brpc_arrow_flight_work_pool_threads; });
|
||||
|
||||
_exec_env->load_stream_mgr()->set_heavy_work_pool(&_heavy_work_pool);
|
||||
_exec_env->load_stream_mgr()->set_light_work_pool(&_light_work_pool);
|
||||
|
||||
@ -256,6 +277,11 @@ PInternalServiceImpl::~PInternalServiceImpl() {
|
||||
DEREGISTER_HOOK_METRIC(heavy_work_max_threads);
|
||||
DEREGISTER_HOOK_METRIC(light_work_max_threads);
|
||||
|
||||
DEREGISTER_HOOK_METRIC(arrow_flight_work_pool_queue_size);
|
||||
DEREGISTER_HOOK_METRIC(arrow_flight_work_active_threads);
|
||||
DEREGISTER_HOOK_METRIC(arrow_flight_work_pool_max_queue_size);
|
||||
DEREGISTER_HOOK_METRIC(arrow_flight_work_max_threads);
|
||||
|
||||
CHECK_EQ(0, bthread_key_delete(btls_key));
|
||||
CHECK_EQ(0, bthread_key_delete(AsyncIO::btls_io_ctx_key));
|
||||
}
|
||||
@ -672,6 +698,22 @@ void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* controlle
|
||||
}
|
||||
}
|
||||
|
||||
void PInternalServiceImpl::fetch_arrow_data(google::protobuf::RpcController* controller,
|
||||
const PFetchArrowDataRequest* request,
|
||||
PFetchArrowDataResult* result,
|
||||
google::protobuf::Closure* done) {
|
||||
bool ret = _arrow_flight_work_pool.try_offer([this, controller, request, result, done]() {
|
||||
brpc::ClosureGuard closure_guard(done);
|
||||
auto* cntl = static_cast<brpc::Controller*>(controller);
|
||||
auto* ctx = new GetArrowResultBatchCtx(cntl, result, done);
|
||||
_exec_env->result_mgr()->fetch_arrow_data(request->finst_id(), ctx);
|
||||
});
|
||||
if (!ret) {
|
||||
offer_failed(result, done, _arrow_flight_work_pool);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
void PInternalServiceImpl::outfile_write_success(google::protobuf::RpcController* controller,
|
||||
const POutfileWriteSuccessRequest* request,
|
||||
POutfileWriteSuccessResult* result,
|
||||
@ -877,23 +919,21 @@ void PInternalServiceImpl::fetch_arrow_flight_schema(google::protobuf::RpcContro
|
||||
google::protobuf::Closure* done) {
|
||||
bool ret = _light_work_pool.try_offer([request, result, done]() {
|
||||
brpc::ClosureGuard closure_guard(done);
|
||||
std::shared_ptr<arrow::Schema> schema =
|
||||
ExecEnv::GetInstance()->result_mgr()->find_arrow_schema(
|
||||
UniqueId(request->finst_id()).to_thrift());
|
||||
if (schema == nullptr) {
|
||||
LOG(INFO) << "FE not found arrow flight schema, maybe query has been canceled";
|
||||
auto st = Status::NotFound(
|
||||
"FE not found arrow flight schema, maybe query has been canceled");
|
||||
std::shared_ptr<arrow::Schema> schema;
|
||||
auto st = ExecEnv::GetInstance()->result_mgr()->find_arrow_schema(
|
||||
UniqueId(request->finst_id()).to_thrift(), &schema);
|
||||
if (!st.ok()) {
|
||||
st.to_protobuf(result->mutable_status());
|
||||
return;
|
||||
}
|
||||
|
||||
std::string schema_str;
|
||||
auto st = serialize_arrow_schema(&schema, &schema_str);
|
||||
st = serialize_arrow_schema(&schema, &schema_str);
|
||||
if (st.ok()) {
|
||||
result->set_schema(std::move(schema_str));
|
||||
if (config::public_access_ip != "") {
|
||||
if (!config::public_access_ip.empty() && config::public_access_port != -1) {
|
||||
result->set_be_arrow_flight_ip(config::public_access_ip);
|
||||
result->set_be_arrow_flight_port(config::public_access_port);
|
||||
}
|
||||
}
|
||||
st.to_protobuf(result->mutable_status());
|
||||
|
||||
@ -77,6 +77,10 @@ public:
|
||||
void fetch_data(google::protobuf::RpcController* controller, const PFetchDataRequest* request,
|
||||
PFetchDataResult* result, google::protobuf::Closure* done) override;
|
||||
|
||||
void fetch_arrow_data(google::protobuf::RpcController* controller,
|
||||
const PFetchArrowDataRequest* request, PFetchArrowDataResult* result,
|
||||
google::protobuf::Closure* done) override;
|
||||
|
||||
void outfile_write_success(google::protobuf::RpcController* controller,
|
||||
const POutfileWriteSuccessRequest* request,
|
||||
POutfileWriteSuccessResult* result,
|
||||
@ -282,6 +286,7 @@ private:
|
||||
// otherwise as light interface
|
||||
FifoThreadPool _heavy_work_pool;
|
||||
FifoThreadPool _light_work_pool;
|
||||
FifoThreadPool _arrow_flight_work_pool;
|
||||
};
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -165,9 +165,9 @@ Status convert_to_arrow_field(SlotDescriptor* desc, std::shared_ptr<arrow::Field
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status convert_block_arrow_schema(const vectorized::Block& block,
|
||||
std::shared_ptr<arrow::Schema>* result,
|
||||
const std::string& timezone) {
|
||||
Status get_arrow_schema_from_block(const vectorized::Block& block,
|
||||
std::shared_ptr<arrow::Schema>* result,
|
||||
const std::string& timezone) {
|
||||
std::vector<std::shared_ptr<arrow::Field>> fields;
|
||||
for (const auto& type_and_name : block) {
|
||||
std::shared_ptr<arrow::DataType> arrow_type;
|
||||
@ -195,9 +195,9 @@ Status convert_to_arrow_schema(const RowDescriptor& row_desc,
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status convert_expr_ctxs_arrow_schema(const vectorized::VExprContextSPtrs& output_vexpr_ctxs,
|
||||
std::shared_ptr<arrow::Schema>* result,
|
||||
const std::string& timezone) {
|
||||
Status get_arrow_schema_from_expr_ctxs(const vectorized::VExprContextSPtrs& output_vexpr_ctxs,
|
||||
std::shared_ptr<arrow::Schema>* result,
|
||||
const std::string& timezone) {
|
||||
std::vector<std::shared_ptr<arrow::Field>> fields;
|
||||
for (int i = 0; i < output_vexpr_ctxs.size(); i++) {
|
||||
std::shared_ptr<arrow::DataType> arrow_type;
|
||||
|
||||
@ -48,13 +48,13 @@ Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr<arrow::
|
||||
Status convert_to_arrow_schema(const RowDescriptor& row_desc,
|
||||
std::shared_ptr<arrow::Schema>* result, const std::string& timezone);
|
||||
|
||||
Status convert_block_arrow_schema(const vectorized::Block& block,
|
||||
std::shared_ptr<arrow::Schema>* result,
|
||||
const std::string& timezone);
|
||||
Status get_arrow_schema_from_block(const vectorized::Block& block,
|
||||
std::shared_ptr<arrow::Schema>* result,
|
||||
const std::string& timezone);
|
||||
|
||||
Status convert_expr_ctxs_arrow_schema(const vectorized::VExprContextSPtrs& output_vexpr_ctxs,
|
||||
std::shared_ptr<arrow::Schema>* result,
|
||||
const std::string& timezone);
|
||||
Status get_arrow_schema_from_expr_ctxs(const vectorized::VExprContextSPtrs& output_vexpr_ctxs,
|
||||
std::shared_ptr<arrow::Schema>* result,
|
||||
const std::string& timezone);
|
||||
|
||||
Status serialize_record_batch(const arrow::RecordBatch& record_batch, std::string* result);
|
||||
|
||||
|
||||
@ -33,9 +33,10 @@ Status to_doris_status(const arrow::Status& status) {
|
||||
}
|
||||
|
||||
arrow::Status to_arrow_status(const Status& status) {
|
||||
if (status.ok()) {
|
||||
if (LIKELY(status.ok())) {
|
||||
return arrow::Status::OK();
|
||||
} else {
|
||||
LOG(WARNING) << status.to_string();
|
||||
// The length of exception msg returned to the ADBC Client cannot larger than 8192,
|
||||
// otherwise ADBC Client will receive:
|
||||
// `INTERNAL: http2 exception Header size exceeded max allowed size (8192)`.
|
||||
|
||||
@ -216,6 +216,11 @@ public:
|
||||
UIntGauge* heavy_work_max_threads = nullptr;
|
||||
UIntGauge* light_work_max_threads = nullptr;
|
||||
|
||||
UIntGauge* arrow_flight_work_pool_queue_size = nullptr;
|
||||
UIntGauge* arrow_flight_work_active_threads = nullptr;
|
||||
UIntGauge* arrow_flight_work_pool_max_queue_size = nullptr;
|
||||
UIntGauge* arrow_flight_work_max_threads = nullptr;
|
||||
|
||||
UIntGauge* flush_thread_pool_queue_size = nullptr;
|
||||
UIntGauge* flush_thread_pool_thread_num = nullptr;
|
||||
|
||||
|
||||
@ -19,21 +19,16 @@
|
||||
|
||||
#include "runtime/buffer_control_block.h"
|
||||
#include "runtime/runtime_state.h"
|
||||
#include "util/arrow/block_convertor.h"
|
||||
#include "util/arrow/row_batch.h"
|
||||
#include "runtime/thread_context.h"
|
||||
#include "vec/core/block.h"
|
||||
#include "vec/exprs/vexpr_context.h"
|
||||
|
||||
namespace doris {
|
||||
namespace vectorized {
|
||||
namespace doris::vectorized {
|
||||
|
||||
VArrowFlightResultWriter::VArrowFlightResultWriter(
|
||||
BufferControlBlock* sinker, const VExprContextSPtrs& output_vexpr_ctxs,
|
||||
RuntimeProfile* parent_profile, const std::shared_ptr<arrow::Schema>& arrow_schema)
|
||||
: _sinker(sinker),
|
||||
_output_vexpr_ctxs(output_vexpr_ctxs),
|
||||
_parent_profile(parent_profile),
|
||||
_arrow_schema(arrow_schema) {}
|
||||
VArrowFlightResultWriter::VArrowFlightResultWriter(BufferControlBlock* sinker,
|
||||
const VExprContextSPtrs& output_vexpr_ctxs,
|
||||
RuntimeProfile* parent_profile)
|
||||
: _sinker(sinker), _output_vexpr_ctxs(output_vexpr_ctxs), _parent_profile(parent_profile) {}
|
||||
|
||||
Status VArrowFlightResultWriter::init(RuntimeState* state) {
|
||||
_init_profile();
|
||||
@ -41,13 +36,11 @@ Status VArrowFlightResultWriter::init(RuntimeState* state) {
|
||||
return Status::InternalError("sinker is NULL pointer.");
|
||||
}
|
||||
_is_dry_run = state->query_options().dry_run_query;
|
||||
_timezone_obj = state->timezone_obj();
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void VArrowFlightResultWriter::_init_profile() {
|
||||
_append_row_batch_timer = ADD_TIMER(_parent_profile, "AppendBatchTime");
|
||||
_convert_tuple_timer = ADD_CHILD_TIMER(_parent_profile, "TupleConvertTime", "AppendBatchTime");
|
||||
_result_send_timer = ADD_CHILD_TIMER(_parent_profile, "ResultSendTime", "AppendBatchTime");
|
||||
_sent_rows_counter = ADD_COUNTER(_parent_profile, "NumSentRows", TUnit::UNIT);
|
||||
_bytes_sent_counter = ADD_COUNTER(_parent_profile, "BytesSent", TUnit::BYTES);
|
||||
@ -66,29 +59,31 @@ Status VArrowFlightResultWriter::write(Block& input_block) {
|
||||
RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs,
|
||||
input_block, &block));
|
||||
|
||||
// convert one batch
|
||||
std::shared_ptr<arrow::RecordBatch> result;
|
||||
auto num_rows = block.rows();
|
||||
// arrow::RecordBatch without `nbytes()` in C++
|
||||
uint64_t bytes_sent = block.bytes();
|
||||
{
|
||||
SCOPED_TIMER(_convert_tuple_timer);
|
||||
RETURN_IF_ERROR(convert_to_arrow_batch(block, _arrow_schema, arrow::default_memory_pool(),
|
||||
&result, _timezone_obj));
|
||||
}
|
||||
{
|
||||
SCOPED_TIMER(_result_send_timer);
|
||||
// If this is a dry run task, no need to send data block
|
||||
if (!_is_dry_run) {
|
||||
status = _sinker->add_arrow_batch(result);
|
||||
}
|
||||
if (status.ok()) {
|
||||
_written_rows += num_rows;
|
||||
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_sinker->mem_tracker());
|
||||
std::unique_ptr<vectorized::MutableBlock> mutable_block =
|
||||
vectorized::MutableBlock::create_unique(block.clone_empty());
|
||||
RETURN_IF_ERROR(mutable_block->merge_ignore_overflow(std::move(block)));
|
||||
std::shared_ptr<vectorized::Block> output_block = vectorized::Block::create_shared();
|
||||
output_block->swap(mutable_block->to_block());
|
||||
|
||||
auto num_rows = output_block->rows();
|
||||
// arrow::RecordBatch without `nbytes()` in C++
|
||||
uint64_t bytes_sent = output_block->bytes();
|
||||
{
|
||||
SCOPED_TIMER(_result_send_timer);
|
||||
// If this is a dry run task, no need to send data block
|
||||
if (!_is_dry_run) {
|
||||
_bytes_sent += bytes_sent;
|
||||
status = _sinker->add_arrow_batch(output_block);
|
||||
}
|
||||
if (status.ok()) {
|
||||
_written_rows += num_rows;
|
||||
if (!_is_dry_run) {
|
||||
_bytes_sent += bytes_sent;
|
||||
}
|
||||
} else {
|
||||
LOG(WARNING) << "append result batch to sink failed.";
|
||||
}
|
||||
} else {
|
||||
LOG(WARNING) << "append result batch to sink failed.";
|
||||
}
|
||||
}
|
||||
return status;
|
||||
@ -104,5 +99,4 @@ Status VArrowFlightResultWriter::close(Status st) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
} // namespace vectorized
|
||||
} // namespace doris
|
||||
} // namespace doris::vectorized
|
||||
|
||||
@ -17,13 +17,6 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <arrow/type.h>
|
||||
#include <cctz/time_zone.h>
|
||||
#include <stddef.h>
|
||||
|
||||
#include <memory>
|
||||
#include <vector>
|
||||
|
||||
#include "common/status.h"
|
||||
#include "runtime/result_writer.h"
|
||||
#include "util/runtime_profile.h"
|
||||
@ -39,12 +32,11 @@ class Block;
|
||||
class VArrowFlightResultWriter final : public ResultWriter {
|
||||
public:
|
||||
VArrowFlightResultWriter(BufferControlBlock* sinker, const VExprContextSPtrs& output_vexpr_ctxs,
|
||||
RuntimeProfile* parent_profile,
|
||||
const std::shared_ptr<arrow::Schema>& arrow_schema);
|
||||
RuntimeProfile* parent_profile);
|
||||
|
||||
Status init(RuntimeState* state) override;
|
||||
|
||||
Status write(Block& block) override;
|
||||
Status write(Block& input_block) override;
|
||||
|
||||
bool can_sink() override;
|
||||
|
||||
@ -60,8 +52,6 @@ private:
|
||||
RuntimeProfile* _parent_profile = nullptr; // parent profile from result sink. not owned
|
||||
// total time cost on append batch operation
|
||||
RuntimeProfile::Counter* _append_row_batch_timer = nullptr;
|
||||
// tuple convert timer, child timer of _append_row_batch_timer
|
||||
RuntimeProfile::Counter* _convert_tuple_timer = nullptr;
|
||||
// file write timer, child timer of _append_row_batch_timer
|
||||
RuntimeProfile::Counter* _result_send_timer = nullptr;
|
||||
// number of sent rows
|
||||
@ -72,10 +62,6 @@ private:
|
||||
bool _is_dry_run = false;
|
||||
|
||||
uint64_t _bytes_sent = 0;
|
||||
|
||||
std::shared_ptr<arrow::Schema> _arrow_schema;
|
||||
|
||||
cctz::time_zone _timezone_obj;
|
||||
};
|
||||
} // namespace vectorized
|
||||
} // namespace doris
|
||||
|
||||
@ -88,7 +88,7 @@ Status MemoryScratchSink::send(RuntimeState* state, Block* input_block, bool eos
|
||||
*input_block, &block));
|
||||
std::shared_ptr<arrow::Schema> block_arrow_schema;
|
||||
// After expr executed, use recaculated schema as final schema
|
||||
RETURN_IF_ERROR(convert_block_arrow_schema(block, &block_arrow_schema, state->timezone()));
|
||||
RETURN_IF_ERROR(get_arrow_schema_from_block(block, &block_arrow_schema, state->timezone()));
|
||||
RETURN_IF_ERROR(convert_to_arrow_batch(block, block_arrow_schema, arrow::default_memory_pool(),
|
||||
&result, _timezone_obj));
|
||||
_queue->blocking_put(result);
|
||||
|
||||
@ -79,7 +79,7 @@ Status VResultFileSink::prepare(RuntimeState* state) {
|
||||
// create sender
|
||||
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
|
||||
state->fragment_instance_id(), _buf_size, &_sender, state->enable_pipeline_exec(),
|
||||
state->execution_timeout()));
|
||||
state));
|
||||
// create writer
|
||||
_writer.reset(new (std::nothrow) VFileResultWriter(
|
||||
_file_opts.get(), _storage_type, state->fragment_instance_id(), _output_vexpr_ctxs,
|
||||
|
||||
@ -90,7 +90,7 @@ Status VResultSink::prepare(RuntimeState* state) {
|
||||
// create sender
|
||||
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
|
||||
state->fragment_instance_id(), _buf_size, &_sender, state->enable_pipeline_exec(),
|
||||
state->execution_timeout()));
|
||||
state));
|
||||
|
||||
// create writer based on sink type
|
||||
switch (_sink_type) {
|
||||
@ -106,12 +106,11 @@ Status VResultSink::prepare(RuntimeState* state) {
|
||||
}
|
||||
case TResultSinkType::ARROW_FLIGHT_PROTOCAL: {
|
||||
std::shared_ptr<arrow::Schema> arrow_schema;
|
||||
RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs, &arrow_schema,
|
||||
state->timezone()));
|
||||
state->exec_env()->result_mgr()->register_arrow_schema(state->fragment_instance_id(),
|
||||
arrow_schema);
|
||||
RETURN_IF_ERROR(get_arrow_schema_from_expr_ctxs(_output_vexpr_ctxs, &arrow_schema,
|
||||
state->timezone()));
|
||||
_sender->register_arrow_schema(arrow_schema);
|
||||
_writer.reset(new (std::nothrow) VArrowFlightResultWriter(_sender.get(), _output_vexpr_ctxs,
|
||||
_profile, arrow_schema));
|
||||
_profile));
|
||||
break;
|
||||
}
|
||||
default:
|
||||
|
||||
@ -34,6 +34,7 @@ protected:
|
||||
virtual void SetUp() {}
|
||||
|
||||
private:
|
||||
RuntimeState _state;
|
||||
};
|
||||
|
||||
TEST_F(ResultBufferMgrTest, create_normal) {
|
||||
@ -43,7 +44,7 @@ TEST_F(ResultBufferMgrTest, create_normal) {
|
||||
query_id.hi = 100;
|
||||
|
||||
std::shared_ptr<BufferControlBlock> control_block1;
|
||||
EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, false).ok());
|
||||
EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, &_state).ok());
|
||||
}
|
||||
|
||||
TEST_F(ResultBufferMgrTest, create_same_buffer) {
|
||||
@ -53,9 +54,9 @@ TEST_F(ResultBufferMgrTest, create_same_buffer) {
|
||||
query_id.hi = 100;
|
||||
|
||||
std::shared_ptr<BufferControlBlock> control_block1;
|
||||
EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, false).ok());
|
||||
EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, &_state).ok());
|
||||
std::shared_ptr<BufferControlBlock> control_block2;
|
||||
EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block2, false).ok());
|
||||
EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block2, &_state).ok());
|
||||
|
||||
EXPECT_EQ(control_block1.get(), control_block1.get());
|
||||
}
|
||||
@ -67,7 +68,7 @@ TEST_F(ResultBufferMgrTest, fetch_data_normal) {
|
||||
query_id.hi = 100;
|
||||
|
||||
std::shared_ptr<BufferControlBlock> control_block1;
|
||||
EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, false).ok());
|
||||
EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, &_state).ok());
|
||||
|
||||
TFetchDataResult* result = new TFetchDataResult();
|
||||
result->result_batch.rows.push_back("hello test");
|
||||
@ -85,7 +86,7 @@ TEST_F(ResultBufferMgrTest, fetch_data_no_block) {
|
||||
query_id.hi = 100;
|
||||
|
||||
std::shared_ptr<BufferControlBlock> control_block1;
|
||||
EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, false).ok());
|
||||
EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, &_state).ok());
|
||||
|
||||
TFetchDataResult* result = new TFetchDataResult();
|
||||
query_id.lo = 11;
|
||||
@ -101,7 +102,7 @@ TEST_F(ResultBufferMgrTest, normal_cancel) {
|
||||
query_id.hi = 100;
|
||||
|
||||
std::shared_ptr<BufferControlBlock> control_block1;
|
||||
EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, false).ok());
|
||||
EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1, &_state).ok());
|
||||
|
||||
EXPECT_TRUE(buffer_mgr.cancel(query_id).ok());
|
||||
}
|
||||
|
||||
@ -27,6 +27,7 @@ import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.QueryState.MysqlStateType;
|
||||
import org.apache.doris.service.arrowflight.results.FlightSqlResultCacheEntry;
|
||||
import org.apache.doris.service.arrowflight.sessions.FlightSessionsManager;
|
||||
import org.apache.doris.thrift.TUniqueId;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.protobuf.Any;
|
||||
@ -224,19 +225,36 @@ public class DorisFlightSqlProducer implements FlightSqlProducer, AutoCloseable
|
||||
}
|
||||
} else {
|
||||
// Now only query stmt will pull results from BE.
|
||||
final ByteString handle = ByteString.copyFromUtf8(
|
||||
DebugUtil.printId(connectContext.getFinstId()) + ":" + query);
|
||||
Schema schema = flightSQLConnectProcessor.fetchArrowFlightSchema(5000);
|
||||
if (schema == null) {
|
||||
throw CallStatus.INTERNAL.withDescription("fetch arrow flight schema is null")
|
||||
.toRuntimeException();
|
||||
}
|
||||
|
||||
TUniqueId queryId = connectContext.getFinstId();
|
||||
// Ticket contains the IP and Brpc Port of the Doris BE node where the query result is located.
|
||||
final ByteString handle = ByteString.copyFromUtf8(
|
||||
DebugUtil.printId(queryId) + "&" + connectContext.getResultInternalServiceAddr().hostname
|
||||
+ "&" + connectContext.getResultInternalServiceAddr().port + "&" + query);
|
||||
TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder().setStatementHandle(handle)
|
||||
.build();
|
||||
Ticket ticket = new Ticket(Any.pack(ticketStatement).toByteArray());
|
||||
// TODO Support multiple endpoints.
|
||||
Location location = Location.forGrpcInsecure(connectContext.getResultFlightServerAddr().hostname,
|
||||
connectContext.getResultFlightServerAddr().port);
|
||||
Location location;
|
||||
if (flightSQLConnectProcessor.getPublicAccessAddr().isSetHostname()) {
|
||||
// In a production environment, it is often inconvenient to expose Doris BE nodes
|
||||
// to the external network.
|
||||
// However, a reverse proxy (such as nginx) can be added to all Doris BE nodes,
|
||||
// and the external client will be randomly routed to a Doris BE node when connecting to nginx.
|
||||
// The query results of Arrow Flight SQL will be randomly saved on a Doris BE node.
|
||||
// If it is different from the Doris BE node randomly routed by nginx,
|
||||
// data forwarding needs to be done inside the Doris BE node.
|
||||
location = Location.forGrpcInsecure(flightSQLConnectProcessor.getPublicAccessAddr().hostname,
|
||||
flightSQLConnectProcessor.getPublicAccessAddr().port);
|
||||
} else {
|
||||
location = Location.forGrpcInsecure(connectContext.getResultFlightServerAddr().hostname,
|
||||
connectContext.getResultFlightServerAddr().port);
|
||||
}
|
||||
List<FlightEndpoint> endpoints = Collections.singletonList(new FlightEndpoint(ticket, location));
|
||||
// TODO Set in BE callback after query end, Client will not callback.
|
||||
return new FlightInfo(schema, descriptor, endpoints, -1, -1);
|
||||
|
||||
@ -53,11 +53,12 @@ import java.util.concurrent.TimeoutException;
|
||||
|
||||
/**
|
||||
* Process one flgiht sql connection.
|
||||
*
|
||||
* <p>
|
||||
* Must use try-with-resources.
|
||||
*/
|
||||
public class FlightSqlConnectProcessor extends ConnectProcessor implements AutoCloseable {
|
||||
private static final Logger LOG = LogManager.getLogger(FlightSqlConnectProcessor.class);
|
||||
private TNetworkAddress publicAccessAddr = new TNetworkAddress();
|
||||
|
||||
public FlightSqlConnectProcessor(ConnectContext context) {
|
||||
super(context);
|
||||
@ -66,6 +67,10 @@ public class FlightSqlConnectProcessor extends ConnectProcessor implements AutoC
|
||||
context.setReturnResultFromLocal(true);
|
||||
}
|
||||
|
||||
public TNetworkAddress getPublicAccessAddr() {
|
||||
return publicAccessAddr;
|
||||
}
|
||||
|
||||
public void prepare(MysqlCommand command) {
|
||||
// set status of query to OK.
|
||||
ctx.getState().reset();
|
||||
@ -123,11 +128,12 @@ public class FlightSqlConnectProcessor extends ConnectProcessor implements AutoC
|
||||
}
|
||||
Status resultStatus = new Status(pResult.getStatus());
|
||||
if (resultStatus.getErrorCode() != TStatusCode.OK) {
|
||||
throw new RuntimeException(String.format("fetch arrow flight schema failed, finstId: %s, errmsg: %s",
|
||||
DebugUtil.printId(tid), resultStatus.toString()));
|
||||
throw new RuntimeException(String.format("fetch arrow flight schema failed, queryId: %s, errmsg: %s",
|
||||
DebugUtil.printId(tid), resultStatus));
|
||||
}
|
||||
if (pResult.hasBeArrowFlightIp()) {
|
||||
ctx.getResultFlightServerAddr().hostname = pResult.getBeArrowFlightIp().toStringUtf8();
|
||||
if (pResult.hasBeArrowFlightIp() && pResult.hasBeArrowFlightPort()) {
|
||||
publicAccessAddr.hostname = pResult.getBeArrowFlightIp().toStringUtf8();
|
||||
publicAccessAddr.port = pResult.getBeArrowFlightPort();
|
||||
}
|
||||
if (pResult.hasSchema() && pResult.getSchema().size() > 0) {
|
||||
RootAllocator rootAllocator = new RootAllocator(Integer.MAX_VALUE);
|
||||
|
||||
@ -262,6 +262,20 @@ message PFetchDataResult {
|
||||
optional bool empty_batch = 6;
|
||||
};
|
||||
|
||||
message PFetchArrowDataRequest {
|
||||
optional PUniqueId finst_id = 1;
|
||||
};
|
||||
|
||||
message PFetchArrowDataResult {
|
||||
optional PStatus status = 1;
|
||||
// valid when status is ok
|
||||
optional int64 packet_seq = 2;
|
||||
optional bool eos = 3;
|
||||
optional PBlock block = 4;
|
||||
optional bool empty_batch = 5;
|
||||
optional string timezone = 6;
|
||||
};
|
||||
|
||||
message PFetchArrowFlightSchemaRequest {
|
||||
optional PUniqueId finst_id = 1;
|
||||
};
|
||||
@ -271,6 +285,7 @@ message PFetchArrowFlightSchemaResult {
|
||||
// valid when status is ok
|
||||
optional bytes schema = 2;
|
||||
optional bytes be_arrow_flight_ip = 3;
|
||||
optional int32 be_arrow_flight_port = 4;
|
||||
};
|
||||
|
||||
message KeyTuple {
|
||||
@ -922,6 +937,7 @@ service PBackendService {
|
||||
rpc exec_plan_fragment_start(PExecPlanFragmentStartRequest) returns (PExecPlanFragmentResult);
|
||||
rpc cancel_plan_fragment(PCancelPlanFragmentRequest) returns (PCancelPlanFragmentResult);
|
||||
rpc fetch_data(PFetchDataRequest) returns (PFetchDataResult);
|
||||
rpc fetch_arrow_data(PFetchArrowDataRequest) returns (PFetchArrowDataResult);
|
||||
rpc tablet_writer_open(PTabletWriterOpenRequest) returns (PTabletWriterOpenResult);
|
||||
rpc open_load_stream(POpenLoadStreamRequest) returns (POpenLoadStreamResponse);
|
||||
rpc tablet_writer_add_block(PTabletWriterAddBlockRequest) returns (PTabletWriterAddBlockResult);
|
||||
|
||||
Reference in New Issue
Block a user