[Fix](func numbers) Remove backend_nums argument of numbers function (#25200)

This commit is contained in:
zhiqqqq
2023-10-10 07:25:58 -05:00
committed by GitHub
parent fc1bad9a6b
commit 62a6b132be
10 changed files with 46 additions and 44 deletions

View File

@ -107,4 +107,14 @@ public class DataGenScanNode extends ExternalScanNode {
public boolean needToCheckColumnPriv() {
return false;
}
// Currently DataGenScanNode is only used by DataGenTableValuedFunction, which is
// inherited by NumbersTableValuedFunction.
// NumbersTableValuedFunction is not a complete implementation for now, since its
// function signature do not support us to split total numbers, so it can not be executed
// by multi-processes or multi-threads. So we assign instance number to 1.
@Override
public int getNumInstances() {
return 1;
}
}

View File

@ -703,8 +703,9 @@ public class Coordinator implements CoordInterface {
// else use exec_plan_fragments directly.
// we choose #fragments >=2 because in some cases
// we need ensure that A fragment is already prepared to receive data before B fragment sends data.
// For example: select * from numbers("10","w") will generate ExchangeNode and TableValuedFunctionScanNode,
// we should ensure TableValuedFunctionScanNode does not send data until ExchangeNode is ready to receive.
// For example: select * from numbers("number"="10") will generate ExchangeNode and
// TableValuedFunctionScanNode, we should ensure TableValuedFunctionScanNode does
// not send data until ExchangeNode is ready to receive.
boolean twoPhaseExecution = fragments.size() >= 2;
for (PlanFragment fragment : fragments) {
FragmentExecParams params = fragmentExecParamsMap.get(fragment.getFragmentId());
@ -825,8 +826,9 @@ public class Coordinator implements CoordInterface {
// else use exec_plan_fragments directly.
// we choose #fragments >=2 because in some cases
// we need ensure that A fragment is already prepared to receive data before B fragment sends data.
// For example: select * from numbers("10","w") will generate ExchangeNode and TableValuedFunctionScanNode,
// we should ensure TableValuedFunctionScanNode does not send data until ExchangeNode is ready to receive.
// For example: select * from numbers("number"="10") will generate ExchangeNode and
// TableValuedFunctionScanNode, we should ensure TableValuedFunctionScanNode does not
// send data until ExchangeNode is ready to receive.
boolean twoPhaseExecution = fragments.size() >= 2;
for (PlanFragment fragment : fragments) {
FragmentExecParams params = fragmentExecParamsMap.get(fragment.getFragmentId());
@ -2315,6 +2317,11 @@ public class Coordinator implements CoordInterface {
FragmentScanRangeAssignment assignment,
Map<TNetworkAddress, Long> assignedBytesPerHost,
Map<TNetworkAddress, Long> replicaNumPerHost) throws Exception {
// Type of locations is List, it could have elements that have same "location"
// and we do have this situation for some scan node.
// The duplicate "location" will NOT be filtered by FragmentScanRangeAssignment,
// since FragmentScanRangeAssignment use List<TScanRangeParams> as its value type,
// duplicate "locations" will be converted to list.
for (TScanRangeLocations scanRangeLocations : locations) {
Reference<Long> backendIdRef = new Reference<Long>();
TScanRangeLocation minLocation = selectBackendsByRoundRobin(scanRangeLocations,

View File

@ -41,20 +41,16 @@ import java.util.Map;
// have a single column number
/**
* The Implement of table valued function——numbers("number" = "N", "backend_num" = "M").
* The Implement of table valued function——numbers("number" = "N").
*/
public class NumbersTableValuedFunction extends DataGenTableValuedFunction {
public static final String NAME = "numbers";
public static final String NUMBER = "number";
public static final String BACKEND_NUM = "backend_num";
private static final ImmutableSet<String> PROPERTIES_SET = new ImmutableSet.Builder<String>()
.add(NUMBER)
.add(BACKEND_NUM)
.build();
// The total numbers will be generated.
private long totalNumbers;
// The total backends will server it.
private int tabletsNum;
/**
* Constructor.
@ -70,11 +66,6 @@ public class NumbersTableValuedFunction extends DataGenTableValuedFunction {
validParams.put(key.toLowerCase(), params.get(key));
}
try {
tabletsNum = Integer.parseInt(validParams.getOrDefault(BACKEND_NUM, "1"));
} catch (NumberFormatException e) {
throw new AnalysisException("can not parse `backend_num` param to natural number");
}
String numberStr = validParams.get(NUMBER);
if (!Strings.isNullOrEmpty(numberStr)) {
try {
@ -92,10 +83,6 @@ public class NumbersTableValuedFunction extends DataGenTableValuedFunction {
return totalNumbers;
}
public int getTabletsNum() {
return tabletsNum;
}
@Override
public TDataGenFunctionName getDataGenFunctionName() {
return TDataGenFunctionName.NUMBERS;
@ -124,17 +111,16 @@ public class NumbersTableValuedFunction extends DataGenTableValuedFunction {
if (backendList.isEmpty()) {
throw new AnalysisException("No Alive backends");
}
Collections.shuffle(backendList);
List<TableValuedFunctionTask> res = Lists.newArrayList();
for (int i = 0; i < tabletsNum; ++i) {
TScanRange scanRange = new TScanRange();
TDataGenScanRange dataGenScanRange = new TDataGenScanRange();
TTVFNumbersScanRange tvfNumbersScanRange = new TTVFNumbersScanRange();
tvfNumbersScanRange.setTotalNumbers(totalNumbers);
dataGenScanRange.setNumbersParams(tvfNumbersScanRange);
scanRange.setDataGenScanRange(dataGenScanRange);
res.add(new TableValuedFunctionTask(backendList.get(i % backendList.size()), scanRange));
}
TScanRange scanRange = new TScanRange();
TDataGenScanRange dataGenScanRange = new TDataGenScanRange();
TTVFNumbersScanRange tvfNumbersScanRange = new TTVFNumbersScanRange();
tvfNumbersScanRange.setTotalNumbers(totalNumbers);
dataGenScanRange.setNumbersParams(tvfNumbersScanRange);
scanRange.setDataGenScanRange(dataGenScanRange);
res.add(new TableValuedFunctionTask(backendList.get(0), scanRange));
return res;
}
}