[enhancement](tracing) Support forward to master tracing (#12290)
This commit is contained in:
@ -38,6 +38,7 @@ import org.apache.doris.common.ErrorCode;
|
||||
import org.apache.doris.common.ErrorReport;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.telemetry.Telemetry;
|
||||
import org.apache.doris.common.util.DebugUtil;
|
||||
import org.apache.doris.common.util.SqlParserUtils;
|
||||
import org.apache.doris.datasource.CatalogIf;
|
||||
@ -62,8 +63,10 @@ import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.api.trace.SpanContext;
|
||||
import io.opentelemetry.api.trace.SpanKind;
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import io.opentelemetry.context.propagation.TextMapGetter;
|
||||
import org.apache.commons.codec.digest.DigestUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
@ -73,7 +76,9 @@ import java.io.StringReader;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.AsynchronousCloseException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
@ -81,10 +86,23 @@ import java.util.UUID;
|
||||
*/
|
||||
public class ConnectProcessor {
|
||||
private static final Logger LOG = LogManager.getLogger(ConnectProcessor.class);
|
||||
private static final TextMapGetter<Map<String, String>> getter =
|
||||
new TextMapGetter<Map<String, String>>() {
|
||||
@Override
|
||||
public Iterable<String> keys(Map<String, String> carrier) {
|
||||
return carrier.keySet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String get(Map<String, String> carrier, String key) {
|
||||
if (carrier.containsKey(key)) {
|
||||
return carrier.get(key);
|
||||
}
|
||||
return "";
|
||||
}
|
||||
};
|
||||
private final ConnectContext ctx;
|
||||
private ByteBuffer packetBuf;
|
||||
|
||||
private StmtExecutor executor = null;
|
||||
|
||||
public ConnectProcessor(ConnectContext context) {
|
||||
@ -473,8 +491,8 @@ public class ConnectProcessor {
|
||||
// explain query stmt do not have profile
|
||||
if (executor != null && !executor.getParsedStmt().isExplain()
|
||||
&& (executor.getParsedStmt() instanceof QueryStmt // currently only QueryStmt and insert need profile
|
||||
|| executor.getParsedStmt() instanceof LogicalPlanAdapter
|
||||
|| executor.getParsedStmt() instanceof InsertStmt)) {
|
||||
|| executor.getParsedStmt() instanceof LogicalPlanAdapter
|
||||
|| executor.getParsedStmt() instanceof InsertStmt)) {
|
||||
executor.writeProfile(true);
|
||||
}
|
||||
}
|
||||
@ -543,6 +561,21 @@ public class ConnectProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
Map<String, String> traceCarrier = new HashMap<>();
|
||||
if (request.isSetTraceCarrier()) {
|
||||
traceCarrier = request.getTraceCarrier();
|
||||
}
|
||||
Context extractedContext = Telemetry.getOpenTelemetry().getPropagators().getTextMapPropagator()
|
||||
.extract(Context.current(), traceCarrier, getter);
|
||||
// What we want is for the Traceid to remain unchanged during propagation.
|
||||
// ctx.initTracer() will be called only if the Context is valid,
|
||||
// so that the Traceid generated by SDKTracer is the same as the follower. Otherwise,
|
||||
// if the Context is invalid and ctx.initTracer() is called,
|
||||
// SDKTracer will generate a different Traceid.
|
||||
if (Span.fromContext(extractedContext).getSpanContext().isValid()) {
|
||||
ctx.initTracer("master trace");
|
||||
}
|
||||
|
||||
ctx.setThreadLocalInfo();
|
||||
StmtExecutor executor = null;
|
||||
try {
|
||||
@ -557,7 +590,17 @@ public class ConnectProcessor {
|
||||
UUID uuid = UUID.randomUUID();
|
||||
queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
|
||||
}
|
||||
executor.execute(queryId);
|
||||
Span masterQuerySpan =
|
||||
ctx.getTracer().spanBuilder("master execute").setParent(extractedContext)
|
||||
.setSpanKind(SpanKind.SERVER).startSpan();
|
||||
try (Scope scope = masterQuerySpan.makeCurrent()) {
|
||||
executor.execute(queryId);
|
||||
} catch (Exception e) {
|
||||
masterQuerySpan.recordException(e);
|
||||
throw e;
|
||||
} finally {
|
||||
masterQuerySpan.end();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
// Client failed.
|
||||
LOG.warn("Process one query failed because IOException: ", e);
|
||||
|
||||
@ -19,17 +19,23 @@ package org.apache.doris.qe;
|
||||
|
||||
import org.apache.doris.analysis.RedirectStatus;
|
||||
import org.apache.doris.common.ClientPool;
|
||||
import org.apache.doris.common.telemetry.Telemetry;
|
||||
import org.apache.doris.thrift.FrontendService;
|
||||
import org.apache.doris.thrift.TMasterOpRequest;
|
||||
import org.apache.doris.thrift.TMasterOpResult;
|
||||
import org.apache.doris.thrift.TNetworkAddress;
|
||||
import org.apache.doris.thrift.TUniqueId;
|
||||
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.thrift.transport.TTransportException;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class MasterOpExecutor {
|
||||
private static final Logger LOG = LogManager.getLogger(MasterOpExecutor.class);
|
||||
@ -58,7 +64,17 @@ public class MasterOpExecutor {
|
||||
}
|
||||
|
||||
public void execute() throws Exception {
|
||||
forward();
|
||||
Span forwardSpan =
|
||||
ctx.getTracer().spanBuilder("forward").setParent(Context.current())
|
||||
.startSpan();
|
||||
try (Scope scope = forwardSpan.makeCurrent()) {
|
||||
forward();
|
||||
} catch (Exception e) {
|
||||
forwardSpan.recordException(e);
|
||||
throw e;
|
||||
} finally {
|
||||
forwardSpan.end();
|
||||
}
|
||||
LOG.info("forwarding to master get result max journal id: {}", result.maxJournalId);
|
||||
ctx.getEnv().getJournalObservable().waitOn(result.maxJournalId, waitTimeoutMs);
|
||||
}
|
||||
@ -95,6 +111,14 @@ public class MasterOpExecutor {
|
||||
// session variables
|
||||
params.setSessionVariables(ctx.getSessionVariable().getForwardVariables());
|
||||
|
||||
// create a trace carrier
|
||||
Map<String, String> traceCarrier = new HashMap<String, String>();
|
||||
// Inject the request with the current context
|
||||
Telemetry.getOpenTelemetry().getPropagators().getTextMapPropagator()
|
||||
.inject(Context.current(), traceCarrier, (carrier, key, value) -> carrier.put(key, value));
|
||||
// carrier send tracing to master
|
||||
params.setTraceCarrier(traceCarrier);
|
||||
|
||||
if (null != ctx.queryId()) {
|
||||
params.setQueryId(ctx.queryId());
|
||||
}
|
||||
|
||||
@ -160,7 +160,8 @@ public class StmtExecutor implements ProfileWriter {
|
||||
|
||||
private static final AtomicLong STMT_ID_GENERATOR = new AtomicLong(0);
|
||||
private static final int MAX_DATA_TO_SEND_FOR_TXN = 100;
|
||||
|
||||
private static final String NULL_VALUE_FOR_LOAD = "\\N";
|
||||
private final Object writeProfileLock = new Object();
|
||||
private ConnectContext context;
|
||||
private StatementContext statementContext;
|
||||
private MysqlSerializer serializer;
|
||||
@ -170,7 +171,6 @@ public class StmtExecutor implements ProfileWriter {
|
||||
private RuntimeProfile profile;
|
||||
private RuntimeProfile summaryProfile;
|
||||
private RuntimeProfile plannerRuntimeProfile;
|
||||
private final Object writeProfileLock = new Object();
|
||||
private volatile boolean isFinishedProfile = false;
|
||||
private String queryType = "Query";
|
||||
private volatile Coordinator coord = null;
|
||||
@ -181,7 +181,6 @@ public class StmtExecutor implements ProfileWriter {
|
||||
private ShowResultSet proxyResultSet = null;
|
||||
private Data.PQueryStatistics.Builder statisticsForAuditLog;
|
||||
private boolean isCached;
|
||||
|
||||
private QueryPlannerProfile plannerProfile = new QueryPlannerProfile();
|
||||
|
||||
// this constructor is mainly for proxy
|
||||
@ -209,6 +208,21 @@ public class StmtExecutor implements ProfileWriter {
|
||||
this.statementContext.setParsedStatement(parsedStmt);
|
||||
}
|
||||
|
||||
public static InternalService.PDataRow getRowStringValue(List<Expr> cols) {
|
||||
if (cols.size() == 0) {
|
||||
return null;
|
||||
}
|
||||
InternalService.PDataRow.Builder row = InternalService.PDataRow.newBuilder();
|
||||
for (Expr expr : cols) {
|
||||
if (expr instanceof NullLiteral) {
|
||||
row.addColBuilder().setValue(NULL_VALUE_FOR_LOAD);
|
||||
} else {
|
||||
row.addColBuilder().setValue(expr.getStringValue());
|
||||
}
|
||||
}
|
||||
return row.build();
|
||||
}
|
||||
|
||||
public void setCoord(Coordinator coord) {
|
||||
this.coord = coord;
|
||||
}
|
||||
@ -336,7 +350,7 @@ public class StmtExecutor implements ProfileWriter {
|
||||
|
||||
/**
|
||||
* Used for audit in ConnectProcessor.
|
||||
*
|
||||
* <p>
|
||||
* TODO: There are three interface in StatementBase be called when doing audit:
|
||||
* toDigest needAuditEncryption when parsedStmt is not a query
|
||||
* and isValuesOrConstantSelect when parsedStmt is instance of InsertStmt.
|
||||
@ -346,7 +360,7 @@ public class StmtExecutor implements ProfileWriter {
|
||||
* isValuesOrConstantSelect: when this interface return true, original string is truncated at 1024
|
||||
*
|
||||
* @return parsed and analyzed statement for Stale planner.
|
||||
* an unresolved LogicalPlan wrapped with a LogicalPlanAdapter for Nereids.
|
||||
* an unresolved LogicalPlan wrapped with a LogicalPlanAdapter for Nereids.
|
||||
*/
|
||||
public StatementBase getParsedStmt() {
|
||||
return parsedStmt;
|
||||
@ -569,6 +583,7 @@ public class StmtExecutor implements ProfileWriter {
|
||||
/**
|
||||
* get variables in stmt.
|
||||
* TODO: only support select stmt now. need to support Nereids.
|
||||
*
|
||||
* @throws DdlException
|
||||
*/
|
||||
private void analyzeVariablesInStmt() throws DdlException {
|
||||
@ -897,7 +912,7 @@ public class StmtExecutor implements ProfileWriter {
|
||||
// the meta fields must be sent right before the first batch of data(or eos flag).
|
||||
// so if it has data(or eos is true), this method must return true.
|
||||
private boolean sendCachedValues(MysqlChannel channel, List<InternalService.PCacheValue> cacheValues,
|
||||
SelectStmt selectStmt, boolean isSendFields, boolean isEos)
|
||||
SelectStmt selectStmt, boolean isSendFields, boolean isEos)
|
||||
throws Exception {
|
||||
RowBatch batch = null;
|
||||
boolean isSend = isSendFields;
|
||||
@ -1123,7 +1138,7 @@ public class StmtExecutor implements ProfileWriter {
|
||||
plannerProfile.setQueryFetchResultFinishTime();
|
||||
} catch (Exception e) {
|
||||
fetchResultSpan.recordException(e);
|
||||
throw e;
|
||||
throw e;
|
||||
} finally {
|
||||
fetchResultSpan.end();
|
||||
}
|
||||
@ -1331,23 +1346,6 @@ public class StmtExecutor implements ProfileWriter {
|
||||
executor.beginTransaction(request);
|
||||
}
|
||||
|
||||
private static final String NULL_VALUE_FOR_LOAD = "\\N";
|
||||
|
||||
public static InternalService.PDataRow getRowStringValue(List<Expr> cols) {
|
||||
if (cols.size() == 0) {
|
||||
return null;
|
||||
}
|
||||
InternalService.PDataRow.Builder row = InternalService.PDataRow.newBuilder();
|
||||
for (Expr expr : cols) {
|
||||
if (expr instanceof NullLiteral) {
|
||||
row.addColBuilder().setValue(NULL_VALUE_FOR_LOAD);
|
||||
} else {
|
||||
row.addColBuilder().setValue(expr.getStringValue());
|
||||
}
|
||||
}
|
||||
return row.build();
|
||||
}
|
||||
|
||||
// Process a select statement.
|
||||
private void handleInsertStmt() throws Exception {
|
||||
// Every time set no send flag and clean all data in buffer
|
||||
|
||||
@ -435,6 +435,7 @@ struct TMasterOpRequest {
|
||||
18: optional i64 insert_visible_timeout_ms // deprecated, move into session_variables
|
||||
19: optional map<string, string> session_variables
|
||||
20: optional bool foldConstantByBe
|
||||
21: optional map<string, string> trace_carrier
|
||||
}
|
||||
|
||||
struct TColumnDefinition {
|
||||
|
||||
Reference in New Issue
Block a user