diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt index b2523eeeab..e0b235a0c5 100644 --- a/be/src/util/CMakeLists.txt +++ b/be/src/util/CMakeLists.txt @@ -110,6 +110,7 @@ set(UTIL_FILES tuple_row_zorder_compare.cpp telemetry/telemetry.cpp telemetry/brpc_carrier.cpp + telemetry/open_telemetry_scop_wrapper.hpp quantile_state.cpp jni-util.cpp ) diff --git a/be/src/util/telemetry/open_telemetry_scop_wrapper.hpp b/be/src/util/telemetry/open_telemetry_scop_wrapper.hpp new file mode 100644 index 0000000000..138d1c014e --- /dev/null +++ b/be/src/util/telemetry/open_telemetry_scop_wrapper.hpp @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "opentelemetry/trace/provider.h" + +namespace doris { + +using OpentelemetryTracer = opentelemetry::nostd::shared_ptr; +using OpentelemetryScope = opentelemetry::trace::Scope; +using OpentelemetrySpan = opentelemetry::nostd::shared_ptr; + +class OpenTelemetryScopeWrapper { +public: + OpenTelemetryScopeWrapper(bool enable, OpentelemetryTracer tracer, const std::string& name) { + if (enable) { + auto span = tracer->StartSpan(name); + _scope.reset(new OpentelemetryScope(span)); + } + } + + OpenTelemetryScopeWrapper(bool enable, OpentelemetryTracer tracer, OpentelemetrySpan span, + const std::string& name) { + if (enable) { + if (UNLIKELY(!span)) { + span = tracer->StartSpan(name); + } + _scope.reset(new OpentelemetryScope(span)); + } + } + +private: + std::unique_ptr _scope; +}; + +} // namespace doris diff --git a/be/src/util/telemetry/telemetry.h b/be/src/util/telemetry/telemetry.h index 9b6142dc95..5b20acc96e 100644 --- a/be/src/util/telemetry/telemetry.h +++ b/be/src/util/telemetry/telemetry.h @@ -18,6 +18,7 @@ #pragma once #include "opentelemetry/trace/provider.h" +#include "util/telemetry/open_telemetry_scop_wrapper.hpp" /// A trace represents the execution process of a single request in the system, span represents a /// logical operation unit with start time and execution duration in the system, and multiple spans @@ -61,6 +62,12 @@ using OpentelemetryScope = opentelemetry::trace::Scope; auto span = tracer->StartSpan(name); \ OpentelemetryScope scope {span}; +#define START_AND_SCOPE_SPAN_IF(enable, tracer, name) \ + OpenTelemetryScopeWrapper(enable, tracer, name) + +#define INIT_AND_SCOPE_REENTRANT_SPAN_IF(enable, tracer, reentrant_span, name) \ + OpenTelemetryScopeWrapper(enable, tracer, reentrant_span, name) + namespace telemetry { void init_tracer(); diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp b/be/src/vec/exec/scan/new_olap_scan_node.cpp index 2c0876c5ff..d4b1b6a7d7 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.cpp +++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp @@ -37,6 +37,7 @@ NewOlapScanNode::NewOlapScanNode(ObjectPool* pool, const TPlanNode& tnode, Status NewOlapScanNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(VScanNode::prepare(state)); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); _scanner_mem_tracker = std::make_unique("OlapScanners"); return Status::OK(); } diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index 7b080d18af..7ab46adc92 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -114,7 +114,7 @@ Status NewOlapScanner::prepare( Status NewOlapScanner::open(RuntimeState* state) { RETURN_IF_ERROR(VScanner::open(state)); - // SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); auto res = _tablet_reader->init(_tablet_reader_params); if (!res.ok()) { diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index 3e5d052412..9bac5e3517 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -21,6 +21,7 @@ #include "common/status.h" #include "runtime/descriptors.h" +#include "util/telemetry/telemetry.h" #include "util/uid_util.h" #include "vec/core/block.h" @@ -123,6 +124,8 @@ public: VScanNode* parent() { return _parent; } + OpentelemetrySpan scan_span() { return _scan_span; } + public: // the unique id of this context std::string ctx_id; @@ -209,6 +212,8 @@ private: int64_t _num_ctx_scheduling = 0; int64_t _num_scanner_scheduling = 0; + + OpentelemetrySpan _scan_span; }; } // namespace vectorized } // namespace doris diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 2fe70ba3c8..8246180ec4 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -20,6 +20,7 @@ #include "common/config.h" #include "util/priority_thread_pool.hpp" #include "util/priority_work_stealing_thread_pool.hpp" +#include "util/telemetry/telemetry.h" #include "util/thread.h" #include "util/threadpool.h" #include "vec/core/block.h" @@ -165,9 +166,8 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* ctx) { void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext* ctx, VScanner* scanner) { - // TODO: rethink mem tracker and span - // START_AND_SCOPE_SPAN(scanner->runtime_state()->get_tracer(), span, - // "ScannerScheduler::_scanner_scan"); + INIT_AND_SCOPE_REENTRANT_SPAN_IF(ctx->state()->enable_profile(), ctx->state()->get_tracer(), + ctx->scan_span(), "VScanner::scan"); SCOPED_ATTACH_TASK(scanner->runtime_state()); Thread::set_self_name("_scanner_scan"); diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index cb9a9988c1..0dd48d58b4 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -51,6 +51,7 @@ static bool ignore_cast(SlotDescriptor* slot, VExpr* expr) { } Status VScanNode::init(const TPlanNode& tnode, RuntimeState* state) { + START_AND_SCOPE_SPAN_IF(state->enable_profile(), state->get_tracer(), "VScanNode::init"); RETURN_IF_ERROR(ExecNode::init(tnode, state)); _state = state; @@ -88,7 +89,7 @@ Status VScanNode::prepare(RuntimeState* state) { } Status VScanNode::open(RuntimeState* state) { - // START_AND_SCOPE_SPAN(state->get_tracer(), span, "VScanNode::open"); + START_AND_SCOPE_SPAN_IF(state->enable_profile(), state->get_tracer(), "VScanNode::open"); SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(ExecNode::open(state)); @@ -109,7 +110,8 @@ Status VScanNode::open(RuntimeState* state) { } Status VScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) { - // INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VScanNode::get_next"); + INIT_AND_SCOPE_REENTRANT_SPAN_IF(state->enable_profile(), state->get_tracer(), _get_next_span, + "VScanNode::get_next"); SCOPED_TIMER(_runtime_profile->total_time_counter()); SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); if (state->is_cancelled()) { @@ -279,10 +281,10 @@ Status VScanNode::_append_rf_into_conjuncts(std::vector& vexprs) { } Status VScanNode::close(RuntimeState* state) { + START_AND_SCOPE_SPAN_IF(state->enable_profile(), state->get_tracer(), "VScanNode::close"); if (is_closed()) { return Status::OK(); } - // START_AND_SCOPE_SPAN(state->get_tracer(), span, "VScanNode::close"); if (_scanner_ctx.get()) { // stop and wait the scanner scheduler to be done // _scanner_ctx may not be created for some short circuit case.