From 7c493b08c5db6cb45949d14bd90044b3f3a1742a Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Fri, 12 Jan 2024 20:40:45 +0800 Subject: [PATCH] [refactor](dialect) make http sql converter plugin and audit loader as builtin plugin (#29692) Followup #28890 Make HttpSqlConverterPlugin and AuditLoader as Doris' builtin plugin. To make it simple for user to support sql dialect and using audit loader. HttpSqlConverterPlugin By default, there is nothing changed. There is a new global variable sql_converter_service, default is empty, if set, the HttpSqlConverterPlugin will be enabled set global sql_converter_service = "http://127.0.0.1:5001/api/v1/convert" AuditLoader By default, there is nothing changed. There is a new global variable enable_audit_plugin, default is false, if set to true, the audit loader plugin will be enable. Doris will create audit_log in __internal_schema when startup If enable_audit_plugin is true, the audit load will be inserted into audit_log table. 3 other global variables related to this plugin: audit_plugin_max_batch_interval_sec: The max interval for audit loader to insert a batch of audit log. audit_plugin_max_batch_bytes: The max batch size for audit loader to insert a batch of audit log. audit_plugin_max_sql_length: The max length of statement in audit log --- .../java/org/apache/doris/common/Config.java | 7 - .../apache/doris/analysis/InlineViewRef.java | 2 +- .../catalog/InternalSchemaInitializer.java | 82 +++++- .../apache/doris/httpv2/rest/LoadAction.java | 46 ++-- .../doris/load/StreamLoadRecordMgr.java | 6 +- .../apache/doris/load/loadv2/BulkLoadJob.java | 4 +- .../apache/doris/nereids/parser/Dialect.java | 30 +-- .../org/apache/doris/plugin/AuditPlugin.java | 2 + .../org/apache/doris/plugin/PluginMgr.java | 26 +- .../doris/plugin/{ => audit}/AuditEvent.java | 9 +- .../doris/plugin/audit/AuditLoaderPlugin.java | 252 ++++++++++++++++++ .../{qe => plugin/audit}/AuditLogBuilder.java | 7 +- .../doris/plugin/audit/AuditStreamLoader.java | 182 +++++++++++++ .../plugin/{ => audit}/LoadAuditEvent.java | 2 +- .../{ => audit}/StreamLoadAuditEvent.java | 2 +- .../dialect}/HttpDialectConverterPlugin.java | 80 ++---- .../plugin/dialect}/HttpDialectUtils.java | 6 +- .../apache/doris/qe/AuditEventProcessor.java | 2 +- .../org/apache/doris/qe/AuditLogHelper.java | 40 ++- .../org/apache/doris/qe/ConnectContext.java | 2 +- .../org/apache/doris/qe/ConnectProcessor.java | 9 +- .../org/apache/doris/qe/GlobalVariable.java | 23 +- .../WorkloadRuntimeStatusMgr.java | 2 +- .../doris/plugin/DialectPluginTest.java | 2 +- .../doris/plugin}/HttpDialectUtilsTest.java | 12 +- .../doris/plugin}/SimpleHttpServer.java | 2 +- .../doris/plugin/TestDialectPlugin1.java | 2 +- .../doris/qe/AuditEventProcessorTest.java | 5 +- .../doris/plugin/audit/AuditLoaderPlugin.java | 2 +- fe_plugins/http-dialect-converter/pom.xml | 119 --------- .../src/main/assembly/plugin.conf | 22 -- .../src/main/assembly/plugin.properties | 23 -- .../src/main/assembly/zip.xml | 43 --- fe_plugins/pom.xml | 1 - 34 files changed, 686 insertions(+), 370 deletions(-) rename fe/fe-core/src/main/java/org/apache/doris/plugin/{ => audit}/AuditEvent.java (97%) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java rename fe/fe-core/src/main/java/org/apache/doris/{qe => plugin/audit}/AuditLogBuilder.java (97%) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java rename fe/fe-core/src/main/java/org/apache/doris/plugin/{ => audit}/LoadAuditEvent.java (99%) rename fe/fe-core/src/main/java/org/apache/doris/plugin/{ => audit}/StreamLoadAuditEvent.java (99%) rename {fe_plugins/http-dialect-converter/src/main/java/org/apache/doris/plugin/dialect/http => fe/fe-core/src/main/java/org/apache/doris/plugin/dialect}/HttpDialectConverterPlugin.java (56%) rename {fe_plugins/http-dialect-converter/src/main/java/org/apache/doris/plugin/dialect/http => fe/fe-core/src/main/java/org/apache/doris/plugin/dialect}/HttpDialectUtils.java (98%) rename {fe_plugins/http-dialect-converter/src/test/java/org/apache/doris/plugin/dialect/http => fe/fe-core/src/test/java/org/apache/doris/plugin}/HttpDialectUtilsTest.java (90%) rename {fe_plugins/http-dialect-converter/src/test/java/org/apache/doris/plugin/dialect/http => fe/fe-core/src/test/java/org/apache/doris/plugin}/SimpleHttpServer.java (98%) delete mode 100644 fe_plugins/http-dialect-converter/pom.xml delete mode 100755 fe_plugins/http-dialect-converter/src/main/assembly/plugin.conf delete mode 100755 fe_plugins/http-dialect-converter/src/main/assembly/plugin.properties delete mode 100644 fe_plugins/http-dialect-converter/src/main/assembly/zip.xml diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 2494465e05..1ddd9ebe53 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2398,13 +2398,6 @@ public class Config extends ConfigBase { "Whether to enable the function of getting log files through http interface"}) public static boolean enable_get_log_file_api = false; - // This config is deprecated and has not taken effect anymore, - // please use dialect plugin: fe_plugins/http-dialect-converter for instead - @Deprecated - @ConfField(description = {"用于SQL方言转换的服务地址。", - "The service address for SQL dialect conversion."}) - public static String sql_convertor_service = ""; - @ConfField(mutable = true) public static boolean enable_profile_when_analyze = false; @ConfField(mutable = true) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/InlineViewRef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/InlineViewRef.java index fb5b8072ed..178b0d0e14 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/InlineViewRef.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/InlineViewRef.java @@ -200,7 +200,7 @@ public class InlineViewRef extends TableRef { if (view == null && !hasExplicitAlias()) { String dialect = ConnectContext.get().getSessionVariable().getSqlDialect(); Dialect sqlDialect = Dialect.getByName(dialect); - if (Dialect.SPARK_SQL != sqlDialect) { + if (Dialect.SPARK != sqlDialect) { ErrorReport.reportAnalysisException(ErrorCode.ERR_DERIVED_MUST_HAVE_ALIAS); } hasExplicitAlias = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java index c7247a443b..d53520b133 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java @@ -24,6 +24,8 @@ import org.apache.doris.analysis.DistributionDesc; import org.apache.doris.analysis.DropTableStmt; import org.apache.doris.analysis.HashDistributionDesc; import org.apache.doris.analysis.KeysDesc; +import org.apache.doris.analysis.PartitionDesc; +import org.apache.doris.analysis.RangePartitionDesc; import org.apache.doris.analysis.TableName; import org.apache.doris.analysis.TypeDef; import org.apache.doris.common.Config; @@ -33,6 +35,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.ha.FrontendNodeType; +import org.apache.doris.plugin.audit.AuditLoaderPlugin; import org.apache.doris.statistics.StatisticConstants; import org.apache.doris.statistics.util.StatisticsUtil; @@ -53,6 +56,33 @@ public class InternalSchemaInitializer extends Thread { private static final Logger LOG = LogManager.getLogger(InternalSchemaInitializer.class); + public static final List AUDIT_TABLE_COLUMNS; + + static { + AUDIT_TABLE_COLUMNS = new ArrayList<>(); + AUDIT_TABLE_COLUMNS.add(new ColumnDef("query_id", TypeDef.createVarchar(48), true)); + AUDIT_TABLE_COLUMNS.add(new ColumnDef("time", TypeDef.create(PrimitiveType.DATETIME), true)); + AUDIT_TABLE_COLUMNS.add(new ColumnDef("client_ip", TypeDef.createVarchar(128), true)); + AUDIT_TABLE_COLUMNS.add(new ColumnDef("user", TypeDef.createVarchar(128), true)); + AUDIT_TABLE_COLUMNS.add(new ColumnDef("catalog", TypeDef.createVarchar(128), true)); + AUDIT_TABLE_COLUMNS.add(new ColumnDef("db", TypeDef.createVarchar(128), true)); + AUDIT_TABLE_COLUMNS.add(new ColumnDef("state", TypeDef.createVarchar(128), true)); + AUDIT_TABLE_COLUMNS.add(new ColumnDef("error_code", TypeDef.create(PrimitiveType.INT), true)); + AUDIT_TABLE_COLUMNS.add(new ColumnDef("error_message", TypeDef.create(PrimitiveType.STRING), true)); + AUDIT_TABLE_COLUMNS.add(new ColumnDef("query_time", TypeDef.create(PrimitiveType.BIGINT), true)); + AUDIT_TABLE_COLUMNS.add(new ColumnDef("scan_bytes", TypeDef.create(PrimitiveType.BIGINT), true)); + AUDIT_TABLE_COLUMNS.add(new ColumnDef("scan_rows", TypeDef.create(PrimitiveType.BIGINT), true)); + AUDIT_TABLE_COLUMNS.add(new ColumnDef("return_rows", TypeDef.create(PrimitiveType.BIGINT), true)); + AUDIT_TABLE_COLUMNS.add(new ColumnDef("stmt_id", TypeDef.create(PrimitiveType.BIGINT), true)); + AUDIT_TABLE_COLUMNS.add(new ColumnDef("is_query", TypeDef.create(PrimitiveType.TINYINT), true)); + AUDIT_TABLE_COLUMNS.add(new ColumnDef("frontend_ip", TypeDef.createVarchar(128), true)); + AUDIT_TABLE_COLUMNS.add(new ColumnDef("cpu_time_ms", TypeDef.create(PrimitiveType.BIGINT), true)); + AUDIT_TABLE_COLUMNS.add(new ColumnDef("sql_hash", TypeDef.createVarchar(128), true)); + AUDIT_TABLE_COLUMNS.add(new ColumnDef("sql_digest", TypeDef.createVarchar(128), true)); + AUDIT_TABLE_COLUMNS.add(new ColumnDef("peak_memory_bytes", TypeDef.create(PrimitiveType.BIGINT), true)); + AUDIT_TABLE_COLUMNS.add(new ColumnDef("stmt", TypeDef.create(PrimitiveType.STRING), true)); + } + public void run() { if (!FeConstants.enableInternalSchemaDb) { return; @@ -83,6 +113,7 @@ public class InternalSchemaInitializer extends Thread { Database database = op.get(); modifyTblReplicaCount(database, StatisticConstants.STATISTIC_TBL_NAME); modifyTblReplicaCount(database, StatisticConstants.HISTOGRAM_TBL_NAME); + modifyTblReplicaCount(database, AuditLoaderPlugin.AUDIT_LOG_TABLE); } public void modifyTblReplicaCount(Database database, String tblName) { @@ -103,8 +134,8 @@ public class InternalSchemaInitializer extends Thread { >= StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM) { return; } + colStatsTbl.writeLock(); try { - colStatsTbl.writeLock(); Env.getCurrentEnv().modifyTableReplicaAllocation(database, (OlapTable) colStatsTbl, props); } finally { colStatsTbl.writeUnlock(); @@ -123,8 +154,11 @@ public class InternalSchemaInitializer extends Thread { } private void createTbl() throws UserException { + // statistics Env.getCurrentEnv().getInternalCatalog().createTable(buildStatisticsTblStmt()); Env.getCurrentEnv().getInternalCatalog().createTable(buildHistogramTblStmt()); + // audit table + Env.getCurrentEnv().getInternalCatalog().createTable(buildAuditTblStmt()); } @VisibleForTesting @@ -212,7 +246,40 @@ public class InternalSchemaInitializer extends Thread { return createTableStmt; } + private CreateTableStmt buildAuditTblStmt() throws UserException { + TableName tableName = new TableName("", + FeConstants.INTERNAL_DB_NAME, AuditLoaderPlugin.AUDIT_LOG_TABLE); + + String engineName = "olap"; + ArrayList dupKeys = Lists.newArrayList("query_id", "time", "client_ip"); + KeysDesc keysDesc = new KeysDesc(KeysType.DUP_KEYS, dupKeys); + // partition + PartitionDesc partitionDesc = new RangePartitionDesc(Lists.newArrayList("time"), Lists.newArrayList()); + // distribution + int bucketNum = 2; + DistributionDesc distributionDesc = new HashDistributionDesc(bucketNum, Lists.newArrayList("query_id")); + Map properties = new HashMap() { + { + put("dynamic_partition.time_unit", "DAY"); + put("dynamic_partition.start", "-30"); + put("dynamic_partition.end", "3"); + put("dynamic_partition.prefix", "p"); + put("dynamic_partition.buckets", String.valueOf(bucketNum)); + put("dynamic_partition.enable", "true"); + put("replication_num", String.valueOf(Math.max(1, + Config.min_replication_num_per_tablet))); + } + }; + CreateTableStmt createTableStmt = new CreateTableStmt(true, false, + tableName, AUDIT_TABLE_COLUMNS, engineName, keysDesc, partitionDesc, distributionDesc, + properties, null, "Doris internal audit table, DO NOT MODIFY IT", null); + StatisticsUtil.analyze(createTableStmt); + return createTableStmt; + } + + private boolean created() { + // 1. check database exist Optional optionalDatabase = Env.getCurrentEnv().getInternalCatalog() .getDb(FeConstants.INTERNAL_DB_NAME); @@ -225,6 +292,7 @@ public class InternalSchemaInitializer extends Thread { return false; } + // 2. check statistic tables Table statsTbl = optionalStatsTbl.get(); Optional optionalColumn = statsTbl.fullSchema.stream().filter(c -> c.getName().equals("count")).findFirst(); @@ -238,7 +306,17 @@ public class InternalSchemaInitializer extends Thread { } return false; } - return db.getTable(StatisticConstants.HISTOGRAM_TBL_NAME).isPresent(); + optionalStatsTbl = db.getTable(StatisticConstants.HISTOGRAM_TBL_NAME); + if (!optionalStatsTbl.isPresent()) { + return false; + } + + // 3. check audit table + optionalStatsTbl = db.getTable(AuditLoaderPlugin.AUDIT_LOG_TABLE); + if (!optionalStatsTbl.isPresent()) { + return false; + } + return true; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java index 2b4f20a57c..a78a7e9fa5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java @@ -26,6 +26,7 @@ import org.apache.doris.common.LoadException; import org.apache.doris.httpv2.entity.ResponseEntityBuilder; import org.apache.doris.httpv2.entity.RestBaseResult; import org.apache.doris.httpv2.exception.UnauthorizedException; +import org.apache.doris.mysql.privilege.Auth; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; import org.apache.doris.resource.Tag; @@ -104,21 +105,21 @@ public class LoadAction extends RestBaseController { return redirectToHttps(request); } - try { - executeCheckPassword(request, response); - } catch (UnauthorizedException unauthorizedException) { - if (LOG.isDebugEnabled()) { - LOG.debug("Check password failed, going to check auth token, request: {}", request.toString()); + String authToken = request.getHeader("token"); + // if auth token is not null, check it first + if (!Strings.isNullOrEmpty(authToken)) { + if (!checkClusterToken(authToken)) { + throw new UnauthorizedException("Invalid token: " + authToken); } - - if (!checkClusterToken(request)) { - throw unauthorizedException; - } else { - return executeWithClusterToken(request, db, table, true); + return executeWithClusterToken(request, db, table, true); + } else { + try { + executeCheckPassword(request, response); + return executeWithoutPassword(request, response, db, table, true, groupCommit); + } finally { + ConnectContext.remove(); } } - - return executeWithoutPassword(request, response, db, table, true, groupCommit); } @RequestMapping(path = "/api/_http_stream", @@ -363,18 +364,8 @@ public class LoadAction extends RestBaseController { // AuditlogPlugin should be re-disigned carefully, and blow method focuses on // temporarily addressing the users' needs for audit logs. // So this function is not widely tested under general scenario - private boolean checkClusterToken(HttpServletRequest request) { - if (LOG.isDebugEnabled()) { - LOG.debug("Checking cluser token, request {}", request.toString()); - } - - String authToken = request.getHeader("token"); - - if (Strings.isNullOrEmpty(authToken)) { - return false; - } - - return Env.getCurrentEnv().getLoadManager().getTokenManager().checkAuthToken(authToken); + private boolean checkClusterToken(String token) { + return Env.getCurrentEnv().getLoadManager().getTokenManager().checkAuthToken(token); } // NOTE: This function can only be used for AuditlogPlugin stream load for now. @@ -388,6 +379,9 @@ public class LoadAction extends RestBaseController { ctx.setEnv(Env.getCurrentEnv()); ctx.setThreadLocalInfo(); ctx.setRemoteIP(request.getRemoteAddr()); + // set user to ADMIN_USER, so that we can get the proper resource tag + ctx.setQualifiedUser(Auth.ADMIN_USER); + ctx.setThreadLocalInfo(); String dbName = db; String tableName = table; @@ -444,8 +438,10 @@ public class LoadAction extends RestBaseController { return redirectView; } catch (Exception e) { - LOG.warn("Failed to execute stream load with cluster token, {}", e); + LOG.warn("Failed to execute stream load with cluster token, {}", e.getMessage(), e); return new RestBaseResult(e.getMessage()); + } finally { + ConnectContext.remove(); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java index 58d789f970..488e73f3ab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java @@ -28,9 +28,9 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.persist.gson.GsonUtils; -import org.apache.doris.plugin.AuditEvent; -import org.apache.doris.plugin.AuditEvent.EventType; -import org.apache.doris.plugin.StreamLoadAuditEvent; +import org.apache.doris.plugin.audit.AuditEvent; +import org.apache.doris.plugin.audit.AuditEvent.EventType; +import org.apache.doris.plugin.audit.StreamLoadAuditEvent; import org.apache.doris.system.Backend; import org.apache.doris.thrift.BackendService; import org.apache.doris.thrift.TNetworkAddress; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java index 1939e86f85..f3d2480351 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java @@ -41,8 +41,8 @@ import org.apache.doris.load.BrokerFileGroup; import org.apache.doris.load.BrokerFileGroupAggInfo; import org.apache.doris.load.EtlJobType; import org.apache.doris.load.FailMsg; -import org.apache.doris.plugin.AuditEvent; -import org.apache.doris.plugin.LoadAuditEvent; +import org.apache.doris.plugin.audit.AuditEvent; +import org.apache.doris.plugin.audit.LoadAuditEvent; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.OriginStatement; import org.apache.doris.qe.SessionVariable; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/Dialect.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/Dialect.java index 722a9ff7cc..9d9b0cb455 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/Dialect.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/Dialect.java @@ -36,25 +36,25 @@ public enum Dialect { */ PRESTO("presto"), /** - * Spark sql parser dialect + * Spark3 sql parser dialect */ - SPARK_SQL("spark_sql"), + SPARK("spark"), + /** + * Spark2 sql parser dialect + */ + SPARK2("spark2"), + /** + * Flink sql parser dialect + */ + FLINK("flink"), /** * Hive parser dialect */ HIVE("hive"), - /** - * Alibaba max compute parser dialect - */ - MAX_COMPUTE("max_compute"), - /** - * Mysql parser dialect - */ - MYSQL("mysql"), /** * Postgresql parser dialect */ - POSTGRESQL("postgresql"), + POSTGRES("postgres"), /** * Sqlserver parser dialect */ @@ -64,13 +64,9 @@ public enum Dialect { */ CLICKHOUSE("clickhouse"), /** - * Sap hana parser dialect + * oracle parser dialect */ - SAP_HANA("sap_hana"), - /** - * OceanBase parser dialect - */ - OCEANBASE("oceanbase"); + ORACLE("oracle"); public static final int MAX_DIALECT_SIZE = Dialect.values().length; diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditPlugin.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditPlugin.java index d9c9ec8469..55962a3dd1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditPlugin.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditPlugin.java @@ -17,6 +17,8 @@ package org.apache.doris.plugin; +import org.apache.doris.plugin.audit.AuditEvent; + /** * Audit plugin interface describe. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/PluginMgr.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/PluginMgr.java index 9a38bce017..7fddf54e1e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/PluginMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/PluginMgr.java @@ -27,7 +27,9 @@ import org.apache.doris.common.util.PrintableMap; import org.apache.doris.nereids.parser.Dialect; import org.apache.doris.plugin.PluginInfo.PluginType; import org.apache.doris.plugin.PluginLoader.PluginStatus; -import org.apache.doris.qe.AuditLogBuilder; +import org.apache.doris.plugin.audit.AuditLoaderPlugin; +import org.apache.doris.plugin.audit.AuditLogBuilder; +import org.apache.doris.plugin.dialect.HttpDialectConverterPlugin; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; @@ -104,12 +106,24 @@ public class PluginMgr implements Writable { } private void initBuiltinPlugins() { - // AuditLog + // AuditLog: log audit log to file AuditLogBuilder auditLogBuilder = new AuditLogBuilder(); if (!registerBuiltinPlugin(auditLogBuilder.getPluginInfo(), auditLogBuilder)) { LOG.warn("failed to register audit log builder"); } + // AuditLoader: log audit log to internal table + AuditLoaderPlugin auditLoaderPlugin = new AuditLoaderPlugin(); + if (!registerBuiltinPlugin(auditLoaderPlugin.getPluginInfo(), auditLoaderPlugin)) { + LOG.warn("failed to register audit log builder"); + } + + // sql dialect converter + HttpDialectConverterPlugin httpDialectConverterPlugin = new HttpDialectConverterPlugin(); + if (!registerBuiltinPlugin(httpDialectConverterPlugin.getPluginInfo(), httpDialectConverterPlugin)) { + LOG.warn("failed to register http dialect converter plugin"); + } + // other builtin plugins } @@ -217,11 +231,17 @@ public class PluginMgr implements Writable { } PluginLoader loader = new BuiltinPluginLoader(Config.plugin_dir, pluginInfo, plugin); - PluginLoader checkLoader = plugins[pluginInfo.getTypeId()].putIfAbsent(pluginInfo.getName(), loader); + try { + loader.install(); + } catch (Exception e) { + LOG.warn("failed to register builtin plugin {}", pluginInfo.getName(), e); + return false; + } // add dialect plugin if (plugin instanceof DialectConverterPlugin) { addDialectPlugin((DialectConverterPlugin) plugin, pluginInfo); } + PluginLoader checkLoader = plugins[pluginInfo.getTypeId()].putIfAbsent(pluginInfo.getName(), loader); return checkLoader == null; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditEvent.java similarity index 97% rename from fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java rename to fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditEvent.java index 732d33c5e1..5c122c9896 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditEvent.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.plugin; +package org.apache.doris.plugin.audit; import java.lang.annotation.Retention; @@ -56,6 +56,8 @@ public class AuditEvent { public String clientIp = ""; @AuditField(value = "User") public String user = ""; + @AuditField(value = "Ctl") + public String ctl = ""; @AuditField(value = "Db") public String db = ""; @AuditField(value = "State") @@ -133,6 +135,11 @@ public class AuditEvent { return this; } + public AuditEventBuilder setCtl(String ctl) { + auditEvent.ctl = ctl; + return this; + } + public AuditEventBuilder setDb(String db) { auditEvent.db = db; return this; diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java new file mode 100644 index 0000000000..533f50f062 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java @@ -0,0 +1,252 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.plugin.audit; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.util.DigitalVersion; +import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.plugin.AuditPlugin; +import org.apache.doris.plugin.Plugin; +import org.apache.doris.plugin.PluginContext; +import org.apache.doris.plugin.PluginException; +import org.apache.doris.plugin.PluginInfo; +import org.apache.doris.plugin.PluginInfo.PluginType; +import org.apache.doris.plugin.PluginMgr; +import org.apache.doris.qe.GlobalVariable; + +import com.google.common.collect.Queues; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CodingErrorAction; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +/* + * This plugin will load audit log to specified doris table at specified interval + */ +public class AuditLoaderPlugin extends Plugin implements AuditPlugin { + private static final Logger LOG = LogManager.getLogger(AuditLoaderPlugin.class); + + public static final String AUDIT_LOG_TABLE = "audit_log"; + + private static final DateTimeFormatter DATETIME_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS") + .withZone(ZoneId.systemDefault()); + + private StringBuilder auditLogBuffer = new StringBuilder(); + private int auditLogNum = 0; + private long lastLoadTimeAuditLog = 0; + // sometimes the audit log may fail to load to doris, count it to observe. + private long discardLogNum = 0; + + private BlockingQueue auditEventQueue; + private AuditStreamLoader streamLoader; + private Thread loadThread; + + private volatile boolean isClosed = false; + private volatile boolean isInit = false; + + private final PluginInfo pluginInfo; + + public AuditLoaderPlugin() { + pluginInfo = new PluginInfo(PluginMgr.BUILTIN_PLUGIN_PREFIX + "AuditLoader", PluginType.AUDIT, + "builtin audit loader, to load audit log to internal table", DigitalVersion.fromString("2.1.0"), + DigitalVersion.fromString("1.8.31"), AuditLoaderPlugin.class.getName(), null, null); + } + + public PluginInfo getPluginInfo() { + return pluginInfo; + } + + @Override + public void init(PluginInfo info, PluginContext ctx) throws PluginException { + super.init(info, ctx); + + synchronized (this) { + if (isInit) { + return; + } + this.lastLoadTimeAuditLog = System.currentTimeMillis(); + // make capacity large enough to avoid blocking. + // and it will not be too large because the audit log will flush if num in queue is larger than + // GlobalVariable.audit_plugin_max_batch_bytes. + this.auditEventQueue = Queues.newLinkedBlockingDeque(100000); + this.streamLoader = new AuditStreamLoader(); + this.loadThread = new Thread(new LoadWorker(this.streamLoader), "audit loader thread"); + this.loadThread.start(); + + isInit = true; + } + } + + @Override + public void close() throws IOException { + super.close(); + isClosed = true; + if (loadThread != null) { + try { + loadThread.join(); + } catch (InterruptedException e) { + LOG.debug("encounter exception when closing the audit loader", e); + } + } + } + + public boolean eventFilter(AuditEvent.EventType type) { + return type == AuditEvent.EventType.AFTER_QUERY; + } + + public void exec(AuditEvent event) { + if (!GlobalVariable.enableAuditLoader) { + LOG.debug("builtin audit loader is disabled, discard current audit event"); + return; + } + try { + auditEventQueue.add(event); + } catch (Exception e) { + // In order to ensure that the system can run normally, here we directly + // discard the current audit_event. If this problem occurs frequently, + // improvement can be considered. + ++discardLogNum; + LOG.debug("encounter exception when putting current audit batch, discard current audit event." + + " total discard num: {}", discardLogNum, e); + } + } + + private void assembleAudit(AuditEvent event) { + fillLogBuffer(event, auditLogBuffer); + ++auditLogNum; + } + + private void fillLogBuffer(AuditEvent event, StringBuilder logBuffer) { + logBuffer.append(event.queryId).append("\t"); + logBuffer.append(TimeUtils.longToTimeString(event.timestamp)).append("\t"); + logBuffer.append(event.clientIp).append("\t"); + logBuffer.append(event.user).append("\t"); + logBuffer.append(event.ctl).append("\t"); + logBuffer.append(event.db).append("\t"); + logBuffer.append(event.state).append("\t"); + logBuffer.append(event.errorCode).append("\t"); + logBuffer.append(event.errorMessage).append("\t"); + logBuffer.append(event.queryTime).append("\t"); + logBuffer.append(event.scanBytes).append("\t"); + logBuffer.append(event.scanRows).append("\t"); + logBuffer.append(event.returnRows).append("\t"); + logBuffer.append(event.stmtId).append("\t"); + logBuffer.append(event.isQuery ? 1 : 0).append("\t"); + logBuffer.append(event.feIp).append("\t"); + logBuffer.append(event.cpuTimeMs).append("\t"); + logBuffer.append(event.sqlHash).append("\t"); + logBuffer.append(event.sqlDigest).append("\t"); + logBuffer.append(event.peakMemoryBytes).append("\t"); + // trim the query to avoid too long + // use `getBytes().length` to get real byte length + String stmt = truncateByBytes(event.stmt).replace("\n", " ") + .replace("\t", " ") + .replace("\r", " "); + LOG.debug("receive audit event with stmt: {}", stmt); + logBuffer.append(stmt).append("\n"); + } + + private String truncateByBytes(String str) { + int maxLen = Math.min(GlobalVariable.auditPluginMaxSqlLength, str.getBytes().length); + if (maxLen >= str.getBytes().length) { + return str; + } + Charset utf8Charset = Charset.forName("UTF-8"); + CharsetDecoder decoder = utf8Charset.newDecoder(); + byte[] sb = str.getBytes(); + ByteBuffer buffer = ByteBuffer.wrap(sb, 0, maxLen); + CharBuffer charBuffer = CharBuffer.allocate(maxLen); + decoder.onMalformedInput(CodingErrorAction.IGNORE); + decoder.decode(buffer, charBuffer, true); + decoder.flush(charBuffer); + return new String(charBuffer.array(), 0, charBuffer.position()); + } + + private void loadIfNecessary(AuditStreamLoader loader) { + long currentTime = System.currentTimeMillis(); + + if (auditLogBuffer.length() >= GlobalVariable.auditPluginMaxBatchBytes + || currentTime - lastLoadTimeAuditLog >= GlobalVariable.auditPluginMaxBatchInternalSec * 1000) { + // begin to load + try { + String token = ""; + try { + // Acquire token from master + token = Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken(); + } catch (Exception e) { + LOG.warn("Failed to get auth token: {}", e); + discardLogNum += auditLogNum; + return; + } + AuditStreamLoader.LoadResponse response = loader.loadBatch(auditLogBuffer, token); + LOG.debug("audit loader response: {}", response); + } catch (Exception e) { + LOG.debug("encounter exception when putting current audit batch, discard current batch", e); + discardLogNum += auditLogNum; + } finally { + // make a new string builder to receive following events. + resetBatch(currentTime); + if (discardLogNum > 0) { + LOG.info("num of total discarded audit logs: {}", discardLogNum); + } + } + } + + return; + } + + private void resetBatch(long currentTime) { + this.auditLogBuffer = new StringBuilder(); + this.lastLoadTimeAuditLog = currentTime; + this.auditLogNum = 0; + } + + private class LoadWorker implements Runnable { + private AuditStreamLoader loader; + + public LoadWorker(AuditStreamLoader loader) { + this.loader = loader; + } + + public void run() { + while (!isClosed) { + try { + AuditEvent event = auditEventQueue.poll(5, TimeUnit.SECONDS); + if (event != null) { + assembleAudit(event); + // process all audit logs + loadIfNecessary(loader); + } + } catch (InterruptedException ie) { + LOG.debug("encounter exception when loading current audit batch", ie); + } catch (Exception e) { + LOG.error("run audit logger error:", e); + } + } + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java similarity index 97% rename from fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogBuilder.java rename to fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java index 1258d0b337..f06dfb6eef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java @@ -15,19 +15,18 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.qe; +package org.apache.doris.plugin.audit; import org.apache.doris.common.AuditLog; import org.apache.doris.common.Config; import org.apache.doris.common.util.DigitalVersion; -import org.apache.doris.plugin.AuditEvent; -import org.apache.doris.plugin.AuditEvent.AuditField; -import org.apache.doris.plugin.AuditEvent.EventType; import org.apache.doris.plugin.AuditPlugin; import org.apache.doris.plugin.Plugin; import org.apache.doris.plugin.PluginInfo; import org.apache.doris.plugin.PluginInfo.PluginType; import org.apache.doris.plugin.PluginMgr; +import org.apache.doris.plugin.audit.AuditEvent.AuditField; +import org.apache.doris.plugin.audit.AuditEvent.EventType; import com.google.common.collect.Maps; import com.google.common.collect.Sets; diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java new file mode 100644 index 0000000000..0717547827 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java @@ -0,0 +1,182 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.plugin.audit; + +import org.apache.doris.catalog.InternalSchemaInitializer; +import org.apache.doris.common.Config; +import org.apache.doris.common.FeConstants; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.BufferedOutputStream; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.Calendar; +import java.util.stream.Collectors; + +public class AuditStreamLoader { + private static final Logger LOG = LogManager.getLogger(AuditStreamLoader.class); + private static String loadUrlPattern = "http://%s/api/%s/%s/_stream_load?"; + private String hostPort; + private String db; + private String auditLogTbl; + private String auditLogLoadUrlStr; + private String feIdentity; + + public AuditStreamLoader() { + this.hostPort = "127.0.0.1:" + Config.http_port; + this.db = FeConstants.INTERNAL_DB_NAME; + this.auditLogTbl = AuditLoaderPlugin.AUDIT_LOG_TABLE; + this.auditLogLoadUrlStr = String.format(loadUrlPattern, hostPort, db, auditLogTbl); + // currently, FE identity is FE's IP, so we replace the "." in IP to make it suitable for label + this.feIdentity = hostPort.replaceAll("\\.", "_"); + } + + private HttpURLConnection getConnection(String urlStr, String label, String clusterToken) throws IOException { + URL url = new URL(urlStr); + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + conn.setInstanceFollowRedirects(false); + conn.setRequestMethod("PUT"); + conn.setRequestProperty("token", clusterToken); + conn.setRequestProperty("Authorization", "Basic "); + conn.addRequestProperty("Expect", "100-continue"); + conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8"); + conn.addRequestProperty("label", label); + conn.addRequestProperty("max_filter_ratio", "1.0"); + conn.addRequestProperty("columns", + InternalSchemaInitializer.AUDIT_TABLE_COLUMNS.stream().map(c -> c.getName()).collect( + Collectors.joining(","))); + conn.setDoOutput(true); + conn.setDoInput(true); + return conn; + } + + private String toCurl(HttpURLConnection conn) { + StringBuilder sb = new StringBuilder("curl -v "); + sb.append("-X ").append(conn.getRequestMethod()).append(" \\\n "); + sb.append("-H \"").append("Authorization\":").append("\"Basic ").append("\" \\\n "); + sb.append("-H \"").append("Expect\":").append("\"100-continue\" \\\n "); + sb.append("-H \"").append("Content-Type\":").append("\"text/plain; charset=UTF-8\" \\\n "); + sb.append("-H \"").append("max_filter_ratio\":").append("\"1.0\" \\\n "); + sb.append("-H \"").append("columns\":") + .append("\"" + InternalSchemaInitializer.AUDIT_TABLE_COLUMNS.stream().map(c -> c.getName()).collect( + Collectors.joining(",")) + "\" \\\n "); + sb.append("\"").append(conn.getURL()).append("\""); + return sb.toString(); + } + + private String getContent(HttpURLConnection conn) { + BufferedReader br = null; + StringBuilder response = new StringBuilder(); + String line; + try { + if (100 <= conn.getResponseCode() && conn.getResponseCode() <= 399) { + br = new BufferedReader(new InputStreamReader(conn.getInputStream())); + } else { + br = new BufferedReader(new InputStreamReader(conn.getErrorStream())); + } + while ((line = br.readLine()) != null) { + response.append(line); + } + } catch (IOException e) { + LOG.warn("get content error,", e); + } + + return response.toString(); + } + + public LoadResponse loadBatch(StringBuilder sb, String clusterToken) { + Calendar calendar = Calendar.getInstance(); + String label = String.format("_log_%s%02d%02d_%02d%02d%02d_%s", + calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH), + calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND), + feIdentity); + + HttpURLConnection feConn = null; + HttpURLConnection beConn = null; + try { + // build request and send to fe + label = "audit" + label; + feConn = getConnection(auditLogLoadUrlStr, label, clusterToken); + int status = feConn.getResponseCode(); + // fe send back http response code TEMPORARY_REDIRECT 307 and new be location + if (status != 307) { + throw new Exception("status is not TEMPORARY_REDIRECT 307, status: " + status + + ", response: " + getContent(feConn) + ", request is: " + toCurl(feConn)); + } + String location = feConn.getHeaderField("Location"); + if (location == null) { + throw new Exception("redirect location is null"); + } + // build request and send to new be location + beConn = getConnection(location, label, clusterToken); + // send data to be + try (BufferedOutputStream bos = new BufferedOutputStream(beConn.getOutputStream())) { + bos.write(sb.toString().getBytes()); + } + + // get respond + status = beConn.getResponseCode(); + String respMsg = beConn.getResponseMessage(); + String response = getContent(beConn); + + LOG.info("AuditLoader plugin load with label: {}, response code: {}, msg: {}, content: {}", + label, status, respMsg, response); + + return new LoadResponse(status, respMsg, response); + + } catch (Exception e) { + e.printStackTrace(); + String err = "failed to load audit via AuditLoader plugin with label: " + label; + LOG.warn(err, e); + return new LoadResponse(-1, e.getMessage(), err); + } finally { + if (feConn != null) { + feConn.disconnect(); + } + if (beConn != null) { + beConn.disconnect(); + } + } + } + + public static class LoadResponse { + public int status; + public String respMsg; + public String respContent; + + public LoadResponse(int status, String respMsg, String respContent) { + this.status = status; + this.respMsg = respMsg; + this.respContent = respContent; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("status: ").append(status); + sb.append(", resp msg: ").append(respMsg); + sb.append(", resp content: ").append(respContent); + return sb.toString(); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/LoadAuditEvent.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/LoadAuditEvent.java similarity index 99% rename from fe/fe-core/src/main/java/org/apache/doris/plugin/LoadAuditEvent.java rename to fe/fe-core/src/main/java/org/apache/doris/plugin/audit/LoadAuditEvent.java index 704ec3ad03..eb3e098bf4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/LoadAuditEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/LoadAuditEvent.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.plugin; +package org.apache.doris.plugin.audit; public class LoadAuditEvent extends AuditEvent { diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/StreamLoadAuditEvent.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/StreamLoadAuditEvent.java similarity index 99% rename from fe/fe-core/src/main/java/org/apache/doris/plugin/StreamLoadAuditEvent.java rename to fe/fe-core/src/main/java/org/apache/doris/plugin/audit/StreamLoadAuditEvent.java index 04b15b9264..8733a59656 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/StreamLoadAuditEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/StreamLoadAuditEvent.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.plugin; +package org.apache.doris.plugin.audit; public class StreamLoadAuditEvent extends AuditEvent { diff --git a/fe_plugins/http-dialect-converter/src/main/java/org/apache/doris/plugin/dialect/http/HttpDialectConverterPlugin.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectConverterPlugin.java similarity index 56% rename from fe_plugins/http-dialect-converter/src/main/java/org/apache/doris/plugin/dialect/http/HttpDialectConverterPlugin.java rename to fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectConverterPlugin.java index a21e501839..29dca027c6 100644 --- a/fe_plugins/http-dialect-converter/src/main/java/org/apache/doris/plugin/dialect/http/HttpDialectConverterPlugin.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectConverterPlugin.java @@ -15,32 +15,27 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.plugin.dialect.http; +package org.apache.doris.plugin.dialect; import org.apache.doris.analysis.StatementBase; +import org.apache.doris.common.util.DigitalVersion; import org.apache.doris.nereids.parser.Dialect; import org.apache.doris.plugin.DialectConverterPlugin; import org.apache.doris.plugin.Plugin; import org.apache.doris.plugin.PluginContext; import org.apache.doris.plugin.PluginException; import org.apache.doris.plugin.PluginInfo; +import org.apache.doris.plugin.PluginInfo.PluginType; +import org.apache.doris.plugin.PluginMgr; +import org.apache.doris.qe.GlobalVariable; import org.apache.doris.qe.SessionVariable; -import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableSet; import java.io.IOException; -import java.io.InputStream; -import java.nio.file.FileSystems; -import java.nio.file.Files; -import java.nio.file.Path; import java.util.Arrays; import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Properties; -import java.util.function.Function; -import java.util.stream.Collectors; import javax.annotation.Nullable; /** @@ -72,57 +67,29 @@ import javax.annotation.Nullable; public class HttpDialectConverterPlugin extends Plugin implements DialectConverterPlugin { private volatile boolean isInit = false; - private volatile boolean isClosed = false; - private volatile String targetURL = null; - private volatile ImmutableSet acceptDialects = null; + private volatile ImmutableSet acceptDialects; + private final PluginInfo pluginInfo; + + public HttpDialectConverterPlugin() { + pluginInfo = new PluginInfo(PluginMgr.BUILTIN_PLUGIN_PREFIX + "SqlDialectConverter", PluginType.DIALECT, + "builtin sql dialect converter", DigitalVersion.fromString("2.1.0"), + DigitalVersion.fromString("1.8.31"), HttpDialectConverterPlugin.class.getName(), null, null); + acceptDialects = ImmutableSet.copyOf(Arrays.asList(Dialect.PRESTO, Dialect.TRINO, Dialect.HIVE, + Dialect.SPARK, Dialect.POSTGRES, Dialect.CLICKHOUSE)); + } + + public PluginInfo getPluginInfo() { + return pluginInfo; + } @Override public void init(PluginInfo info, PluginContext ctx) throws PluginException { super.init(info, ctx); - - synchronized (this) { - if (isInit) { - return; - } - loadConfig(ctx, info.getProperties()); - isInit = true; - } - } - - private void loadConfig(PluginContext ctx, Map pluginInfoProperties) throws PluginException { - Path pluginPath = FileSystems.getDefault().getPath(ctx.getPluginPath()); - if (!Files.exists(pluginPath)) { - throw new PluginException("plugin path does not exist: " + pluginPath); - } - - Path confFile = pluginPath.resolve("plugin.conf"); - if (!Files.exists(confFile)) { - throw new PluginException("plugin conf file does not exist: " + confFile); - } - - final Properties props = new Properties(); - try (InputStream stream = Files.newInputStream(confFile)) { - props.load(stream); - } catch (IOException e) { - throw new PluginException(e.getMessage()); - } - - for (Map.Entry entry : pluginInfoProperties.entrySet()) { - props.setProperty(entry.getKey(), entry.getValue()); - } - - final Map properties = props.stringPropertyNames().stream() - .collect(Collectors.toMap(Function.identity(), props::getProperty)); - targetURL = properties.get("target_url"); - String acceptDialectsStr = Objects.requireNonNull(properties.get("accept_dialects")); - acceptDialects = ImmutableSet.copyOf(Arrays.stream(acceptDialectsStr.split(",")) - .map(Dialect::getByName).collect(Collectors.toSet())); } @Override public void close() throws IOException { super.close(); - isClosed = true; } @Override @@ -132,8 +99,11 @@ public class HttpDialectConverterPlugin extends Plugin implements DialectConvert @Override public @Nullable String convertSql(String originSql, SessionVariable sessionVariable) { - Preconditions.checkNotNull(targetURL); - return HttpDialectUtils.convertSql(targetURL, originSql); + String targetURL = GlobalVariable.sqlConverterServiceUrl; + if (Strings.isNullOrEmpty(targetURL)) { + return null; + } + return HttpDialectUtils.convertSql(targetURL, originSql, sessionVariable.getSqlDialect()); } // no need to override parseSqlWithDialect, just return null diff --git a/fe_plugins/http-dialect-converter/src/main/java/org/apache/doris/plugin/dialect/http/HttpDialectUtils.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectUtils.java similarity index 98% rename from fe_plugins/http-dialect-converter/src/main/java/org/apache/doris/plugin/dialect/http/HttpDialectUtils.java rename to fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectUtils.java index 73c2f47067..39c6417988 100644 --- a/fe_plugins/http-dialect-converter/src/main/java/org/apache/doris/plugin/dialect/http/HttpDialectUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/dialect/HttpDialectUtils.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.plugin.dialect.http; +package org.apache.doris.plugin.dialect; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; @@ -38,8 +38,8 @@ import java.nio.charset.StandardCharsets; public class HttpDialectUtils { private static final Logger LOG = LogManager.getLogger(HttpDialectUtils.class); - public static String convertSql(String targetURL, String originStmt) { - ConvertRequest convertRequest = new ConvertRequest(originStmt, "presto"); + public static String convertSql(String targetURL, String originStmt, String dialect) { + ConvertRequest convertRequest = new ConvertRequest(originStmt, dialect); HttpURLConnection connection = null; try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditEventProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditEventProcessor.java index c116e7c16a..f9ab35f9c2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditEventProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditEventProcessor.java @@ -17,11 +17,11 @@ package org.apache.doris.qe; -import org.apache.doris.plugin.AuditEvent; import org.apache.doris.plugin.AuditPlugin; import org.apache.doris.plugin.Plugin; import org.apache.doris.plugin.PluginInfo.PluginType; import org.apache.doris.plugin.PluginMgr; +import org.apache.doris.plugin.audit.AuditEvent; import com.google.common.collect.Queues; import org.apache.logging.log4j.LogManager; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java index 83cd1d401f..7b6e86ca3a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java @@ -24,8 +24,11 @@ import org.apache.doris.catalog.Env; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.Config; import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.metric.MetricRepo; -import org.apache.doris.plugin.AuditEvent.EventType; +import org.apache.doris.plugin.audit.AuditEvent.AuditEventBuilder; +import org.apache.doris.plugin.audit.AuditEvent.EventType; import org.apache.doris.qe.QueryState.MysqlStateType; import org.apache.doris.service.FrontendOptions; @@ -39,8 +42,17 @@ public class AuditLogHelper { // slow query long endTime = System.currentTimeMillis(); long elapseMs = endTime - ctx.getStartTime(); + CatalogIf catalog = ctx.getCurrentCatalog(); - ctx.getAuditEventBuilder().setEventType(EventType.AFTER_QUERY) + AuditEventBuilder auditEventBuilder = ctx.getAuditEventBuilder(); + auditEventBuilder.reset(); + auditEventBuilder + .setTimestamp(ctx.getStartTime()) + .setClientIp(ctx.getClientIP()) + .setUser(ClusterNamespace.getNameFromFullName(ctx.getQualifiedUser())) + .setSqlHash(ctx.getSqlHash()) + .setEventType(EventType.AFTER_QUERY) + .setCtl(catalog == null ? InternalCatalog.INTERNAL_CATALOG_NAME : catalog.getName()) .setDb(ClusterNamespace.getNameFromFullName(ctx.getDatabase())) .setState(ctx.getState().toString()) .setErrorCode(ctx.getState().getErrorCode() == null ? 0 : ctx.getState().getErrorCode().getCode()) @@ -73,10 +85,10 @@ public class AuditLogHelper { if (elapseMs > Config.qe_slow_log_ms) { String sqlDigest = DigestUtils.md5Hex(((Queriable) parsedStmt).toDigest()); - ctx.getAuditEventBuilder().setSqlDigest(sqlDigest); + auditEventBuilder.setSqlDigest(sqlDigest); } } - ctx.getAuditEventBuilder().setIsQuery(true); + auditEventBuilder.setIsQuery(true); if (ctx.getQueryDetail() != null) { ctx.getQueryDetail().setEventTime(endTime); ctx.getQueryDetail().setEndTime(endTime); @@ -90,35 +102,35 @@ public class AuditLogHelper { ctx.setQueryDetail(null); } } else { - ctx.getAuditEventBuilder().setIsQuery(false); + auditEventBuilder.setIsQuery(false); } - ctx.getAuditEventBuilder().setIsNereids(ctx.getState().isNereids); + auditEventBuilder.setIsNereids(ctx.getState().isNereids); - ctx.getAuditEventBuilder().setFeIp(FrontendOptions.getLocalHostAddress()); + auditEventBuilder.setFeIp(FrontendOptions.getLocalHostAddress()); // We put origin query stmt at the end of audit log, for parsing the log more convenient. if (!ctx.getState().isQuery() && (parsedStmt != null && parsedStmt.needAuditEncryption())) { - ctx.getAuditEventBuilder().setStmt(parsedStmt.toSql()); + auditEventBuilder.setStmt(parsedStmt.toSql()); } else { if (parsedStmt instanceof InsertStmt && !((InsertStmt) parsedStmt).needLoadManager() && ((InsertStmt) parsedStmt).isValuesOrConstantSelect()) { // INSERT INTO VALUES may be very long, so we only log at most 1K bytes. int length = Math.min(1024, origStmt.length()); - ctx.getAuditEventBuilder().setStmt(origStmt.substring(0, length)); + auditEventBuilder.setStmt(origStmt.substring(0, length)); } else { - ctx.getAuditEventBuilder().setStmt(origStmt); + auditEventBuilder.setStmt(origStmt); } } if (!Env.getCurrentEnv().isMaster()) { if (ctx.executor.isForwardToMaster()) { - ctx.getAuditEventBuilder().setState(ctx.executor.getProxyStatus()); + auditEventBuilder.setState(ctx.executor.getProxyStatus()); int proxyStatusCode = ctx.executor.getProxyStatusCode(); if (proxyStatusCode != 0) { - ctx.getAuditEventBuilder().setErrorCode(proxyStatusCode); - ctx.getAuditEventBuilder().setErrorMessage(ctx.executor.getProxyErrMsg()); + auditEventBuilder.setErrorCode(proxyStatusCode); + auditEventBuilder.setErrorMessage(ctx.executor.getProxyErrMsg()); } } } - Env.getCurrentEnv().getWorkloadRuntimeStatusMgr().submitFinishQueryToAudit(ctx.getAuditEventBuilder().build()); + Env.getCurrentEnv().getWorkloadRuntimeStatusMgr().submitFinishQueryToAudit(auditEventBuilder.build()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index 8a1a16999b..cb021ce83e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -48,7 +48,7 @@ import org.apache.doris.mysql.MysqlSslContext; import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.stats.StatsErrorEstimator; import org.apache.doris.nereids.trees.expressions.literal.Literal; -import org.apache.doris.plugin.AuditEvent.AuditEventBuilder; +import org.apache.doris.plugin.audit.AuditEvent.AuditEventBuilder; import org.apache.doris.resource.Tag; import org.apache.doris.service.arrowflight.results.FlightSqlChannel; import org.apache.doris.statistics.ColumnStatistic; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java index e4d8f8273f..b297d192c4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -28,7 +28,6 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.TableIf; -import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; @@ -181,12 +180,6 @@ public abstract class ConnectProcessor { String convertedStmt = convertOriginStmt(originStmt); String sqlHash = DigestUtils.md5Hex(convertedStmt); ctx.setSqlHash(sqlHash); - ctx.getAuditEventBuilder().reset(); - ctx.getAuditEventBuilder() - .setTimestamp(System.currentTimeMillis()) - .setClientIp(ctx.getClientIP()) - .setUser(ClusterNamespace.getNameFromFullName(ctx.getQualifiedUser())) - .setSqlHash(ctx.getSqlHash()); List stmts = null; Exception nereidsParseException = null; @@ -291,7 +284,7 @@ public abstract class ConnectProcessor { private String convertOriginStmt(String originStmt) { String convertedStmt = originStmt; @Nullable Dialect sqlDialect = Dialect.getByName(ctx.getSessionVariable().getSqlDialect()); - if (sqlDialect != null) { + if (sqlDialect != null && sqlDialect != Dialect.DORIS) { PluginMgr pluginMgr = Env.getCurrentEnv().getPluginMgr(); List plugins = pluginMgr.getActiveDialectPluginList(sqlDialect); for (DialectConverterPlugin plugin : plugins) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/GlobalVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/GlobalVariable.java index 419bb1377a..a8374a7b44 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/GlobalVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/GlobalVariable.java @@ -50,6 +50,12 @@ public final class GlobalVariable { public static final long VALIDATE_PASSWORD_POLICY_DISABLED = 0; public static final long VALIDATE_PASSWORD_POLICY_STRONG = 2; + public static final String SQL_CONVERTER_SERVICE_URL = "sql_converter_service_url"; + public static final String ENABLE_AUDIT_PLUGIN = "enable_audit_plugin"; + public static final String AUDIT_PLUGIN_MAX_BATCH_BYTES = "audit_plugin_max_batch_bytes"; + public static final String AUDIT_PLUGIN_MAX_BATCH_INTERVAL_SEC = "audit_plugin_max_batch_interval_sec"; + public static final String AUDIT_PLUGIN_MAX_SQL_LENGTH = "audit_plugin_max_sql_length"; + @VariableMgr.VarAttr(name = VERSION_COMMENT, flag = VariableMgr.READ_ONLY) public static String versionComment = "Doris version " + Version.DORIS_BUILD_VERSION + "-" + Version.DORIS_BUILD_SHORT_HASH; @@ -104,7 +110,22 @@ public final class GlobalVariable { @VariableMgr.VarAttr(name = SHOW_FULL_DBNAME_IN_INFO_SCHEMA_DB, flag = VariableMgr.GLOBAL) public static boolean showFullDbNameInInfoSchemaDb = false; - // Don't allow to create instance. + @VariableMgr.VarAttr(name = SQL_CONVERTER_SERVICE_URL, flag = VariableMgr.GLOBAL) + public static String sqlConverterServiceUrl = ""; + + @VariableMgr.VarAttr(name = ENABLE_AUDIT_PLUGIN, flag = VariableMgr.GLOBAL) + public static boolean enableAuditLoader = false; + + @VariableMgr.VarAttr(name = AUDIT_PLUGIN_MAX_BATCH_BYTES, flag = VariableMgr.GLOBAL) + public static long auditPluginMaxBatchBytes = 50 * 1024 * 1024; + + @VariableMgr.VarAttr(name = AUDIT_PLUGIN_MAX_BATCH_INTERVAL_SEC, flag = VariableMgr.GLOBAL) + public static long auditPluginMaxBatchInternalSec = 60; + + @VariableMgr.VarAttr(name = AUDIT_PLUGIN_MAX_SQL_LENGTH, flag = VariableMgr.GLOBAL) + public static int auditPluginMaxSqlLength = 4096; + + // Don't allow creating instance. private GlobalVariable() { } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java index 085d844e61..1f8cef9271 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java @@ -20,7 +20,7 @@ package org.apache.doris.resource.workloadschedpolicy; import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; import org.apache.doris.common.util.Daemon; -import org.apache.doris.plugin.AuditEvent; +import org.apache.doris.plugin.audit.AuditEvent; import org.apache.doris.thrift.TQueryStatistics; import org.apache.doris.thrift.TReportWorkloadRuntimeStatusParams; diff --git a/fe/fe-core/src/test/java/org/apache/doris/plugin/DialectPluginTest.java b/fe/fe-core/src/test/java/org/apache/doris/plugin/DialectPluginTest.java index 2f0a720ca7..b717b2bbf3 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/plugin/DialectPluginTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/plugin/DialectPluginTest.java @@ -61,7 +61,7 @@ public class DialectPluginTest extends TestWithFeService { @Test public void testSparkPlugin() { - ConnectContext.get().getSessionVariable().setSqlDialect(Dialect.SPARK_SQL.getDialectName()); + ConnectContext.get().getSessionVariable().setSqlDialect(Dialect.SPARK.getDialectName()); NereidsParser parser = new NereidsParser(); List stmts = parser.parseSQL(TEST_SQL, ConnectContext.get().getSessionVariable()); Assertions.assertEquals(1, stmts.size()); diff --git a/fe_plugins/http-dialect-converter/src/test/java/org/apache/doris/plugin/dialect/http/HttpDialectUtilsTest.java b/fe/fe-core/src/test/java/org/apache/doris/plugin/HttpDialectUtilsTest.java similarity index 90% rename from fe_plugins/http-dialect-converter/src/test/java/org/apache/doris/plugin/dialect/http/HttpDialectUtilsTest.java rename to fe/fe-core/src/test/java/org/apache/doris/plugin/HttpDialectUtilsTest.java index 532d660f43..ac40c35376 100644 --- a/fe_plugins/http-dialect-converter/src/test/java/org/apache/doris/plugin/dialect/http/HttpDialectUtilsTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/plugin/HttpDialectUtilsTest.java @@ -15,7 +15,9 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.plugin.dialect.http; +package org.apache.doris.plugin; + +import org.apache.doris.plugin.dialect.HttpDialectUtils; import org.junit.After; import org.junit.Assert; @@ -52,20 +54,20 @@ public class HttpDialectUtilsTest { String expectedSql = "select * from t1 where `k1` = 1"; String targetURL = "http://127.0.0.1:" + port + "/api/v1/convert"; - String res = HttpDialectUtils.convertSql(targetURL, originSql); + String res = HttpDialectUtils.convertSql(targetURL, originSql, "presto"); Assert.assertEquals(originSql, res); // test presto server.setResponse("{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); - res = HttpDialectUtils.convertSql(targetURL, originSql); + res = HttpDialectUtils.convertSql(targetURL, originSql, "presto"); Assert.assertEquals(expectedSql, res); // test response version error server.setResponse("{\"version\": \"v2\", \"data\": \"" + expectedSql + "\", \"code\": 0, \"message\": \"\"}"); - res = HttpDialectUtils.convertSql(targetURL, originSql); + res = HttpDialectUtils.convertSql(targetURL, originSql, "presto"); Assert.assertEquals(originSql, res); // 7. test response code error server.setResponse( "{\"version\": \"v1\", \"data\": \"" + expectedSql + "\", \"code\": 400, \"message\": \"\"}"); - res = HttpDialectUtils.convertSql(targetURL, originSql); + res = HttpDialectUtils.convertSql(targetURL, originSql, "presto"); Assert.assertEquals(originSql, res); } diff --git a/fe_plugins/http-dialect-converter/src/test/java/org/apache/doris/plugin/dialect/http/SimpleHttpServer.java b/fe/fe-core/src/test/java/org/apache/doris/plugin/SimpleHttpServer.java similarity index 98% rename from fe_plugins/http-dialect-converter/src/test/java/org/apache/doris/plugin/dialect/http/SimpleHttpServer.java rename to fe/fe-core/src/test/java/org/apache/doris/plugin/SimpleHttpServer.java index 10ae33e435..dd6ad991e0 100644 --- a/fe_plugins/http-dialect-converter/src/test/java/org/apache/doris/plugin/dialect/http/SimpleHttpServer.java +++ b/fe/fe-core/src/test/java/org/apache/doris/plugin/SimpleHttpServer.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.plugin.dialect.http; +package org.apache.doris.plugin; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; diff --git a/fe/fe-core/src/test/java/org/apache/doris/plugin/TestDialectPlugin1.java b/fe/fe-core/src/test/java/org/apache/doris/plugin/TestDialectPlugin1.java index 05a6b34b97..27fc36b9c5 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/plugin/TestDialectPlugin1.java +++ b/fe/fe-core/src/test/java/org/apache/doris/plugin/TestDialectPlugin1.java @@ -32,7 +32,7 @@ public class TestDialectPlugin1 extends Plugin implements DialectConverterPlugin @Override public ImmutableSet acceptDialects() { - return ImmutableSet.of(Dialect.SPARK_SQL); + return ImmutableSet.of(Dialect.SPARK); } @Override diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/AuditEventProcessorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/AuditEventProcessorTest.java index 465bbef3d6..26c51fb66d 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/AuditEventProcessorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/AuditEventProcessorTest.java @@ -19,9 +19,10 @@ package org.apache.doris.qe; import org.apache.doris.catalog.Env; import org.apache.doris.common.util.DigitalVersion; -import org.apache.doris.plugin.AuditEvent; -import org.apache.doris.plugin.AuditEvent.EventType; import org.apache.doris.plugin.PluginInfo; +import org.apache.doris.plugin.audit.AuditEvent; +import org.apache.doris.plugin.audit.AuditEvent.EventType; +import org.apache.doris.plugin.audit.AuditLogBuilder; import org.apache.doris.utframe.UtFrameUtils; import org.junit.AfterClass; diff --git a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java index 4c5586ed88..ed4b7efc65 100755 --- a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java +++ b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java @@ -219,7 +219,7 @@ public class AuditLoaderPlugin extends Plugin implements AuditPlugin { } catch (Exception e) { LOG.error("Failed to get auth token: {}", e); } - } + } DorisStreamLoader.LoadResponse response = loader.loadBatch(logBuffer, slowLog, token); LOG.debug("audit loader response: {}", response); } catch (Exception e) { diff --git a/fe_plugins/http-dialect-converter/pom.xml b/fe_plugins/http-dialect-converter/pom.xml deleted file mode 100644 index 2486ec1b24..0000000000 --- a/fe_plugins/http-dialect-converter/pom.xml +++ /dev/null @@ -1,119 +0,0 @@ - - - - - org.apache.doris - fe-plugins - 1.0-SNAPSHOT - ../pom.xml - - 4.0.0 - http-dialect-converter - jar - - - org.apache.doris - fe-core - ${doris.version} - provided - - - org.apache.doris - fe-common - ${doris.version} - provided - - - org.projectlombok - lombok - ${lombok.version} - provided - - - - org.apache.logging.log4j - log4j-api - - - - org.apache.logging.log4j - log4j-core - - - - org.apache.logging.log4j - log4j-slf4j-impl - - - - log4j - log4j - - - - org.junit.jupiter - junit-jupiter-engine - test - - - - org.junit.vintage - junit-vintage-engine - test - - - - org.junit.jupiter - junit-jupiter-params - test - - - - org.jmockit - jmockit - test - - - - presto-converter - - - maven-assembly-plugin - 2.4.1 - - false - - src/main/assembly/zip.xml - - - - - make-assembly - package - - single - - - - - - - diff --git a/fe_plugins/http-dialect-converter/src/main/assembly/plugin.conf b/fe_plugins/http-dialect-converter/src/main/assembly/plugin.conf deleted file mode 100755 index cb29828039..0000000000 --- a/fe_plugins/http-dialect-converter/src/main/assembly/plugin.conf +++ /dev/null @@ -1,22 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -### plugin configuration -# Replace the target url, set your sql converter service url here -target_url=http://127.0.0.1:8080/api/v1/sql/convert -# Replace the dialects if you need, use comma to split the value, for instance: presto,trino,hive -accept_dialects=presto \ No newline at end of file diff --git a/fe_plugins/http-dialect-converter/src/main/assembly/plugin.properties b/fe_plugins/http-dialect-converter/src/main/assembly/plugin.properties deleted file mode 100755 index d06bf1c6ac..0000000000 --- a/fe_plugins/http-dialect-converter/src/main/assembly/plugin.properties +++ /dev/null @@ -1,23 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -name=HttpDialectConverter -type=DIALECT -description=SQL dialect converter plugin using http protocol. -version=1.0.0 -java.version=1.8.0 -classname=org.apache.doris.plugin.dialect.http.HttpDialectConverterPlugin diff --git a/fe_plugins/http-dialect-converter/src/main/assembly/zip.xml b/fe_plugins/http-dialect-converter/src/main/assembly/zip.xml deleted file mode 100644 index 515e68751b..0000000000 --- a/fe_plugins/http-dialect-converter/src/main/assembly/zip.xml +++ /dev/null @@ -1,43 +0,0 @@ - - - - plugin - - zip - - false - - - target - - *.jar - - / - - - src/main/assembly - - plugin.properties - plugin.conf - - / - - - diff --git a/fe_plugins/pom.xml b/fe_plugins/pom.xml index 9ace25e271..ef2ade6f67 100644 --- a/fe_plugins/pom.xml +++ b/fe_plugins/pom.xml @@ -65,7 +65,6 @@ under the License. auditdemo auditloader - http-dialect-converter trino-converter sparksql-converter