[Improvement](scan) add a config for scan queue memory limit (#19439)
This commit is contained in:
@ -160,6 +160,10 @@ public class SetVar {
|
||||
this.value = new StringLiteral(Long.toString(ParseUtil.analyzeDataVolumn(getValue().getStringValue())));
|
||||
this.result = (LiteralExpr) this.value;
|
||||
}
|
||||
if (getVariable().equalsIgnoreCase(SessionVariable.SCAN_QUEUE_MEM_LIMIT)) {
|
||||
this.value = new StringLiteral(Long.toString(ParseUtil.analyzeDataVolumn(getValue().getStringValue())));
|
||||
this.result = (LiteralExpr) this.value;
|
||||
}
|
||||
if (getVariable().equalsIgnoreCase("is_report_success")) {
|
||||
variable = SessionVariable.ENABLE_PROFILE;
|
||||
}
|
||||
|
||||
@ -62,6 +62,7 @@ public class SessionVariable implements Serializable, Writable {
|
||||
public static final Logger LOG = LogManager.getLogger(SessionVariable.class);
|
||||
|
||||
public static final String EXEC_MEM_LIMIT = "exec_mem_limit";
|
||||
public static final String SCAN_QUEUE_MEM_LIMIT = "scan_queue_mem_limit";
|
||||
public static final String QUERY_TIMEOUT = "query_timeout";
|
||||
public static final String INSERT_TIMEOUT = "insert_timeout";
|
||||
public static final String ENABLE_PROFILE = "enable_profile";
|
||||
@ -336,6 +337,9 @@ public class SessionVariable implements Serializable, Writable {
|
||||
@VariableMgr.VarAttr(name = EXEC_MEM_LIMIT)
|
||||
public long maxExecMemByte = 2147483648L;
|
||||
|
||||
@VariableMgr.VarAttr(name = SCAN_QUEUE_MEM_LIMIT)
|
||||
public long maxScanQueueMemByte = 2147483648L / 20;
|
||||
|
||||
@VariableMgr.VarAttr(name = ENABLE_SPILLING)
|
||||
public boolean enableSpilling = false;
|
||||
|
||||
@ -1012,6 +1016,10 @@ public class SessionVariable implements Serializable, Writable {
|
||||
return maxExecMemByte;
|
||||
}
|
||||
|
||||
public long getMaxScanQueueExecMemByte() {
|
||||
return maxScanQueueMemByte;
|
||||
}
|
||||
|
||||
public int getQueryTimeoutS() {
|
||||
return queryTimeoutS;
|
||||
}
|
||||
@ -1159,6 +1167,10 @@ public class SessionVariable implements Serializable, Writable {
|
||||
}
|
||||
}
|
||||
|
||||
public void setMaxScanQueueMemByte(long scanQueueMemByte) {
|
||||
this.maxScanQueueMemByte = Math.min(scanQueueMemByte, maxExecMemByte / 20);
|
||||
}
|
||||
|
||||
public boolean isSqlQuoteShowCreate() {
|
||||
return sqlQuoteShowCreate;
|
||||
}
|
||||
@ -1743,6 +1755,7 @@ public class SessionVariable implements Serializable, Writable {
|
||||
public TQueryOptions toThrift() {
|
||||
TQueryOptions tResult = new TQueryOptions();
|
||||
tResult.setMemLimit(maxExecMemByte);
|
||||
tResult.setScanQueueMemLimit(Math.min(maxScanQueueMemByte, maxExecMemByte / 20));
|
||||
|
||||
// TODO chenhao, reservation will be calculated by cost
|
||||
tResult.setMinReservation(0);
|
||||
@ -1996,6 +2009,7 @@ public class SessionVariable implements Serializable, Writable {
|
||||
public TQueryOptions getQueryOptionVariables() {
|
||||
TQueryOptions queryOptions = new TQueryOptions();
|
||||
queryOptions.setMemLimit(maxExecMemByte);
|
||||
queryOptions.setScanQueueMemLimit(Math.min(maxScanQueueMemByte, maxExecMemByte / 20));
|
||||
queryOptions.setQueryTimeout(queryTimeoutS);
|
||||
queryOptions.setInsertTimeout(insertTimeoutS);
|
||||
return queryOptions;
|
||||
|
||||
Reference in New Issue
Block a user