[feature](agg) Support spill to disk in aggregation (#18051)

This commit is contained in:
Jerry Hu
2023-04-20 18:59:08 +08:00
committed by GitHub
parent 668c681fbc
commit c4e469c82c
14 changed files with 683 additions and 339 deletions

View File

@ -277,6 +277,8 @@ public class SessionVariable implements Serializable, Writable {
public static final String GROUP_CONCAT_MAX_LEN = "group_concat_max_len";
public static final String EXTERNAL_SORT_BYTES_THRESHOLD = "external_sort_bytes_threshold";
public static final String EXTERNAL_AGG_BYTES_THRESHOLD = "external_agg_bytes_threshold";
public static final String EXTERNAL_AGG_PARTITION_BITS = "external_agg_partition_bits";
public static final String ENABLE_TWO_PHASE_READ_OPT = "enable_two_phase_read_opt";
public static final String TOPN_OPT_LIMIT_THRESHOLD = "topn_opt_limit_threshold";
@ -761,6 +763,18 @@ public class SessionVariable implements Serializable, Writable {
checker = "checkExternalSortBytesThreshold", fuzzy = true)
public long externalSortBytesThreshold = 0;
// Set to 0 to disable; min: 128M
public static final long MIN_EXTERNAL_AGG_BYTES_THRESHOLD = 134217728;
@VariableMgr.VarAttr(name = EXTERNAL_AGG_BYTES_THRESHOLD,
checker = "checkExternalAggBytesThreshold", fuzzy = true)
public long externalAggBytesThreshold = 0;
public static final int MIN_EXTERNAL_AGG_PARTITION_BITS = 4;
public static final int MAX_EXTERNAL_AGG_PARTITION_BITS = 8;
@VariableMgr.VarAttr(name = EXTERNAL_AGG_PARTITION_BITS,
checker = "checkExternalAggPartitionBits", fuzzy = true)
public int externalAggPartitionBits = 8; // means that the hash table will be partitioned into 256 blocks.
// Whether enable two phase read optimization
// 1. read related rowids along with necessary column data
// 2. spawn fetch RPC to other nodes to get related data by sorted rowids
@ -826,15 +840,22 @@ public class SessionVariable implements Serializable, Writable {
switch (randomInt) {
case 0:
this.externalSortBytesThreshold = 0;
this.externalAggBytesThreshold = 0;
break;
case 1:
this.externalSortBytesThreshold = 1;
this.externalAggBytesThreshold = 1;
this.externalAggPartitionBits = 6;
break;
case 2:
this.externalSortBytesThreshold = 1024 * 1024;
this.externalAggBytesThreshold = 1024 * 1024;
this.externalAggPartitionBits = 8;
break;
default:
this.externalSortBytesThreshold = 100 * 1024 * 1024 * 1024;
this.externalAggBytesThreshold = 100 * 1024 * 1024 * 1024;
this.externalAggPartitionBits = 4;
break;
}
// pull_request_id default value is 0
@ -1601,6 +1622,24 @@ public class SessionVariable implements Serializable, Writable {
}
}
public void checkExternalAggBytesThreshold(String externalAggBytesThreshold) {
long value = Long.valueOf(externalAggBytesThreshold);
if (value > 0 && value < MIN_EXTERNAL_AGG_BYTES_THRESHOLD) {
LOG.warn("external agg bytes threshold: {}, min: {}", value, MIN_EXTERNAL_AGG_BYTES_THRESHOLD);
throw new UnsupportedOperationException("minimum value is " + MIN_EXTERNAL_AGG_BYTES_THRESHOLD);
}
}
public void checkExternalAggPartitionBits(String externalAggPartitionBits) {
int value = Integer.valueOf(externalAggPartitionBits);
if (value < MIN_EXTERNAL_AGG_PARTITION_BITS || value > MAX_EXTERNAL_AGG_PARTITION_BITS) {
LOG.warn("external agg bytes threshold: {}, min: {}, max: {}",
value, MIN_EXTERNAL_AGG_PARTITION_BITS, MAX_EXTERNAL_AGG_PARTITION_BITS);
throw new UnsupportedOperationException("min value is " + MIN_EXTERNAL_AGG_PARTITION_BITS + " max value is "
+ MAX_EXTERNAL_AGG_PARTITION_BITS);
}
}
public boolean isEnableFileCache() {
return enableFileCache;
}
@ -1690,6 +1729,10 @@ public class SessionVariable implements Serializable, Writable {
tResult.setExternalSortBytesThreshold(externalSortBytesThreshold);
tResult.setExternalAggBytesThreshold(externalAggBytesThreshold);
tResult.setExternalAggPartitionBits(externalAggPartitionBits);
tResult.setEnableFileCache(enableFileCache);
if (dryRunQuery) {