Add cpu and io indicates to audit log (#531)

This commit is contained in:
chenhao
2019-01-17 12:43:15 +08:00
committed by ZHAO Chun
parent 33b133c6ff
commit 0e5b193243
44 changed files with 608 additions and 162 deletions

View File

@ -49,11 +49,13 @@ Status DataSink::create_data_sink(
if (!thrift_sink.__isset.stream_sink) {
return Status("Missing data stream sink.");
}
bool send_query_statistics_with_every_batch = params.__isset.send_query_statistics_with_every_batch ?
params.send_query_statistics_with_every_batch : false;
// TODO: figure out good buffer size based on size of output row
tmp_sink = new DataStreamSender(
pool, params.sender_id, row_desc,
thrift_sink.stream_sink, params.destinations, 16 * 1024);
thrift_sink.stream_sink, params.destinations, 16 * 1024,
send_query_statistics_with_every_batch);
// RETURN_IF_ERROR(sender->prepare(state->obj_pool(), thrift_sink.stream_sink));
sink->reset(tmp_sink);
break;

View File

@ -25,6 +25,7 @@
#include "gen_cpp/DataSinks_types.h"
#include "gen_cpp/Exprs_types.h"
#include "runtime/mem_tracker.h"
#include "runtime/query_statistics.h"
namespace doris {
@ -78,11 +79,17 @@ public:
// Returns the runtime profile for the sink.
virtual RuntimeProfile* profile() = 0;
virtual void set_query_statistics(std::shared_ptr<QueryStatistics> statistics) {
_query_statistics = statistics;
}
protected:
// Set to true after close() has been called. subclasses should check and set this in
// close().
bool _closed;
std::unique_ptr<MemTracker> _expr_mem_tracker;
// Maybe this will be transfered to BufferControlBlock.
std::shared_ptr<QueryStatistics> _query_statistics;
};
} // namespace doris

View File

