[feature](local-tvf) support local tvf on shared storage (#33050)
Previously, local tvf can only query data on one BE node.
But if the storage is shared(eg, NAS), it can be executed on multi nodes.
This PR mainly changes:
1. Add a new property `"shared_stoage" = "false/true"`
Default is false, if set to true, "backend_id" is optional. If "backend_id" is set,
it still be executed on that BE, if not set, "shared_stoage" must be "true"
and it will be executed on multi nodes.
Doc: https://github.com/apache/doris-website/pull/494
This commit is contained in:
@ -73,11 +73,15 @@ public class TVFScanNode extends FileQueryScanNode {
|
||||
if (tableValuedFunction instanceof LocalTableValuedFunction) {
|
||||
// For local tvf, the backend was specified by backendId
|
||||
Long backendId = ((LocalTableValuedFunction) tableValuedFunction).getBackendId();
|
||||
Backend backend = Env.getCurrentSystemInfo().getBackend(backendId);
|
||||
if (backend == null) {
|
||||
throw new UserException("Backend " + backendId + " does not exist");
|
||||
if (backendId != -1) {
|
||||
// User has specified the backend, only use that backend
|
||||
// Otherwise, use all backends for shared storage.
|
||||
Backend backend = Env.getCurrentSystemInfo().getBackend(backendId);
|
||||
if (backend == null) {
|
||||
throw new UserException("Backend " + backendId + " does not exist");
|
||||
}
|
||||
preferLocations.add(backend.getHost());
|
||||
}
|
||||
preferLocations.add(backend.getHost());
|
||||
}
|
||||
backendPolicy.init(preferLocations);
|
||||
numNodes = backendPolicy.numBackends();
|
||||
|
||||
@ -36,6 +36,7 @@ import java.util.stream.Collectors;
|
||||
*/
|
||||
public class BeSelectionPolicy {
|
||||
private static final Logger LOG = LogManager.getLogger(BeSelectionPolicy.class);
|
||||
|
||||
public boolean needScheduleAvailable = false;
|
||||
public boolean needQueryAvailable = false;
|
||||
public boolean needLoadAvailable = false;
|
||||
|
||||
@ -30,10 +30,13 @@ import org.apache.doris.thrift.TBrokerFileStatus;
|
||||
import org.apache.doris.thrift.TFileType;
|
||||
import org.apache.doris.thrift.TNetworkAddress;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@ -47,13 +50,19 @@ public class LocalTableValuedFunction extends ExternalFileTableValuedFunction {
|
||||
public static final String NAME = "local";
|
||||
public static final String PROP_FILE_PATH = "file_path";
|
||||
public static final String PROP_BACKEND_ID = "backend_id";
|
||||
public static final String PROP_SHARED_STORAGE = "shared_storage";
|
||||
|
||||
private static final ImmutableSet<String> LOCATION_PROPERTIES = new ImmutableSet.Builder<String>()
|
||||
.add(PROP_FILE_PATH)
|
||||
.add(PROP_BACKEND_ID)
|
||||
.build();
|
||||
|
||||
// This backend is user specified backend for listing files, fetching file schema and executing query.
|
||||
private long backendId;
|
||||
// This backend if for listing files and fetching file schema.
|
||||
// If "backendId" is set, "backendIdForRequest" will be set to "backendId",
|
||||
// otherwise, "backendIdForRequest" will be set to one of the available backends.
|
||||
private long backendIdForRequest = -1;
|
||||
private boolean sharedStorage = false;
|
||||
|
||||
public LocalTableValuedFunction(Map<String, String> properties) throws AnalysisException {
|
||||
// 1. analyze common properties
|
||||
@ -66,14 +75,35 @@ public class LocalTableValuedFunction extends ExternalFileTableValuedFunction {
|
||||
}
|
||||
}
|
||||
filePath = otherProps.get(PROP_FILE_PATH);
|
||||
backendId = Long.parseLong(otherProps.get(PROP_BACKEND_ID));
|
||||
backendId = Long.parseLong(otherProps.getOrDefault(PROP_BACKEND_ID, "-1"));
|
||||
sharedStorage = Boolean.parseBoolean(otherProps.getOrDefault(PROP_SHARED_STORAGE, "false"));
|
||||
|
||||
// If not shared storage, backend_id is required
|
||||
// If is shared storage, backend_id can still be set, so that we can query data on single BE node.
|
||||
// Otherwise, if shared storage is true, we may use multi BE nodes.
|
||||
if (backendId == -1 && !sharedStorage) {
|
||||
throw new AnalysisException("'backend_id' is required when 'shared_storage' is false.");
|
||||
}
|
||||
|
||||
// 3. parse file
|
||||
getFileListFromBackend();
|
||||
}
|
||||
|
||||
private void getFileListFromBackend() throws AnalysisException {
|
||||
Backend be = Env.getCurrentSystemInfo().getBackend(backendId);
|
||||
Backend be = null;
|
||||
if (backendId != -1) {
|
||||
be = Env.getCurrentSystemInfo().getBackend(backendId);
|
||||
backendIdForRequest = backendId;
|
||||
} else {
|
||||
Preconditions.checkState(sharedStorage);
|
||||
List<Long> beIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
|
||||
if (beIds.isEmpty()) {
|
||||
throw new AnalysisException("No available backend");
|
||||
}
|
||||
Collections.shuffle(beIds);
|
||||
be = Env.getCurrentSystemInfo().getBackend(beIds.get(0));
|
||||
backendIdForRequest = be.getId();
|
||||
}
|
||||
if (be == null) {
|
||||
throw new AnalysisException("backend not found with backend_id = " + backendId);
|
||||
}
|
||||
@ -125,6 +155,6 @@ public class LocalTableValuedFunction extends ExternalFileTableValuedFunction {
|
||||
|
||||
@Override
|
||||
protected Backend getBackend() {
|
||||
return Env.getCurrentSystemInfo().getBackend(backendId);
|
||||
return Env.getCurrentSystemInfo().getBackend(backendIdForRequest);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user