[feature](tracing) Support query tracing to improve doris observability by introducing OpenTelemetry. (#10533)

The collection of query traces is implemented in fe and be, and the spans are exported to zipkin.
DSIP: https://cwiki.apache.org/confluence/display/DORIS/DSIP-012%3A+Introduce+opentelemetry
This commit is contained in:
luozenglin
2022-07-09 15:50:40 +08:00
committed by GitHub
parent 1112dba525
commit d5ea677282
61 changed files with 1119 additions and 110 deletions

View File

@ -24,6 +24,7 @@ import org.apache.doris.common.LdapConfig;
import org.apache.doris.common.Log4jConfig;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.Version;
import org.apache.doris.common.telemetry.Telemetry;
import org.apache.doris.common.util.JdkUtils;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.httpv2.HttpServer;
@ -128,6 +129,8 @@ public class PaloFe {
Catalog.getCurrentCatalog().initialize(args);
Catalog.getCurrentCatalog().waitForReady();
Telemetry.initOpenTelemetry();
// init and start:
// 1. HttpServer for HTTP Server
// 2. FeServer for Thrift Server

View File

@ -1631,6 +1631,12 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static int cbo_default_sample_percentage = 10;
@ConfField(mutable = false, masterOnly = false)
public static boolean enable_tracing = false;
@ConfField(mutable = false, masterOnly = false)
public static String trace_export_url = "http://127.0.0.1:9411/api/v2/spans";
/**
* If set to TRUE, the compaction slower replica will be skipped when select get queryable replicas
* Default is true.

View File

@ -0,0 +1,48 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.telemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
/**
* encapsulated {@link Span} and {@link Scope}.
*/
public class ScopedSpan {
private Span span;
private Scope scope;
public ScopedSpan() {
span = Telemetry.getNoopSpan();
this.scope = span.makeCurrent();
}
public ScopedSpan(Span span) {
this.span = span;
this.scope = span.makeCurrent();
}
public Span getSpan() {
return span;
}
public void endSpan() {
scope.close();
span.end();
}
}

View File

@ -0,0 +1,101 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.telemetry;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.Config;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
import io.opentelemetry.context.propagation.ContextPropagators;
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter;
import io.opentelemetry.exporter.zipkin.ZipkinSpanExporter;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.TimeUnit;
/**
* Managing OpenTelemetry sdk.
*/
public class Telemetry {
private static final Logger LOG = LogManager.getLogger(Telemetry.class);
private static OpenTelemetry openTelemetry = OpenTelemetry.noop();
/**
* Initialize {@link OpenTelemetry} with {@link SdkTracerProvider}, {@link BatchSpanProcessor},
* {@link ZipkinSpanExporter} and {@link W3CTraceContextPropagator}.
*/
public static void initOpenTelemetry() {
if (!Config.enable_tracing) {
return;
}
// todo: It may be possible to use oltp exporter to export telemetry data to otel collector,
// which in turn processes and sends telemetry data to multiple back-ends (e.g. zipkin, Prometheus,
// Fluent Bit, etc.) to improve scalability.
String httpUrl = Config.trace_export_url;
SpanExporter spanExporter = zipkinExporter(httpUrl);
String serviceName = "FRONTEND:" + Catalog.getCurrentCatalog().getSelfNode().first;
Resource serviceNameResource =
Resource.create(Attributes.of(AttributeKey.stringKey("service.name"), serviceName));
// Send a batch of spans if ScheduleDelay time or MaxExportBatchSize is reached
BatchSpanProcessor spanProcessor =
BatchSpanProcessor.builder(spanExporter).setScheduleDelay(100, TimeUnit.MILLISECONDS)
.setMaxExportBatchSize(1000).build();
SdkTracerProvider tracerProvider = SdkTracerProvider.builder().addSpanProcessor(spanProcessor)
.setResource(Resource.getDefault().merge(serviceNameResource)).build();
openTelemetry = OpenTelemetrySdk.builder().setTracerProvider(tracerProvider)
.setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance())).build();
// add a shutdown hook to shut down the SDK
Runtime.getRuntime().addShutdownHook(new Thread(tracerProvider::shutdown));
}
private static SpanExporter zipkinExporter(String httpUrl) {
return ZipkinSpanExporter.builder().setEndpoint(httpUrl).build();
}
private static SpanExporter oltpExporter(String httpUrl) {
return OtlpGrpcSpanExporter.builder().setEndpoint(httpUrl).build();
}
public static OpenTelemetry getOpenTelemetry() {
return openTelemetry;
}
public static Tracer getNoopTracer() {
return OpenTelemetry.noop().getTracer("noop");
}
public static Span getNoopSpan() {
return getNoopTracer().spanBuilder("noopSpan").startSpan();
}
}

