[improve](stream-load) add observability on receiving HTTP request #30432
This commit is contained in:
committed by
yiguolei
parent
5a0764b288
commit
822f2b1255
@ -77,6 +77,8 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_requests_total, MetricUnit::
|
||||
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_duration_ms, MetricUnit::MILLISECONDS);
|
||||
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(streaming_load_current_processing, MetricUnit::REQUESTS);
|
||||
|
||||
bvar::LatencyRecorder g_stream_load_receive_data_latency_ms("stream_load_receive_data_latency_ms");
|
||||
|
||||
static constexpr size_t MIN_CHUNK_SIZE = 64 * 1024;
|
||||
static const string CHUNK = "chunked";
|
||||
|
||||
@ -195,9 +197,11 @@ int StreamLoadAction::on_header(HttpRequest* req) {
|
||||
|
||||
LOG(INFO) << "new income streaming load request." << ctx->brief() << ", db=" << ctx->db
|
||||
<< ", tbl=" << ctx->table << ", group_commit=" << ctx->group_commit;
|
||||
ctx->begin_receive_and_read_data_cost_nanos = MonotonicNanos();
|
||||
|
||||
if (st.ok()) {
|
||||
st = _on_header(req, ctx);
|
||||
LOG(INFO) << "finished to handle HTTP header, " << ctx->brief();
|
||||
}
|
||||
if (!st.ok()) {
|
||||
ctx->status = std::move(st);
|
||||
@ -350,7 +354,15 @@ void StreamLoadAction::on_chunk_data(HttpRequest* req) {
|
||||
}
|
||||
ctx->receive_bytes += remove_bytes;
|
||||
}
|
||||
ctx->read_data_cost_nanos += (MonotonicNanos() - start_read_data_time);
|
||||
int64_t read_data_time = MonotonicNanos() - start_read_data_time;
|
||||
int64_t last_receive_and_read_data_cost_nanos = ctx->receive_and_read_data_cost_nanos;
|
||||
ctx->read_data_cost_nanos += read_data_time;
|
||||
ctx->receive_and_read_data_cost_nanos =
|
||||
MonotonicNanos() - ctx->begin_receive_and_read_data_cost_nanos;
|
||||
g_stream_load_receive_data_latency_ms
|
||||
<< (ctx->receive_and_read_data_cost_nanos - last_receive_and_read_data_cost_nanos -
|
||||
read_data_time) /
|
||||
1000000;
|
||||
}
|
||||
|
||||
void StreamLoadAction::free_handler_ctx(std::shared_ptr<void> param) {
|
||||
|
||||
@ -208,6 +208,8 @@ public:
|
||||
int64_t pre_commit_txn_cost_nanos = 0;
|
||||
int64_t read_data_cost_nanos = 0;
|
||||
int64_t write_data_cost_nanos = 0;
|
||||
int64_t receive_and_read_data_cost_nanos = 0;
|
||||
int64_t begin_receive_and_read_data_cost_nanos = 0;
|
||||
|
||||
std::string error_url = "";
|
||||
// if label already be used, set existing job's status here
|
||||
|
||||
@ -68,7 +68,7 @@ Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
|
||||
// submit this params
|
||||
#ifndef BE_TEST
|
||||
ctx->start_write_data_nanos = MonotonicNanos();
|
||||
LOG(INFO) << "begin to execute job. label=" << ctx->label << ", txn_id=" << ctx->txn_id
|
||||
LOG(INFO) << "begin to execute stream load. label=" << ctx->label << ", txn_id=" << ctx->txn_id
|
||||
<< ", query_id=" << print_id(ctx->put_result.params.params.query_id);
|
||||
Status st;
|
||||
auto exec_fragment = [ctx, this](RuntimeState* state, Status* status) {
|
||||
@ -148,6 +148,14 @@ Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
|
||||
static_cast<void>(this->commit_txn(ctx.get()));
|
||||
}
|
||||
}
|
||||
|
||||
LOG(INFO) << "finished to execute stream load. label=" << ctx->label
|
||||
<< ", txn_id=" << ctx->txn_id
|
||||
<< ", query_id=" << print_id(ctx->put_result.params.params.query_id)
|
||||
<< ", receive_data_cost_ms="
|
||||
<< (ctx->receive_and_read_data_cost_nanos - ctx->read_data_cost_nanos) / 1000000
|
||||
<< ", read_data_cost_ms=" << ctx->read_data_cost_nanos / 1000000
|
||||
<< ", write_data_cost_ms=" << ctx->write_data_cost_nanos / 1000000;
|
||||
};
|
||||
|
||||
if (ctx->put_result.__isset.params) {
|
||||
|
||||
Reference in New Issue
Block a user