[fix](audit) add workload_group to audit log table (#30470)

1. Missing workload_group column in audit table
2. Extract the definition of internal schema's tables into a new class
3. Fix bug that audit loader has no authorization to load data to audit_table, introduced from #29790
4. Fix bug that audit_log can not be modified to 3 replica because it is partitioned table
This commit is contained in:
Mingyu Chen
2024-01-30 17:12:09 +08:00
committed by yiguolei
parent 7838ba6d4e
commit b983cbd02d
13 changed files with 277 additions and 90 deletions

View File

@ -98,7 +98,7 @@ public class CreateTableStmt extends DdlStmt {
engineNames.add("broker");
}
// if auto bucket auto bucket enable, rewrite distribution bucket num &&
// if auto bucket enable, rewrite distribution bucket num &&
// set properties[PropertyAnalyzer.PROPERTIES_AUTO_BUCKET] = "true"
private static Map<String, String> maybeRewriteByAutoBucket(DistributionDesc distributionDesc,
Map<String, String> properties) throws AnalysisException {

View File

@ -0,0 +1,113 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.catalog;
import org.apache.doris.analysis.ColumnDef;
import org.apache.doris.analysis.TypeDef;
import org.apache.doris.common.UserException;
import org.apache.doris.plugin.audit.AuditLoaderPlugin;
import org.apache.doris.statistics.StatisticConstants;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
public class InternalSchema {
// Do not use the original schema directly, because it may be modified by create table operation.
public static final List<ColumnDef> COL_STATS_SCHEMA;
public static final List<ColumnDef> HISTO_STATS_SCHEMA;
public static final List<ColumnDef> AUDIT_SCHEMA;
static {
// column statistics table
COL_STATS_SCHEMA = new ArrayList<>();
COL_STATS_SCHEMA.add(new ColumnDef("id", TypeDef.createVarchar(StatisticConstants.ID_LEN)));
COL_STATS_SCHEMA.add(new ColumnDef("catalog_id", TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
COL_STATS_SCHEMA.add(new ColumnDef("db_id", TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
COL_STATS_SCHEMA.add(new ColumnDef("tbl_id", TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
COL_STATS_SCHEMA.add(new ColumnDef("idx_id", TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
COL_STATS_SCHEMA.add(new ColumnDef("col_id", TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
COL_STATS_SCHEMA.add(new ColumnDef("part_id", TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN), true));
COL_STATS_SCHEMA.add(new ColumnDef("count", TypeDef.create(PrimitiveType.BIGINT), true));
COL_STATS_SCHEMA.add(new ColumnDef("ndv", TypeDef.create(PrimitiveType.BIGINT), true));
COL_STATS_SCHEMA.add(new ColumnDef("null_count", TypeDef.create(PrimitiveType.BIGINT), true));
COL_STATS_SCHEMA.add(new ColumnDef("min", TypeDef.createVarchar(ScalarType.MAX_VARCHAR_LENGTH), true));
COL_STATS_SCHEMA.add(new ColumnDef("max", TypeDef.createVarchar(ScalarType.MAX_VARCHAR_LENGTH), true));
COL_STATS_SCHEMA.add(new ColumnDef("data_size_in_bytes", TypeDef.create(PrimitiveType.BIGINT), true));
COL_STATS_SCHEMA.add(new ColumnDef("update_time", TypeDef.create(PrimitiveType.DATETIME)));
// histogram_statistics table
HISTO_STATS_SCHEMA = new ArrayList<>();
HISTO_STATS_SCHEMA.add(new ColumnDef("id", TypeDef.createVarchar(StatisticConstants.ID_LEN)));
HISTO_STATS_SCHEMA.add(new ColumnDef("catalog_id", TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
HISTO_STATS_SCHEMA.add(new ColumnDef("db_id", TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
HISTO_STATS_SCHEMA.add(new ColumnDef("tbl_id", TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
HISTO_STATS_SCHEMA.add(new ColumnDef("idx_id", TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
HISTO_STATS_SCHEMA.add(new ColumnDef("col_id", TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
HISTO_STATS_SCHEMA.add(new ColumnDef("sample_rate", TypeDef.create(PrimitiveType.DOUBLE)));
HISTO_STATS_SCHEMA.add(new ColumnDef("buckets", TypeDef.createVarchar(ScalarType.MAX_VARCHAR_LENGTH)));
HISTO_STATS_SCHEMA.add(new ColumnDef("update_time", TypeDef.create(PrimitiveType.DATETIME)));
// audit table
AUDIT_SCHEMA = new ArrayList<>();
AUDIT_SCHEMA.add(new ColumnDef("query_id", TypeDef.createVarchar(48), true));
AUDIT_SCHEMA.add(new ColumnDef("time", TypeDef.create(PrimitiveType.DATETIME), true));
AUDIT_SCHEMA.add(new ColumnDef("client_ip", TypeDef.createVarchar(128), true));
AUDIT_SCHEMA.add(new ColumnDef("user", TypeDef.createVarchar(128), true));
AUDIT_SCHEMA.add(new ColumnDef("catalog", TypeDef.createVarchar(128), true));
AUDIT_SCHEMA.add(new ColumnDef("db", TypeDef.createVarchar(128), true));
AUDIT_SCHEMA.add(new ColumnDef("state", TypeDef.createVarchar(128), true));
AUDIT_SCHEMA.add(new ColumnDef("error_code", TypeDef.create(PrimitiveType.INT), true));
AUDIT_SCHEMA.add(new ColumnDef("error_message", TypeDef.create(PrimitiveType.STRING), true));
AUDIT_SCHEMA.add(new ColumnDef("query_time", TypeDef.create(PrimitiveType.BIGINT), true));
AUDIT_SCHEMA.add(new ColumnDef("scan_bytes", TypeDef.create(PrimitiveType.BIGINT), true));
AUDIT_SCHEMA.add(new ColumnDef("scan_rows", TypeDef.create(PrimitiveType.BIGINT), true));
AUDIT_SCHEMA.add(new ColumnDef("return_rows", TypeDef.create(PrimitiveType.BIGINT), true));
AUDIT_SCHEMA.add(new ColumnDef("stmt_id", TypeDef.create(PrimitiveType.BIGINT), true));
AUDIT_SCHEMA.add(new ColumnDef("is_query", TypeDef.create(PrimitiveType.TINYINT), true));
AUDIT_SCHEMA.add(new ColumnDef("frontend_ip", TypeDef.createVarchar(128), true));
AUDIT_SCHEMA.add(new ColumnDef("cpu_time_ms", TypeDef.create(PrimitiveType.BIGINT), true));
AUDIT_SCHEMA.add(new ColumnDef("sql_hash", TypeDef.createVarchar(128), true));
AUDIT_SCHEMA.add(new ColumnDef("sql_digest", TypeDef.createVarchar(128), true));
AUDIT_SCHEMA.add(new ColumnDef("peak_memory_bytes", TypeDef.create(PrimitiveType.BIGINT), true));
AUDIT_SCHEMA.add(new ColumnDef("workload_group", TypeDef.create(PrimitiveType.STRING), true));
AUDIT_SCHEMA.add(new ColumnDef("stmt", TypeDef.create(PrimitiveType.STRING), true));
}
// Get copied schema for statistic table
// Do not use the original schema directly, because it may be modified by create table operation.
public static List<ColumnDef> getCopiedSchema(String tblName) throws UserException {
List<ColumnDef> schema;
if (tblName.equals(StatisticConstants.STATISTIC_TBL_NAME)) {
schema = COL_STATS_SCHEMA;
} else if (tblName.equals(StatisticConstants.HISTOGRAM_TBL_NAME)) {
schema = HISTO_STATS_SCHEMA;
} else if (tblName.equals(AuditLoaderPlugin.AUDIT_LOG_TABLE)) {
schema = AUDIT_SCHEMA;
} else {
throw new UserException("Unknown internal table name: " + tblName);
}
List<ColumnDef> copiedSchema = Lists.newArrayList();
for (ColumnDef columnDef : schema) {
copiedSchema.add(new ColumnDef(columnDef.getName(), columnDef.getTypeDef(), columnDef.isAllowNull()));
}
return copiedSchema;
}
}

View File

@ -17,17 +17,18 @@
package org.apache.doris.catalog;
import org.apache.doris.analysis.ColumnDef;
import org.apache.doris.analysis.AlterClause;
import org.apache.doris.analysis.AlterTableStmt;
import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.DistributionDesc;
import org.apache.doris.analysis.DropTableStmt;
import org.apache.doris.analysis.HashDistributionDesc;
import org.apache.doris.analysis.KeysDesc;
import org.apache.doris.analysis.ModifyPartitionClause;
import org.apache.doris.analysis.PartitionDesc;
import org.apache.doris.analysis.RangePartitionDesc;
import org.apache.doris.analysis.TableName;
import org.apache.doris.analysis.TypeDef;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
@ -56,33 +57,6 @@ public class InternalSchemaInitializer extends Thread {
private static final Logger LOG = LogManager.getLogger(InternalSchemaInitializer.class);
public static final List<ColumnDef> AUDIT_TABLE_COLUMNS;
static {
AUDIT_TABLE_COLUMNS = new ArrayList<>();
AUDIT_TABLE_COLUMNS.add(new ColumnDef("query_id", TypeDef.createVarchar(48), true));
AUDIT_TABLE_COLUMNS.add(new ColumnDef("time", TypeDef.create(PrimitiveType.DATETIME), true));
AUDIT_TABLE_COLUMNS.add(new ColumnDef("client_ip", TypeDef.createVarchar(128), true));
AUDIT_TABLE_COLUMNS.add(new ColumnDef("user", TypeDef.createVarchar(128), true));
AUDIT_TABLE_COLUMNS.add(new ColumnDef("catalog", TypeDef.createVarchar(128), true));
AUDIT_TABLE_COLUMNS.add(new ColumnDef("db", TypeDef.createVarchar(128), true));
AUDIT_TABLE_COLUMNS.add(new ColumnDef("state", TypeDef.createVarchar(128), true));
AUDIT_TABLE_COLUMNS.add(new ColumnDef("error_code", TypeDef.create(PrimitiveType.INT), true));
AUDIT_TABLE_COLUMNS.add(new ColumnDef("error_message", TypeDef.create(PrimitiveType.STRING), true));
AUDIT_TABLE_COLUMNS.add(new ColumnDef("query_time", TypeDef.create(PrimitiveType.BIGINT), true));
AUDIT_TABLE_COLUMNS.add(new ColumnDef("scan_bytes", TypeDef.create(PrimitiveType.BIGINT), true));
AUDIT_TABLE_COLUMNS.add(new ColumnDef("scan_rows", TypeDef.create(PrimitiveType.BIGINT), true));
AUDIT_TABLE_COLUMNS.add(new ColumnDef("return_rows", TypeDef.create(PrimitiveType.BIGINT), true));
AUDIT_TABLE_COLUMNS.add(new ColumnDef("stmt_id", TypeDef.create(PrimitiveType.BIGINT), true));
AUDIT_TABLE_COLUMNS.add(new ColumnDef("is_query", TypeDef.create(PrimitiveType.TINYINT), true));
AUDIT_TABLE_COLUMNS.add(new ColumnDef("frontend_ip", TypeDef.createVarchar(128), true));
AUDIT_TABLE_COLUMNS.add(new ColumnDef("cpu_time_ms", TypeDef.create(PrimitiveType.BIGINT), true));
AUDIT_TABLE_COLUMNS.add(new ColumnDef("sql_hash", TypeDef.createVarchar(128), true));
AUDIT_TABLE_COLUMNS.add(new ColumnDef("sql_digest", TypeDef.createVarchar(128), true));
AUDIT_TABLE_COLUMNS.add(new ColumnDef("peak_memory_bytes", TypeDef.create(PrimitiveType.BIGINT), true));
AUDIT_TABLE_COLUMNS.add(new ColumnDef("stmt", TypeDef.create(PrimitiveType.STRING), true));
}
public void run() {
if (!FeConstants.enableInternalSchemaDb) {
return;
@ -97,7 +71,7 @@ public class InternalSchemaInitializer extends Thread {
}
Thread.currentThread()
.join(TABLE_CREATION_RETRY_INTERVAL_IN_SECONDS * 1000L);
createDB();
createDb();
createTbl();
} catch (Throwable e) {
LOG.warn("Statistics storage initiated failed, will try again later", e);
@ -116,29 +90,51 @@ public class InternalSchemaInitializer extends Thread {
modifyTblReplicaCount(database, AuditLoaderPlugin.AUDIT_LOG_TABLE);
}
public void modifyTblReplicaCount(Database database, String tblName) {
@VisibleForTesting
public static void modifyTblReplicaCount(Database database, String tblName) {
if (!(Config.min_replication_num_per_tablet < StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM
&& Config.max_replication_num_per_tablet >= StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM)) {
return;
}
while (true) {
if (Env.getCurrentSystemInfo().aliveBECount() >= StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM) {
int backendNum = Env.getCurrentSystemInfo().getBackendNumFromDiffHosts(true);
if (FeConstants.runningUnitTest) {
backendNum = Env.getCurrentSystemInfo().getAllBackendIds().size();
}
if (backendNum >= StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM) {
try {
Map<String, String> props = new HashMap<>();
props.put(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION, "tag.location.default: "
+ StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM);
TableIf colStatsTbl = StatisticsUtil.findTable(InternalCatalog.INTERNAL_CATALOG_NAME,
OlapTable tbl = (OlapTable) StatisticsUtil.findTable(InternalCatalog.INTERNAL_CATALOG_NAME,
StatisticConstants.DB_NAME, tblName);
OlapTable olapTable = (OlapTable) colStatsTbl;
if (olapTable.getTableProperty().getReplicaAllocation().getTotalReplicaNum()
>= StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM) {
return;
}
colStatsTbl.writeLock();
tbl.writeLock();
try {
Env.getCurrentEnv().modifyTableReplicaAllocation(database, (OlapTable) colStatsTbl, props);
if (tbl.getTableProperty().getReplicaAllocation().getTotalReplicaNum()
>= StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM) {
return;
}
if (!tbl.isPartitionedTable()) {
Map<String, String> props = new HashMap<>();
props.put(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION, "tag.location.default: "
+ StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM);
Env.getCurrentEnv().modifyTableReplicaAllocation(database, tbl, props);
} else {
TableName tableName = new TableName(InternalCatalog.INTERNAL_CATALOG_NAME,
StatisticConstants.DB_NAME, tbl.getName());
// 1. modify table's default replica num
Map<String, String> props = new HashMap<>();
props.put("default." + PropertyAnalyzer.PROPERTIES_REPLICATION_NUM,
"" + StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM);
Env.getCurrentEnv().modifyTableDefaultReplicaAllocation(database, tbl, props);
// 2. modify each partition's replica num
List<AlterClause> clauses = Lists.newArrayList();
props.clear();
props.put(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM,
"" + StatisticConstants.STATISTIC_INTERNAL_TABLE_REPLICA_NUM);
clauses.add(ModifyPartitionClause.createStarClause(props, false));
AlterTableStmt alter = new AlterTableStmt(tableName, clauses);
Env.getCurrentEnv().alterTable(alter);
}
} finally {
colStatsTbl.writeUnlock();
tbl.writeUnlock();
}
break;
} catch (Throwable t) {
@ -153,7 +149,8 @@ public class InternalSchemaInitializer extends Thread {
}
}
private void createTbl() throws UserException {
@VisibleForTesting
public static void createTbl() throws UserException {
// statistics
Env.getCurrentEnv().getInternalCatalog().createTable(buildStatisticsTblStmt());
Env.getCurrentEnv().getInternalCatalog().createTable(buildHistogramTblStmt());
@ -162,7 +159,7 @@ public class InternalSchemaInitializer extends Thread {
}
@VisibleForTesting
public static void createDB() {
public static void createDb() {
CreateDbStmt createDbStmt = new CreateDbStmt(true, FeConstants.INTERNAL_DB_NAME,
null);
try {
@ -173,27 +170,9 @@ public class InternalSchemaInitializer extends Thread {
}
}
@VisibleForTesting
public CreateTableStmt buildStatisticsTblStmt() throws UserException {
private static CreateTableStmt buildStatisticsTblStmt() throws UserException {
TableName tableName = new TableName("",
FeConstants.INTERNAL_DB_NAME, StatisticConstants.STATISTIC_TBL_NAME);
List<ColumnDef> columnDefs = new ArrayList<>();
columnDefs.add(new ColumnDef("id", TypeDef.createVarchar(StatisticConstants.ID_LEN)));
columnDefs.add(new ColumnDef("catalog_id", TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
columnDefs.add(new ColumnDef("db_id", TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
columnDefs.add(new ColumnDef("tbl_id", TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
columnDefs.add(new ColumnDef("idx_id", TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
columnDefs.add(new ColumnDef("col_id", TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
ColumnDef partId = new ColumnDef("part_id", TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN));
partId.setAllowNull(true);
columnDefs.add(partId);
columnDefs.add(new ColumnDef("count", TypeDef.create(PrimitiveType.BIGINT), true));
columnDefs.add(new ColumnDef("ndv", TypeDef.create(PrimitiveType.BIGINT), true));
columnDefs.add(new ColumnDef("null_count", TypeDef.create(PrimitiveType.BIGINT), true));
columnDefs.add(new ColumnDef("min", TypeDef.createVarchar(ScalarType.MAX_VARCHAR_LENGTH), true));
columnDefs.add(new ColumnDef("max", TypeDef.createVarchar(ScalarType.MAX_VARCHAR_LENGTH), true));
columnDefs.add(new ColumnDef("data_size_in_bytes", TypeDef.create(PrimitiveType.BIGINT), true));
columnDefs.add(new ColumnDef("update_time", TypeDef.create(PrimitiveType.DATETIME)));
String engineName = "olap";
ArrayList<String> uniqueKeys = Lists.newArrayList("id", "catalog_id",
"db_id", "tbl_id", "idx_id", "col_id", "part_id");
@ -207,26 +186,16 @@ public class InternalSchemaInitializer extends Thread {
}
};
CreateTableStmt createTableStmt = new CreateTableStmt(true, false,
tableName, columnDefs, engineName, keysDesc, null, distributionDesc,
tableName, InternalSchema.getCopiedSchema(StatisticConstants.STATISTIC_TBL_NAME),
engineName, keysDesc, null, distributionDesc,
properties, null, "Doris internal statistics table, DO NOT MODIFY IT", null);
StatisticsUtil.analyze(createTableStmt);
return createTableStmt;
}
@VisibleForTesting
public CreateTableStmt buildHistogramTblStmt() throws UserException {
private static CreateTableStmt buildHistogramTblStmt() throws UserException {
TableName tableName = new TableName("",
FeConstants.INTERNAL_DB_NAME, StatisticConstants.HISTOGRAM_TBL_NAME);
List<ColumnDef> columnDefs = new ArrayList<>();
columnDefs.add(new ColumnDef("id", TypeDef.createVarchar(StatisticConstants.ID_LEN)));
columnDefs.add(new ColumnDef("catalog_id", TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
columnDefs.add(new ColumnDef("db_id", TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
columnDefs.add(new ColumnDef("tbl_id", TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
columnDefs.add(new ColumnDef("idx_id", TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
columnDefs.add(new ColumnDef("col_id", TypeDef.createVarchar(StatisticConstants.MAX_NAME_LEN)));
columnDefs.add(new ColumnDef("sample_rate", TypeDef.create(PrimitiveType.DOUBLE)));
columnDefs.add(new ColumnDef("buckets", TypeDef.createVarchar(ScalarType.MAX_VARCHAR_LENGTH)));
columnDefs.add(new ColumnDef("update_time", TypeDef.create(PrimitiveType.DATETIME)));
String engineName = "olap";
ArrayList<String> uniqueKeys = Lists.newArrayList("id", "catalog_id",
"db_id", "tbl_id", "idx_id", "col_id");
@ -240,13 +209,14 @@ public class InternalSchemaInitializer extends Thread {
}
};
CreateTableStmt createTableStmt = new CreateTableStmt(true, false,
tableName, columnDefs, engineName, keysDesc, null, distributionDesc,
tableName, InternalSchema.getCopiedSchema(StatisticConstants.HISTOGRAM_TBL_NAME),
engineName, keysDesc, null, distributionDesc,
properties, null, "Doris internal statistics table, DO NOT MODIFY IT", null);
StatisticsUtil.analyze(createTableStmt);
return createTableStmt;
}
private CreateTableStmt buildAuditTblStmt() throws UserException {
private static CreateTableStmt buildAuditTblStmt() throws UserException {
TableName tableName = new TableName("",
FeConstants.INTERNAL_DB_NAME, AuditLoaderPlugin.AUDIT_LOG_TABLE);
@ -271,7 +241,8 @@ public class InternalSchemaInitializer extends Thread {
}
};
CreateTableStmt createTableStmt = new CreateTableStmt(true, false,
tableName, AUDIT_TABLE_COLUMNS, engineName, keysDesc, partitionDesc, distributionDesc,
tableName, InternalSchema.getCopiedSchema(AuditLoaderPlugin.AUDIT_LOG_TABLE),
engineName, keysDesc, partitionDesc, distributionDesc,
properties, null, "Doris internal audit table, DO NOT MODIFY IT", null);
StatisticsUtil.analyze(createTableStmt);
return createTableStmt;

View File

@ -161,6 +161,7 @@ public class AuditLoaderPlugin extends Plugin implements AuditPlugin {
logBuffer.append(event.sqlHash).append("\t");
logBuffer.append(event.sqlDigest).append("\t");
logBuffer.append(event.peakMemoryBytes).append("\t");
logBuffer.append(event.workloadGroup).append("\t");
// trim the query to avoid too long
// use `getBytes().length` to get real byte length
String stmt = truncateByBytes(event.stmt).replace("\n", " ")

View File

@ -17,7 +17,7 @@
package org.apache.doris.plugin.audit;
import org.apache.doris.catalog.InternalSchemaInitializer;
import org.apache.doris.catalog.InternalSchema;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
@ -48,7 +48,7 @@ public class AuditStreamLoader {
this.auditLogTbl = AuditLoaderPlugin.AUDIT_LOG_TABLE;
this.auditLogLoadUrlStr = String.format(loadUrlPattern, hostPort, db, auditLogTbl);
// currently, FE identity is FE's IP, so we replace the "." in IP to make it suitable for label
this.feIdentity = hostPort.replaceAll("\\.", "_");
this.feIdentity = hostPort.replaceAll("\\.", "_").replaceAll(":", "_");
}
private HttpURLConnection getConnection(String urlStr, String label, String clusterToken) throws IOException {
@ -63,7 +63,7 @@ public class AuditStreamLoader {
conn.addRequestProperty("label", label);
conn.addRequestProperty("max_filter_ratio", "1.0");
conn.addRequestProperty("columns",
InternalSchemaInitializer.AUDIT_TABLE_COLUMNS.stream().map(c -> c.getName()).collect(
InternalSchema.AUDIT_SCHEMA.stream().map(c -> c.getName()).collect(
Collectors.joining(",")));
conn.setDoOutput(true);
conn.setDoInput(true);
@ -78,7 +78,7 @@ public class AuditStreamLoader {
sb.append("-H \"").append("Content-Type\":").append("\"text/plain; charset=UTF-8\" \\\n ");
sb.append("-H \"").append("max_filter_ratio\":").append("\"1.0\" \\\n ");
sb.append("-H \"").append("columns\":")
.append("\"" + InternalSchemaInitializer.AUDIT_TABLE_COLUMNS.stream().map(c -> c.getName()).collect(
.append("\"" + InternalSchema.AUDIT_SCHEMA.stream().map(c -> c.getName()).collect(
Collectors.joining(",")) + "\" \\\n ");
sb.append("\"").append(conn.getURL()).append("\"");
return sb.toString();

View File

@ -1090,6 +1090,8 @@ public class FrontendServiceImpl implements FrontendService.Iface {
checkSingleTablePasswordAndPrivs(request.getUser(), request.getPasswd(), request.getDb(),
request.getTbl(),
request.getUserIp(), PrivPredicate.LOAD);
} else {
checkToken(request.getToken());
}
// check label
@ -1111,9 +1113,13 @@ public class FrontendServiceImpl implements FrontendService.Iface {
OlapTable table = (OlapTable) db.getTableOrMetaException(request.tbl, TableType.OLAP);
// begin
long timeoutSecond = request.isSetTimeout() ? request.getTimeout() : Config.stream_load_default_timeout_second;
TxnCoordinator txnCoord = new TxnCoordinator(TxnSourceType.BE, clientIp);
if (request.isSetToken()) {
txnCoord.isFromInternal = true;
}
long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
db.getId(), Lists.newArrayList(table.getId()), request.getLabel(), request.getRequestId(),
new TxnCoordinator(TxnSourceType.BE, clientIp),
txnCoord,
TransactionState.LoadJobSourceType.BACKEND_STREAMING, -1, timeoutSecond);
TLoadTxnBeginResult result = new TLoadTxnBeginResult();
result.setTxnId(txnId).setDbId(db.getId());

View File

@ -416,6 +416,19 @@ public class SystemInfoService {
return idToBackendRef.values().stream().filter(backend -> backend.isComputeNode()).collect(Collectors.toList());
}
// return num of backends that from different hosts
public int getBackendNumFromDiffHosts(boolean aliveOnly) {
Set<String> hosts = Sets.newHashSet();
ImmutableMap<Long, Backend> idToBackend = idToBackendRef;
for (Backend backend : idToBackend.values()) {
if (aliveOnly && !backend.isAlive()) {
continue;
}
hosts.add(backend.getHost());
}
return hosts.size();
}
class BeIdComparator implements Comparator<Backend> {
public int compare(Backend a, Backend b) {
return (int) (a.getId() - b.getId());

View File

@ -321,7 +321,9 @@ public class DatabaseTransactionMgr {
throws DuplicatedRequestException, LabelAlreadyUsedException, BeginTransactionException,
AnalysisException, QuotaExceedException, MetaNotFoundException {
Database db = env.getInternalCatalog().getDbOrMetaException(dbId);
InternalDatabaseUtil.checkDatabase(db.getFullName(), ConnectContext.get());
if (!coordinator.isFromInternal) {
InternalDatabaseUtil.checkDatabase(db.getFullName(), ConnectContext.get());
}
checkDatabaseDataQuota();
Preconditions.checkNotNull(coordinator);
Preconditions.checkNotNull(label);

View File

@ -164,6 +164,9 @@ public class TransactionState implements Writable {
public TxnSourceType sourceType;
@SerializedName(value = "ip")
public String ip;
// True if this txn if created by system(such as writing data to audit table)
@SerializedName(value = "ii")
public boolean isFromInternal = false;
public TxnCoordinator() {
}