View File

@ -22,6 +22,7 @@ import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.UserException;
import org.apache.doris.common.telemetry.Telemetry;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.datasource.DataSourceIf;
import org.apache.doris.datasource.InternalDataSource;
@ -40,6 +41,7 @@ import org.apache.doris.transaction.TransactionStatus;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.opentelemetry.api.trace.Tracer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -108,6 +110,8 @@ public class ConnectContext {
// Cache thread info for this connection.
protected volatile ThreadInfo threadInfo;
protected volatile Tracer tracer = Telemetry.getNoopTracer();
// Catalog: put catalog here is convenient for unit test,
// because catalog is singleton, hard to mock
protected Catalog catalog;
@ -490,6 +494,14 @@ public class ConnectContext {
this.sqlHash = sqlHash;
}
public Tracer getTracer() {
return tracer;
}
public void initTracer(String name) {
this.tracer = Telemetry.getOpenTelemetry().getTracer(name);
}
// kill operation with no protect.
public void kill(boolean killConnection) {
LOG.warn("kill timeout query, {}, kill connection: {}",

View File

@ -55,6 +55,8 @@ import org.apache.doris.thrift.TUniqueId;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -352,7 +354,16 @@ public class ConnectProcessor {
handleQuit();
break;
case COM_QUERY:
handleQuery();
ctx.initTracer("trace");
Span rootSpan = ctx.getTracer().spanBuilder("handleQuery").startSpan();
try (Scope scope = rootSpan.makeCurrent()) {
handleQuery();
} catch (Exception e) {
rootSpan.recordException(e);
throw e;
} finally {
rootSpan.end();
}
break;
case COM_FIELD_LIST:
handleFieldList();

View File

@ -28,6 +28,8 @@ import org.apache.doris.common.Pair;
import org.apache.doris.common.Reference;
import org.apache.doris.common.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.common.telemetry.ScopedSpan;
import org.apache.doris.common.telemetry.Telemetry;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.ListUtil;
import org.apache.doris.common.util.ProfileWriter;
@ -102,6 +104,10 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multiset;
import com.google.common.collect.Sets;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
@ -645,9 +651,16 @@ public class Coordinator {
} // end for fragments
// 4. send and wait fragments rpc
List<Pair<BackendExecStates, Future<InternalService.PExecPlanFragmentResult>>> futures
= Lists.newArrayList();
List<Pair<BackendExecStates, Future<InternalService.PExecPlanFragmentResult>>> futures =
Lists.newArrayList();
Context parentSpanContext = Context.current();
for (BackendExecStates states : beToExecStates.values()) {
Span span = Telemetry.getNoopSpan();
if (ConnectContext.get() != null) {
span = ConnectContext.get().getTracer().spanBuilder("execRemoteFragmentsAsync")
.setParent(parentSpanContext).setSpanKind(SpanKind.CLIENT).startSpan();
}
states.scopedSpan = new ScopedSpan(span);
states.unsetFields();
futures.add(Pair.create(states, states.execRemoteFragmentsAsync()));
}
@ -657,6 +670,12 @@ public class Coordinator {
// 5. send and wait execution start rpc
futures.clear();
for (BackendExecStates states : beToExecStates.values()) {
Span span = Telemetry.getNoopSpan();
if (ConnectContext.get() != null) {
span = ConnectContext.get().getTracer().spanBuilder("execPlanFragmentStartAsync")
.setParent(parentSpanContext).setSpanKind(SpanKind.CLIENT).startSpan();
}
states.scopedSpan = new ScopedSpan(span);
futures.add(Pair.create(states, states.execPlanFragmentStartAsync()));
}
waitRpc(futures, this.timeoutDeadline - System.currentTimeMillis(), "send execution start");
@ -679,6 +698,7 @@ public class Coordinator {
TStatusCode code;
String errMsg = null;
Exception exception = null;
Span span = pair.first.scopedSpan.getSpan();
try {
PExecPlanFragmentResult result = pair.second.get(timeoutMs, TimeUnit.MILLISECONDS);
code = TStatusCode.findByValue(result.getStatus().getStatusCode());
@ -703,21 +723,28 @@ public class Coordinator {
code = TStatusCode.TIMEOUT;
}
if (code != TStatusCode.OK) {
if (exception != null && errMsg == null) {
errMsg = operation + " failed. " + exception.getMessage();
}
queryStatus.setStatus(errMsg);
cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR);
switch (code) {
case TIMEOUT:
throw new RpcException(pair.first.brpcAddr.hostname, errMsg, exception);
case THRIFT_RPC_ERROR:
SimpleScheduler.addToBlacklist(pair.first.beId, errMsg);
throw new RpcException(pair.first.brpcAddr.hostname, errMsg, exception);
default:
throw new UserException(errMsg, exception);
try {
if (code != TStatusCode.OK) {
if (exception != null && errMsg == null) {
errMsg = operation + " failed. " + exception.getMessage();
}
queryStatus.setStatus(errMsg);
cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR);
switch (code) {
case TIMEOUT:
throw new RpcException(pair.first.brpcAddr.hostname, errMsg, exception);
case THRIFT_RPC_ERROR:
SimpleScheduler.addToBlacklist(pair.first.beId, errMsg);
throw new RpcException(pair.first.brpcAddr.hostname, errMsg, exception);
default:
throw new UserException(errMsg, exception);
}
}
} catch (Exception e) {
span.recordException(e);
throw e;
} finally {
pair.first.scopedSpan.endSpan();
}
// succeed to send the plan fragment, update the "alreadySentBackendIds"
@ -2103,13 +2130,20 @@ public class Coordinator {
return false;
}
try {
Span span = ConnectContext.get() != null
? ConnectContext.get().getTracer().spanBuilder("cancelPlanFragmentAsync")
.setParent(Context.current()).setSpanKind(SpanKind.CLIENT).startSpan()
: Telemetry.getNoopSpan();
try (Scope scope = span.makeCurrent()) {
BackendServiceProxy.getInstance().cancelPlanFragmentAsync(brpcAddress,
fragmentInstanceId(), cancelReason);
} catch (RpcException e) {
span.recordException(e);
LOG.warn("cancel plan fragment get a exception, address={}:{}", brpcAddress.getHostname(),
brpcAddress.getPort());
SimpleScheduler.addToBlacklist(addressToBackendID.get(brpcAddress), e.getMessage());
} finally {
span.end();
}
this.hasCanceled = true;
@ -2156,6 +2190,7 @@ public class Coordinator {
TNetworkAddress brpcAddr;
List<BackendExecState> states = Lists.newArrayList();
boolean twoPhaseExecution = false;
ScopedSpan scopedSpan = new ScopedSpan();
public BackendExecStates(long beId, TNetworkAddress brpcAddr, boolean twoPhaseExecution) {
this.beId = beId;

View File

@ -128,6 +128,9 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
@ -324,7 +327,16 @@ public class StmtExecutor implements ProfileWriter {
public void execute() throws Exception {
UUID uuid = UUID.randomUUID();
TUniqueId queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
execute(queryId);
Span executeSpan = context.getTracer().spanBuilder("execute").setParent(Context.current()).startSpan();
executeSpan.setAttribute("queryId", DebugUtil.printId(queryId));
if (originStmt != null) {
executeSpan.setAttribute("sql", originStmt.originStmt);
}
try (Scope scope = executeSpan.makeCurrent()) {
execute(queryId);
} finally {
executeSpan.end();
}
}
// Execute one statement with queryId
@ -334,6 +346,7 @@ public class StmtExecutor implements ProfileWriter {
// Exception:
// IOException: talk with client failed.
public void execute(TUniqueId queryId) throws Exception {
Span span = Span.fromContext(Context.current());
context.setStartTime();
plannerProfile.setQueryBeginTime();
@ -350,8 +363,17 @@ public class StmtExecutor implements ProfileWriter {
analyzeVariablesInStmt();
if (!context.isTxnModel()) {
// analyze this query
analyze(context.getSessionVariable().toThrift());
Span queryAnalysisSpan =
context.getTracer().spanBuilder("query analysis").setParent(Context.current()).startSpan();
try (Scope scope = queryAnalysisSpan.makeCurrent()) {
// analyze this query
analyze(context.getSessionVariable().toThrift());
} catch (Exception e) {
queryAnalysisSpan.recordException(e);
throw e;
} finally {
queryAnalysisSpan.end();
}
if (isForwardToMaster()) {
if (isProxy) {
// This is already a stmt forwarded from other FE.
@ -412,6 +434,7 @@ public class StmtExecutor implements ProfileWriter {
AuditLog.getQueryAudit().log("Query {} {} times with new query id: {}",
DebugUtil.printId(queryId), i, DebugUtil.printId(newQueryId));
context.setQueryId(newQueryId);
span.setAttribute("queryId", DebugUtil.printId(newQueryId));
}
handleQueryStmt();
// explain query stmt do not have profile
@ -1005,55 +1028,73 @@ public class StmtExecutor implements ProfileWriter {
QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord));
coord.setProfileWriter(this);
coord.exec();
Span queryScheduleSpan =
context.getTracer().spanBuilder("query schedule").setParent(Context.current()).startSpan();
try (Scope scope = queryScheduleSpan.makeCurrent()) {
coord.exec();
} catch (Exception e) {
queryScheduleSpan.recordException(e);
throw e;
} finally {
queryScheduleSpan.end();
}
plannerProfile.setQueryScheduleFinishTime();
writeProfile(false);
while (true) {
batch = coord.getNext();
// for outfile query, there will be only one empty batch send back with eos flag
if (batch.getBatch() != null) {
if (cacheAnalyzer != null) {
cacheAnalyzer.copyRowBatch(batch);
}
// For some language driver, getting error packet after fields packet
// will be recognized as a success result
// so We need to send fields after first batch arrived
if (!isSendFields) {
if (!isOutfileQuery) {
sendFields(queryStmt.getColLabels(), exprToType(queryStmt.getResultExprs()));
} else {
sendFields(OutFileClause.RESULT_COL_NAMES, OutFileClause.RESULT_COL_TYPES);
Span fetchResultSpan = context.getTracer().spanBuilder("fetch result").setParent(Context.current()).startSpan();
try (Scope scope = fetchResultSpan.makeCurrent()) {
while (true) {
batch = coord.getNext();
// for outfile query, there will be only one empty batch send back with eos flag
if (batch.getBatch() != null) {
if (cacheAnalyzer != null) {
cacheAnalyzer.copyRowBatch(batch);
}
isSendFields = true;
// For some language driver, getting error packet after fields packet
// will be recognized as a success result
// so We need to send fields after first batch arrived
if (!isSendFields) {
if (!isOutfileQuery) {
sendFields(queryStmt.getColLabels(), exprToType(queryStmt.getResultExprs()));
} else {
sendFields(OutFileClause.RESULT_COL_NAMES, OutFileClause.RESULT_COL_TYPES);
}
isSendFields = true;
}
for (ByteBuffer row : batch.getBatch().getRows()) {
channel.sendOnePacket(row);
}
context.updateReturnRows(batch.getBatch().getRows().size());
}
for (ByteBuffer row : batch.getBatch().getRows()) {
channel.sendOnePacket(row);
if (batch.isEos()) {
break;
}
context.updateReturnRows(batch.getBatch().getRows().size());
}
if (batch.isEos()) {
break;
if (cacheAnalyzer != null) {
if (cacheResult != null && cacheAnalyzer.getHitRange() == Cache.HitRange.Right) {
isSendFields =
sendCachedValues(channel, cacheResult.getValuesList(), (SelectStmt) queryStmt, isSendFields,
false);
}
cacheAnalyzer.updateCache();
}
}
if (cacheAnalyzer != null) {
if (cacheResult != null && cacheAnalyzer.getHitRange() == Cache.HitRange.Right) {
isSendFields = sendCachedValues(channel, cacheResult.getValuesList(), (SelectStmt) queryStmt,
isSendFields, false);
if (!isSendFields) {
if (!isOutfileQuery) {
sendFields(queryStmt.getColLabels(), exprToType(queryStmt.getResultExprs()));
} else {
sendFields(OutFileClause.RESULT_COL_NAMES, OutFileClause.RESULT_COL_TYPES);
}
}
cacheAnalyzer.updateCache();
statisticsForAuditLog = batch.getQueryStatistics() == null ? null : batch.getQueryStatistics().toBuilder();
context.getState().setEof();
plannerProfile.setQueryFetchResultFinishTime();
} catch (Exception e) {
fetchResultSpan.recordException(e);
throw e;
} finally {
fetchResultSpan.end();
}
if (!isSendFields) {
if (!isOutfileQuery) {
sendFields(queryStmt.getColLabels(), exprToType(queryStmt.getResultExprs()));
} else {
sendFields(OutFileClause.RESULT_COL_NAMES, OutFileClause.RESULT_COL_TYPES);
}
}
statisticsForAuditLog = batch.getQueryStatistics() == null ? null : batch.getQueryStatistics().toBuilder();
context.getState().setEof();
plannerProfile.setQueryFetchResultFinishTime();
}
private TWaitingTxnStatusResult getWaitingTxnStatus(TWaitingTxnStatusRequest request) throws Exception {

View File

@ -18,12 +18,21 @@
package org.apache.doris.rpc;
import org.apache.doris.common.Config;
import org.apache.doris.common.telemetry.Telemetry;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.PBackendServiceGrpc;
import org.apache.doris.thrift.TNetworkAddress;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
import io.opentelemetry.context.Context;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -44,7 +53,7 @@ public class BackendServiceClient {
channel = NettyChannelBuilder.forAddress(address.getHostname(), address.getPort())
.flowControlWindow(Config.grpc_max_message_size_bytes)
.maxInboundMessageSize(Config.grpc_max_message_size_bytes).enableRetry().maxRetryAttempts(MAX_RETRY_NUM)
.usePlaintext().build();
.intercept(new OpenTelemetryClientInterceptor()).usePlaintext().build();
stub = PBackendServiceGrpc.newFutureStub(channel);
blockingStub = PBackendServiceGrpc.newBlockingStub(channel);
}
@ -134,4 +143,25 @@ public class BackendServiceClient {
LOG.warn("shut down backend service client: {}", address);
}
/**
* OpenTelemetry span interceptor.
*/
public static class OpenTelemetryClientInterceptor implements ClientInterceptor {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> methodDescriptor,
CallOptions callOptions, Channel channel) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
channel.newCall(methodDescriptor, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
// Inject the request with the current context
Telemetry.getOpenTelemetry().getPropagators().getTextMapPropagator()
.inject(Context.current(), headers, (carrier, key, value) -> carrier.put(
Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value));
super.start(responseListener, headers);
}
};
}
}
}