[enhancement](load) Change transaction limit from global level to db level (#15830)

Add transaction size quota for database

Co-authored-by: wuhangze <wuhangze@jd.com>
This commit is contained in:
Henry2SS
2023-02-08 18:04:26 +08:00
committed by GitHub
parent f71fc3291f
commit bb334de00f
14 changed files with 150 additions and 13 deletions

View File

@ -2593,3 +2593,25 @@ MasterOnly:true
Maximum number of error tablet showed in broker load.
#### `default_db_max_running_txn_num`
Default:-1
IsMutable:true
MasterOnly:true
Used to set the default database transaction quota size.
The default value setting to -1 means using `max_running_txn_num_per_db` instead of `default_db_max_running_txn_num`.
To set the quota size of a single database, you can use:
```
Set the database transaction quota
ALTER DATABASE db_name SET TRANSACTION QUOTA quota;
View configuration
show data (Detail:HELP SHOW DATA)
```

View File

@ -2592,3 +2592,25 @@ SmallFileMgr 中存储的最大文件数
是否为 Master FE 节点独有的配置项:true
broker load job 保存的失败tablet 信息的最大数量
#### `default_db_max_running_txn_num`
默认值:-1
是否可以动态配置:true
是否为 Master FE 节点独有的配置项:true
用于设置默认数据库事务配额大小。
默认值设置为 -1 意味着使用 `max_running_txn_num_per_db` 而不是 `default_db_max_running_txn_num`。
设置单个数据库的配额大小可以使用:
```
设置数据库事务量配额
ALTER DATABASE db_name SET TRANSACTION QUOTA quota;
查看配置
show data (其他用法:HELP SHOW DATA)
```

View File

@ -1985,5 +1985,11 @@ public class Config extends ConfigBase {
*/
@ConfField(mutable = true)
public static int pull_request_id = 0;
/**
* Used to set default db transaction quota num.
*/
@ConfField(mutable = true, masterOnly = true)
public static long default_db_max_running_txn_num = -1;
}

View File

@ -1271,6 +1271,10 @@ alter_stmt ::=
{:
RESULT = new AlterDatabaseQuotaStmt(dbName, QuotaType.REPLICA, String.valueOf(number));
:}
| KW_ALTER KW_DATABASE ident:dbName KW_SET KW_TRANSACTION KW_QUOTA INTEGER_LITERAL:number
{:
RESULT = new AlterDatabaseQuotaStmt(dbName, QuotaType.TRANSACTION, String.valueOf(number));
:}
| KW_ALTER KW_DATABASE ident:dbName KW_RENAME ident:newDbName
{:
RESULT = new AlterDatabaseRename(dbName, newDbName);

View File

@ -37,7 +37,8 @@ public class AlterDatabaseQuotaStmt extends DdlStmt {
public enum QuotaType {
NONE,
DATA,
REPLICA
REPLICA,
TRANSACTION
}
public AlterDatabaseQuotaStmt(String dbName, QuotaType quotaType, String quotaValue) {
@ -75,6 +76,8 @@ public class AlterDatabaseQuotaStmt extends DdlStmt {
quota = ParseUtil.analyzeDataVolumn(quotaValue);
} else if (quotaType == QuotaType.REPLICA) {
quota = ParseUtil.analyzeReplicaNumber(quotaValue);
} else if (quotaType == QuotaType.TRANSACTION) {
quota = ParseUtil.analyzeTransactionNumber(quotaValue);
}
}
@ -82,7 +85,7 @@ public class AlterDatabaseQuotaStmt extends DdlStmt {
@Override
public String toSql() {
return "ALTER DATABASE " + dbName + " SET "
+ (quotaType == QuotaType.DATA ? "DATA" : "REPLICA")
+ quotaType.name()
+ " QUOTA " + quotaValue;
}
}

View File

@ -212,6 +212,12 @@ public class ShowDataStmt extends ShowStmt {
+ leftPair.second;
List<String> leftRow = Arrays.asList("Left", readableLeft, String.valueOf(replicaCountLeft));
totalRows.add(leftRow);
// txn quota
long txnQuota = db.getTransactionQuotaSize();
List<String> transactionQuotaList = Arrays.asList("Transaction Quota",
String.valueOf(txnQuota), String.valueOf(txnQuota));
totalRows.add(transactionQuotaList);
} finally {
db.readUnlock();
}

View File

@ -71,6 +71,8 @@ import java.util.stream.Collectors;
public class Database extends MetaObject implements Writable, DatabaseIf<Table> {
private static final Logger LOG = LogManager.getLogger(Database.class);
private static final String TRANSACTION_QUOTA_SIZE = "transactionQuotaSize";
private long id;
private volatile String fullQualifiedName;
private String clusterName;
@ -91,6 +93,8 @@ public class Database extends MetaObject implements Writable, DatabaseIf<Table>
private volatile long replicaQuotaSize;
private volatile long transactionQuotaSize;
private volatile boolean isDropped;
public enum DbState {
@ -118,6 +122,9 @@ public class Database extends MetaObject implements Writable, DatabaseIf<Table>
this.lowerCaseToTableName = Maps.newConcurrentMap();
this.dataQuotaBytes = Config.default_db_data_quota_bytes;
this.replicaQuotaSize = Config.default_db_replica_quota_size;
this.transactionQuotaSize = Config.default_db_max_running_txn_num == -1L
? Config.max_running_txn_num_per_db
: Config.default_db_max_running_txn_num;
this.dbState = DbState.NORMAL;
this.attachDbName = "";
this.clusterName = "";
@ -213,6 +220,19 @@ public class Database extends MetaObject implements Writable, DatabaseIf<Table>
this.replicaQuotaSize = newQuota;
}
public void setTransactionQuotaSize(long newQuota) {
writeLock();
try {
Preconditions.checkArgument(newQuota >= 0L);
LOG.info("database[{}] try to set transaction quota from {} to {}",
fullQualifiedName, transactionQuotaSize, newQuota);
this.transactionQuotaSize = newQuota;
this.dbProperties.put(TRANSACTION_QUOTA_SIZE, String.valueOf(transactionQuotaSize));
} finally {
writeUnlock();
}
}
public long getDataQuota() {
return dataQuotaBytes;
}
@ -221,6 +241,10 @@ public class Database extends MetaObject implements Writable, DatabaseIf<Table>
return replicaQuotaSize;
}
public long getTransactionQuotaSize() {
return transactionQuotaSize;
}
public DatabaseProperty getDbProperties() {
return dbProperties;
}
@ -603,6 +627,13 @@ public class Database extends MetaObject implements Writable, DatabaseIf<Table>
if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_105) {
dbProperties = DatabaseProperty.read(in);
String txnQuotaStr = dbProperties.getOrDefault(TRANSACTION_QUOTA_SIZE,
String.valueOf(Config.max_running_txn_num_per_db));
transactionQuotaSize = Long.parseLong(txnQuotaStr);
} else {
transactionQuotaSize = Config.default_db_max_running_txn_num == -1L
? Config.max_running_txn_num_per_db
: Config.default_db_max_running_txn_num;
}
}

View File

@ -43,6 +43,7 @@ public class DbsProcDir implements ProcDirInterface {
public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
.add("DbId").add("DbName").add("TableNum").add("Size").add("Quota")
.add("LastConsistencyCheckTime").add("ReplicaCount").add("ReplicaQuota")
.add("TransactionQuota")
.build();
private Env env;
@ -114,11 +115,13 @@ public class DbsProcDir implements ProcDirInterface {
((Database) db).getLastCheckTime()) : FeConstants.null_string;
long replicaCount = (db instanceof Database) ? ((Database) db).getReplicaCountWithLock() : 0;
long replicaQuota = (db instanceof Database) ? ((Database) db).getReplicaQuota() : 0;
long transactionQuota = (db instanceof Database) ? ((Database) db).getTransactionQuotaSize() : 0;
dbInfo.add(readableUsedQuota);
dbInfo.add(readableQuota);
dbInfo.add(lastCheckTime);
dbInfo.add(replicaCount);
dbInfo.add(replicaQuota);
dbInfo.add(transactionQuota);
} finally {
db.readUnlock();

View File

@ -83,4 +83,17 @@ public class ParseUtil {
return replicaNumber;
}
public static long analyzeTransactionNumber(String transactionNumberStr) throws AnalysisException {
long transactionNumber = 0;
try {
transactionNumber = Long.parseLong(transactionNumberStr);
} catch (NumberFormatException nfe) {
throw new AnalysisException("invalid data volumn:" + transactionNumberStr);
}
if (transactionNumber <= 0L) {
throw new AnalysisException("Transaction quota size must larger than 0");
}
return transactionNumber;
}
}

View File

@ -775,6 +775,8 @@ public class InternalCatalog implements CatalogIf<Database> {
db.setDataQuota(stmt.getQuota());
} else if (quotaType == QuotaType.REPLICA) {
db.setReplicaQuota(stmt.getQuota());
} else if (quotaType == QuotaType.TRANSACTION) {
db.setTransactionQuotaSize(stmt.getQuota());
}
long quota = stmt.getQuota();
DatabaseInfo dbInfo = new DatabaseInfo(dbName, "", quota, quotaType);
@ -792,6 +794,8 @@ public class InternalCatalog implements CatalogIf<Database> {
db.setDataQuota(quota);
} else if (quotaType == QuotaType.REPLICA) {
db.setReplicaQuota(quota);
} else if (quotaType == QuotaType.TRANSACTION) {
db.setTransactionQuotaSize(quota);
}
} finally {
db.writeUnlock();

View File

@ -124,7 +124,8 @@ public class CanalSyncChannel extends SyncChannel {
String targetColumn = Joiner.on(",").join(columns) + "," + DELETE_COLUMN;
GlobalTransactionMgr globalTransactionMgr = Env.getCurrentGlobalTransactionMgr();
DatabaseTransactionMgr databaseTransactionMgr = globalTransactionMgr.getDatabaseTransactionMgr(db.getId());
if (databaseTransactionMgr.getRunningTxnNums() < Config.max_running_txn_num_per_db) {
long txnLimit = db.getTransactionQuotaSize();
if (databaseTransactionMgr.getRunningTxnNums() < txnLimit) {
TransactionEntry txnEntry = txnExecutor.getTxnEntry();
TTxnParams txnConf = txnEntry.getTxnConf();
TransactionState.LoadJobSourceType sourceType = TransactionState.LoadJobSourceType.INSERT_STREAMING;
@ -185,7 +186,7 @@ public class CanalSyncChannel extends SyncChannel {
} else {
String failMsg = "current running txns on db " + db.getId() + " is "
+ databaseTransactionMgr.getRunningTxnNums()
+ ", larger than limit " + Config.max_running_txn_num_per_db;
+ ", larger than limit " + txnLimit;
LOG.warn(failMsg);
throw new BeginTransactionException(failMsg);
}

View File

@ -31,7 +31,6 @@ import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
@ -126,7 +125,6 @@ public class DatabaseTransactionMgr {
// it must exists in dbIdToTxnLabels, and vice versa
private final Map<String, Set<Long>> labelToTxnIds = Maps.newHashMap();
// count the number of running txns of database, except for the routine load txn
private volatile int runningTxnNums = 0;
private volatile int runningTxnReplicaNums = 0;
@ -1523,7 +1521,7 @@ public class DatabaseTransactionMgr {
}
protected void checkRunningTxnExceedLimit(TransactionState.LoadJobSourceType sourceType)
throws BeginTransactionException {
throws BeginTransactionException, MetaNotFoundException {
switch (sourceType) {
case ROUTINE_LOAD_TASK:
// no need to check limit for routine load task:
@ -1532,9 +1530,10 @@ public class DatabaseTransactionMgr {
// load, and other txn may not be able to submitted.
break;
default:
if (runningTxnNums >= Config.max_running_txn_num_per_db) {
long txnQuota = env.getInternalCatalog().getDbOrMetaException(dbId).getTransactionQuotaSize();
if (runningTxnNums >= txnQuota) {
throw new BeginTransactionException("current running txns on db " + dbId + " is "
+ runningTxnNums + ", larger than limit " + Config.max_running_txn_num_per_db);
+ runningTxnNums + ", larger than limit " + txnQuota);
}
break;
}

View File

@ -154,4 +154,27 @@ public class AlterDatabaseQuotaStmtTest {
Assert.fail("No exception throws.");
}
@Test
public void testNormalAlterDatabaseTransactionQuotaStmt() throws AnalysisException, UserException {
long quotaSize = 10;
AlterDatabaseQuotaStmt stmt = new AlterDatabaseQuotaStmt("testDb", QuotaType.TRANSACTION, String.valueOf(quotaSize));
stmt.analyze(analyzer);
String expectedSql = "ALTER DATABASE testCluster:testDb SET TRANSACTION QUOTA 10";
Assert.assertEquals(expectedSql, stmt.toSql());
Assert.assertEquals(quotaSize, stmt.getQuota());
}
@Test(expected = AnalysisException.class)
public void testTransactionMinusQuota() throws AnalysisException, UserException {
AlterDatabaseQuotaStmt stmt = new AlterDatabaseQuotaStmt("testDb", QuotaType.TRANSACTION, "-100");
stmt.analyze(analyzer);
Assert.fail("No exception throws.");
}
@Test(expected = AnalysisException.class)
public void testtransactionInvalidQuantity() throws AnalysisException, UserException {
AlterDatabaseQuotaStmt stmt = new AlterDatabaseQuotaStmt("testDb", QuotaType.TRANSACTION, "invalid_100_quota");
stmt.analyze(analyzer);
Assert.fail("No exception throws.");
}
}

View File

@ -193,11 +193,11 @@ public class DbsProcDirTest {
Assert.assertTrue(result instanceof BaseProcResult);
Assert.assertEquals(Lists.newArrayList("DbId", "DbName", "TableNum", "Size", "Quota",
"LastConsistencyCheckTime", "ReplicaCount", "ReplicaQuota"),
"LastConsistencyCheckTime", "ReplicaCount", "ReplicaQuota", "TransactionQuota"),
result.getColumnNames());
List<List<String>> rows = Lists.newArrayList();
rows.add(Arrays.asList(String.valueOf(db1.getId()), db1.getFullName(), "0", "0.000 ", "1024.000 TB", FeConstants.null_string, "0", "1073741824"));
rows.add(Arrays.asList(String.valueOf(db2.getId()), db2.getFullName(), "0", "0.000 ", "1024.000 TB", FeConstants.null_string, "0", "1073741824"));
rows.add(Arrays.asList(String.valueOf(db1.getId()), db1.getFullName(), "0", "0.000 ", "1024.000 TB", FeConstants.null_string, "0", "1073741824", "100"));
rows.add(Arrays.asList(String.valueOf(db2.getId()), db2.getFullName(), "0", "0.000 ", "1024.000 TB", FeConstants.null_string, "0", "1073741824", "100"));
Assert.assertEquals(rows, result.getRows());
}
@ -228,7 +228,7 @@ public class DbsProcDirTest {
dir = new DbsProcDir(env, catalog);
result = dir.fetchResult();
Assert.assertEquals(Lists.newArrayList("DbId", "DbName", "TableNum", "Size", "Quota",
"LastConsistencyCheckTime", "ReplicaCount", "ReplicaQuota"),
"LastConsistencyCheckTime", "ReplicaCount", "ReplicaQuota", "TransactionQuota"),
result.getColumnNames());
List<List<String>> rows = Lists.newArrayList();
Assert.assertEquals(rows, result.getRows());