diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 2494465e05..1ddd9ebe53 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2398,13 +2398,6 @@ public class Config extends ConfigBase { "Whether to enable the function of getting log files through http interface"}) public static boolean enable_get_log_file_api = false; - // This config is deprecated and has not taken effect anymore, - // please use dialect plugin: fe_plugins/http-dialect-converter for instead - @Deprecated - @ConfField(description = {"用于SQL方言转换的服务地址。", - "The service address for SQL dialect conversion."}) - public static String sql_convertor_service = ""; - @ConfField(mutable = true) public static boolean enable_profile_when_analyze = false; @ConfField(mutable = true) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/InlineViewRef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/InlineViewRef.java index fb5b8072ed..178b0d0e14 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/InlineViewRef.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/InlineViewRef.java @@ -200,7 +200,7 @@ public class InlineViewRef extends TableRef { if (view == null && !hasExplicitAlias()) { String dialect = ConnectContext.get().getSessionVariable().getSqlDialect(); Dialect sqlDialect = Dialect.getByName(dialect); - if (Dialect.SPARK_SQL != sqlDialect) { + if (Dialect.SPARK != sqlDialect) { ErrorReport.reportAnalysisException(ErrorCode.ERR_DERIVED_MUST_HAVE_ALIAS); } hasExplicitAlias = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java index c7247a443b..d53520b133 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java @@ -24,6 +24,8 @@ import org.apache.doris.analysis.DistributionDesc; import org.apache.doris.analysis.DropTableStmt; import org.apache.doris.analysis.HashDistributionDesc; import org.apache.doris.analysis.KeysDesc; +import org.apache.doris.analysis.PartitionDesc; +import org.apache.doris.analysis.RangePartitionDesc; import org.apache.doris.analysis.TableName; import org.apache.doris.analysis.TypeDef; import org.apache.doris.common.Config; @@ -33,6 +35,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.ha.FrontendNodeType; +import org.apache.doris.plugin.audit.AuditLoaderPlugin; import org.apache.doris.statistics.StatisticConstants; import org.apache.doris.statistics.util.StatisticsUtil; @@ -53,6 +56,33 @@ public class InternalSchemaInitializer extends Thread { private static final Logger LOG = LogManager.getLogger(InternalSchemaInitializer.class); + public static final List AUDIT_TABLE_COLUMNS; + + static { + AUDIT_TABLE_COLUMNS = new ArrayList<>(); + AUDIT_TABLE_COLUMNS.add(new ColumnDef("query_id", TypeDef.createVarchar(48), true)); + AUDIT_TABLE_COLUMNS.add(new ColumnDef("time", TypeDef.create(PrimitiveType.DATETIME), true)); + AUDIT_TABLE_COLUMNS.add(new ColumnDef("client_ip", TypeDef.createVarchar(128), true)); + AUDIT_TABLE_COLUMNS.add(new ColumnDef("user", TypeDef.createVarchar(128), true)); + AUDIT_TABLE_COLUMNS.add(new ColumnDef("catalog", TypeDef.createVarchar(128), true)); + AUDIT_TABLE_COLUMNS.add(new ColumnDef("db", TypeDef.createVarchar(128), true)); + AUDIT_TABLE_COLUMNS.add(new ColumnDef("state", TypeDef.createVarchar(128), true)); + AUDIT_TABLE_COLUMNS.add(new ColumnDef("error_code", TypeDef.create(PrimitiveType.INT), true)); + AUDIT_TABLE_COLUMNS.add(new ColumnDef("error_message", TypeDef.create(PrimitiveType.STRING), true)); + AUDIT_TABLE_COLUMNS.add(new ColumnDef("query_time", TypeDef.create(PrimitiveType.BIGINT), true)); + AUDIT_TABLE_COLUMNS.add(new ColumnDef("scan_bytes", TypeDef.create(PrimitiveType.BIGINT), true)); + AUDIT_TABLE_COLUMNS.add(new ColumnDef("scan_rows", TypeDef.create(PrimitiveType.BIGINT), true)); + AUDIT_TABLE_COLUMNS.add(new ColumnDef("return_rows", TypeDef.create(PrimitiveType.BIGINT), true)); + AUDIT_TABLE_COLUMNS.add(new ColumnDef("stmt_id", TypeDef.create(PrimitiveType.BIGINT), true)); + AUDIT_TABLE_COLUMNS.add(new ColumnDef("is_query", TypeDef.create(PrimitiveType.TINYINT), true)); + AUDIT_TABLE_COLUMNS.add(new ColumnDef("frontend_ip", TypeDef.createVarchar(128), true)); + AUDIT_TABLE_COLUMNS.add(new ColumnDef("cpu_time_ms", TypeDef.create(PrimitiveType.BIGINT), true)); + AUDIT_TABLE_COLUMNS.add(new ColumnDef("sql_hash", TypeDef.createVarchar(128), true)); + AUDIT_TABLE_COLUMNS.add(new ColumnDef("sql_digest", TypeDef.createVarchar(128), true)); + AUDIT_TABLE_COLUMNS.add(new ColumnDef("peak_memory_bytes", TypeDef.create(PrimitiveType.BIGINT), true)); + AUDIT_TABLE_COLUMNS.add(new ColumnDef("stmt", TypeDef.create(PrimitiveType.STRING), true)); + } + public void run() { if (!FeConstants.enableInternalSchemaDb) { return; @@ -83,6 +113,7 @@ public class InternalSchemaInitializer extends Thread { Database database = op.get(); modifyTblReplicaCount(database, StatisticConstants.STATISTIC_TBL_NAME); modifyTblReplicaCount(database, StatisticConstants.HISTOGRAM_TBL_NAME); + modifyTblReplicaCount(database, AuditLoaderPlugin.AUDIT_LOG_TABLE); } public void modifyTblReplicaCount(Database database, String tblName) { @@ -103,8 +134,8 @@ public class InternalSchemaInitializer extends Thread { >= StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM) { return; } + colStatsTbl.writeLock(); try { - colStatsTbl.writeLock(); Env.getCurrentEnv().modifyTableReplicaAllocation(database, (OlapTable) colStatsTbl, props); } finally { colStatsTbl.writeUnlock(); @@ -123,8 +154,11 @@ public class InternalSchemaInitializer extends Thread { } private void createTbl() throws UserException { + // statistics Env.getCurrentEnv().getInternalCatalog().createTable(buildStatisticsTblStmt()); Env.getCurrentEnv().getInternalCatalog().createTable(buildHistogramTblStmt()); + // audit table + Env.getCurrentEnv().getInternalCatalog().createTable(buildAuditTblStmt()); } @VisibleForTesting @@ -212,7 +246,40 @@ public class InternalSchemaInitializer extends Thread { return createTableStmt; } + private CreateTableStmt buildAuditTblStmt() throws UserException { + TableName tableName = new TableName("", + FeConstants.INTERNAL_DB_NAME, AuditLoaderPlugin.AUDIT_LOG_TABLE); + + String engineName = "olap"; + ArrayList dupKeys = Lists.newArrayList("query_id", "time", "client_ip"); + KeysDesc keysDesc = new KeysDesc(KeysType.DUP_KEYS, dupKeys); + // partition + PartitionDesc partitionDesc = new RangePartitionDesc(Lists.newArrayList("time"), Lists.newArrayList()); + // distribution + int bucketNum = 2; + DistributionDesc distributionDesc = new HashDistributionDesc(bucketNum, Lists.newArrayList("query_id")); + Map properties = new HashMap() { + { + put("dynamic_partition.time_unit", "DAY"); + put("dynamic_partition.start", "-30"); + put("dynamic_partition.end", "3"); + put("dynamic_partition.prefix", "p"); + put("dynamic_partition.buckets", String.valueOf(bucketNum)); + put("dynamic_partition.enable", "true"); + put("replication_num", String.valueOf(Math.max(1, + Config.min_replication_num_per_tablet))); + } + }; + CreateTableStmt createTableStmt = new CreateTableStmt(true, false, + tableName, AUDIT_TABLE_COLUMNS, engineName, keysDesc, partitionDesc, distributionDesc, + properties, null, "Doris internal audit table, DO NOT MODIFY IT", null); + StatisticsUtil.analyze(createTableStmt); + return createTableStmt; + } + + private boolean created() { + // 1. check database exist Optional optionalDatabase = Env.getCurrentEnv().getInternalCatalog() .getDb(FeConstants.INTERNAL_DB_NAME); @@ -225,6 +292,7 @@ public class InternalSchemaInitializer extends Thread { return false; } + // 2. check statistic tables Table statsTbl = optionalStatsTbl.get(); Optional optionalColumn = statsTbl.fullSchema.stream().filter(c -> c.getName().equals("count")).findFirst(); @@ -238,7 +306,17 @@ public class InternalSchemaInitializer extends Thread { } return false; } - return db.getTable(StatisticConstants.HISTOGRAM_TBL_NAME).isPresent(); + optionalStatsTbl = db.getTable(StatisticConstants.HISTOGRAM_TBL_NAME); + if (!optionalStatsTbl.isPresent()) { + return false; + } + + // 3. check audit table + optionalStatsTbl = db.getTable(AuditLoaderPlugin.AUDIT_LOG_TABLE); + if (!optionalStatsTbl.isPresent()) { + return false; + } + return true; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java index 2b4f20a57c..a78a7e9fa5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java @@ -26,6 +26,7 @@ import org.apache.doris.common.LoadException; import org.apache.doris.httpv2.entity.ResponseEntityBuilder; import org.apache.doris.httpv2.entity.RestBaseResult; import org.apache.doris.httpv2.exception.UnauthorizedException; +import org.apache.doris.mysql.privilege.Auth; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; import org.apache.doris.resource.Tag; @@ -104,21 +105,21 @@ public class LoadAction extends RestBaseController { return redirectToHttps(request); } - try { - executeCheckPassword(request, response); - } catch (UnauthorizedException unauthorizedException) { - if (LOG.isDebugEnabled()) { - LOG.debug("Check password failed, going to check auth token, request: {}", request.toString()); + String authToken = request.getHeader("token"); + // if auth token is not null, check it first + if (!Strings.isNullOrEmpty(authToken)) { + if (!checkClusterToken(authToken)) { + throw new UnauthorizedException("Invalid token: " + authToken); } - - if (!checkClusterToken(request)) { - throw unauthorizedException; - } else { - return executeWithClusterToken(request, db, table, true); + return executeWithClusterToken(request, db, table, true); + } else { + try { + executeCheckPassword(request, response); + return executeWithoutPassword(request, response, db, table, true, groupCommit); + } finally { + ConnectContext.remove(); } } - - return executeWithoutPassword(request, response, db, table, true, groupCommit); } @RequestMapping(path = "/api/_http_stream", @@ -363,18 +364,8 @@ public class LoadAction extends RestBaseController { // AuditlogPlugin should be re-disigned carefully, and blow method focuses on // temporarily addressing the users' needs for audit logs. // So this function is not widely tested under general scenario - private boolean checkClusterToken(HttpServletRequest request) { - if (LOG.isDebugEnabled()) { - LOG.debug("Checking cluser token, request {}", request.toString()); - } - - String authToken = request.getHeader("token"); - - if (Strings.isNullOrEmpty(authToken)) { - return false; - } - - return Env.getCurrentEnv().getLoadManager().getTokenManager().checkAuthToken(authToken); + private boolean checkClusterToken(String token) { + return Env.getCurrentEnv().getLoadManager().getTokenManager().checkAuthToken(token); } // NOTE: This function can only be used for AuditlogPlugin stream load for now. @@ -388,6 +379,9 @@ public class LoadAction extends RestBaseController { ctx.setEnv(Env.getCurrentEnv()); ctx.setThreadLocalInfo(); ctx.setRemoteIP(request.getRemoteAddr()); + // set user to ADMIN_USER, so that we can get the proper resource tag + ctx.setQualifiedUser(Auth.ADMIN_USER); + ctx.setThreadLocalInfo(); String dbName = db; String tableName = table; @@ -444,8 +438,10 @@ public class LoadAction extends RestBaseController { return redirectView; } catch (Exception e) { - LOG.warn("Failed to execute stream load with cluster token, {}", e); + LOG.warn("Failed to execute stream load with cluster token, {}", e.getMessage(), e); return new RestBaseResult(e.getMessage()); + } finally { + ConnectContext.remove(); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java index 58d789f970..488e73f3ab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java @@ -28,9 +28,9 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.persist.gson.GsonUtils; -import org.apache.doris.plugin.AuditEvent; -import org.apache.doris.plugin.AuditEvent.EventType; -import org.apache.doris.plugin.StreamLoadAuditEvent; +import org.apache.doris.plugin.audit.AuditEvent; +import org.apache.doris.plugin.audit.AuditEvent.EventType; +import org.apache.doris.plugin.audit.StreamLoadAuditEvent; import org.apache.doris.system.Backend; import org.apache.doris.thrift.BackendService; import org.apache.doris.thrift.TNetworkAddress; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java index 1939e86f85..f3d2480351 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java @@ -41,8 +41,8 @@ import org.apache.doris.load.BrokerFileGroup; import org.apache.doris.load.BrokerFileGroupAggInfo; import org.apache.doris.load.EtlJobType; import org.apache.doris.load.FailMsg; -import org.apache.doris.plugin.AuditEvent; -import org.apache.doris.plugin.LoadAuditEvent; +import org.apache.doris.plugin.audit.AuditEvent; +import org.apache.doris.plugin.audit.LoadAuditEvent; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.OriginStatement; import org.apache.doris.qe.SessionVariable; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/Dialect.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/Dialect.java index 722a9ff7cc..9d9b0cb455 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/Dialect.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/Dialect.java @@ -36,25 +36,25 @@ public enum Dialect { */ PRESTO("presto"), /** - * Spark sql parser dialect + * Spark3 sql parser dialect */ - SPARK_SQL("spark_sql"), + SPARK("spark"), + /** + * Spark2 sql parser dialect + */ + SPARK2("spark2"), + /** + * Flink sql parser dialect + */ + FLINK("flink"), /** * Hive parser dialect */ HIVE("hive"), - /** - * Alibaba max compute parser dialect - */ - MAX_COMPUTE("max_compute"), - /** - * Mysql parser dialect - */ - MYSQL("mysql"), /** * Postgresql parser dialect */ - POSTGRESQL("postgresql"), + POSTGRES("postgres"), /** * Sqlserver parser dialect */ @@ -64,13 +64,9 @@ public enum Dialect { */ CLICKHOUSE("clickhouse"), /** - * Sap hana parser dialect + * oracle parser dialect */ - SAP_HANA("sap_hana"), - /** - * OceanBase parser dialect - */ - OCEANBASE("oceanbase"); + ORACLE("oracle"); public static final int MAX_DIALECT_SIZE = Dialect.values().length; diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditPlugin.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditPlugin.java index d9c9ec8469..55962a3dd1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditPlugin.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditPlugin.java @@ -17,6 +17,8 @@ package org.apache.doris.plugin; +import org.apache.doris.plugin.audit.AuditEvent; + /** * Audit plugin interface describe. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/PluginMgr.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/PluginMgr.java index 9a38bce017..7fddf54e1e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/PluginMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/PluginMgr.java @@ -27,7 +27,9 @@ import org.apache.doris.common.util.PrintableMap; import org.apache.doris.nereids.parser.Dialect; import org.apache.doris.plugin.PluginInfo.PluginType; import org.apache.doris.plugin.PluginLoader.PluginStatus; -import org.apache.doris.qe.AuditLogBuilder; +import org.apache.doris.plugin.audit.AuditLoaderPlugin; +import org.apache.doris.plugin.audit.AuditLogBuilder; +import org.apache.doris.plugin.dialect.HttpDialectConverterPlugin; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; @@ -104,12 +106,24 @@ public class PluginMgr implements Writable { } private void initBuiltinPlugins() { - // AuditLog + // AuditLog: log audit log to file AuditLogBuilder auditLogBuilder = new AuditLogBuilder(); if (!registerBuiltinPlugin(auditLogBuilder.getPluginInfo(), auditLogBuilder)) { LOG.warn("failed to register audit log builder"); } + // AuditLoader: log audit log to internal table + AuditLoaderPlugin auditLoaderPlugin = new AuditLoaderPlugin(); + if (!registerBuiltinPlugin(auditLoaderPlugin.getPluginInfo(), auditLoaderPlugin)) { + LOG.warn("failed to register audit log builder"); + } + + // sql dialect converter + HttpDialectConverterPlugin httpDialectConverterPlugin = new HttpDialectConverterPlugin(); + if (!registerBuiltinPlugin(httpDialectConverterPlugin.getPluginInfo(), httpDialectConverterPlugin)) { + LOG.warn("failed to register http dialect converter plugin"); + } + // other builtin plugins } @@ -217,11 +231,17 @@ public class PluginMgr implements Writable { } PluginLoader loader = new BuiltinPluginLoader(Config.plugin_dir, pluginInfo, plugin); - PluginLoader checkLoader = plugins[pluginInfo.getTypeId()].putIfAbsent(pluginInfo.getName(), loader); + try { + loader.install(); + } catch (Exception e) { + LOG.warn("failed to register builtin plugin {}", pluginInfo.getName(), e); + return false; + } // add dialect plugin if (plugin instanceof DialectConverterPlugin) { addDialectPlugin((DialectConverterPlugin) plugin, pluginInfo); } + PluginLoader checkLoader = plugins[pluginInfo.getTypeId()].putIfAbsent(pluginInfo.getName(), loader); return checkLoader == null; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditEvent.java similarity index 97% rename from fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java rename to fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditEvent.java index 732d33c5e1..5c122c9896 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditEvent.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.plugin; +package org.apache.doris.plugin.audit; import java.lang.annotation.Retention; @@ -56,6 +56,8 @@ public class AuditEvent { public String clientIp = ""; @AuditField(value = "User") public String user = ""; + @AuditField(value = "Ctl") + public String ctl = ""; @AuditField(value = "Db") public String db = ""; @AuditField(value = "State") @@ -133,6 +135,11 @@ public class AuditEvent { return this; } + public AuditEventBuilder setCtl(String ctl) { + auditEvent.ctl = ctl; + return this; + } + public AuditEventBuilder setDb(String db) { auditEvent.db = db; return this; diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java new file mode 100644 index 0000000000..533f50f062 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java @@ -0,0 +1,252 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.plugin.audit; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.util.DigitalVersion; +import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.plugin.AuditPlugin; +import org.apache.doris.plugin.Plugin; +import org.apache.doris.plugin.PluginContext; +import org.apache.doris.plugin.PluginException; +import org.apache.doris.plugin.PluginInfo; +import org.apache.doris.plugin.PluginInfo.PluginType; +import org.apache.doris.plugin.PluginMgr; +import org.apache.doris.qe.GlobalVariable; + +import com.google.common.collect.Queues; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CodingErrorAction; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +/* + * This plugin will load audit log to specified doris table at specified interval + */ +public class AuditLoaderPlugin extends Plugin implements AuditPlugin { + private static final Logger LOG = LogManager.getLogger(AuditLoaderPlugin.class); + + public static final String AUDIT_LOG_TABLE = "audit_log"; + + private static final DateTimeFormatter DATETIME_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS") + .withZone(ZoneId.systemDefault()); + + private StringBuilder auditLogBuffer = new StringBuilder(); + private int auditLogNum = 0; + private long lastLoadTimeAuditLog = 0; + // sometimes the audit log may fail to load to doris, count it to observe. + private long discardLogNum = 0; + + private BlockingQueue auditEventQueue; + private AuditStreamLoader streamLoader; + private Thread loadThread; + + private volatile boolean isClosed = false; + private volatile boolean isInit = false; + + private final PluginInfo pluginInfo; + + public AuditLoaderPlugin() { + pluginInfo = new PluginInfo(PluginMgr.BUILTIN_PLUGIN_PREFIX + "AuditLoader", PluginType.AUDIT, + "builtin audit loader, to load audit log to internal table", DigitalVersion.fromString("2.1.0"), + DigitalVersion.fromString("1.8.31"), AuditLoaderPlugin.class.getName(), null, null); + } + + public PluginInfo getPluginInfo() { + return pluginInfo; + } + + @Override + public void init(PluginInfo info, PluginContext ctx) throws PluginException { + super.init(info, ctx); + + synchronized (this) { + if (isInit) { + return; + } + this.lastLoadTimeAuditLog = System.currentTimeMillis(); + // make capacity large enough to avoid blocking. + // and it will not be too large because the audit log will flush if num in queue is larger than + // GlobalVariable.audit_plugin_max_batch_bytes. + this.auditEventQueue = Queues.newLinkedBlockingDeque(100000); + this.streamLoader = new AuditStreamLoader(); + this.loadThread = new Thread(new LoadWorker(this.streamLoader), "audit loader thread"); + this.loadThread.start(); + + isInit = true; + } + } + + @Override + public void close() throws IOException { + super.close(); + isClosed = true; + if (loadThread != null) { + try { + loadThread.join(); + } catch (InterruptedException e) { + LOG.debug("encounter exception when closing the audit loader", e); + } + } + } + + public boolean eventFilter(AuditEvent.EventType type) { + return type == AuditEvent.EventType.AFTER_QUERY; + } + + public void exec(AuditEvent event) { + if (!GlobalVariable.enableAuditLoader) { + LOG.debug("builtin audit loader is disabled, discard current audit event"); + return; + } + try { + auditEventQueue.add(event); + } catch (Exception e) { + // In order to ensure that the system can run normally, here we directly + // discard the current audit_event. If this problem occurs frequently, + // improvement can be considered. + ++discardLogNum; + LOG.debug("encounter exception when putting current audit batch, discard current audit event." + + " total discard num: {}", discardLogNum, e); + } + } + + private void assembleAudit(AuditEvent event) { + fillLogBuffer(event, auditLogBuffer); + ++auditLogNum; + } + + private void fillLogBuffer(AuditEvent event, StringBuilder logBuffer) { + logBuffer.append(event.queryId).append("\t"); + logBuffer.append(TimeUtils.longToTimeString(event.timestamp)).append("\t"); + logBuffer.append(event.clientIp).append("\t"); + logBuffer.append(event.user).append("\t"); + logBuffer.append(event.ctl).append("\t"); + logBuffer.append(event.db).append("\t"); + logBuffer.append(event.state).append("\t"); + logBuffer.append(event.errorCode).append("\t"); + logBuffer.append(event.errorMessage).append("\t"); + logBuffer.append(event.queryTime).append("\t"); + logBuffer.append(event.scanBytes).append("\t"); + logBuffer.append(event.scanRows).append("\t"); + logBuffer.append(event.returnRows).append("\t"); + logBuffer.append(event.stmtId).append("\t"); + logBuffer.append(event.isQuery ? 1 : 0).append("\t"); + logBuffer.append(event.feIp).append("\t"); + logBuffer.append(event.cpuTimeMs).append("\t"); + logBuffer.append(event.sqlHash).append("\t"); + logBuffer.append(event.sqlDigest).append("\t"); + logBuffer.append(event.peakMemoryBytes).append("\t"); + // trim the query to avoid too long + // use `getBytes().length` to get real byte length + String stmt = truncateByBytes(event.stmt).replace("\n", " ") + .replace("\t", " ") + .replace("\r", " "); + LOG.debug("receive audit event with stmt: {}", stmt); + logBuffer.append(stmt).append("\n"); + } + + private String truncateByBytes(String str) { + int maxLen = Math.min(GlobalVariable.auditPluginMaxSqlLength, str.getBytes().length); + if (maxLen >= str.getBytes().length) { + return str; + } + Charset utf8Charset = Charset.forName("UTF-8"); + CharsetDecoder decoder = utf8Charset.newDecoder(); + byte[] sb = str.getBytes(); + ByteBuffer buffer = ByteBuffer.wrap(sb, 0, maxLen); + CharBuffer charBuffer = CharBuffer.allocate(maxLen); + decoder.onMalformedInput(CodingErrorAction.IGNORE); + decoder.decode(buffer, charBuffer, true); + decoder.flush(charBuffer); + return new String(charBuffer.array(), 0, charBuffer.position()); + } + + private void loadIfNecessary(AuditStreamLoader loader) { + long currentTime = System.currentTimeMillis(); + + if (auditLogBuffer.length() >= GlobalVariable.auditPluginMaxBatchBytes + || currentTime - lastLoadTimeAuditLog >= GlobalVariable.auditPluginMaxBatchInternalSec * 1000) { + // begin to load + try { + String token = ""; + try { + // Acquire token from master + token = Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken(); + } catch (Exception e) { + LOG.warn("Failed to get auth token: {}", e); + discardLogNum += auditLogNum; + return; + } + AuditStreamLoader.LoadResponse response = loader.loadBatch(auditLogBuffer, token); + LOG.debug("audit loader response: {}", response); + } catch (Exception e) { + LOG.debug("encounter exception when putting current audit batch, discard current batch", e); + discardLogNum += auditLogNum; + } finally { + // make a new string builder to receive following events. + resetBatch(currentTime); + if (discardLogNum > 0) { + LOG.info("num of total discarded audit logs: {}", discardLogNum); + } + } + } + + return; + } + + private void resetBatch(long currentTime) { + this.auditLogBuffer = new StringBuilder(); + this.lastLoadTimeAuditLog = currentTime; + this.auditLogNum = 0; + } + + private class LoadWorker implements Runnable { + private AuditStreamLoader loader; + + public LoadWorker(AuditStreamLoader loader) { + this.loader = loader; + } + + public void run() { + while (!isClosed) { + try { + AuditEvent event = auditEventQueue.poll(5, TimeUnit.SECONDS); + if (event != null) { + assembleAudit(event); + // process all audit logs + loadIfNecessary(loader); + } + } catch (InterruptedException ie) { + LOG.debug("encounter exception when loading current audit batch", ie); + } catch (Exception e) { + LOG.error("run audit logger error:", e); + } + } + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java similarity index 97% rename from fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogBuilder.java rename to fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java index 1258d0b337..f06dfb6eef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java @@ -15,19 +15,18 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.qe; +package org.apache.doris.plugin.audit; import org.apache.doris.common.AuditLog; import org.apache.doris.common.Config; import org.apache.doris.common.util.DigitalVersion; -import org.apache.doris.plugin.AuditEvent; -import org.apache.doris.plugin.AuditEvent.AuditField; -import org.apache.doris.plugin.AuditEvent.EventType; import org.apache.doris.plugin.AuditPlugin; import org.apache.doris.plugin.Plugin; import org.apache.doris.plugin.PluginInfo; import org.apache.doris.plugin.PluginInfo.PluginType; import org.apache.doris.plugin.PluginMgr; +import org.apache.doris.plugin.audit.AuditEvent.AuditField; +import org.apache.doris.plugin.audit.AuditEvent.EventType; import com.google.common.collect.Maps; import com.google.common.collect.Sets; diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java new file mode 100644 index 0000000000..0717547827 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java @@ -0,0 +1,182 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.plugin.audit; + +import org.apache.doris.catalog.InternalSchemaInitializer; +import org.apache.doris.common.Config; +import org.apache.doris.common.FeConstants; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.BufferedOutputStream; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.Calendar; +import java.util.stream.Collectors; + +public class AuditStreamLoader { + private static final Logger LOG = LogManager.getLogger(AuditStreamLoader.class); + private static String loadUrlPattern = "http://%s/api/%s/%s/_stream_load?"; + private String hostPort; + private String db; + private String auditLogTbl; + private String auditLogLoadUrlStr; + private String feIdentity; + + public AuditStreamLoader() { + this.hostPort = "127.0.0.1:" + Config.http_port; + this.db = FeConstants.INTERNAL_DB_NAME; + this.auditLogTbl = AuditLoaderPlugin.AUDIT_LOG_TABLE; + this.auditLogLoadUrlStr = String.format(loadUrlPattern, hostPort, db, auditLogTbl); + // currently, FE identity is FE's IP, so we replace the "." in IP to make it suitable for label + this.feIdentity = hostPort.replaceAll("\\.", "_"); + } + + private HttpURLConnection getConnection(String urlStr, String label, String clusterToken) throws IOException { + URL url = new URL(urlStr); + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + conn.setInstanceFollowRedirects(false); + conn.setRequestMethod("PUT"); + conn.setRequestProperty("token", clusterToken); + conn.setRequestProperty("Authorization", "Basic "); + conn.addRequestProperty("Expect", "100-continue"); + conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8"); + conn.addRequestProperty("label", label); + conn.addRequestProperty("max_filter_ratio", "1.0"); + conn.addRequestProperty("columns", + InternalSchemaInitializer.AUDIT_TABLE_COLUMNS.stream().map(c -> c.getName()).collect( + Collectors.joining(","))); + conn.setDoOutput(true); + conn.setDoInput(true); + return conn; + } + + private String toCurl(HttpURLConnection conn) { + StringBuilder sb = new StringBuilder("curl -v "); + sb.append("-X ").append(conn.getRequestMethod()).append(" \\\n "); + sb.append("-H \"").append("Authorization\":").append("\"Basic ").append("\" \\\n "); + sb.append("-H \"").append("Expect\":").append("\"100-continue\" \\\n "); + sb.append("-H \"").append("Content-Type\":").append("\"text/plain; charset=UTF-8\" \\\n "); + sb.append("-H \"").append("max_filter_ratio\":").append("\"1.0\" \\\n "); + sb.append("-H \"").append("columns\":") + .append("\"" + InternalSchemaInitializer.AUDIT_TABLE_COLUMNS.stream().map(c -> c.getName()).collect( + Collectors.joining(",")) + "\" \\\n "); + sb.append("\"").append(conn.getURL()).append("\""); + return sb.toString(); + } + + private String getContent(HttpURLConnection conn) { + BufferedReader br = null; + StringBuilder response = new StringBuilder(); + String line; + try { + if (100 <= conn.getResponseCode() && conn.getResponseCode() <= 399) { + br = new BufferedReader(new InputStreamReader(conn.getInputStream())); + } else { + br = new BufferedReader(new InputStreamReader(conn.getErrorStream())); + } + while ((line = br.readLine()) != null) { + response.append(line); + } + } catch (IOException e) { + LOG.warn("get content error,", e); + } + + return response.toString(); + } + + public LoadResponse loadBatch(StringBuilder sb, String clusterToken) { + Calendar calendar = Calendar.getInstance(); + String label = String.format("_log_%s%02d%02d_%02d%02d%02d_%s", + calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH), + calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND), + feIdentity); + + HttpURLConnection feConn = null; + HttpURLConnection beConn = null; + try { + // build request and send to fe + label = "audit" + label; + feConn = getConnection(auditLogLoadUrlStr, label, clusterToken); + int status = feConn.getResponseCode(); + // fe send back http response code TEMPORARY_REDIRECT 307 and new be location + if (status != 307) { + throw new Exception("status is not TEMPORARY_REDIRECT 307, status: " + status + + ", response: " + getContent(feConn) + ", request is: " + toCurl(feConn)); + } + String location = feConn.getHeaderField("Location"); + if (location == null) { + throw new Exception("redirect location is null"); + } + // build request and send to new be location + beConn = getConnection(location, label, clusterToken); + // send data to be + try (BufferedOutputStream bos = new BufferedOutputStream(beConn.getOutputStream())) { + bos.write(sb.toString().getBytes()); + } + + // get respond + status = beConn.getResponseCode(); + String respMsg = beConn.getResponseMessage(); + String response = getContent(beConn); + + LOG.info("AuditLoader plugin load with label: {}, response code: {}, msg: {}, content: {}", + label, status, respMsg, response); + + return new LoadResponse(status, respMsg, response); + + } catch (Exception e) { + e.printStackTrace(); + String err = "failed to load audit via AuditLoader plugin with label: " + label; + LOG.warn(err, e); + return new LoadResponse(-1, e.getMessage(), err); + } finally { + if (feConn != null) { + feConn.disconnect(); + } + if (beConn != null) { + beConn.disconnect(); + } + } + } + + public static class LoadResponse { + public int status; + public String respMsg; + public String respContent; + + public LoadResponse(int status, String respMsg, String respContent) { + this.status = status; + this.respMsg = respMsg; + this.respContent = respContent; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("status: ").append(status); + sb.append(", resp msg: ").append(respMsg); + sb.append(", resp content: ").append(respContent); + return sb.toString(); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/LoadAuditEvent.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/LoadAuditEvent.java similarity index 99% rename from fe/fe-core/src/main/java/org/apache/doris/plugin/LoadAuditEvent.java rename to fe/fe-core/src/main/java/org/apache/doris/plugin/audit/LoadAuditEvent.java index 704ec3ad03..eb3e098bf4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/LoadAuditEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/LoadAuditEvent.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.plugin; +package org.apache.doris.plugin.audit; public class LoadAuditEvent extends AuditEvent { diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/StreamLoadAuditEvent.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/StreamLoadAuditEvent.java similarity index 99% rename from fe/fe-core/src/main/java/org/apache/doris/plugin/StreamLoadAuditEvent.java rename to fe/fe-core/src/main/java/org/apache/doris/plugin/audit/StreamLoadAuditEvent.java index 04b15b9264..8733a59656 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/StreamLoadAuditEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/StreamLoadAuditEvent.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.plugin; +package org.apache.doris.plugin.audit; public class StreamLoadAuditEvent extends AuditEvent { diff --git a/fe_plugins/http-dialect-converter/src/main/java/org/apache/doris/plugin/dialect/http/HttpDialectConverterPlugin.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectConverterPlugin.java similarity index 56% rename from fe_plugins/http-dialect-converter/src/main/java/org/apache/doris/plugin/dialect/http/HttpDialectConverterPlugin.java rename to fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectConverterPlugin.java index a21e501839..29dca027c6 100644 --- a/fe_plugins/http-dialect-converter/src/main/java/org/apache/doris/plugin/dialect/http/HttpDialectConverterPlugin.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectConverterPlugin.java @@ -15,32 +15,27 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.plugin.dialect.http; +package org.apache.doris.plugin.dialect; import org.apache.doris.analysis.StatementBase; +import org.apache.doris.common.util.DigitalVersion; import org.apache.doris.nereids.parser.Dialect; import org.apache.doris.plugin.DialectConverterPlugin; import org.apache.doris.plugin.Plugin; import org.apache.doris.plugin.PluginContext; import org.apache.doris.plugin.PluginException; import org.apache.doris.plugin.PluginInfo; +import org.apache.doris.plugin.PluginInfo.PluginType; +import org.apache.doris.plugin.PluginMgr; +import org.apache.doris.qe.GlobalVariable; import org.apache.doris.qe.SessionVariable; -import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableSet; import java.io.IOException; -import java.io.InputStream; -import java.nio.file.FileSystems; -import java.nio.file.Files; -import java.nio.file.Path; import java.util.Arrays; import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Properties; -import java.util.function.Function; -import java.util.stream.Collectors; import javax.annotation.Nullable; /** @@ -72,57 +67,29 @@ import javax.annotation.Nullable; public class HttpDialectConverterPlugin extends Plugin implements DialectConverterPlugin { private volatile boolean isInit = false; - private volatile boolean isClosed = false; - private volatile String targetURL = null; - private volatile ImmutableSet acceptDialects = null; + private volatile ImmutableSet acceptDialects; + private final PluginInfo pluginInfo; + + public HttpDialectConverterPlugin() { + pluginInfo = new PluginInfo(PluginMgr.BUILTIN_PLUGIN_PREFIX + "SqlDialectConverter", PluginType.DIALECT, + "builtin sql dialect converter", DigitalVersion.fromString("2.1.0"), + DigitalVersion.fromString("1.8.31"), HttpDialectConverterPlugin.class.getName(), null, null); + acceptDialects = ImmutableSet.copyOf(Arrays.asList(Dialect.PRESTO, Dialect.TRINO, Dialect.HIVE, + Dialect.SPARK, Dialect.POSTGRES, Dialect.CLICKHOUSE)); + } + + public PluginInfo getPluginInfo() { + return pluginInfo; + } @Override public void init(PluginInfo info, PluginContext ctx) throws PluginException { super.init(info, ctx); - - synchronized (this) { - if (isInit) { - return; - } - loadConfig(ctx, info.getProperties()); - isInit = true; - } - } - - private void loadConfig(PluginContext ctx, Map pluginInfoProperties) throws PluginException { - Path pluginPath = FileSystems.getDefault().getPath(ctx.getPluginPath()); - if (!Files.exists(pluginPath)) { - throw new PluginException("plugin path does not exist: " + pluginPath); - } - - Path confFile = pluginPath.resolve("plugin.conf"); - if (!Files.exists(confFile)) { - throw new PluginException("plugin conf file does not exist: " + confFile); - } - - final Properties props = new Properties(); - try (InputStream stream = Files.newInputStream(confFile)) { - props.load(stream); - } catch (IOException e) { - throw new PluginException(e.getMessage()); - } - - for (Map.Entry entry : pluginInfoProperties.entrySet()) { - props.setProperty(entry.getKey(), entry.getValue()); - } - - final Map properties = props.stringPropertyNames().stream() - .collect(Collectors.toMap(Function.identity(), props::getProperty)); - targetURL = properties.get("target_url"); - String acceptDialectsStr = Objects.requireNonNull(properties.get("accept_dialects")); - acceptDialects = ImmutableSet.copyOf(Arrays.stream(acceptDialectsStr.split(",")) - .map(Dialect::getByName).collect(Collectors.toSet())); } @Override public void close() throws IOException { super.close(); - isClosed = true; } @Override @@ -132,8 +99,11 @@ public class HttpDialectConverterPlugin extends Plugin implements DialectConvert @Override public @Nullable String convertSql(String originSql, SessionVariable sessionVariable) { - Preconditions.checkNotNull(targetURL); - return HttpDialectUtils.convertSql(targetURL, originSql); + String targetURL = GlobalVariable.sqlConverterServiceUrl; + if (Strings.isNullOrEmpty(targetURL)) { + return null; + } + return HttpDialectUtils.convertSql(targetURL, originSql, sessionVariable.getSqlDialect()); } // no need to override parseSqlWithDialect, just return null diff --git a/fe_plugins/http-dialect-converter/src/main/java/org/apache/doris/plugin/dialect/http/HttpDialectUtils.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectUtils.java similarity index 98% rename from fe_plugins/http-dialect-converter/src/main/java/org/apache/doris/plugin/dialect/http/HttpDialectUtils.java rename to fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectUtils.java index 73c2f47067..39c6417988 100644 --- a/fe_plugins/http-dialect-converter/src/main/java/org/apache/doris/plugin/dialect/http/HttpDialectUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectUtils.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.plugin.dialect.http; +package org.apache.doris.plugin.dialect; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; @@ -38,8 +38,8 @@ import java.nio.charset.StandardCharsets; public class HttpDialectUtils { private static final Logger LOG = LogManager.getLogger(HttpDialectUtils.class); - public static String convertSql(String targetURL, String originStmt) { - ConvertRequest convertRequest = new ConvertRequest(originStmt, "presto"); + public static String convertSql(String targetURL, String originStmt, String dialect) { + ConvertRequest convertRequest = new ConvertRequest(originStmt, dialect); HttpURLConnection connection = null; try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditEventProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditEventProcessor.java index c116e7c16a..f9ab35f9c2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditEventProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditEventProcessor.java @@ -17,11 +17,11 @@ package org.apache.doris.qe; -import org.apache.doris.plugin.AuditEvent; import org.apache.doris.plugin.AuditPlugin; import org.apache.doris.plugin.Plugin; import org.apache.doris.plugin.PluginInfo.PluginType; import org.apache.doris.plugin.PluginMgr; +import org.apache.doris.plugin.audit.AuditEvent; import com.google.common.collect.Queues; import org.apache.logging.log4j.LogManager; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java index 83cd1d401f..7b6e86ca3a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java @@ -24,8 +24,11 @@ import org.apache.doris.catalog.Env; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.Config; import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.metric.MetricRepo; -import org.apache.doris.plugin.AuditEvent.EventType; +import org.apache.doris.plugin.audit.AuditEvent.AuditEventBuilder; +import org.apache.doris.plugin.audit.AuditEvent.EventType; import org.apache.doris.qe.QueryState.MysqlStateType; import org.apache.doris.service.FrontendOptions; @@ -39,8 +42,17 @@ public class AuditLogHelper { // slow query long endTime = System.currentTimeMillis(); long elapseMs = endTime - ctx.getStartTime(); + CatalogIf catalog = ctx.getCurrentCatalog(); - ctx.getAuditEventBuilder().setEventType(EventType.AFTER_QUERY) + AuditEventBuilder auditEventBuilder = ctx.getAuditEventBuilder(); + auditEventBuilder.reset(); + auditEventBuilder + .setTimestamp(ctx.getStartTime()) + .setClientIp(ctx.getClientIP()) + .setUser(ClusterNamespace.getNameFromFullName(ctx.getQualifiedUser())) + .setSqlHash(ctx.getSqlHash()) + .setEventType(EventType.AFTER_QUERY) + .setCtl(catalog == null ? InternalCatalog.INTERNAL_CATALOG_NAME : catalog.getName()) .setDb(ClusterNamespace.getNameFromFullName(ctx.getDatabase())) .setState(ctx.getState().toString()) .setErrorCode(ctx.getState().getErrorCode() == null ? 0 : ctx.getState().getErrorCode().getCode()) @@ -73,10 +85,10 @@ public class AuditLogHelper { if (elapseMs > Config.qe_slow_log_ms) { String sqlDigest = DigestUtils.md5Hex(((Queriable) parsedStmt).toDigest()); - ctx.getAuditEventBuilder().setSqlDigest(sqlDigest); + auditEventBuilder.setSqlDigest(sqlDigest); } } - ctx.getAuditEventBuilder().setIsQuery(true); + auditEventBuilder.setIsQuery(true); if (ctx.getQueryDetail() != null) { ctx.getQueryDetail().setEventTime(endTime); ctx.getQueryDetail().setEndTime(endTime); @@ -90,35 +102,35 @@ public class AuditLogHelper { ctx.setQueryDetail(null); } } else { - ctx.getAuditEventBuilder().setIsQuery(false); + auditEventBuilder.setIsQuery(false); } - ctx.getAuditEventBuilder().setIsNereids(ctx.getState().isNereids); + auditEventBuilder.setIsNereids(ctx.getState().isNereids); - ctx.getAuditEventBuilder().setFeIp(FrontendOptions.getLocalHostAddress()); + auditEventBuilder.setFeIp(FrontendOptions.getLocalHostAddress()); // We put origin query stmt at the end of audit log, for parsing the log more convenient. if (!ctx.getState().isQuery() && (parsedStmt != null && parsedStmt.needAuditEncryption())) { - ctx.getAuditEventBuilder().setStmt(parsedStmt.toSql()); + auditEventBuilder.setStmt(parsedStmt.toSql()); } else { if (parsedStmt instanceof InsertStmt && !((InsertStmt) parsedStmt).needLoadManager() && ((InsertStmt) parsedStmt).isValuesOrConstantSelect()) { // INSERT INTO VALUES may be very long, so we only log at most 1K bytes. int length = Math.min(1024, origStmt.length()); - ctx.getAuditEventBuilder().setStmt(origStmt.substring(0, length)); + auditEventBuilder.setStmt(origStmt.substring(0, length)); } else { - ctx.getAuditEventBuilder().setStmt(origStmt); + auditEventBuilder.setStmt(origStmt); } } if (!Env.getCurrentEnv().isMaster()) { if (ctx.executor.isForwardToMaster()) { - ctx.getAuditEventBuilder().setState(ctx.executor.getProxyStatus()); + auditEventBuilder.setState(ctx.executor.getProxyStatus()); int proxyStatusCode = ctx.executor.getProxyStatusCode(); if (proxyStatusCode != 0) { - ctx.getAuditEventBuilder().setErrorCode(proxyStatusCode); - ctx.getAuditEventBuilder().setErrorMessage(ctx.executor.getProxyErrMsg()); + auditEventBuilder.setErrorCode(proxyStatusCode); + auditEventBuilder.setErrorMessage(ctx.executor.getProxyErrMsg()); } } } - Env.getCurrentEnv().getWorkloadRuntimeStatusMgr().submitFinishQueryToAudit(ctx.getAuditEventBuilder().build()); + Env.getCurrentEnv().getWorkloadRuntimeStatusMgr().submitFinishQueryToAudit(auditEventBuilder.build()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index 8a1a16999b..cb021ce83e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -48,7 +48,7 @@ import org.apache.doris.mysql.MysqlSslContext; import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.stats.StatsErrorEstimator; import org.apache.doris.nereids.trees.expressions.literal.Literal; -import org.apache.doris.plugin.AuditEvent.AuditEventBuilder; +import org.apache.doris.plugin.audit.AuditEvent.AuditEventBuilder; import org.apache.doris.resource.Tag; import org.apache.doris.service.arrowflight.results.FlightSqlChannel; import org.apache.doris.statistics.ColumnStatistic; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java index e4d8f8273f..b297d192c4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -28,7 +28,6 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.TableIf; -import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; @@ -181,12 +180,6 @@ public abstract class ConnectProcessor { String convertedStmt = convertOriginStmt(originStmt); String sqlHash = DigestUtils.md5Hex(convertedStmt); ctx.setSqlHash(sqlHash); - ctx.getAuditEventBuilder().reset(); - ctx.getAuditEventBuilder() - .setTimestamp(System.currentTimeMillis()) - .setClientIp(ctx.getClientIP()) - .setUser(ClusterNamespace.getNameFromFullName(ctx.getQualifiedUser())) - .setSqlHash(ctx.getSqlHash()); List stmts = null; Exception nereidsParseException = null; @@ -291,7 +284,7 @@ public abstract class ConnectProcessor { private String convertOriginStmt(String originStmt) { String convertedStmt = originStmt; @Nullable Dialect sqlDialect = Dialect.getByName(ctx.getSessionVariable().getSqlDialect()); - if (sqlDialect != null) { + if (sqlDialect != null && sqlDialect != Dialect.DORIS) { PluginMgr pluginMgr = Env.getCurrentEnv().getPluginMgr(); List plugins = pluginMgr.getActiveDialectPluginList(sqlDialect); for (DialectConverterPlugin plugin : plugins) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/GlobalVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/GlobalVariable.java index 419bb1377a..a8374a7b44 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/GlobalVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/GlobalVariable.java @@ -50,6 +50,12 @@ public final class GlobalVariable { public static final long VALIDATE_PASSWORD_POLICY_DISABLED = 0; public static final long VALIDATE_PASSWORD_POLICY_STRONG = 2; + public static final String SQL_CONVERTER_SERVICE_URL = "sql_converter_service_url"; + public static final String ENABLE_AUDIT_PLUGIN = "enable_audit_plugin"; + public static final String AUDIT_PLUGIN_MAX_BATCH_BYTES = "audit_plugin_max_batch_bytes"; + public static final String AUDIT_PLUGIN_MAX_BATCH_INTERVAL_SEC = "audit_plugin_max_batch_interval_sec"; + public static final String AUDIT_PLUGIN_MAX_SQL_LENGTH = "audit_plugin_max_sql_length"; + @VariableMgr.VarAttr(name = VERSION_COMMENT, flag = VariableMgr.READ_ONLY) public static String versionComment = "Doris version " + Version.DORIS_BUILD_VERSION + "-" + Version.DORIS_BUILD_SHORT_HASH; @@ -104,7 +110,22 @@ public final class GlobalVariable { @VariableMgr.VarAttr(name = SHOW_FULL_DBNAME_IN_INFO_SCHEMA_DB, flag = VariableMgr.GLOBAL) public static boolean showFullDbNameInInfoSchemaDb = false; - // Don't allow to create instance. + @VariableMgr.VarAttr(name = SQL_CONVERTER_SERVICE_URL, flag = VariableMgr.GLOBAL) + public static String sqlConverterServiceUrl = ""; + + @VariableMgr.VarAttr(name = ENABLE_AUDIT_PLUGIN, flag = VariableMgr.GLOBAL) + public static boolean enableAuditLoader = false; + + @VariableMgr.VarAttr(name = AUDIT_PLUGIN_MAX_BATCH_BYTES, flag = VariableMgr.GLOBAL) + public static long auditPluginMaxBatchBytes = 50 * 1024 * 1024; + + @VariableMgr.VarAttr(name = AUDIT_PLUGIN_MAX_BATCH_INTERVAL_SEC, flag = VariableMgr.GLOBAL) + public static long auditPluginMaxBatchInternalSec = 60; + + @VariableMgr.VarAttr(name = AUDIT_PLUGIN_MAX_SQL_LENGTH, flag = VariableMgr.GLOBAL) + public static int auditPluginMaxSqlLength = 4096; + + // Don't allow creating instance. private GlobalVariable() { } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java index 085d844e61..1f8cef9271 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java @@ -20,7 +20,7 @@ package org.apache.doris.resource.workloadschedpolicy; import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; import org.apache.doris.common.util.Daemon; -import org.apache.doris.plugin.AuditEvent; +import org.apache.doris.plugin.audit.AuditEvent; import org.apache.doris.thrift.TQueryStatistics; import org.apache.doris.thrift.TReportWorkloadRuntimeStatusParams; diff --git a/fe/fe-core/src/test/java/org/apache/doris/plugin/DialectPluginTest.java b/fe/fe-core/src/test/java/org/apache/doris/plugin/DialectPluginTest.java index 2f0a720ca7..b717b2bbf3 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/plugin/DialectPluginTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/plugin/DialectPluginTest.java @@ -61,7 +61,7 @@ public class DialectPluginTest extends TestWithFeService { @Test public void testSparkPlugin() { - ConnectContext.get().getSessionVariable().setSqlDialect(Dialect.SPARK_SQL.getDialectName()); + ConnectContext.get().getSessionVariable().setSqlDialect(Dialect.SPARK.getDialectName()); NereidsParser parser = new NereidsParser(); List stmts = parser.parseSQL(TEST_SQL, ConnectContext.get().getSessionVariable()); Assertions.assertEquals(1, stmts.size()); diff --git a/fe_plugins/http-dialect-converter/src/test/java/org/apache/doris/plugin/dialect/http/HttpDialectUtilsTest.java b/fe/fe-core/src/test/java/org/apache/doris/plugin/HttpDialectUtilsTest.java similarity index 90% rename from fe_plugins/http-dialect-converter/src/test/java/org/apache/doris/plugin/dialect/http/HttpDialectUtilsTest.java rename to fe/fe-core/src/test/java/org/apache/doris/plugin/HttpDialectUtilsTest.java index 532d660f43..ac40c35376 100644 --- a/fe_plugins/http-dialect-converter/src/test/java/org/apache/doris/plugin/dialect/http/HttpDialectUtilsTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/plugin/HttpDialectUtilsTest.java @@ -15,7 +15,9 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.plugin.dialect.http; +package org.apache.doris.plugin; + +import org.apache.doris.plugin.dialect.HttpDialectUtils; import org.junit.After; import org.junit.Assert; @@ -52,20 +54,20 @@ public class HttpDialectUtilsTest { String expectedSql = "select * from t1 where `k1` = 1"; String targetURL = "http://127.0.0.1:" + port + "/api/v1/convert"; - String res = HttpDialectUtils.convertSql(targetURL, originSql); + String res = HttpDialectUtils.convertSql(targetURL, originSql, "presto"); Assert.assertEquals(originSql, res); // test presto server.setResponse("{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); - res = HttpDialectUtils.convertSql(targetURL, originSql); + res = HttpDialectUtils.convertSql(targetURL, originSql, "presto"); Assert.assertEquals(expectedSql, res); // test response version error server.setResponse("{\"version\": \"v2\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); - res = HttpDialectUtils.convertSql(targetURL, originSql); + res = HttpDialectUtils.convertSql(targetURL, originSql, "presto"); Assert.assertEquals(originSql, res); // 7. test response code error server.setResponse( "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 400, \"message\": \"\"}"); - res = HttpDialectUtils.convertSql(targetURL, originSql); + res = HttpDialectUtils.convertSql(targetURL, originSql, "presto"); Assert.assertEquals(originSql, res); } diff --git a/fe_plugins/http-dialect-converter/src/test/java/org/apache/doris/plugin/dialect/http/SimpleHttpServer.java b/fe/fe-core/src/test/java/org/apache/doris/plugin/SimpleHttpServer.java similarity index 98% rename from fe_plugins/http-dialect-converter/src/test/java/org/apache/doris/plugin/dialect/http/SimpleHttpServer.java rename to fe/fe-core/src/test/java/org/apache/doris/plugin/SimpleHttpServer.java index 10ae33e435..dd6ad991e0 100644 --- a/fe_plugins/http-dialect-converter/src/test/java/org/apache/doris/plugin/dialect/http/SimpleHttpServer.java +++ b/fe/fe-core/src/test/java/org/apache/doris/plugin/SimpleHttpServer.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.plugin.dialect.http; +package org.apache.doris.plugin; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; diff --git a/fe/fe-core/src/test/java/org/apache/doris/plugin/TestDialectPlugin1.java b/fe/fe-core/src/test/java/org/apache/doris/plugin/TestDialectPlugin1.java index 05a6b34b97..27fc36b9c5 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/plugin/TestDialectPlugin1.java +++ b/fe/fe-core/src/test/java/org/apache/doris/plugin/TestDialectPlugin1.java @@ -32,7 +32,7 @@ public class TestDialectPlugin1 extends Plugin implements DialectConverterPlugin @Override public ImmutableSet acceptDialects() { - return ImmutableSet.of(Dialect.SPARK_SQL); + return ImmutableSet.of(Dialect.SPARK); } @Override diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/AuditEventProcessorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/AuditEventProcessorTest.java index 465bbef3d6..26c51fb66d 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/AuditEventProcessorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/AuditEventProcessorTest.java @@ -19,9 +19,10 @@ package org.apache.doris.qe; import org.apache.doris.catalog.Env; import org.apache.doris.common.util.DigitalVersion; -import org.apache.doris.plugin.AuditEvent; -import org.apache.doris.plugin.AuditEvent.EventType; import org.apache.doris.plugin.PluginInfo; +import org.apache.doris.plugin.audit.AuditEvent; +import org.apache.doris.plugin.audit.AuditEvent.EventType; +import org.apache.doris.plugin.audit.AuditLogBuilder; import org.apache.doris.utframe.UtFrameUtils; import org.junit.AfterClass; diff --git a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java index 4c5586ed88..ed4b7efc65 100755 --- a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java +++ b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java @@ -219,7 +219,7 @@ public class AuditLoaderPlugin extends Plugin implements AuditPlugin { } catch (Exception e) { LOG.error("Failed to get auth token: {}", e); } - } + } DorisStreamLoader.LoadResponse response = loader.loadBatch(logBuffer, slowLog, token); LOG.debug("audit loader response: {}", response); } catch (Exception e) { diff --git a/fe_plugins/http-dialect-converter/pom.xml b/fe_plugins/http-dialect-converter/pom.xml deleted file mode 100644 index 2486ec1b24..0000000000 --- a/fe_plugins/http-dialect-converter/pom.xml +++ /dev/null @@ -1,119 +0,0 @@ - - - - - org.apache.doris - fe-plugins - 1.0-SNAPSHOT - ../pom.xml - - 4.0.0 - http-dialect-converter - jar - - - org.apache.doris - fe-core - ${doris.version} - provided - - - org.apache.doris - fe-common - ${doris.version} - provided - - - org.projectlombok - lombok - ${lombok.version} - provided - - - - org.apache.logging.log4j - log4j-api - - - - org.apache.logging.log4j - log4j-core - - - - org.apache.logging.log4j - log4j-slf4j-impl - - - - log4j - log4j - - - - org.junit.jupiter - junit-jupiter-engine - test - - - - org.junit.vintage - junit-vintage-engine - test - - - - org.junit.jupiter - junit-jupiter-params - test - - - - org.jmockit - jmockit - test - - - - presto-converter - - - maven-assembly-plugin - 2.4.1 - - false - - src/main/assembly/zip.xml - - - - - make-assembly - package - - single - - - - - - - diff --git a/fe_plugins/http-dialect-converter/src/main/assembly/plugin.conf b/fe_plugins/http-dialect-converter/src/main/assembly/plugin.conf deleted file mode 100755 index cb29828039..0000000000 --- a/fe_plugins/http-dialect-converter/src/main/assembly/plugin.conf +++ /dev/null @@ -1,22 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -### plugin configuration -# Replace the target url, set your sql converter service url here -target_url=http://127.0.0.1:8080/api/v1/sql/convert -# Replace the dialects if you need, use comma to split the value, for instance: presto,trino,hive -accept_dialects=presto \ No newline at end of file diff --git a/fe_plugins/http-dialect-converter/src/main/assembly/plugin.properties b/fe_plugins/http-dialect-converter/src/main/assembly/plugin.properties deleted file mode 100755 index d06bf1c6ac..0000000000 --- a/fe_plugins/http-dialect-converter/src/main/assembly/plugin.properties +++ /dev/null @@ -1,23 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -name=HttpDialectConverter -type=DIALECT -description=SQL dialect converter plugin using http protocol. -version=1.0.0 -java.version=1.8.0 -classname=org.apache.doris.plugin.dialect.http.HttpDialectConverterPlugin diff --git a/fe_plugins/http-dialect-converter/src/main/assembly/zip.xml b/fe_plugins/http-dialect-converter/src/main/assembly/zip.xml deleted file mode 100644 index 515e68751b..0000000000 --- a/fe_plugins/http-dialect-converter/src/main/assembly/zip.xml +++ /dev/null @@ -1,43 +0,0 @@ - - - - plugin - - zip - - false - - - target - - *.jar - - / - - - src/main/assembly - - plugin.properties - plugin.conf - - / - - - diff --git a/fe_plugins/pom.xml b/fe_plugins/pom.xml index 9ace25e271..ef2ade6f67 100644 --- a/fe_plugins/pom.xml +++ b/fe_plugins/pom.xml @@ -65,7 +65,6 @@ under the License. auditdemo auditloader - http-dialect-converter trino-converter sparksql-converter