[refactor](dialect) make http sql converter plugin and audit loader as builtin plugin (#29692)
Followup #28890 Make HttpSqlConverterPlugin and AuditLoader as Doris' builtin plugin. To make it simple for user to support sql dialect and using audit loader. HttpSqlConverterPlugin By default, there is nothing changed. There is a new global variable sql_converter_service, default is empty, if set, the HttpSqlConverterPlugin will be enabled set global sql_converter_service = "http://127.0.0.1:5001/api/v1/convert" AuditLoader By default, there is nothing changed. There is a new global variable enable_audit_plugin, default is false, if set to true, the audit loader plugin will be enable. Doris will create audit_log in __internal_schema when startup If enable_audit_plugin is true, the audit load will be inserted into audit_log table. 3 other global variables related to this plugin: audit_plugin_max_batch_interval_sec: The max interval for audit loader to insert a batch of audit log. audit_plugin_max_batch_bytes: The max batch size for audit loader to insert a batch of audit log. audit_plugin_max_sql_length: The max length of statement in audit log
This commit is contained in:
@ -200,7 +200,7 @@ public class InlineViewRef extends TableRef {
|
||||
if (view == null && !hasExplicitAlias()) {
|
||||
String dialect = ConnectContext.get().getSessionVariable().getSqlDialect();
|
||||
Dialect sqlDialect = Dialect.getByName(dialect);
|
||||
if (Dialect.SPARK_SQL != sqlDialect) {
|
||||
if (Dialect.SPARK != sqlDialect) {
|
||||
ErrorReport.reportAnalysisException(ErrorCode.ERR_DERIVED_MUST_HAVE_ALIAS);
|
||||
}
|
||||
hasExplicitAlias = true;
|
||||
|
||||
@ -24,6 +24,8 @@ import org.apache.doris.analysis.DistributionDesc;
|
||||
import org.apache.doris.analysis.DropTableStmt;
|
||||
import org.apache.doris.analysis.HashDistributionDesc;
|
||||
import org.apache.doris.analysis.KeysDesc;
|
||||
import org.apache.doris.analysis.PartitionDesc;
|
||||
import org.apache.doris.analysis.RangePartitionDesc;
|
||||
import org.apache.doris.analysis.TableName;
|
||||
import org.apache.doris.analysis.TypeDef;
|
||||
import org.apache.doris.common.Config;
|
||||
@ -33,6 +35,7 @@ import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.PropertyAnalyzer;
|
||||
import org.apache.doris.datasource.InternalCatalog;
|
||||
import org.apache.doris.ha.FrontendNodeType;
|
||||
import org.apache.doris.plugin.audit.AuditLoaderPlugin;
|
||||
import org.apache.doris.statistics.StatisticConstants;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
|
||||
@ -53,6 +56,33 @@ public class InternalSchemaInitializer extends Thread {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(InternalSchemaInitializer.class);
|
||||
|
||||
public static final List<ColumnDef> AUDIT_TABLE_COLUMNS;
|
||||
|
||||
static {
|
||||
AUDIT_TABLE_COLUMNS = new ArrayList<>();
|
||||
AUDIT_TABLE_COLUMNS.add(new ColumnDef("query_id", TypeDef.createVarchar(48), true));
|
||||
AUDIT_TABLE_COLUMNS.add(new ColumnDef("time", TypeDef.create(PrimitiveType.DATETIME), true));
|
||||
AUDIT_TABLE_COLUMNS.add(new ColumnDef("client_ip", TypeDef.createVarchar(128), true));
|
||||
AUDIT_TABLE_COLUMNS.add(new ColumnDef("user", TypeDef.createVarchar(128), true));
|
||||
AUDIT_TABLE_COLUMNS.add(new ColumnDef("catalog", TypeDef.createVarchar(128), true));
|
||||
AUDIT_TABLE_COLUMNS.add(new ColumnDef("db", TypeDef.createVarchar(128), true));
|
||||
AUDIT_TABLE_COLUMNS.add(new ColumnDef("state", TypeDef.createVarchar(128), true));
|
||||
AUDIT_TABLE_COLUMNS.add(new ColumnDef("error_code", TypeDef.create(PrimitiveType.INT), true));
|
||||
AUDIT_TABLE_COLUMNS.add(new ColumnDef("error_message", TypeDef.create(PrimitiveType.STRING), true));
|
||||
AUDIT_TABLE_COLUMNS.add(new ColumnDef("query_time", TypeDef.create(PrimitiveType.BIGINT), true));
|
||||
AUDIT_TABLE_COLUMNS.add(new ColumnDef("scan_bytes", TypeDef.create(PrimitiveType.BIGINT), true));
|
||||
AUDIT_TABLE_COLUMNS.add(new ColumnDef("scan_rows", TypeDef.create(PrimitiveType.BIGINT), true));
|
||||
AUDIT_TABLE_COLUMNS.add(new ColumnDef("return_rows", TypeDef.create(PrimitiveType.BIGINT), true));
|
||||
AUDIT_TABLE_COLUMNS.add(new ColumnDef("stmt_id", TypeDef.create(PrimitiveType.BIGINT), true));
|
||||
AUDIT_TABLE_COLUMNS.add(new ColumnDef("is_query", TypeDef.create(PrimitiveType.TINYINT), true));
|
||||
AUDIT_TABLE_COLUMNS.add(new ColumnDef("frontend_ip", TypeDef.createVarchar(128), true));
|
||||
AUDIT_TABLE_COLUMNS.add(new ColumnDef("cpu_time_ms", TypeDef.create(PrimitiveType.BIGINT), true));
|
||||
AUDIT_TABLE_COLUMNS.add(new ColumnDef("sql_hash", TypeDef.createVarchar(128), true));
|
||||
AUDIT_TABLE_COLUMNS.add(new ColumnDef("sql_digest", TypeDef.createVarchar(128), true));
|
||||
AUDIT_TABLE_COLUMNS.add(new ColumnDef("peak_memory_bytes", TypeDef.create(PrimitiveType.BIGINT), true));
|
||||
AUDIT_TABLE_COLUMNS.add(new ColumnDef("stmt", TypeDef.create(PrimitiveType.STRING), true));
|
||||
}
|
||||
|
||||
public void run() {
|
||||
if (!FeConstants.enableInternalSchemaDb) {
|
||||
return;
|
||||
@ -83,6 +113,7 @@ public class InternalSchemaInitializer extends Thread {
|
||||
Database database = op.get();
|
||||
modifyTblReplicaCount(database, StatisticConstants.STATISTIC_TBL_NAME);
|
||||
modifyTblReplicaCount(database, StatisticConstants.HISTOGRAM_TBL_NAME);
|
||||
modifyTblReplicaCount(database, AuditLoaderPlugin.AUDIT_LOG_TABLE);
|
||||
}
|
||||
|
||||
public void modifyTblReplicaCount(Database database, String tblName) {
|
||||
@ -103,8 +134,8 @@ public class InternalSchemaInitializer extends Thread {
|
||||
>= StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM) {
|
||||
return;
|
||||
}
|
||||
colStatsTbl.writeLock();
|
||||
try {
|
||||
colStatsTbl.writeLock();
|
||||
Env.getCurrentEnv().modifyTableReplicaAllocation(database, (OlapTable) colStatsTbl, props);
|
||||
} finally {
|
||||
colStatsTbl.writeUnlock();
|
||||
@ -123,8 +154,11 @@ public class InternalSchemaInitializer extends Thread {
|
||||
}
|
||||
|
||||
private void createTbl() throws UserException {
|
||||
// statistics
|
||||
Env.getCurrentEnv().getInternalCatalog().createTable(buildStatisticsTblStmt());
|
||||
Env.getCurrentEnv().getInternalCatalog().createTable(buildHistogramTblStmt());
|
||||
// audit table
|
||||
Env.getCurrentEnv().getInternalCatalog().createTable(buildAuditTblStmt());
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
@ -212,7 +246,40 @@ public class InternalSchemaInitializer extends Thread {
|
||||
return createTableStmt;
|
||||
}
|
||||
|
||||
private CreateTableStmt buildAuditTblStmt() throws UserException {
|
||||
TableName tableName = new TableName("",
|
||||
FeConstants.INTERNAL_DB_NAME, AuditLoaderPlugin.AUDIT_LOG_TABLE);
|
||||
|
||||
String engineName = "olap";
|
||||
ArrayList<String> dupKeys = Lists.newArrayList("query_id", "time", "client_ip");
|
||||
KeysDesc keysDesc = new KeysDesc(KeysType.DUP_KEYS, dupKeys);
|
||||
// partition
|
||||
PartitionDesc partitionDesc = new RangePartitionDesc(Lists.newArrayList("time"), Lists.newArrayList());
|
||||
// distribution
|
||||
int bucketNum = 2;
|
||||
DistributionDesc distributionDesc = new HashDistributionDesc(bucketNum, Lists.newArrayList("query_id"));
|
||||
Map<String, String> properties = new HashMap<String, String>() {
|
||||
{
|
||||
put("dynamic_partition.time_unit", "DAY");
|
||||
put("dynamic_partition.start", "-30");
|
||||
put("dynamic_partition.end", "3");
|
||||
put("dynamic_partition.prefix", "p");
|
||||
put("dynamic_partition.buckets", String.valueOf(bucketNum));
|
||||
put("dynamic_partition.enable", "true");
|
||||
put("replication_num", String.valueOf(Math.max(1,
|
||||
Config.min_replication_num_per_tablet)));
|
||||
}
|
||||
};
|
||||
CreateTableStmt createTableStmt = new CreateTableStmt(true, false,
|
||||
tableName, AUDIT_TABLE_COLUMNS, engineName, keysDesc, partitionDesc, distributionDesc,
|
||||
properties, null, "Doris internal audit table, DO NOT MODIFY IT", null);
|
||||
StatisticsUtil.analyze(createTableStmt);
|
||||
return createTableStmt;
|
||||
}
|
||||
|
||||
|
||||
private boolean created() {
|
||||
// 1. check database exist
|
||||
Optional<Database> optionalDatabase =
|
||||
Env.getCurrentEnv().getInternalCatalog()
|
||||
.getDb(FeConstants.INTERNAL_DB_NAME);
|
||||
@ -225,6 +292,7 @@ public class InternalSchemaInitializer extends Thread {
|
||||
return false;
|
||||
}
|
||||
|
||||
// 2. check statistic tables
|
||||
Table statsTbl = optionalStatsTbl.get();
|
||||
Optional<Column> optionalColumn =
|
||||
statsTbl.fullSchema.stream().filter(c -> c.getName().equals("count")).findFirst();
|
||||
@ -238,7 +306,17 @@ public class InternalSchemaInitializer extends Thread {
|
||||
}
|
||||
return false;
|
||||
}
|
||||
return db.getTable(StatisticConstants.HISTOGRAM_TBL_NAME).isPresent();
|
||||
optionalStatsTbl = db.getTable(StatisticConstants.HISTOGRAM_TBL_NAME);
|
||||
if (!optionalStatsTbl.isPresent()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// 3. check audit table
|
||||
optionalStatsTbl = db.getTable(AuditLoaderPlugin.AUDIT_LOG_TABLE);
|
||||
if (!optionalStatsTbl.isPresent()) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -26,6 +26,7 @@ import org.apache.doris.common.LoadException;
|
||||
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
|
||||
import org.apache.doris.httpv2.entity.RestBaseResult;
|
||||
import org.apache.doris.httpv2.exception.UnauthorizedException;
|
||||
import org.apache.doris.mysql.privilege.Auth;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.resource.Tag;
|
||||
@ -104,21 +105,21 @@ public class LoadAction extends RestBaseController {
|
||||
return redirectToHttps(request);
|
||||
}
|
||||
|
||||
try {
|
||||
executeCheckPassword(request, response);
|
||||
} catch (UnauthorizedException unauthorizedException) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Check password failed, going to check auth token, request: {}", request.toString());
|
||||
String authToken = request.getHeader("token");
|
||||
// if auth token is not null, check it first
|
||||
if (!Strings.isNullOrEmpty(authToken)) {
|
||||
if (!checkClusterToken(authToken)) {
|
||||
throw new UnauthorizedException("Invalid token: " + authToken);
|
||||
}
|
||||
|
||||
if (!checkClusterToken(request)) {
|
||||
throw unauthorizedException;
|
||||
} else {
|
||||
return executeWithClusterToken(request, db, table, true);
|
||||
return executeWithClusterToken(request, db, table, true);
|
||||
} else {
|
||||
try {
|
||||
executeCheckPassword(request, response);
|
||||
return executeWithoutPassword(request, response, db, table, true, groupCommit);
|
||||
} finally {
|
||||
ConnectContext.remove();
|
||||
}
|
||||
}
|
||||
|
||||
return executeWithoutPassword(request, response, db, table, true, groupCommit);
|
||||
}
|
||||
|
||||
@RequestMapping(path = "/api/_http_stream",
|
||||
@ -363,18 +364,8 @@ public class LoadAction extends RestBaseController {
|
||||
// AuditlogPlugin should be re-disigned carefully, and blow method focuses on
|
||||
// temporarily addressing the users' needs for audit logs.
|
||||
// So this function is not widely tested under general scenario
|
||||
private boolean checkClusterToken(HttpServletRequest request) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Checking cluser token, request {}", request.toString());
|
||||
}
|
||||
|
||||
String authToken = request.getHeader("token");
|
||||
|
||||
if (Strings.isNullOrEmpty(authToken)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return Env.getCurrentEnv().getLoadManager().getTokenManager().checkAuthToken(authToken);
|
||||
private boolean checkClusterToken(String token) {
|
||||
return Env.getCurrentEnv().getLoadManager().getTokenManager().checkAuthToken(token);
|
||||
}
|
||||
|
||||
// NOTE: This function can only be used for AuditlogPlugin stream load for now.
|
||||
@ -388,6 +379,9 @@ public class LoadAction extends RestBaseController {
|
||||
ctx.setEnv(Env.getCurrentEnv());
|
||||
ctx.setThreadLocalInfo();
|
||||
ctx.setRemoteIP(request.getRemoteAddr());
|
||||
// set user to ADMIN_USER, so that we can get the proper resource tag
|
||||
ctx.setQualifiedUser(Auth.ADMIN_USER);
|
||||
ctx.setThreadLocalInfo();
|
||||
|
||||
String dbName = db;
|
||||
String tableName = table;
|
||||
@ -444,8 +438,10 @@ public class LoadAction extends RestBaseController {
|
||||
|
||||
return redirectView;
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to execute stream load with cluster token, {}", e);
|
||||
LOG.warn("Failed to execute stream load with cluster token, {}", e.getMessage(), e);
|
||||
return new RestBaseResult(e.getMessage());
|
||||
} finally {
|
||||
ConnectContext.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -28,9 +28,9 @@ import org.apache.doris.common.io.Writable;
|
||||
import org.apache.doris.common.util.MasterDaemon;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.persist.gson.GsonUtils;
|
||||
import org.apache.doris.plugin.AuditEvent;
|
||||
import org.apache.doris.plugin.AuditEvent.EventType;
|
||||
import org.apache.doris.plugin.StreamLoadAuditEvent;
|
||||
import org.apache.doris.plugin.audit.AuditEvent;
|
||||
import org.apache.doris.plugin.audit.AuditEvent.EventType;
|
||||
import org.apache.doris.plugin.audit.StreamLoadAuditEvent;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.thrift.BackendService;
|
||||
import org.apache.doris.thrift.TNetworkAddress;
|
||||
|
||||
@ -41,8 +41,8 @@ import org.apache.doris.load.BrokerFileGroup;
|
||||
import org.apache.doris.load.BrokerFileGroupAggInfo;
|
||||
import org.apache.doris.load.EtlJobType;
|
||||
import org.apache.doris.load.FailMsg;
|
||||
import org.apache.doris.plugin.AuditEvent;
|
||||
import org.apache.doris.plugin.LoadAuditEvent;
|
||||
import org.apache.doris.plugin.audit.AuditEvent;
|
||||
import org.apache.doris.plugin.audit.LoadAuditEvent;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.OriginStatement;
|
||||
import org.apache.doris.qe.SessionVariable;
|
||||
|
||||
@ -36,25 +36,25 @@ public enum Dialect {
|
||||
*/
|
||||
PRESTO("presto"),
|
||||
/**
|
||||
* Spark sql parser dialect
|
||||
* Spark3 sql parser dialect
|
||||
*/
|
||||
SPARK_SQL("spark_sql"),
|
||||
SPARK("spark"),
|
||||
/**
|
||||
* Spark2 sql parser dialect
|
||||
*/
|
||||
SPARK2("spark2"),
|
||||
/**
|
||||
* Flink sql parser dialect
|
||||
*/
|
||||
FLINK("flink"),
|
||||
/**
|
||||
* Hive parser dialect
|
||||
*/
|
||||
HIVE("hive"),
|
||||
/**
|
||||
* Alibaba max compute parser dialect
|
||||
*/
|
||||
MAX_COMPUTE("max_compute"),
|
||||
/**
|
||||
* Mysql parser dialect
|
||||
*/
|
||||
MYSQL("mysql"),
|
||||
/**
|
||||
* Postgresql parser dialect
|
||||
*/
|
||||
POSTGRESQL("postgresql"),
|
||||
POSTGRES("postgres"),
|
||||
/**
|
||||
* Sqlserver parser dialect
|
||||
*/
|
||||
@ -64,13 +64,9 @@ public enum Dialect {
|
||||
*/
|
||||
CLICKHOUSE("clickhouse"),
|
||||
/**
|
||||
* Sap hana parser dialect
|
||||
* oracle parser dialect
|
||||
*/
|
||||
SAP_HANA("sap_hana"),
|
||||
/**
|
||||
* OceanBase parser dialect
|
||||
*/
|
||||
OCEANBASE("oceanbase");
|
||||
ORACLE("oracle");
|
||||
|
||||
public static final int MAX_DIALECT_SIZE = Dialect.values().length;
|
||||
|
||||
|
||||
@ -17,6 +17,8 @@
|
||||
|
||||
package org.apache.doris.plugin;
|
||||
|
||||
import org.apache.doris.plugin.audit.AuditEvent;
|
||||
|
||||
/**
|
||||
* Audit plugin interface describe.
|
||||
*/
|
||||
|
||||
@ -27,7 +27,9 @@ import org.apache.doris.common.util.PrintableMap;
|
||||
import org.apache.doris.nereids.parser.Dialect;
|
||||
import org.apache.doris.plugin.PluginInfo.PluginType;
|
||||
import org.apache.doris.plugin.PluginLoader.PluginStatus;
|
||||
import org.apache.doris.qe.AuditLogBuilder;
|
||||
import org.apache.doris.plugin.audit.AuditLoaderPlugin;
|
||||
import org.apache.doris.plugin.audit.AuditLogBuilder;
|
||||
import org.apache.doris.plugin.dialect.HttpDialectConverterPlugin;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
@ -104,12 +106,24 @@ public class PluginMgr implements Writable {
|
||||
}
|
||||
|
||||
private void initBuiltinPlugins() {
|
||||
// AuditLog
|
||||
// AuditLog: log audit log to file
|
||||
AuditLogBuilder auditLogBuilder = new AuditLogBuilder();
|
||||
if (!registerBuiltinPlugin(auditLogBuilder.getPluginInfo(), auditLogBuilder)) {
|
||||
LOG.warn("failed to register audit log builder");
|
||||
}
|
||||
|
||||
// AuditLoader: log audit log to internal table
|
||||
AuditLoaderPlugin auditLoaderPlugin = new AuditLoaderPlugin();
|
||||
if (!registerBuiltinPlugin(auditLoaderPlugin.getPluginInfo(), auditLoaderPlugin)) {
|
||||
LOG.warn("failed to register audit log builder");
|
||||
}
|
||||
|
||||
// sql dialect converter
|
||||
HttpDialectConverterPlugin httpDialectConverterPlugin = new HttpDialectConverterPlugin();
|
||||
if (!registerBuiltinPlugin(httpDialectConverterPlugin.getPluginInfo(), httpDialectConverterPlugin)) {
|
||||
LOG.warn("failed to register http dialect converter plugin");
|
||||
}
|
||||
|
||||
// other builtin plugins
|
||||
}
|
||||
|
||||
@ -217,11 +231,17 @@ public class PluginMgr implements Writable {
|
||||
}
|
||||
|
||||
PluginLoader loader = new BuiltinPluginLoader(Config.plugin_dir, pluginInfo, plugin);
|
||||
PluginLoader checkLoader = plugins[pluginInfo.getTypeId()].putIfAbsent(pluginInfo.getName(), loader);
|
||||
try {
|
||||
loader.install();
|
||||
} catch (Exception e) {
|
||||
LOG.warn("failed to register builtin plugin {}", pluginInfo.getName(), e);
|
||||
return false;
|
||||
}
|
||||
// add dialect plugin
|
||||
if (plugin instanceof DialectConverterPlugin) {
|
||||
addDialectPlugin((DialectConverterPlugin) plugin, pluginInfo);
|
||||
}
|
||||
PluginLoader checkLoader = plugins[pluginInfo.getTypeId()].putIfAbsent(pluginInfo.getName(), loader);
|
||||
return checkLoader == null;
|
||||
}
|
||||
|
||||
|
||||
@ -15,7 +15,7 @@
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.plugin;
|
||||
package org.apache.doris.plugin.audit;
|
||||
|
||||
|
||||
import java.lang.annotation.Retention;
|
||||
@ -56,6 +56,8 @@ public class AuditEvent {
|
||||
public String clientIp = "";
|
||||
@AuditField(value = "User")
|
||||
public String user = "";
|
||||
@AuditField(value = "Ctl")
|
||||
public String ctl = "";
|
||||
@AuditField(value = "Db")
|
||||
public String db = "";
|
||||
@AuditField(value = "State")
|
||||
@ -133,6 +135,11 @@ public class AuditEvent {
|
||||
return this;
|
||||
}
|
||||
|
||||
public AuditEventBuilder setCtl(String ctl) {
|
||||
auditEvent.ctl = ctl;
|
||||
return this;
|
||||
}
|
||||
|
||||
public AuditEventBuilder setDb(String db) {
|
||||
auditEvent.db = db;
|
||||
return this;
|
||||
@ -0,0 +1,252 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.plugin.audit;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.util.DigitalVersion;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.plugin.AuditPlugin;
|
||||
import org.apache.doris.plugin.Plugin;
|
||||
import org.apache.doris.plugin.PluginContext;
|
||||
import org.apache.doris.plugin.PluginException;
|
||||
import org.apache.doris.plugin.PluginInfo;
|
||||
import org.apache.doris.plugin.PluginInfo.PluginType;
|
||||
import org.apache.doris.plugin.PluginMgr;
|
||||
import org.apache.doris.qe.GlobalVariable;
|
||||
|
||||
import com.google.common.collect.Queues;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.CharBuffer;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.CharsetDecoder;
|
||||
import java.nio.charset.CodingErrorAction;
|
||||
import java.time.ZoneId;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/*
|
||||
* This plugin will load audit log to specified doris table at specified interval
|
||||
*/
|
||||
public class AuditLoaderPlugin extends Plugin implements AuditPlugin {
|
||||
private static final Logger LOG = LogManager.getLogger(AuditLoaderPlugin.class);
|
||||
|
||||
public static final String AUDIT_LOG_TABLE = "audit_log";
|
||||
|
||||
private static final DateTimeFormatter DATETIME_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
|
||||
.withZone(ZoneId.systemDefault());
|
||||
|
||||
private StringBuilder auditLogBuffer = new StringBuilder();
|
||||
private int auditLogNum = 0;
|
||||
private long lastLoadTimeAuditLog = 0;
|
||||
// sometimes the audit log may fail to load to doris, count it to observe.
|
||||
private long discardLogNum = 0;
|
||||
|
||||
private BlockingQueue<AuditEvent> auditEventQueue;
|
||||
private AuditStreamLoader streamLoader;
|
||||
private Thread loadThread;
|
||||
|
||||
private volatile boolean isClosed = false;
|
||||
private volatile boolean isInit = false;
|
||||
|
||||
private final PluginInfo pluginInfo;
|
||||
|
||||
public AuditLoaderPlugin() {
|
||||
pluginInfo = new PluginInfo(PluginMgr.BUILTIN_PLUGIN_PREFIX + "AuditLoader", PluginType.AUDIT,
|
||||
"builtin audit loader, to load audit log to internal table", DigitalVersion.fromString("2.1.0"),
|
||||
DigitalVersion.fromString("1.8.31"), AuditLoaderPlugin.class.getName(), null, null);
|
||||
}
|
||||
|
||||
public PluginInfo getPluginInfo() {
|
||||
return pluginInfo;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(PluginInfo info, PluginContext ctx) throws PluginException {
|
||||
super.init(info, ctx);
|
||||
|
||||
synchronized (this) {
|
||||
if (isInit) {
|
||||
return;
|
||||
}
|
||||
this.lastLoadTimeAuditLog = System.currentTimeMillis();
|
||||
// make capacity large enough to avoid blocking.
|
||||
// and it will not be too large because the audit log will flush if num in queue is larger than
|
||||
// GlobalVariable.audit_plugin_max_batch_bytes.
|
||||
this.auditEventQueue = Queues.newLinkedBlockingDeque(100000);
|
||||
this.streamLoader = new AuditStreamLoader();
|
||||
this.loadThread = new Thread(new LoadWorker(this.streamLoader), "audit loader thread");
|
||||
this.loadThread.start();
|
||||
|
||||
isInit = true;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
super.close();
|
||||
isClosed = true;
|
||||
if (loadThread != null) {
|
||||
try {
|
||||
loadThread.join();
|
||||
} catch (InterruptedException e) {
|
||||
LOG.debug("encounter exception when closing the audit loader", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public boolean eventFilter(AuditEvent.EventType type) {
|
||||
return type == AuditEvent.EventType.AFTER_QUERY;
|
||||
}
|
||||
|
||||
public void exec(AuditEvent event) {
|
||||
if (!GlobalVariable.enableAuditLoader) {
|
||||
LOG.debug("builtin audit loader is disabled, discard current audit event");
|
||||
return;
|
||||
}
|
||||
try {
|
||||
auditEventQueue.add(event);
|
||||
} catch (Exception e) {
|
||||
// In order to ensure that the system can run normally, here we directly
|
||||
// discard the current audit_event. If this problem occurs frequently,
|
||||
// improvement can be considered.
|
||||
++discardLogNum;
|
||||
LOG.debug("encounter exception when putting current audit batch, discard current audit event."
|
||||
+ " total discard num: {}", discardLogNum, e);
|
||||
}
|
||||
}
|
||||
|
||||
private void assembleAudit(AuditEvent event) {
|
||||
fillLogBuffer(event, auditLogBuffer);
|
||||
++auditLogNum;
|
||||
}
|
||||
|
||||
private void fillLogBuffer(AuditEvent event, StringBuilder logBuffer) {
|
||||
logBuffer.append(event.queryId).append("\t");
|
||||
logBuffer.append(TimeUtils.longToTimeString(event.timestamp)).append("\t");
|
||||
logBuffer.append(event.clientIp).append("\t");
|
||||
logBuffer.append(event.user).append("\t");
|
||||
logBuffer.append(event.ctl).append("\t");
|
||||
logBuffer.append(event.db).append("\t");
|
||||
logBuffer.append(event.state).append("\t");
|
||||
logBuffer.append(event.errorCode).append("\t");
|
||||
logBuffer.append(event.errorMessage).append("\t");
|
||||
logBuffer.append(event.queryTime).append("\t");
|
||||
logBuffer.append(event.scanBytes).append("\t");
|
||||
logBuffer.append(event.scanRows).append("\t");
|
||||
logBuffer.append(event.returnRows).append("\t");
|
||||
logBuffer.append(event.stmtId).append("\t");
|
||||
logBuffer.append(event.isQuery ? 1 : 0).append("\t");
|
||||
logBuffer.append(event.feIp).append("\t");
|
||||
logBuffer.append(event.cpuTimeMs).append("\t");
|
||||
logBuffer.append(event.sqlHash).append("\t");
|
||||
logBuffer.append(event.sqlDigest).append("\t");
|
||||
logBuffer.append(event.peakMemoryBytes).append("\t");
|
||||
// trim the query to avoid too long
|
||||
// use `getBytes().length` to get real byte length
|
||||
String stmt = truncateByBytes(event.stmt).replace("\n", " ")
|
||||
.replace("\t", " ")
|
||||
.replace("\r", " ");
|
||||
LOG.debug("receive audit event with stmt: {}", stmt);
|
||||
logBuffer.append(stmt).append("\n");
|
||||
}
|
||||
|
||||
private String truncateByBytes(String str) {
|
||||
int maxLen = Math.min(GlobalVariable.auditPluginMaxSqlLength, str.getBytes().length);
|
||||
if (maxLen >= str.getBytes().length) {
|
||||
return str;
|
||||
}
|
||||
Charset utf8Charset = Charset.forName("UTF-8");
|
||||
CharsetDecoder decoder = utf8Charset.newDecoder();
|
||||
byte[] sb = str.getBytes();
|
||||
ByteBuffer buffer = ByteBuffer.wrap(sb, 0, maxLen);
|
||||
CharBuffer charBuffer = CharBuffer.allocate(maxLen);
|
||||
decoder.onMalformedInput(CodingErrorAction.IGNORE);
|
||||
decoder.decode(buffer, charBuffer, true);
|
||||
decoder.flush(charBuffer);
|
||||
return new String(charBuffer.array(), 0, charBuffer.position());
|
||||
}
|
||||
|
||||
private void loadIfNecessary(AuditStreamLoader loader) {
|
||||
long currentTime = System.currentTimeMillis();
|
||||
|
||||
if (auditLogBuffer.length() >= GlobalVariable.auditPluginMaxBatchBytes
|
||||
|| currentTime - lastLoadTimeAuditLog >= GlobalVariable.auditPluginMaxBatchInternalSec * 1000) {
|
||||
// begin to load
|
||||
try {
|
||||
String token = "";
|
||||
try {
|
||||
// Acquire token from master
|
||||
token = Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to get auth token: {}", e);
|
||||
discardLogNum += auditLogNum;
|
||||
return;
|
||||
}
|
||||
AuditStreamLoader.LoadResponse response = loader.loadBatch(auditLogBuffer, token);
|
||||
LOG.debug("audit loader response: {}", response);
|
||||
} catch (Exception e) {
|
||||
LOG.debug("encounter exception when putting current audit batch, discard current batch", e);
|
||||
discardLogNum += auditLogNum;
|
||||
} finally {
|
||||
// make a new string builder to receive following events.
|
||||
resetBatch(currentTime);
|
||||
if (discardLogNum > 0) {
|
||||
LOG.info("num of total discarded audit logs: {}", discardLogNum);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
private void resetBatch(long currentTime) {
|
||||
this.auditLogBuffer = new StringBuilder();
|
||||
this.lastLoadTimeAuditLog = currentTime;
|
||||
this.auditLogNum = 0;
|
||||
}
|
||||
|
||||
private class LoadWorker implements Runnable {
|
||||
private AuditStreamLoader loader;
|
||||
|
||||
public LoadWorker(AuditStreamLoader loader) {
|
||||
this.loader = loader;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
while (!isClosed) {
|
||||
try {
|
||||
AuditEvent event = auditEventQueue.poll(5, TimeUnit.SECONDS);
|
||||
if (event != null) {
|
||||
assembleAudit(event);
|
||||
// process all audit logs
|
||||
loadIfNecessary(loader);
|
||||
}
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.debug("encounter exception when loading current audit batch", ie);
|
||||
} catch (Exception e) {
|
||||
LOG.error("run audit logger error:", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -15,19 +15,18 @@
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.qe;
|
||||
package org.apache.doris.plugin.audit;
|
||||
|
||||
import org.apache.doris.common.AuditLog;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.util.DigitalVersion;
|
||||
import org.apache.doris.plugin.AuditEvent;
|
||||
import org.apache.doris.plugin.AuditEvent.AuditField;
|
||||
import org.apache.doris.plugin.AuditEvent.EventType;
|
||||
import org.apache.doris.plugin.AuditPlugin;
|
||||
import org.apache.doris.plugin.Plugin;
|
||||
import org.apache.doris.plugin.PluginInfo;
|
||||
import org.apache.doris.plugin.PluginInfo.PluginType;
|
||||
import org.apache.doris.plugin.PluginMgr;
|
||||
import org.apache.doris.plugin.audit.AuditEvent.AuditField;
|
||||
import org.apache.doris.plugin.audit.AuditEvent.EventType;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
@ -0,0 +1,182 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.plugin.audit;
|
||||
|
||||
import org.apache.doris.catalog.InternalSchemaInitializer;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.URL;
|
||||
import java.util.Calendar;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class AuditStreamLoader {
|
||||
private static final Logger LOG = LogManager.getLogger(AuditStreamLoader.class);
|
||||
private static String loadUrlPattern = "http://%s/api/%s/%s/_stream_load?";
|
||||
private String hostPort;
|
||||
private String db;
|
||||
private String auditLogTbl;
|
||||
private String auditLogLoadUrlStr;
|
||||
private String feIdentity;
|
||||
|
||||
public AuditStreamLoader() {
|
||||
this.hostPort = "127.0.0.1:" + Config.http_port;
|
||||
this.db = FeConstants.INTERNAL_DB_NAME;
|
||||
this.auditLogTbl = AuditLoaderPlugin.AUDIT_LOG_TABLE;
|
||||
this.auditLogLoadUrlStr = String.format(loadUrlPattern, hostPort, db, auditLogTbl);
|
||||
// currently, FE identity is FE's IP, so we replace the "." in IP to make it suitable for label
|
||||
this.feIdentity = hostPort.replaceAll("\\.", "_");
|
||||
}
|
||||
|
||||
private HttpURLConnection getConnection(String urlStr, String label, String clusterToken) throws IOException {
|
||||
URL url = new URL(urlStr);
|
||||
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
|
||||
conn.setInstanceFollowRedirects(false);
|
||||
conn.setRequestMethod("PUT");
|
||||
conn.setRequestProperty("token", clusterToken);
|
||||
conn.setRequestProperty("Authorization", "Basic ");
|
||||
conn.addRequestProperty("Expect", "100-continue");
|
||||
conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
|
||||
conn.addRequestProperty("label", label);
|
||||
conn.addRequestProperty("max_filter_ratio", "1.0");
|
||||
conn.addRequestProperty("columns",
|
||||
InternalSchemaInitializer.AUDIT_TABLE_COLUMNS.stream().map(c -> c.getName()).collect(
|
||||
Collectors.joining(",")));
|
||||
conn.setDoOutput(true);
|
||||
conn.setDoInput(true);
|
||||
return conn;
|
||||
}
|
||||
|
||||
private String toCurl(HttpURLConnection conn) {
|
||||
StringBuilder sb = new StringBuilder("curl -v ");
|
||||
sb.append("-X ").append(conn.getRequestMethod()).append(" \\\n ");
|
||||
sb.append("-H \"").append("Authorization\":").append("\"Basic ").append("\" \\\n ");
|
||||
sb.append("-H \"").append("Expect\":").append("\"100-continue\" \\\n ");
|
||||
sb.append("-H \"").append("Content-Type\":").append("\"text/plain; charset=UTF-8\" \\\n ");
|
||||
sb.append("-H \"").append("max_filter_ratio\":").append("\"1.0\" \\\n ");
|
||||
sb.append("-H \"").append("columns\":")
|
||||
.append("\"" + InternalSchemaInitializer.AUDIT_TABLE_COLUMNS.stream().map(c -> c.getName()).collect(
|
||||
Collectors.joining(",")) + "\" \\\n ");
|
||||
sb.append("\"").append(conn.getURL()).append("\"");
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
private String getContent(HttpURLConnection conn) {
|
||||
BufferedReader br = null;
|
||||
StringBuilder response = new StringBuilder();
|
||||
String line;
|
||||
try {
|
||||
if (100 <= conn.getResponseCode() && conn.getResponseCode() <= 399) {
|
||||
br = new BufferedReader(new InputStreamReader(conn.getInputStream()));
|
||||
} else {
|
||||
br = new BufferedReader(new InputStreamReader(conn.getErrorStream()));
|
||||
}
|
||||
while ((line = br.readLine()) != null) {
|
||||
response.append(line);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.warn("get content error,", e);
|
||||
}
|
||||
|
||||
return response.toString();
|
||||
}
|
||||
|
||||
public LoadResponse loadBatch(StringBuilder sb, String clusterToken) {
|
||||
Calendar calendar = Calendar.getInstance();
|
||||
String label = String.format("_log_%s%02d%02d_%02d%02d%02d_%s",
|
||||
calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH),
|
||||
calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND),
|
||||
feIdentity);
|
||||
|
||||
HttpURLConnection feConn = null;
|
||||
HttpURLConnection beConn = null;
|
||||
try {
|
||||
// build request and send to fe
|
||||
label = "audit" + label;
|
||||
feConn = getConnection(auditLogLoadUrlStr, label, clusterToken);
|
||||
int status = feConn.getResponseCode();
|
||||
// fe send back http response code TEMPORARY_REDIRECT 307 and new be location
|
||||
if (status != 307) {
|
||||
throw new Exception("status is not TEMPORARY_REDIRECT 307, status: " + status
|
||||
+ ", response: " + getContent(feConn) + ", request is: " + toCurl(feConn));
|
||||
}
|
||||
String location = feConn.getHeaderField("Location");
|
||||
if (location == null) {
|
||||
throw new Exception("redirect location is null");
|
||||
}
|
||||
// build request and send to new be location
|
||||
beConn = getConnection(location, label, clusterToken);
|
||||
// send data to be
|
||||
try (BufferedOutputStream bos = new BufferedOutputStream(beConn.getOutputStream())) {
|
||||
bos.write(sb.toString().getBytes());
|
||||
}
|
||||
|
||||
// get respond
|
||||
status = beConn.getResponseCode();
|
||||
String respMsg = beConn.getResponseMessage();
|
||||
String response = getContent(beConn);
|
||||
|
||||
LOG.info("AuditLoader plugin load with label: {}, response code: {}, msg: {}, content: {}",
|
||||
label, status, respMsg, response);
|
||||
|
||||
return new LoadResponse(status, respMsg, response);
|
||||
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
String err = "failed to load audit via AuditLoader plugin with label: " + label;
|
||||
LOG.warn(err, e);
|
||||
return new LoadResponse(-1, e.getMessage(), err);
|
||||
} finally {
|
||||
if (feConn != null) {
|
||||
feConn.disconnect();
|
||||
}
|
||||
if (beConn != null) {
|
||||
beConn.disconnect();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class LoadResponse {
|
||||
public int status;
|
||||
public String respMsg;
|
||||
public String respContent;
|
||||
|
||||
public LoadResponse(int status, String respMsg, String respContent) {
|
||||
this.status = status;
|
||||
this.respMsg = respMsg;
|
||||
this.respContent = respContent;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("status: ").append(status);
|
||||
sb.append(", resp msg: ").append(respMsg);
|
||||
sb.append(", resp content: ").append(respContent);
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -15,7 +15,7 @@
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.plugin;
|
||||
package org.apache.doris.plugin.audit;
|
||||
|
||||
public class LoadAuditEvent extends AuditEvent {
|
||||
|
||||
@ -15,7 +15,7 @@
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.plugin;
|
||||
package org.apache.doris.plugin.audit;
|
||||
|
||||
public class StreamLoadAuditEvent extends AuditEvent {
|
||||
|
||||
@ -0,0 +1,114 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.plugin.dialect;
|
||||
|
||||
import org.apache.doris.analysis.StatementBase;
|
||||
import org.apache.doris.common.util.DigitalVersion;
|
||||
import org.apache.doris.nereids.parser.Dialect;
|
||||
import org.apache.doris.plugin.DialectConverterPlugin;
|
||||
import org.apache.doris.plugin.Plugin;
|
||||
import org.apache.doris.plugin.PluginContext;
|
||||
import org.apache.doris.plugin.PluginException;
|
||||
import org.apache.doris.plugin.PluginInfo;
|
||||
import org.apache.doris.plugin.PluginInfo.PluginType;
|
||||
import org.apache.doris.plugin.PluginMgr;
|
||||
import org.apache.doris.qe.GlobalVariable;
|
||||
import org.apache.doris.qe.SessionVariable;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
/**
|
||||
* Currently, there are many frameworks and services that support SQL dialect conversion,
|
||||
* but they may not be implemented in Java.
|
||||
* Therefore, we can encapsulate these SQL dialect conversion frameworks or services into an HTTP service,
|
||||
* and combine them with this plugin to provide dialect conversion capabilities.
|
||||
* Note that the protocol request/response for the wrapped HTTP service must comply with the following rules:
|
||||
* <pre>
|
||||
* Request body:
|
||||
* {
|
||||
* "version": "v1",
|
||||
* "sql": "select * from t",
|
||||
* "from": "presto",
|
||||
* "to": "doris",
|
||||
* "source": "text",
|
||||
* "case_sensitive": "0"
|
||||
* }
|
||||
*
|
||||
* Response body:
|
||||
* {
|
||||
* "version": "v1",
|
||||
* "data": "select * from t",
|
||||
* "code": 0,
|
||||
* "message": ""
|
||||
* }
|
||||
* </pre>
|
||||
* */
|
||||
public class HttpDialectConverterPlugin extends Plugin implements DialectConverterPlugin {
|
||||
|
||||
private volatile boolean isInit = false;
|
||||
private volatile ImmutableSet<Dialect> acceptDialects;
|
||||
private final PluginInfo pluginInfo;
|
||||
|
||||
public HttpDialectConverterPlugin() {
|
||||
pluginInfo = new PluginInfo(PluginMgr.BUILTIN_PLUGIN_PREFIX + "SqlDialectConverter", PluginType.DIALECT,
|
||||
"builtin sql dialect converter", DigitalVersion.fromString("2.1.0"),
|
||||
DigitalVersion.fromString("1.8.31"), HttpDialectConverterPlugin.class.getName(), null, null);
|
||||
acceptDialects = ImmutableSet.copyOf(Arrays.asList(Dialect.PRESTO, Dialect.TRINO, Dialect.HIVE,
|
||||
Dialect.SPARK, Dialect.POSTGRES, Dialect.CLICKHOUSE));
|
||||
}
|
||||
|
||||
public PluginInfo getPluginInfo() {
|
||||
return pluginInfo;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(PluginInfo info, PluginContext ctx) throws PluginException {
|
||||
super.init(info, ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
super.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ImmutableSet<Dialect> acceptDialects() {
|
||||
return acceptDialects;
|
||||
}
|
||||
|
||||
@Override
|
||||
public @Nullable String convertSql(String originSql, SessionVariable sessionVariable) {
|
||||
String targetURL = GlobalVariable.sqlConverterServiceUrl;
|
||||
if (Strings.isNullOrEmpty(targetURL)) {
|
||||
return null;
|
||||
}
|
||||
return HttpDialectUtils.convertSql(targetURL, originSql, sessionVariable.getSqlDialect());
|
||||
}
|
||||
|
||||
// no need to override parseSqlWithDialect, just return null
|
||||
@Override
|
||||
public List<StatementBase> parseSqlWithDialect(String sql, SessionVariable sessionVariable) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,140 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.plugin.dialect;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.reflect.TypeToken;
|
||||
import lombok.Data;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.OutputStream;
|
||||
import java.lang.reflect.Type;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.URL;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
/**
|
||||
* This class is used to convert sql with different dialects using sql convertor service.
|
||||
* The sql convertor service is a http service which is used to convert sql.
|
||||
*/
|
||||
public class HttpDialectUtils {
|
||||
private static final Logger LOG = LogManager.getLogger(HttpDialectUtils.class);
|
||||
|
||||
public static String convertSql(String targetURL, String originStmt, String dialect) {
|
||||
ConvertRequest convertRequest = new ConvertRequest(originStmt, dialect);
|
||||
|
||||
HttpURLConnection connection = null;
|
||||
try {
|
||||
URL url = new URL(targetURL);
|
||||
connection = (HttpURLConnection) url.openConnection();
|
||||
connection.setRequestMethod("POST");
|
||||
connection.setRequestProperty("Content-Type", "application/json");
|
||||
connection.setUseCaches(false);
|
||||
connection.setDoOutput(true);
|
||||
|
||||
String requestStr = convertRequest.toJson();
|
||||
try (OutputStream outputStream = connection.getOutputStream()) {
|
||||
outputStream.write(requestStr.getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
|
||||
int responseCode = connection.getResponseCode();
|
||||
LOG.debug("POST Response Code: {}, post data: {}", responseCode, requestStr);
|
||||
|
||||
if (responseCode == HttpURLConnection.HTTP_OK) {
|
||||
try (InputStreamReader inputStreamReader
|
||||
= new InputStreamReader(connection.getInputStream(), StandardCharsets.UTF_8);
|
||||
BufferedReader in = new BufferedReader(inputStreamReader)) {
|
||||
String inputLine;
|
||||
StringBuilder response = new StringBuilder();
|
||||
|
||||
while ((inputLine = in.readLine()) != null) {
|
||||
response.append(inputLine);
|
||||
}
|
||||
|
||||
Type type = new TypeToken<ConvertResponse>() {
|
||||
}.getType();
|
||||
ConvertResponse result = new Gson().fromJson(response.toString(), type);
|
||||
LOG.debug("convert response: {}", result);
|
||||
if (result.code == 0) {
|
||||
if (!"v1".equals(result.version)) {
|
||||
LOG.warn("failed to convert sql, response version is not v1: {}", result.version);
|
||||
return originStmt;
|
||||
}
|
||||
return result.data;
|
||||
} else {
|
||||
LOG.warn("failed to convert sql, response: {}", result);
|
||||
return originStmt;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
LOG.warn("failed to convert sql, response code: {}", responseCode);
|
||||
return originStmt;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("failed to convert sql", e);
|
||||
return originStmt;
|
||||
} finally {
|
||||
if (connection != null) {
|
||||
connection.disconnect();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Data
|
||||
private static class ConvertRequest {
|
||||
private String version; // CHECKSTYLE IGNORE THIS LINE
|
||||
private String sql_query; // CHECKSTYLE IGNORE THIS LINE
|
||||
private String from; // CHECKSTYLE IGNORE THIS LINE
|
||||
private String to; // CHECKSTYLE IGNORE THIS LINE
|
||||
private String source; // CHECKSTYLE IGNORE THIS LINE
|
||||
private String case_sensitive; // CHECKSTYLE IGNORE THIS LINE
|
||||
|
||||
public ConvertRequest(String originStmt, String dialect) {
|
||||
this.version = "v1";
|
||||
this.sql_query = originStmt;
|
||||
this.from = dialect;
|
||||
this.to = "doris";
|
||||
this.source = "text";
|
||||
this.case_sensitive = "0";
|
||||
}
|
||||
|
||||
public String toJson() {
|
||||
return new Gson().toJson(this);
|
||||
}
|
||||
}
|
||||
|
||||
@Data
|
||||
private static class ConvertResponse {
|
||||
private String version; // CHECKSTYLE IGNORE THIS LINE
|
||||
private String data; // CHECKSTYLE IGNORE THIS LINE
|
||||
private int code; // CHECKSTYLE IGNORE THIS LINE
|
||||
private String message; // CHECKSTYLE IGNORE THIS LINE
|
||||
|
||||
public String toJson() {
|
||||
return new Gson().toJson(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return toJson();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -17,11 +17,11 @@
|
||||
|
||||
package org.apache.doris.qe;
|
||||
|
||||
import org.apache.doris.plugin.AuditEvent;
|
||||
import org.apache.doris.plugin.AuditPlugin;
|
||||
import org.apache.doris.plugin.Plugin;
|
||||
import org.apache.doris.plugin.PluginInfo.PluginType;
|
||||
import org.apache.doris.plugin.PluginMgr;
|
||||
import org.apache.doris.plugin.audit.AuditEvent;
|
||||
|
||||
import com.google.common.collect.Queues;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
|
||||
@ -24,8 +24,11 @@ import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.cluster.ClusterNamespace;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.util.DebugUtil;
|
||||
import org.apache.doris.datasource.CatalogIf;
|
||||
import org.apache.doris.datasource.InternalCatalog;
|
||||
import org.apache.doris.metric.MetricRepo;
|
||||
import org.apache.doris.plugin.AuditEvent.EventType;
|
||||
import org.apache.doris.plugin.audit.AuditEvent.AuditEventBuilder;
|
||||
import org.apache.doris.plugin.audit.AuditEvent.EventType;
|
||||
import org.apache.doris.qe.QueryState.MysqlStateType;
|
||||
import org.apache.doris.service.FrontendOptions;
|
||||
|
||||
@ -39,8 +42,17 @@ public class AuditLogHelper {
|
||||
// slow query
|
||||
long endTime = System.currentTimeMillis();
|
||||
long elapseMs = endTime - ctx.getStartTime();
|
||||
CatalogIf catalog = ctx.getCurrentCatalog();
|
||||
|
||||
ctx.getAuditEventBuilder().setEventType(EventType.AFTER_QUERY)
|
||||
AuditEventBuilder auditEventBuilder = ctx.getAuditEventBuilder();
|
||||
auditEventBuilder.reset();
|
||||
auditEventBuilder
|
||||
.setTimestamp(ctx.getStartTime())
|
||||
.setClientIp(ctx.getClientIP())
|
||||
.setUser(ClusterNamespace.getNameFromFullName(ctx.getQualifiedUser()))
|
||||
.setSqlHash(ctx.getSqlHash())
|
||||
.setEventType(EventType.AFTER_QUERY)
|
||||
.setCtl(catalog == null ? InternalCatalog.INTERNAL_CATALOG_NAME : catalog.getName())
|
||||
.setDb(ClusterNamespace.getNameFromFullName(ctx.getDatabase()))
|
||||
.setState(ctx.getState().toString())
|
||||
.setErrorCode(ctx.getState().getErrorCode() == null ? 0 : ctx.getState().getErrorCode().getCode())
|
||||
@ -73,10 +85,10 @@ public class AuditLogHelper {
|
||||
|
||||
if (elapseMs > Config.qe_slow_log_ms) {
|
||||
String sqlDigest = DigestUtils.md5Hex(((Queriable) parsedStmt).toDigest());
|
||||
ctx.getAuditEventBuilder().setSqlDigest(sqlDigest);
|
||||
auditEventBuilder.setSqlDigest(sqlDigest);
|
||||
}
|
||||
}
|
||||
ctx.getAuditEventBuilder().setIsQuery(true);
|
||||
auditEventBuilder.setIsQuery(true);
|
||||
if (ctx.getQueryDetail() != null) {
|
||||
ctx.getQueryDetail().setEventTime(endTime);
|
||||
ctx.getQueryDetail().setEndTime(endTime);
|
||||
@ -90,35 +102,35 @@ public class AuditLogHelper {
|
||||
ctx.setQueryDetail(null);
|
||||
}
|
||||
} else {
|
||||
ctx.getAuditEventBuilder().setIsQuery(false);
|
||||
auditEventBuilder.setIsQuery(false);
|
||||
}
|
||||
ctx.getAuditEventBuilder().setIsNereids(ctx.getState().isNereids);
|
||||
auditEventBuilder.setIsNereids(ctx.getState().isNereids);
|
||||
|
||||
ctx.getAuditEventBuilder().setFeIp(FrontendOptions.getLocalHostAddress());
|
||||
auditEventBuilder.setFeIp(FrontendOptions.getLocalHostAddress());
|
||||
|
||||
// We put origin query stmt at the end of audit log, for parsing the log more convenient.
|
||||
if (!ctx.getState().isQuery() && (parsedStmt != null && parsedStmt.needAuditEncryption())) {
|
||||
ctx.getAuditEventBuilder().setStmt(parsedStmt.toSql());
|
||||
auditEventBuilder.setStmt(parsedStmt.toSql());
|
||||
} else {
|
||||
if (parsedStmt instanceof InsertStmt && !((InsertStmt) parsedStmt).needLoadManager()
|
||||
&& ((InsertStmt) parsedStmt).isValuesOrConstantSelect()) {
|
||||
// INSERT INTO VALUES may be very long, so we only log at most 1K bytes.
|
||||
int length = Math.min(1024, origStmt.length());
|
||||
ctx.getAuditEventBuilder().setStmt(origStmt.substring(0, length));
|
||||
auditEventBuilder.setStmt(origStmt.substring(0, length));
|
||||
} else {
|
||||
ctx.getAuditEventBuilder().setStmt(origStmt);
|
||||
auditEventBuilder.setStmt(origStmt);
|
||||
}
|
||||
}
|
||||
if (!Env.getCurrentEnv().isMaster()) {
|
||||
if (ctx.executor.isForwardToMaster()) {
|
||||
ctx.getAuditEventBuilder().setState(ctx.executor.getProxyStatus());
|
||||
auditEventBuilder.setState(ctx.executor.getProxyStatus());
|
||||
int proxyStatusCode = ctx.executor.getProxyStatusCode();
|
||||
if (proxyStatusCode != 0) {
|
||||
ctx.getAuditEventBuilder().setErrorCode(proxyStatusCode);
|
||||
ctx.getAuditEventBuilder().setErrorMessage(ctx.executor.getProxyErrMsg());
|
||||
auditEventBuilder.setErrorCode(proxyStatusCode);
|
||||
auditEventBuilder.setErrorMessage(ctx.executor.getProxyErrMsg());
|
||||
}
|
||||
}
|
||||
}
|
||||
Env.getCurrentEnv().getWorkloadRuntimeStatusMgr().submitFinishQueryToAudit(ctx.getAuditEventBuilder().build());
|
||||
Env.getCurrentEnv().getWorkloadRuntimeStatusMgr().submitFinishQueryToAudit(auditEventBuilder.build());
|
||||
}
|
||||
}
|
||||
|
||||
@ -48,7 +48,7 @@ import org.apache.doris.mysql.MysqlSslContext;
|
||||
import org.apache.doris.nereids.StatementContext;
|
||||
import org.apache.doris.nereids.stats.StatsErrorEstimator;
|
||||
import org.apache.doris.nereids.trees.expressions.literal.Literal;
|
||||
import org.apache.doris.plugin.AuditEvent.AuditEventBuilder;
|
||||
import org.apache.doris.plugin.audit.AuditEvent.AuditEventBuilder;
|
||||
import org.apache.doris.resource.Tag;
|
||||
import org.apache.doris.service.arrowflight.results.FlightSqlChannel;
|
||||
import org.apache.doris.statistics.ColumnStatistic;
|
||||
|
||||
@ -28,7 +28,6 @@ import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.DatabaseIf;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.cluster.ClusterNamespace;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.DdlException;
|
||||
@ -181,12 +180,6 @@ public abstract class ConnectProcessor {
|
||||
String convertedStmt = convertOriginStmt(originStmt);
|
||||
String sqlHash = DigestUtils.md5Hex(convertedStmt);
|
||||
ctx.setSqlHash(sqlHash);
|
||||
ctx.getAuditEventBuilder().reset();
|
||||
ctx.getAuditEventBuilder()
|
||||
.setTimestamp(System.currentTimeMillis())
|
||||
.setClientIp(ctx.getClientIP())
|
||||
.setUser(ClusterNamespace.getNameFromFullName(ctx.getQualifiedUser()))
|
||||
.setSqlHash(ctx.getSqlHash());
|
||||
|
||||
List<StatementBase> stmts = null;
|
||||
Exception nereidsParseException = null;
|
||||
@ -291,7 +284,7 @@ public abstract class ConnectProcessor {
|
||||
private String convertOriginStmt(String originStmt) {
|
||||
String convertedStmt = originStmt;
|
||||
@Nullable Dialect sqlDialect = Dialect.getByName(ctx.getSessionVariable().getSqlDialect());
|
||||
if (sqlDialect != null) {
|
||||
if (sqlDialect != null && sqlDialect != Dialect.DORIS) {
|
||||
PluginMgr pluginMgr = Env.getCurrentEnv().getPluginMgr();
|
||||
List<DialectConverterPlugin> plugins = pluginMgr.getActiveDialectPluginList(sqlDialect);
|
||||
for (DialectConverterPlugin plugin : plugins) {
|
||||
|
||||
@ -50,6 +50,12 @@ public final class GlobalVariable {
|
||||
public static final long VALIDATE_PASSWORD_POLICY_DISABLED = 0;
|
||||
public static final long VALIDATE_PASSWORD_POLICY_STRONG = 2;
|
||||
|
||||
public static final String SQL_CONVERTER_SERVICE_URL = "sql_converter_service_url";
|
||||
public static final String ENABLE_AUDIT_PLUGIN = "enable_audit_plugin";
|
||||
public static final String AUDIT_PLUGIN_MAX_BATCH_BYTES = "audit_plugin_max_batch_bytes";
|
||||
public static final String AUDIT_PLUGIN_MAX_BATCH_INTERVAL_SEC = "audit_plugin_max_batch_interval_sec";
|
||||
public static final String AUDIT_PLUGIN_MAX_SQL_LENGTH = "audit_plugin_max_sql_length";
|
||||
|
||||
@VariableMgr.VarAttr(name = VERSION_COMMENT, flag = VariableMgr.READ_ONLY)
|
||||
public static String versionComment = "Doris version "
|
||||
+ Version.DORIS_BUILD_VERSION + "-" + Version.DORIS_BUILD_SHORT_HASH;
|
||||
@ -104,7 +110,22 @@ public final class GlobalVariable {
|
||||
@VariableMgr.VarAttr(name = SHOW_FULL_DBNAME_IN_INFO_SCHEMA_DB, flag = VariableMgr.GLOBAL)
|
||||
public static boolean showFullDbNameInInfoSchemaDb = false;
|
||||
|
||||
// Don't allow to create instance.
|
||||
@VariableMgr.VarAttr(name = SQL_CONVERTER_SERVICE_URL, flag = VariableMgr.GLOBAL)
|
||||
public static String sqlConverterServiceUrl = "";
|
||||
|
||||
@VariableMgr.VarAttr(name = ENABLE_AUDIT_PLUGIN, flag = VariableMgr.GLOBAL)
|
||||
public static boolean enableAuditLoader = false;
|
||||
|
||||
@VariableMgr.VarAttr(name = AUDIT_PLUGIN_MAX_BATCH_BYTES, flag = VariableMgr.GLOBAL)
|
||||
public static long auditPluginMaxBatchBytes = 50 * 1024 * 1024;
|
||||
|
||||
@VariableMgr.VarAttr(name = AUDIT_PLUGIN_MAX_BATCH_INTERVAL_SEC, flag = VariableMgr.GLOBAL)
|
||||
public static long auditPluginMaxBatchInternalSec = 60;
|
||||
|
||||
@VariableMgr.VarAttr(name = AUDIT_PLUGIN_MAX_SQL_LENGTH, flag = VariableMgr.GLOBAL)
|
||||
public static int auditPluginMaxSqlLength = 4096;
|
||||
|
||||
// Don't allow creating instance.
|
||||
private GlobalVariable() {
|
||||
}
|
||||
|
||||
|
||||
@ -20,7 +20,7 @@ package org.apache.doris.resource.workloadschedpolicy;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.util.Daemon;
|
||||
import org.apache.doris.plugin.AuditEvent;
|
||||
import org.apache.doris.plugin.audit.AuditEvent;
|
||||
import org.apache.doris.thrift.TQueryStatistics;
|
||||
import org.apache.doris.thrift.TReportWorkloadRuntimeStatusParams;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user