[feature-wip](new-scan) Add memtracker and span for new olap scan node (#12281)
Add memtracker and span for new olap scan node
This commit is contained in:
@ -110,6 +110,7 @@ set(UTIL_FILES
|
||||
tuple_row_zorder_compare.cpp
|
||||
telemetry/telemetry.cpp
|
||||
telemetry/brpc_carrier.cpp
|
||||
telemetry/open_telemetry_scop_wrapper.hpp
|
||||
quantile_state.cpp
|
||||
jni-util.cpp
|
||||
)
|
||||
|
||||
51
be/src/util/telemetry/open_telemetry_scop_wrapper.hpp
Normal file
51
be/src/util/telemetry/open_telemetry_scop_wrapper.hpp
Normal file
@ -0,0 +1,51 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "opentelemetry/trace/provider.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
using OpentelemetryTracer = opentelemetry::nostd::shared_ptr<opentelemetry::trace::Tracer>;
|
||||
using OpentelemetryScope = opentelemetry::trace::Scope;
|
||||
using OpentelemetrySpan = opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span>;
|
||||
|
||||
class OpenTelemetryScopeWrapper {
|
||||
public:
|
||||
OpenTelemetryScopeWrapper(bool enable, OpentelemetryTracer tracer, const std::string& name) {
|
||||
if (enable) {
|
||||
auto span = tracer->StartSpan(name);
|
||||
_scope.reset(new OpentelemetryScope(span));
|
||||
}
|
||||
}
|
||||
|
||||
OpenTelemetryScopeWrapper(bool enable, OpentelemetryTracer tracer, OpentelemetrySpan span,
|
||||
const std::string& name) {
|
||||
if (enable) {
|
||||
if (UNLIKELY(!span)) {
|
||||
span = tracer->StartSpan(name);
|
||||
}
|
||||
_scope.reset(new OpentelemetryScope(span));
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
std::unique_ptr<OpentelemetryScope> _scope;
|
||||
};
|
||||
|
||||
} // namespace doris
|
||||
@ -18,6 +18,7 @@
|
||||
#pragma once
|
||||
|
||||
#include "opentelemetry/trace/provider.h"
|
||||
#include "util/telemetry/open_telemetry_scop_wrapper.hpp"
|
||||
|
||||
/// A trace represents the execution process of a single request in the system, span represents a
|
||||
/// logical operation unit with start time and execution duration in the system, and multiple spans
|
||||
@ -61,6 +62,12 @@ using OpentelemetryScope = opentelemetry::trace::Scope;
|
||||
auto span = tracer->StartSpan(name); \
|
||||
OpentelemetryScope scope {span};
|
||||
|
||||
#define START_AND_SCOPE_SPAN_IF(enable, tracer, name) \
|
||||
OpenTelemetryScopeWrapper(enable, tracer, name)
|
||||
|
||||
#define INIT_AND_SCOPE_REENTRANT_SPAN_IF(enable, tracer, reentrant_span, name) \
|
||||
OpenTelemetryScopeWrapper(enable, tracer, reentrant_span, name)
|
||||
|
||||
namespace telemetry {
|
||||
|
||||
void init_tracer();
|
||||
|
||||
@ -37,6 +37,7 @@ NewOlapScanNode::NewOlapScanNode(ObjectPool* pool, const TPlanNode& tnode,
|
||||
|
||||
Status NewOlapScanNode::prepare(RuntimeState* state) {
|
||||
RETURN_IF_ERROR(VScanNode::prepare(state));
|
||||
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
|
||||
_scanner_mem_tracker = std::make_unique<MemTracker>("OlapScanners");
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -114,7 +114,7 @@ Status NewOlapScanner::prepare(
|
||||
|
||||
Status NewOlapScanner::open(RuntimeState* state) {
|
||||
RETURN_IF_ERROR(VScanner::open(state));
|
||||
// SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
|
||||
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
|
||||
|
||||
auto res = _tablet_reader->init(_tablet_reader_params);
|
||||
if (!res.ok()) {
|
||||
|
||||
@ -21,6 +21,7 @@
|
||||
|
||||
#include "common/status.h"
|
||||
#include "runtime/descriptors.h"
|
||||
#include "util/telemetry/telemetry.h"
|
||||
#include "util/uid_util.h"
|
||||
#include "vec/core/block.h"
|
||||
|
||||
@ -123,6 +124,8 @@ public:
|
||||
|
||||
VScanNode* parent() { return _parent; }
|
||||
|
||||
OpentelemetrySpan scan_span() { return _scan_span; }
|
||||
|
||||
public:
|
||||
// the unique id of this context
|
||||
std::string ctx_id;
|
||||
@ -209,6 +212,8 @@ private:
|
||||
|
||||
int64_t _num_ctx_scheduling = 0;
|
||||
int64_t _num_scanner_scheduling = 0;
|
||||
|
||||
OpentelemetrySpan _scan_span;
|
||||
};
|
||||
} // namespace vectorized
|
||||
} // namespace doris
|
||||
|
||||
@ -20,6 +20,7 @@
|
||||
#include "common/config.h"
|
||||
#include "util/priority_thread_pool.hpp"
|
||||
#include "util/priority_work_stealing_thread_pool.hpp"
|
||||
#include "util/telemetry/telemetry.h"
|
||||
#include "util/thread.h"
|
||||
#include "util/threadpool.h"
|
||||
#include "vec/core/block.h"
|
||||
@ -165,9 +166,8 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* ctx) {
|
||||
|
||||
void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext* ctx,
|
||||
VScanner* scanner) {
|
||||
// TODO: rethink mem tracker and span
|
||||
// START_AND_SCOPE_SPAN(scanner->runtime_state()->get_tracer(), span,
|
||||
// "ScannerScheduler::_scanner_scan");
|
||||
INIT_AND_SCOPE_REENTRANT_SPAN_IF(ctx->state()->enable_profile(), ctx->state()->get_tracer(),
|
||||
ctx->scan_span(), "VScanner::scan");
|
||||
SCOPED_ATTACH_TASK(scanner->runtime_state());
|
||||
|
||||
Thread::set_self_name("_scanner_scan");
|
||||
|
||||
@ -51,6 +51,7 @@ static bool ignore_cast(SlotDescriptor* slot, VExpr* expr) {
|
||||
}
|
||||
|
||||
Status VScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
|
||||
START_AND_SCOPE_SPAN_IF(state->enable_profile(), state->get_tracer(), "VScanNode::init");
|
||||
RETURN_IF_ERROR(ExecNode::init(tnode, state));
|
||||
_state = state;
|
||||
|
||||
@ -88,7 +89,7 @@ Status VScanNode::prepare(RuntimeState* state) {
|
||||
}
|
||||
|
||||
Status VScanNode::open(RuntimeState* state) {
|
||||
// START_AND_SCOPE_SPAN(state->get_tracer(), span, "VScanNode::open");
|
||||
START_AND_SCOPE_SPAN_IF(state->enable_profile(), state->get_tracer(), "VScanNode::open");
|
||||
SCOPED_TIMER(_runtime_profile->total_time_counter());
|
||||
RETURN_IF_CANCELLED(state);
|
||||
RETURN_IF_ERROR(ExecNode::open(state));
|
||||
@ -109,7 +110,8 @@ Status VScanNode::open(RuntimeState* state) {
|
||||
}
|
||||
|
||||
Status VScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) {
|
||||
// INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VScanNode::get_next");
|
||||
INIT_AND_SCOPE_REENTRANT_SPAN_IF(state->enable_profile(), state->get_tracer(), _get_next_span,
|
||||
"VScanNode::get_next");
|
||||
SCOPED_TIMER(_runtime_profile->total_time_counter());
|
||||
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
|
||||
if (state->is_cancelled()) {
|
||||
@ -279,10 +281,10 @@ Status VScanNode::_append_rf_into_conjuncts(std::vector<VExpr*>& vexprs) {
|
||||
}
|
||||
|
||||
Status VScanNode::close(RuntimeState* state) {
|
||||
START_AND_SCOPE_SPAN_IF(state->enable_profile(), state->get_tracer(), "VScanNode::close");
|
||||
if (is_closed()) {
|
||||
return Status::OK();
|
||||
}
|
||||
// START_AND_SCOPE_SPAN(state->get_tracer(), span, "VScanNode::close");
|
||||
if (_scanner_ctx.get()) {
|
||||
// stop and wait the scanner scheduler to be done
|
||||
// _scanner_ctx may not be created for some short circuit case.
|
||||
|
||||
Reference in New Issue
Block a user