[fix](pipeline) Do not push data in local exchange if eos (#35972) (#36010)

pick #35972 and #34536
This commit is contained in:
Gabriel
2024-06-07 15:40:55 +08:00
committed by GitHub
parent 4030164270
commit a518915626
13 changed files with 249 additions and 108 deletions

View File

@ -76,6 +76,7 @@ public class SessionVariable implements Serializable, Writable {
public static final Logger LOG = LogManager.getLogger(SessionVariable.class);
public static final String EXEC_MEM_LIMIT = "exec_mem_limit";
public static final String LOCAL_EXCHANGE_FREE_BLOCKS_LIMIT = "local_exchange_free_blocks_limit";
public static final String SCAN_QUEUE_MEM_LIMIT = "scan_queue_mem_limit";
public static final String NUM_SCANNER_THREADS = "num_scanner_threads";
public static final String SCANNER_SCALE_UP_RATIO = "scanner_scale_up_ratio";
@ -615,6 +616,9 @@ public class SessionVariable implements Serializable, Writable {
})
public int numScannerThreads = 0;
@VariableMgr.VarAttr(name = LOCAL_EXCHANGE_FREE_BLOCKS_LIMIT)
public int localExchangeFreeBlocksLimit = 4;
@VariableMgr.VarAttr(name = SCANNER_SCALE_UP_RATIO, needForward = true, description = {
"ScanNode自适应的增加扫描并发,最大允许增长的并发倍率,默认为0,关闭该功能",
"The max multiple of increasing the concurrency of scanners adaptively, "
@ -3157,6 +3161,7 @@ public class SessionVariable implements Serializable, Writable {
public TQueryOptions toThrift() {
TQueryOptions tResult = new TQueryOptions();
tResult.setMemLimit(maxExecMemByte);
tResult.setLocalExchangeFreeBlocksLimit(localExchangeFreeBlocksLimit);
tResult.setScanQueueMemLimit(maxScanQueueMemByte);
tResult.setNumScannerThreads(numScannerThreads);
tResult.setScannerScaleUpRatio(scannerScaleUpRatio);