@ -43,7 +43,8 @@ ExchangeNode::ExchangeNode(
_next_row_idx(0),
_is_merging(tnode.exchange_node.__isset.sort_info),
_offset(tnode.exchange_node.__isset.offset ? tnode.exchange_node.offset : 0),
_num_rows_skipped(0) {
_num_rows_skipped(0),
_merge_rows_counter(nullptr) {
DCHECK_GE(_offset, 0);
DCHECK(_is_merging || (_offset == 0));
}
@ -63,16 +64,16 @@ Status ExchangeNode::init(const TPlanNode& tnode, RuntimeState* state) {
Status ExchangeNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::prepare(state));
_convert_row_batch_timer = ADD_TIMER(runtime_profile(), "ConvertRowBatchTime");
_merge_rows_counter = ADD_COUNTER(runtime_profile(), "MergeRows", TUnit::UNIT);
// TODO: figure out appropriate buffer size
DCHECK_GT(_num_senders, 0);
_sub_plan_query_statistics_recvr.reset(new QueryStatisticsRecvr());
_stream_recvr = state->exec_env()->stream_mgr()->create_recvr(
state, _input_row_desc,
state->fragment_instance_id(), _id,
_num_senders, config::exchg_node_buffer_size_bytes,
state->runtime_profile(), _is_merging);
state->runtime_profile(), _is_merging, _sub_plan_query_statistics_recvr.get());
if (_is_merging) {
_merge_rows_counter = ADD_COUNTER(runtime_profile(), "MergeRows", TUnit::UNIT);
RETURN_IF_ERROR(_sort_exec_exprs.prepare(
state, _row_descriptor, _row_descriptor, expr_mem_tracker()));
// AddExprCtxsToFree(_sort_exec_exprs);
@ -95,6 +96,12 @@ Status ExchangeNode::open(RuntimeState* state) {
return Status::OK;
}
Status ExchangeNode::collect_query_statistics(QueryStatistics* statistics) {
RETURN_IF_ERROR(ExecNode::collect_query_statistics(statistics));
statistics->merge(_sub_plan_query_statistics_recvr.get());
return Status::OK;
}
Status ExchangeNode::close(RuntimeState* state) {
if (is_closed()) {
return Status::OK;

View File

@ -21,11 +21,11 @@
#include <boost/scoped_ptr.hpp>
#include "exec/exec_node.h"
#include "exec/sort_exec_exprs.h"
#include "runtime/data_stream_recvr.h"
namespace doris {
class RowBatch;
class DataStreamRecvr;
class RuntimeProfile;
// Receiver node for data streams. The data stream receiver is created in Prepare()
@ -49,6 +49,7 @@ public:
// Blocks until the first batch is available for consumption via GetNext().
virtual Status open(RuntimeState* state);
virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos);
Status collect_query_statistics(QueryStatistics* statistics) override;
virtual Status close(RuntimeState* state);
// the number of senders needs to be set after the c'tor, because it's not
@ -61,6 +62,7 @@ protected:
virtual void debug_string(int indentation_level, std::stringstream* out) const;
private:
// Implements GetNext() for the case where _is_merging is true. Delegates the GetNext()
// call to the underlying DataStreamRecvr.
Status get_next_merging(RuntimeState* state, RowBatch* output_batch, bool* eos);
@ -109,6 +111,11 @@ private:
int64_t _num_rows_skipped;
RuntimeProfile::Counter* _merge_rows_counter;
// Sub plan query statistics receiver. It is shared with DataStreamRecvr and will be
// called in two different threads. But their calls are all at different time, there is
// no problem of multithreaded access.
std::unique_ptr<QueryStatisticsRecvr> _sub_plan_query_statistics_recvr;
};
};

View File

@ -212,6 +212,14 @@ Status ExecNode::reset(RuntimeState* state) {
return Status::OK;
}
Status ExecNode::collect_query_statistics(QueryStatistics* statistics) {
DCHECK(statistics != nullptr);
for (auto child_node : _children) {
child_node->collect_query_statistics(statistics);
}
return Status::OK;
}
Status ExecNode::close(RuntimeState* state) {
if (_is_closed) {
return Status::OK;

View File

@ -29,6 +29,7 @@
#include "util/runtime_profile.h"
#include "util/blocking_queue.hpp"
#include "runtime/bufferpool/buffer_pool.h"
#include "runtime/query_statistics.h"
namespace llvm {
class Function;
@ -114,6 +115,11 @@ public:
// so should be fast.
virtual Status reset(RuntimeState* state);
// This should be called before close() and after get_next(), it is responsible for
// collecting statistics sent with row batch, it can't be called when prepare() returns
// error.
virtual Status collect_query_statistics(QueryStatistics* statistics);
// close() will get called for every exec node, regardless of what else is called and
// the status of these calls (i.e. prepare() may never have been called, or
// prepare()/open()/get_next() returned with an error).

View File

@ -92,11 +92,11 @@ Status HashJoinNode::prepare(RuntimeState* state) {
ADD_TIMER(runtime_profile(), "PushDownComputeTime");
_probe_timer =
ADD_TIMER(runtime_profile(), "ProbeTime");
_build_row_counter =
_build_rows_counter =
ADD_COUNTER(runtime_profile(), "BuildRows", TUnit::UNIT);
_build_buckets_counter =
ADD_COUNTER(runtime_profile(), "BuildBuckets", TUnit::UNIT);
_probe_row_counter =
_probe_rows_counter =
ADD_COUNTER(runtime_profile(), "ProbeRows", TUnit::UNIT);
_hash_tbl_load_factor_counter =
ADD_COUNTER(runtime_profile(), "LoadFactor", TUnit::DOUBLE_VALUE);
@ -243,7 +243,7 @@ Status HashJoinNode::construct_hash_table(RuntimeState* state) {
VLOG_ROW << _hash_tbl->debug_string(true, &child(1)->row_desc());
COUNTER_SET(_build_row_counter, _hash_tbl->size());
COUNTER_SET(_build_rows_counter, _hash_tbl->size());
COUNTER_SET(_build_buckets_counter, _hash_tbl->num_buckets());
COUNTER_SET(_hash_tbl_load_factor_counter, _hash_tbl->load_factor());
build_batch.reset();
@ -380,7 +380,7 @@ Status HashJoinNode::open(RuntimeState* state) {
// seed probe batch and _current_probe_row, etc.
while (true) {
RETURN_IF_ERROR(child(0)->get_next(state, _probe_batch.get(), &_probe_eos));
COUNTER_UPDATE(_probe_row_counter, _probe_batch->num_rows());
COUNTER_UPDATE(_probe_rows_counter, _probe_batch->num_rows());
_probe_batch_pos = 0;
if (_probe_batch->num_rows() == 0) {
@ -571,7 +571,7 @@ Status HashJoinNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eo
continue;
} else {
COUNTER_UPDATE(_probe_row_counter, _probe_batch->num_rows());
COUNTER_UPDATE(_probe_rows_counter, _probe_batch->num_rows());
break;
}
}
@ -695,7 +695,7 @@ Status HashJoinNode::left_join_get_next(RuntimeState* state,
probe_timer.stop();
RETURN_IF_ERROR(child(0)->get_next(state, _probe_batch.get(), &_probe_eos));
probe_timer.start();
COUNTER_UPDATE(_probe_row_counter, _probe_batch->num_rows());
COUNTER_UPDATE(_probe_rows_counter, _probe_batch->num_rows());
}
}
}

View File

@ -134,8 +134,8 @@ private:
RuntimeProfile::Counter* _push_down_timer; // time to build hash table
RuntimeProfile::Counter* _push_compute_timer;
RuntimeProfile::Counter* _probe_timer; // time to probe
RuntimeProfile::Counter* _build_row_counter; // num build rows
RuntimeProfile::Counter* _probe_row_counter; // num probe rows
RuntimeProfile::Counter* _build_rows_counter; // num build rows
RuntimeProfile::Counter* _probe_rows_counter; // num probe rows
RuntimeProfile::Counter* _build_buckets_counter; // num buckets in hash table
RuntimeProfile::Counter* _hash_tbl_load_factor_counter;

View File

@ -309,6 +309,13 @@ Status OlapScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eo
return _status;
}
Status OlapScanNode::collect_query_statistics(QueryStatistics* statistics) {
RETURN_IF_ERROR(ExecNode::collect_query_statistics(statistics));
statistics->add_scan_bytes(_read_compressed_counter->value());
statistics->add_scan_rows(rows_returned());
return Status::OK;
}
Status OlapScanNode::close(RuntimeState* state) {
if (is_closed()) {
return Status::OK;

View File

@ -56,6 +56,7 @@ public:
virtual Status prepare(RuntimeState* state);
virtual Status open(RuntimeState* state);
virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos);
Status collect_query_statistics(QueryStatistics* statistics) override;
virtual Status close(RuntimeState* state);
virtual Status set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges);

View File

@ -93,6 +93,7 @@ add_library(Runtime STATIC
initial_reservations.cc
snapshot_loader.cpp
kafka_consumer_pipe.cpp
query_statistics.cpp
)
# This test runs forever so should not be part of 'make test'

View File

@ -31,9 +31,13 @@ void GetResultBatchCtx::on_failure(const Status& status) {
delete this;
}
void GetResultBatchCtx::on_close(int64_t packet_seq) {
void GetResultBatchCtx::on_close(int64_t packet_seq,
QueryStatistics* statistics) {
Status status;
status.to_protobuf(result->mutable_status());
if (statistics != nullptr) {
statistics->to_pb(result->mutable_query_statistics());
}
result->set_packet_seq(packet_seq);
result->set_eos(true);
done->Run();
@ -183,7 +187,7 @@ void BufferControlBlock::get_batch(GetResultBatchCtx* ctx) {
return;
}
if (_is_close) {
ctx->on_close(_packet_num);
ctx->on_close(_packet_num, _query_statistics.get());
return;
}
// no ready data, push ctx to waiting list
@ -200,7 +204,7 @@ Status BufferControlBlock::close(Status exec_status) {
if (!_waiting_rpc.empty()) {
if (_status.ok()) {
for (auto& ctx : _waiting_rpc) {
ctx->on_close(_packet_num);
ctx->on_close(_packet_num, _query_statistics.get());
}
} else {
for (auto& ctx : _waiting_rpc) {

View File

@ -24,6 +24,7 @@
#include <boost/thread/condition_variable.hpp>
#include "common/status.h"
#include "gen_cpp/Types_types.h"
#include "runtime/query_statistics.h"
namespace google {
namespace protobuf {
@ -52,7 +53,7 @@ struct GetResultBatchCtx {
}
void on_failure(const Status& status);
void on_close(int64_t packet_seq);
void on_close(int64_t packet_seq, QueryStatistics* statistics = nullptr);
void on_data(TFetchDataResult* t_result, int64_t packet_seq, bool eos = false);
};
@ -80,6 +81,9 @@ public:
return _fragment_id;
}
void set_query_statistics(std::shared_ptr<QueryStatistics> statistics) {
_query_statistics = statistics;
}
private:
typedef std::list<TFetchDataResult*> ResultQueue;
@ -100,8 +104,13 @@ private:
boost::condition_variable _data_arriaval;
// signal removal of data by stream consumer
boost::condition_variable _data_removal;
std::deque<GetResultBatchCtx*> _waiting_rpc;
// It is shared with PlanFragmentExecutor and will be called in two different
// threads. But their calls are all at different time, there is no problem of
// multithreaded access.
std::shared_ptr<QueryStatistics> _query_statistics;
};
}

View File

@ -51,14 +51,14 @@ inline uint32_t DataStreamMgr::get_hash_value(
shared_ptr<DataStreamRecvr> DataStreamMgr::create_recvr(RuntimeState* state,
const RowDescriptor& row_desc, const TUniqueId& fragment_instance_id,
PlanNodeId dest_node_id, int num_senders, int buffer_size, RuntimeProfile* profile,
bool is_merging) {
bool is_merging, QueryStatisticsRecvr* sub_plan_query_statistics_recvr) {
DCHECK(profile != NULL);
VLOG_FILE << "creating receiver for fragment="
<< fragment_instance_id << ", node=" << dest_node_id;
shared_ptr<DataStreamRecvr> recvr(
new DataStreamRecvr(this, state->instance_mem_tracker(), row_desc,
fragment_instance_id, dest_node_id, num_senders, is_merging, buffer_size,
profile));
profile, sub_plan_query_statistics_recvr));
uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id);
lock_guard<mutex> l(_lock);
_fragment_stream_set.insert(std::make_pair(fragment_instance_id, dest_node_id));
@ -93,39 +93,14 @@ shared_ptr<DataStreamRecvr> DataStreamMgr::find_recvr(
return shared_ptr<DataStreamRecvr>();
}
Status DataStreamMgr::add_data(
const PUniqueId& finst_id, int32_t node_id,
const PRowBatch& pb_batch, int32_t sender_id,
int be_number, int64_t packet_seq,
::google::protobuf::Closure** done) {
VLOG_ROW << "add_data(): fragment_instance_id=" << print_id(finst_id)
<< " node=" << node_id;
Status DataStreamMgr::transmit_data(const PTransmitDataParams* request, ::google::protobuf::Closure** done) {
const PUniqueId& finst_id = request->finst_id();
TUniqueId t_finst_id;
t_finst_id.hi = finst_id.hi();
t_finst_id.lo = finst_id.lo();
shared_ptr<DataStreamRecvr> recvr = find_recvr(t_finst_id, node_id);
if (recvr == NULL) {
// The receiver may remove itself from the receiver map via deregister_recvr()
// at any time without considering the remaining number of senders.
// As a consequence, find_recvr() may return an innocuous NULL if a thread
// calling deregister_recvr() beat the thread calling find_recvr()
// in acquiring _lock.
// TODO: Rethink the lifecycle of DataStreamRecvr to distinguish
// errors from receiver-initiated teardowns.
return Status::OK;
}
recvr->add_batch(pb_batch, sender_id, be_number, packet_seq, done);
return Status::OK;
}
shared_ptr<DataStreamRecvr> recvr = find_recvr(t_finst_id, request->node_id());
Status DataStreamMgr::close_sender(const TUniqueId& fragment_instance_id,
PlanNodeId dest_node_id,
int sender_id,
int be_number) {
VLOG_FILE << "close_sender(): fragment_instance_id=" << fragment_instance_id
<< ", node=" << dest_node_id;
shared_ptr<DataStreamRecvr> recvr = find_recvr(fragment_instance_id, dest_node_id);
if (recvr == NULL) {
if (recvr == nullptr) {
// The receiver may remove itself from the receiver map via deregister_recvr()
// at any time without considering the remaining number of senders.
// As a consequence, find_recvr() may return an innocuous NULL if a thread
@ -135,7 +110,20 @@ Status DataStreamMgr::close_sender(const TUniqueId& fragment_instance_id,
// errors from receiver-initiated teardowns.
return Status::OK;
}
recvr->remove_sender(sender_id, be_number);
bool eos = request->eos();
if (request->has_row_batch()) {
recvr->add_batch(request->row_batch(), request->sender_id(),
request->be_number(), request->packet_seq(), eos ? nullptr : done);
}
if (request->has_query_statistics()) {
recvr->add_sub_plan_statistics(request->query_statistics(), request->sender_id());
}
if (eos) {
recvr->remove_sender(request->sender_id(), request->be_number());
}
return Status::OK;
}

View File

@ -30,7 +30,9 @@
#include "common/object_pool.h"
#include "runtime/descriptors.h" // for PlanNodeId
#include "runtime/mem_tracker.h"
#include "runtime/query_statistics.h"
#include "util/runtime_profile.h"
#include "gen_cpp/palo_internal_service.pb.h"
#include "gen_cpp/Types_types.h" // for TUniqueId
namespace google {
@ -76,27 +78,9 @@ public:
RuntimeState* state, const RowDescriptor& row_desc,
const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id,
int num_senders, int buffer_size, RuntimeProfile* profile,
bool is_merging);
bool is_merging, QueryStatisticsRecvr* sub_plan_query_statistics_recvr);
// Adds a row batch to the recvr identified by fragment_instance_id/dest_node_id
// if the recvr has not been cancelled. sender_id identifies the sender instance
// from which the data came.
// The call blocks if this ends up pushing the stream over its buffering limit;
// it unblocks when the consumer removed enough data to make space for
// row_batch.
// TODO: enforce per-sender quotas (something like 200% of buffer_size/#senders),
// so that a single sender can't flood the buffer and stall everybody else.
// Returns OK if successful, error status otherwise.
Status add_data(const PUniqueId& fragment_instance_id, int32_t node_id,
const PRowBatch& pb_batch, int32_t sender_id,
int32_t be_number, int64_t packet_seq,
::google::protobuf::Closure** done);
// Notifies the recvr associated with the fragment/node id that the specified
// sender has closed.
// Returns OK if successful, error status otherwise.
Status close_sender(const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id,
int sender_id, int be_number);
Status transmit_data(const PTransmitDataParams* request, ::google::protobuf::Closure** done);
// Closes all receivers registered for fragment_instance_id immediately.
void cancel(const TUniqueId& fragment_instance_id);

View File

@ -242,6 +242,7 @@ void DataStreamRecvr::SenderQueue::add_batch(
// it in this thread.
batch = new RowBatch(_recvr->row_desc(), pb_batch, _recvr->mem_tracker());
}
VLOG_ROW << "added #rows=" << batch->num_rows()
<< " batch_size=" << batch_size << "\n";
_batch_queue.emplace_back(batch_size, batch);
@ -350,7 +351,7 @@ DataStreamRecvr::DataStreamRecvr(
DataStreamMgr* stream_mgr, MemTracker* parent_tracker,
const RowDescriptor& row_desc, const TUniqueId& fragment_instance_id,
PlanNodeId dest_node_id, int num_senders, bool is_merging, int total_buffer_limit,
RuntimeProfile* profile) :
RuntimeProfile* profile, QueryStatisticsRecvr* sub_plan_query_statistics_recvr) :
_mgr(stream_mgr),
_fragment_instance_id(fragment_instance_id),
_dest_node_id(dest_node_id),
@ -358,7 +359,8 @@ DataStreamRecvr::DataStreamRecvr(
_row_desc(row_desc),
_is_merging(is_merging),
_num_buffered_bytes(0),
_profile(profile) {
_profile(profile),
_sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr) {
_mem_tracker.reset(new MemTracker(-1, "DataStreamRecvr", parent_tracker));
// Create one queue per sender if is_merging is true.

View File

@ -25,6 +25,7 @@
#include "common/status.h"
#include "gen_cpp/Types_types.h" // for TUniqueId
#include "runtime/descriptors.h"
#include "runtime/query_statistics.h"
#include "util/tuple_row_compare.h"
namespace google {
@ -99,6 +100,10 @@ public:
const RowDescriptor& row_desc() const { return _row_desc; }
MemTracker* mem_tracker() const { return _mem_tracker.get(); }
void add_sub_plan_statistics(const PQueryStatistics& statistics, int sender_id) {
_sub_plan_query_statistics_recvr->insert(statistics, sender_id);
}
private:
friend class DataStreamMgr;
class SenderQueue;
@ -106,7 +111,7 @@ private:
DataStreamRecvr(DataStreamMgr* stream_mgr, MemTracker* parent_tracker,
const RowDescriptor& row_desc, const TUniqueId& fragment_instance_id,
PlanNodeId dest_node_id, int num_senders, bool is_merging, int total_buffer_limit,
RuntimeProfile* profile);
RuntimeProfile* profile, QueryStatisticsRecvr* sub_plan_query_statistics_recvr);
// If receive queue is full, done is enqueue pending, and return with *done is nullptr
void add_batch(const PRowBatch& batch, int sender_id,
@ -194,6 +199,9 @@ private:
// Wall time senders spend waiting for the recv buffer to have capacity.
RuntimeProfile::Counter* _buffer_full_wall_timer;
// Sub plan query statistics receiver.
QueryStatisticsRecvr* _sub_plan_query_statistics_recvr;
// Total time spent waiting for data to arrive in the recv buffer
// RuntimeProfile::Counter* _data_arrival_timer;
};

View File

@ -71,7 +71,10 @@ public:
Channel(DataStreamSender* parent, const RowDescriptor& row_desc,
const TNetworkAddress& brpc_dest,
const TUniqueId& fragment_instance_id,
PlanNodeId dest_node_id, int buffer_size) :
PlanNodeId dest_node_id,
int buffer_size,
bool is_transfer_chain,
bool send_query_statistics_with_every_batch) :
_parent(parent),
_buffer_size(buffer_size),
_row_desc(row_desc),
@ -80,7 +83,9 @@ public:
_num_data_bytes_sent(0),
_packet_seq(0),
_need_close(false),
_brpc_dest_addr(brpc_dest) {
_brpc_dest_addr(brpc_dest),
_is_transfer_chain(is_transfer_chain),
_send_query_statistics_with_every_batch(send_query_statistics_with_every_batch) {
}
virtual ~Channel() {
@ -163,6 +168,9 @@ private:
palo::PInternalService_Stub* _brpc_stub = nullptr;
RefCountClosure<PTransmitDataResult>* _closure = nullptr;
int32_t _brpc_timeout_ms = 500;
// whether the dest can be treated as query statistics transfer chain.
bool _is_transfer_chain;
bool _send_query_statistics_with_every_batch;
};
Status DataStreamSender::Channel::init(RuntimeState* state) {
@ -203,6 +211,10 @@ Status DataStreamSender::Channel::send_batch(PRowBatch* batch, bool eos) {
}
VLOG_ROW << "Channel::send_batch() instance_id=" << _fragment_instance_id
<< " dest_node=" << _dest_node_id;
if (_is_transfer_chain && (_send_query_statistics_with_every_batch || eos)) {
auto statistic = _brpc_request.mutable_query_statistics();
_parent->_query_statistics->to_pb(statistic);
}
_brpc_request.set_eos(eos);
if (batch != nullptr) {
@ -285,7 +297,8 @@ DataStreamSender::DataStreamSender(
ObjectPool* pool, int sender_id,
const RowDescriptor& row_desc, const TDataStreamSink& sink,
const std::vector<TPlanFragmentDestination>& destinations,
int per_channel_buffer_size) :
int per_channel_buffer_size,
bool send_query_statistics_with_every_batch) :
_sender_id(sender_id),
_pool(pool),
_row_desc(row_desc),
@ -305,11 +318,14 @@ DataStreamSender::DataStreamSender(
|| sink.output_partition.type == TPartitionType::RANGE_PARTITIONED);
// TODO: use something like google3's linked_ptr here (scoped_ptr isn't copyable)
for (int i = 0; i < destinations.size(); ++i) {
// Select first dest as transfer chain.
bool is_transfer_chain = (i == 0);
_channel_shared_ptrs.emplace_back(
new Channel(this, row_desc,
destinations[i].brpc_server,
destinations[i].fragment_instance_id,
sink.dest_node_id, per_channel_buffer_size));
sink.dest_node_id, per_channel_buffer_size,
is_transfer_chain, send_query_statistics_with_every_batch));
_channels.push_back(_channel_shared_ptrs[i].get());
}
}

View File

@ -59,7 +59,7 @@ public:
DataStreamSender(ObjectPool* pool, int sender_id,
const RowDescriptor& row_desc, const TDataStreamSink& sink,
const std::vector<TPlanFragmentDestination>& destinations,
int per_channel_buffer_size);
int per_channel_buffer_size, bool send_query_statistics_with_every_batch);
virtual ~DataStreamSender();
virtual Status init(const TDataSink& thrift_sink);

View File

@ -54,7 +54,8 @@ PlanFragmentExecutor::PlanFragmentExecutor(
_prepared(false),
_closed(false),
_has_thread_token(false),
_is_report_success(true) {
_is_report_success(true),
_collect_query_statistics_with_every_batch(false) {
}
PlanFragmentExecutor::~PlanFragmentExecutor() {
@ -196,6 +197,9 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request) {
if (sink_profile != NULL) {
profile()->add_child(sink_profile, true, NULL);
}
_collect_query_statistics_with_every_batch = params.__isset.send_query_statistics_with_every_batch ?
params.send_query_statistics_with_every_batch : false;
} else {
_sink.reset(NULL);
}
@ -226,6 +230,9 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request) {
// _row_batch->tuple_data_pool()->set_limits(*_runtime_state->mem_trackers());
VLOG(3) << "plan_root=\n" << _plan->debug_string();
_prepared = true;
_query_statistics.reset(new QueryStatistics());
_sink->set_query_statistics(_query_statistics);
return Status::OK;
}
@ -315,8 +322,12 @@ Status PlanFragmentExecutor::open_internal() {
VLOG_ROW << row->to_string(row_desc());
}
}
SCOPED_TIMER(profile()->total_time_counter());
// Collect this plan and sub plan statisticss, and send to parent plan.
if (_collect_query_statistics_with_every_batch) {
collect_query_statistics();
}
RETURN_IF_ERROR(_sink->send(runtime_state(), batch));
}
@ -333,6 +344,7 @@ Status PlanFragmentExecutor::open_internal() {
// audit the sinks to check that this is ok, or change that behaviour.
{
SCOPED_TIMER(profile()->total_time_counter());
collect_query_statistics();
Status status = _sink->close(runtime_state(), _status);
RETURN_IF_ERROR(status);
}
@ -349,6 +361,11 @@ Status PlanFragmentExecutor::open_internal() {
return Status::OK;
}
void PlanFragmentExecutor::collect_query_statistics() {
_query_statistics->clear();
_plan->collect_query_statistics(_query_statistics.get());
}
void PlanFragmentExecutor::report_profile() {
VLOG_FILE << "report_profile(): instance_id="
<< _runtime_state->fragment_instance_id();

View File

@ -24,6 +24,7 @@
#include "common/status.h"
#include "common/object_pool.h"
#include "runtime/query_statistics.h"
#include "runtime/runtime_state.h"
namespace doris {
@ -204,6 +205,12 @@ private:
// of the execution.
RuntimeProfile::Counter* _average_thread_tokens;
// It is shared with BufferControlBlock and will be called in two different
// threads. But their calls are all at different time, there is no problem of
// multithreaded access.
std::shared_ptr<QueryStatistics> _query_statistics;
bool _collect_query_statistics_with_every_batch;
ObjectPool* obj_pool() {
return _runtime_state->obj_pool();
}
@ -256,6 +263,9 @@ private:
const DescriptorTbl& desc_tbl() {
return _runtime_state->desc_tbl();
}
void collect_query_statistics();
};
}

View File

@ -0,0 +1,48 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/query_statistics.h"
namespace doris {
void QueryStatistics::merge(QueryStatisticsRecvr* recvr) {
recvr->merge(this);
}
void QueryStatisticsRecvr::insert(const PQueryStatistics& statistics, int sender_id) {
std::lock_guard<SpinLock> l(_lock);
QueryStatistics* query_statistics = nullptr;
auto iter = _query_statistics.find(sender_id);
if (iter == _query_statistics.end()) {
query_statistics = new QueryStatistics;
_query_statistics[sender_id] = query_statistics;
} else {
query_statistics = iter->second;
}
query_statistics->from_pb(statistics);
}
QueryStatisticsRecvr::~QueryStatisticsRecvr() {
// It is unnecessary to lock here, because the destructor will be
// called alter DataStreamRecvr's close in ExchangeNode.
for (auto& pair : _query_statistics) {
delete pair.second;
}
_query_statistics.clear();
}
}

View File

@ -0,0 +1,101 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef DORIS_BE_EXEC_QUERY_STATISTICS_H
#define DORIS_BE_EXEC_QUERY_STATISTICS_H
#include <mutex>
#include "gen_cpp/data.pb.h"
#include "util/spinlock.h"
namespace doris {
class QueryStatisticsRecvr;
// This is responsible for collecting query statistics, usually it consists of
// two parts, one is current fragment or plan's statistics, the other is sub fragment
// or plan's statistics and QueryStatisticsRecvr is responsible for collecting it.
class QueryStatistics {
public:
QueryStatistics() : scan_rows(0), scan_bytes(0) {
}
void merge(const QueryStatistics& other) {
scan_rows += other.scan_rows;
scan_bytes += other.scan_bytes;
}
void add_scan_rows(int64_t scan_rows) {
this->scan_rows += scan_rows;
}
void add_scan_bytes(int64_t scan_bytes) {
this->scan_bytes += scan_bytes;
}
void merge(QueryStatisticsRecvr* recvr);
void clear() {
scan_rows = 0;
scan_bytes = 0;
}
void to_pb(PQueryStatistics* statistics) {
DCHECK(statistics != nullptr);
statistics->set_scan_rows(scan_rows);
statistics->set_scan_bytes(scan_bytes);
}
void from_pb(const PQueryStatistics& statistics) {
scan_rows = statistics.scan_rows();
scan_bytes = statistics.scan_bytes();
}
private:
int64_t scan_rows;
int64_t scan_bytes;
};
// It is used for collecting sub plan query statistics in DataStreamRecvr.
class QueryStatisticsRecvr {
public:
~QueryStatisticsRecvr();
void insert(const PQueryStatistics& statistics, int sender_id);
private:
friend class QueryStatistics;
void merge(QueryStatistics* statistics) {
std::lock_guard<SpinLock> l(_lock);
for (auto& pair : _query_statistics) {
statistics->merge(*(pair.second));
}
}
std::map<int, QueryStatistics*> _query_statistics;
SpinLock _lock;
};
}
#endif

View File

@ -92,5 +92,9 @@ Status ResultSink::close(RuntimeState* state, Status exec_status) {
return Status::OK;
}
void ResultSink::set_query_statistics(std::shared_ptr<QueryStatistics> statistics) {
_sender->set_query_statistics(statistics);
}
}
/* vim: set ts=4 sw=4 sts=4 tw=100 : */

View File

@ -56,6 +56,8 @@ public:
return _profile;
}
void set_query_statistics(std::shared_ptr<QueryStatistics> statistics) override;
private:
Status prepare_exprs(RuntimeState* state);

View File

@ -130,14 +130,14 @@ void BackendService::transmit_data(TTransmitDataResult& return_val,
}
if (params.eos) {
Status status = _exec_env->stream_mgr()->close_sender(
params.dest_fragment_instance_id,
params.dest_node_id,
params.sender_id,
params.be_number);
VLOG_ROW << "params.eos: " << (params.eos ? "true" : "false")
<< " close_sender status: " << status.get_error_msg();
status.set_t_status(&return_val);
// Status status = _exec_env->stream_mgr()->close_sender(
// params.dest_fragment_instance_id,
// params.dest_node_id,
// params.sender_id,
// params.be_number);
//VLOG_ROW << "params.eos: " << (params.eos ? "true" : "false")
// << " close_sender status: " << status.get_error_msg();
//status.set_t_status(&return_val);
}
}

View File

@ -46,22 +46,9 @@ void PInternalServiceImpl<T>::transmit_data(google::protobuf::RpcController* cnt
const PTransmitDataParams* request,
PTransmitDataResult* response,
google::protobuf::Closure* done) {
bool eos = request->eos();
if (request->has_row_batch()) {
_exec_env->stream_mgr()->add_data(
request->finst_id(), request->node_id(),
request->row_batch(), request->sender_id(),
request->be_number(), request->packet_seq(),
eos ? nullptr : &done);
}
if (eos) {
TUniqueId finst_id;
finst_id.__set_hi(request->finst_id().hi());
finst_id.__set_lo(request->finst_id().lo());
_exec_env->stream_mgr()->close_sender(
finst_id, request->node_id(),
request->sender_id(), request->be_number());
}
VLOG_ROW << "transmit data: fragment_instance_id=" << print_id(request->finst_id())
<< " node=" << request->node_id();
_exec_env->stream_mgr()->transmit_data(request, &done);
if (done != nullptr) {
done->Run();
}

View File

@ -39,7 +39,7 @@ public class CurrentQueryFragmentProcNode implements ProcNodeInterface {
private static final Logger LOG = LogManager.getLogger(CurrentQueryFragmentProcNode.class);
public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
.add("FragmentId").add("InstanceId").add("Host")
.add("IO").add("CPU").build();
.add("ScanRawData").add("ProcessRows").build();
private QueryStatisticsItem item;
public CurrentQueryFragmentProcNode(QueryStatisticsItem item) {
@ -79,8 +79,8 @@ public class CurrentQueryFragmentProcNode implements ProcNodeInterface {
rowData.add(instanceConsumption.getFragmentId());
rowData.add(instanceConsumption.getInstanceId().toString());
rowData.add(instanceConsumption.getAddress().toString());
rowData.add(String.valueOf(instanceConsumption.getTotalIoConsumption()));
rowData.add(String.valueOf(instanceConsumption.getTotalCpuConsumption()));
rowData.add(instanceConsumption.getFormattingScanBytes());
rowData.add(instanceConsumption.getFormattingProcessRows());
sortedRowDatas.add(rowData);
}

View File

@ -36,6 +36,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Collection;
import java.util.Formatter;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
@ -334,21 +335,35 @@ public class CurrentQueryInfoProvider {
}
}
public long getTotalCpuConsumption() {
private long getTotalCpuConsumption() {
long cpu = 0;
for (ConsumptionCalculator consumption : calculators) {
cpu += consumption.getCpu();
cpu += consumption.getProcessRows();
}
return cpu;
}
public long getTotalIoConsumption() {
private long getTotalIoConsumption() {
long io = 0;
for (ConsumptionCalculator consumption : calculators) {
io += consumption.getIo();
io += consumption.getScanBytes();
}
return io;
}
public String getFormattingProcessRows() {
final StringBuilder builder = new StringBuilder();
builder.append(getTotalCpuConsumption()).append(" Rows");
return builder.toString();
}
public String getFormattingScanBytes() {
final Pair<Double, String> pair = DebugUtil.getByteUint(getTotalIoConsumption());
final Formatter fmt = new Formatter();
final StringBuilder builder = new StringBuilder();
builder.append(fmt.format("%.2f", pair.first)).append(" ").append(pair.second);
return builder.toString();
}
}
public static class InstanceConsumption extends Consumption {
@ -388,27 +403,27 @@ public class CurrentQueryInfoProvider {
this.counterMaps = counterMaps;
}
public long getCpu() {
public long getProcessRows() {
long cpu = 0;
for (Map<String, Counter> counters : counterMaps) {
cpu += getCpuByRows(counters);
cpu += getProcessRows(counters);
}
return cpu;
}
public long getIo() {
public long getScanBytes() {
long io = 0;
for (Map<String, Counter> counters : counterMaps) {
io += getIoByByte(counters);
io += getScanBytes(counters);
}
return io;
}
protected long getCpuByRows(Map<String, Counter> counters) {
protected long getProcessRows(Map<String, Counter> counters) {
return 0;
}
protected long getIoByByte(Map<String, Counter> counters) {
protected long getScanBytes(Map<String, Counter> counters) {
return 0;
}
}
@ -419,7 +434,7 @@ public class CurrentQueryInfoProvider {
}
@Override
protected long getIoByByte(Map<String, Counter> counters) {
protected long getScanBytes(Map<String, Counter> counters) {
final Counter counter = counters.get("CompressedBytesRead");
return counter == null ? 0 : counter.getValue();
}
@ -431,7 +446,7 @@ public class CurrentQueryInfoProvider {
}
@Override
protected long getCpuByRows(Map<String, Counter> counters) {
protected long getProcessRows(Map<String, Counter> counters) {
final Counter probeCounter = counters.get("ProbeRows");
final Counter buildCounter = counters.get("BuildRows");
return probeCounter == null || buildCounter == null ?
@ -445,7 +460,7 @@ public class CurrentQueryInfoProvider {
}
@Override
protected long getCpuByRows(Map<String, Counter> counters) {
protected long getProcessRows(Map<String, Counter> counters) {
final Counter buildCounter = counters.get("BuildRows");
return buildCounter == null ? 0 : buildCounter.getValue();
}
@ -457,7 +472,7 @@ public class CurrentQueryInfoProvider {
}
@Override
protected long getCpuByRows(Map<String, Counter> counters) {
protected long getProcessRows(Map<String, Counter> counters) {
final Counter sortRowsCounter = counters.get("SortRows");
return sortRowsCounter == null ? 0 : sortRowsCounter.getValue();
}
@ -469,7 +484,7 @@ public class CurrentQueryInfoProvider {
}
@Override
protected long getCpuByRows(Map<String, Counter> counters) {
protected long getProcessRows(Map<String, Counter> counters) {
final Counter processRowsCounter = counters.get("ProcessRows");
return processRowsCounter == null ? 0 : processRowsCounter.getValue();
@ -482,7 +497,7 @@ public class CurrentQueryInfoProvider {
}
@Override
protected long getCpuByRows(Map<String, Counter> counters) {
protected long getProcessRows(Map<String, Counter> counters) {
final Counter materializeRowsCounter = counters.get("MaterializeRows");
return materializeRowsCounter == null ? 0 : materializeRowsCounter.getValue();
}
@ -495,7 +510,7 @@ public class CurrentQueryInfoProvider {
}
@Override
protected long getCpuByRows(Map<String, Counter> counters) {
protected long getProcessRows(Map<String, Counter> counters) {
final Counter mergeRowsCounter = counters.get("MergeRows");
return mergeRowsCounter == null ? 0 : mergeRowsCounter.getValue();
}

View File

@ -37,7 +37,7 @@ public class CurrentQueryStatisticsProcDir implements ProcDirInterface {
private static final Logger LOG = LogManager.getLogger(CurrentQueryStatisticsProcDir.class);
public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
.add("ConnectionId").add("QueryId").add("Database").add("User")
.add("IO").add("CPU").add("ExecTime").build();
.add("ScanRawData").add("ProcessRows").add("ExecTime").build();
private static final int EXEC_TIME_INDEX = 6;
@ -76,8 +76,8 @@ public class CurrentQueryStatisticsProcDir implements ProcDirInterface {
values.add(item.getDb());
values.add(item.getUser());
final CurrentQueryInfoProvider.Consumption consumption = consumptions.get(item.getQueryId());
values.add(String.valueOf(consumption.getTotalIoConsumption()));
values.add(String.valueOf(consumption.getTotalCpuConsumption()));
values.add(consumption.getFormattingScanBytes());
values.add(consumption.getFormattingProcessRows());
values.add(item.getQueryExecTime());
sortedRowData.add(values);
}

View File

@ -96,6 +96,11 @@ public class PlanFragment extends TreeNode<PlanFragment> {
// if the output is UNPARTITIONED, it is being broadcast
private DataPartition outputPartition;
// Whether query statistics is sent with every batch. In order to get the query
// statistics correctly when query contains limit, it is necessary to send query
// statistics with every batch, or only in close.
private boolean transferQueryStatisticsWithEveryBatch;
// TODO: SubstitutionMap outputSmap;
// substitution map to remap exprs onto the output of this fragment, to be applied
// at destination fragment
@ -108,6 +113,7 @@ public class PlanFragment extends TreeNode<PlanFragment> {
this.planRoot = root;
this.dataPartition = partition;
this.outputPartition = DataPartition.UNPARTITIONED;
this.transferQueryStatisticsWithEveryBatch = false;
setFragmentInPlanTree(planRoot);
}
@ -180,7 +186,6 @@ public class PlanFragment extends TreeNode<PlanFragment> {
// TODO chenhao , calculated by cost
result.setMin_reservation_bytes(0);
result.setInitial_reservation_total_claims(0);
return result;
}
@ -270,4 +275,11 @@ public class PlanFragment extends TreeNode<PlanFragment> {
return fragmentId;
}
public void setTransferQueryStatisticsWithEveryBatch(boolean value) {
transferQueryStatisticsWithEveryBatch = value;
}
public boolean isTransferQueryStatisticsWithEveryBatch() {
return transferQueryStatisticsWithEveryBatch;
}
}

View File

@ -176,7 +176,11 @@ public class Planner {
fragments = distributedPlanner.createPlanFragments(singleNodePlan);
}
// Optimize the transfer of query statistic when query does't contain limit.
PlanFragment rootFragment = fragments.get(fragments.size() - 1);
QueryStatisticsTransferOptimizer queryStatisticTransferOptimizer = new QueryStatisticsTransferOptimizer(rootFragment);
queryStatisticTransferOptimizer.optimizeQueryStatisticsTransfer();
if (statment instanceof InsertStmt) {
InsertStmt insertStmt = (InsertStmt) statment;
rootFragment = distributedPlanner.createInsertFragment(rootFragment, insertStmt, fragments);
@ -230,4 +234,53 @@ public class Planner {
Preconditions.checkState(selectNode.hasValidStats());
return selectNode;
}
private static class QueryStatisticsTransferOptimizer {
private final PlanFragment root;
public QueryStatisticsTransferOptimizer(PlanFragment root) {
Preconditions.checkNotNull(root);
this.root = root;
}
public void optimizeQueryStatisticsTransfer() {
optimizeQueryStatisticsTransfer(root, null);
}
private void optimizeQueryStatisticsTransfer(PlanFragment fragment, PlanFragment parent) {
if (parent != null && hasLimit(parent.getPlanRoot(), fragment.getPlanRoot())) {
fragment.setTransferQueryStatisticsWithEveryBatch(true);
}
for (PlanFragment child : fragment.getChildren()) {
optimizeQueryStatisticsTransfer(child, fragment);
}
}
// Check whether leaf node contains limit.
private boolean hasLimit(PlanNode ancestor, PlanNode successor) {
final List<PlanNode> exchangeNodes = Lists.newArrayList();
collectExchangeNode(ancestor, exchangeNodes);
for (PlanNode leaf : exchangeNodes) {
if (leaf.getChild(0) == successor
&& leaf.hasLimit()) {
return true;
}
}
return false;
}
private void collectExchangeNode(PlanNode planNode, List<PlanNode> exchangeNodes) {
if (planNode instanceof ExchangeNode) {
exchangeNodes.add(planNode);
}
for (PlanNode child : planNode.getChildren()) {
if (child instanceof ExchangeNode) {
exchangeNodes.add(child);
} else {
collectExchangeNode(child, exchangeNodes);
}
}
}
}
}

View File

@ -28,15 +28,19 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mysql.MysqlChannel;
import org.apache.doris.mysql.MysqlCommand;
import org.apache.doris.mysql.MysqlPacket;
import org.apache.doris.mysql.MysqlProto;
import org.apache.doris.mysql.MysqlSerializer;
import org.apache.doris.rpc.PQueryStatistics;
import org.apache.doris.thrift.TMasterOpRequest;
import org.apache.doris.thrift.TMasterOpResult;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.logging.log4j.LogManager;
@ -46,6 +50,7 @@ import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.util.Formatter;
import java.util.List;
/**
@ -92,12 +97,30 @@ public class ConnectProcessor {
ctx.getState().setOk();
}
private void auditAfterExec(String origStmt, StatementBase parsedStmt) {
private String getFormattingScanRows(PQueryStatistics statistics) {
final StringBuilder builder = new StringBuilder();
builder.append(statistics.scanRows).append(" Rows");
return builder.toString();
}
private String getFormattingScanBytes(PQueryStatistics statistics) {
final Pair<Double, String> pair = DebugUtil.getByteUint(statistics.scanBytes);
final Formatter fmt = new Formatter();
final StringBuilder builder = new StringBuilder();
builder.append(fmt.format("%.2f", pair.first)).append(" ").append(pair.second);
return builder.toString();
}
private void auditAfterExec(String origStmt, StatementBase parsedStmt,
PQueryStatistics statistics) {
// slow query
long elapseMs = System.currentTimeMillis() - ctx.getStartTime();
// query state log
ctx.getAuditBuilder().put("state", ctx.getState());
ctx.getAuditBuilder().put("time", elapseMs);
Preconditions.checkNotNull(statistics);
ctx.getAuditBuilder().put("ScanRows", getFormattingScanRows(statistics));
ctx.getAuditBuilder().put("ScanRawData", getFormattingScanBytes(statistics));
ctx.getAuditBuilder().put("returnRows", ctx.getReturnRows());
ctx.getAuditBuilder().put("stmt_id", ctx.getStmtId());
@ -177,7 +200,8 @@ public class ConnectProcessor {
// audit after exec
// replace '\n' to '\\n' to make string in one line
auditAfterExec(stmt.replace("\n", " \\n"), executor.getParsedStmt());
auditAfterExec(stmt.replace("\n", " \\n"), executor.getParsedStmt(),
executor.getQueryStatisticsForAuditLog());
}
// Get the column definitions of a table

View File

@ -574,12 +574,12 @@ public class Coordinator {
}
}
TResultBatch getNext() throws Exception {
public RowBatch getNext() throws Exception {
if (receiver == null) {
throw new UserException("There is no receiver.");
}
TResultBatch resultBatch;
RowBatch resultBatch;
Status status = new Status();
resultBatch = receiver.getNext(status);
@ -611,7 +611,7 @@ public class Coordinator {
}
}
if (resultBatch == null) {
if (resultBatch.isEos()) {
this.returnedAllResults = true;
// if this query is a block query do not cancel.
@ -622,7 +622,7 @@ public class Coordinator {
cancelInternal();
}
} else {
numReceivedRows += resultBatch.getRowsSize();
numReceivedRows += resultBatch.getBatch().getRowsSize();
}
return resultBatch;
@ -1038,7 +1038,7 @@ public class Coordinator {
return result;
}
public void createScanInstance(PlanNodeId leftMostScanId, FragmentExecParams fragmentExecParams)
private void createScanInstance(PlanNodeId leftMostScanId, FragmentExecParams fragmentExecParams)
throws UserException {
int maxNumInstance = queryOptions.mt_dop;
if (maxNumInstance == 0) {
@ -1150,7 +1150,7 @@ public class Coordinator {
}
// create collocated instance according to inputFragments
public void createCollocatedInstance(FragmentExecParams fragmentExecParams) {
private void createCollocatedInstance(FragmentExecParams fragmentExecParams) {
Preconditions.checkState(fragmentExecParams.inputFragments.size() >= 1);
final FragmentExecParams inputFragmentParams = fragmentExecParamsMap.get(fragmentExecParams.
inputFragments.get(0));
@ -1169,7 +1169,7 @@ public class Coordinator {
}
public void createUnionInstance(FragmentExecParams fragmentExecParams) {
private void createUnionInstance(FragmentExecParams fragmentExecParams) {
final PlanFragment fragment = fragmentExecParams.fragment;
// Add hosts of scan nodes
List<PlanNodeId> scanNodeIds = findScanNodes(fragment.getPlanRoot());
@ -1563,7 +1563,6 @@ public class Coordinator {
params.setResource_info(tResourceInfo);
params.params.setQuery_id(queryId);
params.params.setFragment_instance_id(instanceExecParam.instanceId);
Map<Integer, List<TScanRangeParams>> scanRanges = instanceExecParam.perNodeScanRanges;
if (scanRanges == null) {
scanRanges = Maps.newHashMap();
@ -1579,7 +1578,8 @@ public class Coordinator {
params.setBackend_num(backendNum++);
params.setQuery_globals(queryGlobals);
params.setQuery_options(queryOptions);
params.params.setSend_query_statistics_with_every_batch(
fragment.isTransferQueryStatisticsWithEveryBatch());
if (queryOptions.getQuery_type() == TQueryType.LOAD) {
LoadErrorHub.Param param = Catalog.getCurrentCatalog().getLoadInstance().getLoadErrorHubInfo();
if (param != null) {

View File

@ -58,11 +58,11 @@ public class ResultReceiver {
this.timeoutTs = System.currentTimeMillis() + timeoutMs;
}
public TResultBatch getNext(Status status) throws TException {
public RowBatch getNext(Status status) throws TException {
if (isDone) {
return null;
}
final RowBatch rowBatch = new RowBatch();
try {
while (!isDone && !isCancel) {
PFetchDataRequest request = new PFetchDataRequest(finstId);
@ -90,7 +90,9 @@ public class ResultReceiver {
if (code != TStatusCode.OK) {
status.setPstatus(pResult.status);
return null;
}
}
rowBatch.setQueryStatistics(pResult.statistics);
if (packetIdx != pResult.packetSeq) {
LOG.warn("receive packet failed, expect={}, receive={}", packetIdx, pResult.packetSeq);
@ -106,7 +108,9 @@ public class ResultReceiver {
TResultBatch resultBatch = new TResultBatch();
TDeserializer deserializer = new TDeserializer();
deserializer.deserialize(resultBatch, serialResult);
return resultBatch;
rowBatch.setBatch(resultBatch);
rowBatch.setEos(pResult.eos);
return rowBatch;
}
}
} catch (RpcException e) {
@ -134,7 +138,7 @@ public class ResultReceiver {
if (isCancel) {
status.setStatus(Status.CANCELLED);
}
return null;
return rowBatch;
}
public void cancel() {

View File

@ -0,0 +1,55 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.qe;
import org.apache.doris.rpc.PQueryStatistics;
import org.apache.doris.thrift.TResultBatch;
public final class RowBatch {
private TResultBatch batch;
private PQueryStatistics statistics;
private boolean eos;
public RowBatch() {
eos = true;
}
public TResultBatch getBatch() {
return batch;
}
public void setBatch(TResultBatch batch) {
this.batch = batch;
}
public PQueryStatistics getQueryStatistics() {
return statistics;
}
public void setQueryStatistics(PQueryStatistics statistics) {
this.statistics = statistics;
}
public boolean isEos() {
return eos;
}
public void setEos(boolean eos) {
this.eos = eos;
}
}

View File

@ -63,6 +63,7 @@ import org.apache.doris.mysql.MysqlSerializer;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.planner.Planner;
import org.apache.doris.rewrite.ExprRewriter;
import org.apache.doris.rpc.PQueryStatistics;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TQueryOptions;
@ -101,6 +102,7 @@ public class StmtExecutor {
private Planner planner;
private boolean isProxy;
private ShowResultSet proxyResultSet = null;
private PQueryStatistics statisticsForAuditLog;
public StmtExecutor(ConnectContext context, String stmt, boolean isProxy) {
this.context = context;
@ -539,23 +541,23 @@ public class StmtExecutor {
// so We need to send fields after first batch arrived
// send result
TResultBatch batch;
RowBatch batch;
MysqlChannel channel = context.getMysqlChannel();
boolean isSendFields = false;
while ((batch = coord.getNext()) != null) {
if (!isSendFields) {
sendFields(queryStmt.getColLabels(), queryStmt.getResultExprs());
sendFields(queryStmt.getColLabels(), queryStmt.getResultExprs());
while (true) {
batch = coord.getNext();
if (batch.getBatch() != null) {
for (ByteBuffer row : batch.getBatch().getRows()) {
channel.sendOnePacket(row);
}
context.updateReturnRows(batch.getBatch().getRows().size());
}
isSendFields = true;
if (batch.isEos()) {
break;
}
}
for (ByteBuffer row : batch.getRows()) {
channel.sendOnePacket(row);
}
context.updateReturnRows(batch.getRows().size());
}
if (!isSendFields) {
sendFields(queryStmt.getColLabels(), queryStmt.getResultExprs());
}
statisticsForAuditLog = batch.getQueryStatistics();
context.getState().setEof();
}
@ -776,4 +778,11 @@ public class StmtExecutor {
ExportStmt exportStmt = (ExportStmt) parsedStmt;
context.getCatalog().getExportMgr().addExportJob(exportStmt);
}
public PQueryStatistics getQueryStatisticsForAuditLog() {
if (statisticsForAuditLog == null) {
statisticsForAuditLog = new PQueryStatistics();
}
return statisticsForAuditLog;
}
}

View File

@ -28,4 +28,6 @@ public class PFetchDataResult {
public long packetSeq;
@Protobuf(order = 3, required = false)
public boolean eos;
@Protobuf(order = 4, required = false)
public PQueryStatistics statistics;
}

View File

@ -0,0 +1,35 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.rpc;
import com.baidu.bjf.remoting.protobuf.annotation.Protobuf;
import com.baidu.bjf.remoting.protobuf.annotation.ProtobufClass;
@ProtobufClass
public class PQueryStatistics {
@Protobuf(order = 1, required = false)
public long scanRows;
@Protobuf(order = 2, required = false)
public long scanBytes;
public PQueryStatistics() {
scanRows = 0;
scanBytes = 0;
}
}

View File

@ -26,6 +26,7 @@ import org.apache.doris.mysql.MysqlEofPacket;
import org.apache.doris.mysql.MysqlErrPacket;
import org.apache.doris.mysql.MysqlOkPacket;
import org.apache.doris.mysql.MysqlSerializer;
import org.apache.doris.rpc.PQueryStatistics;
import org.easymock.EasyMock;
import org.junit.Assert;
@ -231,6 +232,7 @@ public class ConnectProcessorTest {
// Mock statement executor
StmtExecutor qe = EasyMock.createNiceMock(StmtExecutor.class);
qe.execute();
EasyMock.expect(qe.getQueryStatisticsForAuditLog()).andReturn(new PQueryStatistics());
EasyMock.expectLastCall().anyTimes();
EasyMock.replay(qe);
PowerMock.expectNew(
@ -254,11 +256,11 @@ public class ConnectProcessorTest {
StmtExecutor qe = EasyMock.createNiceMock(StmtExecutor.class);
qe.execute();
EasyMock.expectLastCall().andThrow(new IOException("Fail")).anyTimes();
EasyMock.expect(qe.getQueryStatisticsForAuditLog()).andReturn(new PQueryStatistics());
EasyMock.replay(qe);
PowerMock.expectNew(StmtExecutor.class, EasyMock.isA(ConnectContext.class), EasyMock.isA(String.class))
.andReturn(qe).anyTimes();
PowerMock.replay(StmtExecutor.class);
processor.processOnce();
Assert.assertEquals(MysqlCommand.COM_QUERY, myContext.getCommand());
}
@ -272,6 +274,7 @@ public class ConnectProcessorTest {
// Mock statement executor
StmtExecutor qe = EasyMock.createNiceMock(StmtExecutor.class);
qe.execute();
EasyMock.expect(qe.getQueryStatisticsForAuditLog()).andReturn(new PQueryStatistics());
EasyMock.expectLastCall().andThrow(new NullPointerException("Fail")).anyTimes();
EasyMock.replay(qe);
PowerMock.expectNew(StmtExecutor.class, EasyMock.isA(ConnectContext.class), EasyMock.isA(String.class))

View File

@ -160,7 +160,7 @@ public class StmtExecutorTest {
cood.endProfile();
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(cood.getQueryProfile()).andReturn(new RuntimeProfile()).anyTimes();
EasyMock.expect(cood.getNext()).andReturn(null).anyTimes();
EasyMock.expect(cood.getNext()).andReturn(new RowBatch()).anyTimes();
EasyMock.replay(cood);
PowerMock.expectNew(Coordinator.class, EasyMock.isA(ConnectContext.class),
EasyMock.isA(Analyzer.class), EasyMock.isA(Planner.class))

View File

@ -19,6 +19,11 @@ syntax="proto2";
package doris;
message PQueryStatistics {
optional int64 scan_rows = 1;
optional int64 scan_bytes = 2;
}
message PRowBatch {
required int32 num_rows = 1;
repeated int32 row_tuples = 2;

View File

@ -41,6 +41,7 @@ message PTransmitDataParams {
optional PRowBatch row_batch = 6;
// different per packet
required int64 packet_seq = 7;
optional PQueryStatistics query_statistics = 8;
};
message PTransmitDataResult {
@ -129,6 +130,7 @@ message PFetchDataResult {
// valid when status is ok
optional int64 packet_seq = 2;
optional bool eos = 3;
optional PQueryStatistics query_statistics = 4;
};
message PTriggerProfileReportRequest {

View File

@ -175,6 +175,7 @@ struct TPlanFragmentExecParams {
// Id of this fragment in its role as a sender.
9: optional i32 sender_id
10: optional i32 num_senders
11: optional bool send_query_statistics_with_every_batch
}
// Global query parameters assigned by the coordinator.