diff --git a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java index a2237b4cc4..9fa9eea9f6 100644 --- a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java +++ b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java @@ -28,6 +28,7 @@ import org.apache.arrow.vector.Float4Vector; import org.apache.arrow.vector.Float8Vector; import org.apache.arrow.vector.IntVector; import org.apache.arrow.vector.SmallIntVector; +import org.apache.arrow.vector.TimeStampNanoVector; import org.apache.arrow.vector.TinyIntVector; import org.apache.arrow.vector.VarBinaryVector; import org.apache.arrow.vector.VarCharVector; @@ -162,9 +163,15 @@ public class MaxComputeColumnValue implements ColumnValue { @Override public LocalDateTime getDateTime() { skippedIfNull(); - DateMilliVector datetimeCol = (DateMilliVector) column; - LocalDateTime v = datetimeCol.getObject(idx++); - return v == null ? LocalDateTime.MIN : v; + LocalDateTime result; + if (column instanceof DateMilliVector) { + DateMilliVector datetimeCol = (DateMilliVector) column; + result = datetimeCol.getObject(idx++); + } else { + TimeStampNanoVector datetimeCol = (TimeStampNanoVector) column; + result = datetimeCol.getObject(idx++); + } + return result == null ? LocalDateTime.MIN : result; } @Override diff --git a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java index 95664c93c3..8f9b903afd 100644 --- a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java +++ b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java @@ -22,14 +22,13 @@ import org.apache.doris.common.jni.vec.ColumnType; import org.apache.doris.common.jni.vec.ScanPredicate; import com.aliyun.odps.Column; -import com.aliyun.odps.Odps; import com.aliyun.odps.OdpsType; -import com.aliyun.odps.account.AliyunAccount; import com.aliyun.odps.data.ArrowRecordReader; import com.aliyun.odps.tunnel.TableTunnel; import com.aliyun.odps.type.TypeInfo; import com.aliyun.odps.type.TypeInfoFactory; import com.google.common.base.Strings; +import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.log4j.Logger; @@ -41,17 +40,13 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; /** * MaxComputeJ JniScanner. BE will read data from the scanner object. */ public class MaxComputeJniScanner extends JniScanner { - private Odps odps; - private TableTunnel tunnel; - private static final Logger LOG = Logger.getLogger(MaxComputeJniScanner.class); - private static final String odpsUrlTemplate = "http://service.{}.maxcompute.aliyun.com/api"; - private static final String tunnelUrlTemplate = "http://dt.{}.maxcompute.aliyun-inc.com"; private static final String REGION = "region"; private static final String PROJECT = "project"; private static final String TABLE = "table"; @@ -60,39 +55,28 @@ public class MaxComputeJniScanner extends JniScanner { private static final String START_OFFSET = "start_offset"; private static final String SPLIT_SIZE = "split_size"; private static final String PUBLIC_ACCESS = "public_access"; + private static final Map tableScans = new ConcurrentHashMap<>(); + private final String region; private final String project; private final String table; + private final MaxComputeTableScan curTableScan; private MaxComputeColumnValue columnValue; private long remainBatchRows = 0; private long totalRows = 0; - private TableTunnel.DownloadSession session; + private RootAllocator arrowAllocator; private ArrowRecordReader curReader; - private List columns; - private Map readColumnsId; + private List readColumns; + private Map readColumnsToId; private long startOffset = -1L; private long splitSize = -1L; public MaxComputeJniScanner(int batchSize, Map params) { - String region = Objects.requireNonNull(params.get(REGION), "required property '" + REGION + "'."); + region = Objects.requireNonNull(params.get(REGION), "required property '" + REGION + "'."); project = Objects.requireNonNull(params.get(PROJECT), "required property '" + PROJECT + "'."); table = Objects.requireNonNull(params.get(TABLE), "required property '" + TABLE + "'."); - if (!Strings.isNullOrEmpty(params.get(START_OFFSET)) - && !Strings.isNullOrEmpty(params.get(SPLIT_SIZE))) { - startOffset = Long.parseLong(params.get(START_OFFSET)); - splitSize = Long.parseLong(params.get(SPLIT_SIZE)); - } - String accessKey = Objects.requireNonNull(params.get(ACCESS_KEY), "required property '" + ACCESS_KEY + "'."); - String secretKey = Objects.requireNonNull(params.get(SECRET_KEY), "required property '" + SECRET_KEY + "'."); - odps = new Odps(new AliyunAccount(accessKey, secretKey)); - odps.setEndpoint(odpsUrlTemplate.replace("{}", region)); - odps.setDefaultProject(project); - tunnel = new TableTunnel(odps); - String tunnelUrl = tunnelUrlTemplate.replace("{}", region); - boolean enablePublicAccess = Boolean.parseBoolean(params.getOrDefault(PUBLIC_ACCESS, "false")); - if (enablePublicAccess) { - tunnelUrl = tunnelUrlTemplate.replace("-inc", ""); - } - tunnel.setEndpoint(tunnelUrl); + tableScans.putIfAbsent(tableUniqKey(), newTableScan(params)); + curTableScan = tableScans.get(tableUniqKey()); + String[] requiredFields = params.get("required_fields").split(","); String[] types = params.get("columns_types").split("#"); ColumnType[] columnTypes = new ColumnType[types.length]; @@ -110,44 +94,60 @@ public class MaxComputeJniScanner extends JniScanner { initTableInfo(columnTypes, requiredFields, predicates, batchSize); } + private MaxComputeTableScan newTableScan(Map params) { + if (!Strings.isNullOrEmpty(params.get(START_OFFSET)) + && !Strings.isNullOrEmpty(params.get(SPLIT_SIZE))) { + startOffset = Long.parseLong(params.get(START_OFFSET)); + splitSize = Long.parseLong(params.get(SPLIT_SIZE)); + } + String accessKey = Objects.requireNonNull(params.get(ACCESS_KEY), "required property '" + ACCESS_KEY + "'."); + String secretKey = Objects.requireNonNull(params.get(SECRET_KEY), "required property '" + SECRET_KEY + "'."); + boolean enablePublicAccess = Boolean.parseBoolean(params.getOrDefault(PUBLIC_ACCESS, "false")); + return new MaxComputeTableScan(region, project, table, accessKey, secretKey, enablePublicAccess); + } + + public String tableUniqKey() { + return region + "#" + project + "." + table; + } + @Override protected void initTableInfo(ColumnType[] requiredTypes, String[] requiredFields, ScanPredicate[] predicates, int batchSize) { super.initTableInfo(requiredTypes, requiredFields, predicates, batchSize); - columns = new ArrayList<>(); - readColumnsId = new HashMap<>(); + readColumns = new ArrayList<>(); + readColumnsToId = new HashMap<>(); for (int i = 0; i < fields.length; i++) { if (!Strings.isNullOrEmpty(fields[i])) { - columns.add(createOdpsColumn(i, types[i])); - readColumnsId.put(fields[i], i); + readColumns.add(createOdpsColumn(i, types[i])); + readColumnsToId.put(fields[i], i); } } // reorder columns - List columnList = odps.tables().get(table).getSchema().getColumns(); + List columnList = curTableScan.getSchema().getColumns(); Map columnRank = new HashMap<>(); for (int i = 0; i < columnList.size(); i++) { columnRank.put(columnList.get(i).getName(), i); } // Downloading columns data from Max compute only supports the order of table metadata. // We might get an error message if no sort here: Column reorder is not supported in legacy arrow mode. - columns.sort((Comparator.comparing(o -> columnRank.get(o.getName())))); + readColumns.sort((Comparator.comparing(o -> columnRank.get(o.getName())))); } @Override public void open() throws IOException { - if (columns.isEmpty()) { + if (readColumns.isEmpty()) { return; } try { - session = tunnel.createDownloadSession(project, table); - if (splitSize > 0) { - totalRows = Math.min(splitSize, session.getRecordCount()); - } else { - totalRows = session.getRecordCount(); - } + TableTunnel.DownloadSession session = curTableScan.getSession(); long start = startOffset == -1L ? 0 : startOffset; - curReader = session.openArrowRecordReader(start, totalRows, columns); + long recordCount = session.getRecordCount(); + totalRows = splitSize > 0 ? Math.min(splitSize, recordCount) : recordCount; + + arrowAllocator = new RootAllocator(Long.MAX_VALUE); + curReader = session.openArrowRecordReader(start, totalRows, readColumns, arrowAllocator); } catch (Exception e) { + close(); throw new IOException(e); } remainBatchRows = totalRows; @@ -206,12 +206,19 @@ public class MaxComputeJniScanner extends JniScanner { @Override public void close() throws IOException { + String tableName = tableUniqKey(); + MaxComputeTableScan scan = tableScans.get(tableName); + if (scan != null && scan.endOfScan()) { + tableScans.remove(tableName); + } remainBatchRows = 0; totalRows = 0; startOffset = -1; splitSize = -1; if (curReader != null) { + arrowAllocator.close(); curReader.close(); + curReader = null; } } @@ -227,6 +234,7 @@ public class MaxComputeJniScanner extends JniScanner { return 0; } remainBatchRows -= realRows; + curTableScan.increaseReadRows(realRows); return realRows; } @@ -234,17 +242,20 @@ public class MaxComputeJniScanner extends JniScanner { VectorSchemaRoot batch; int curReadRows = 0; while (curReadRows < expectedRows && (batch = curReader.read()) != null) { - List fieldVectors = batch.getFieldVectors(); - int batchRows = 0; - for (FieldVector column : fieldVectors) { - columnValue.reset(column); - // LOG.warn("MCJNI read getClass: " + column.getClass()); - batchRows = column.getValueCount(); - for (int j = 0; j < batchRows; j++) { - appendData(readColumnsId.get(column.getName()), columnValue); + try { + List fieldVectors = batch.getFieldVectors(); + int batchRows = 0; + for (FieldVector column : fieldVectors) { + columnValue.reset(column); + batchRows = column.getValueCount(); + for (int j = 0; j < batchRows; j++) { + appendData(readColumnsToId.get(column.getName()), columnValue); + } } + curReadRows += batchRows; + } finally { + batch.close(); } - curReadRows += batchRows; } return curReadRows; } diff --git a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeTableScan.java b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeTableScan.java new file mode 100644 index 0000000000..5102330a4d --- /dev/null +++ b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeTableScan.java @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.maxcompute; + +import com.aliyun.odps.Odps; +import com.aliyun.odps.TableSchema; +import com.aliyun.odps.account.AliyunAccount; +import com.aliyun.odps.tunnel.TableTunnel; +import com.aliyun.odps.tunnel.TunnelException; + +import java.io.IOException; + +/** + * MaxComputeJ JniScanner. BE will read data from the scanner object. + */ +public class MaxComputeTableScan { + private static final String odpsUrlTemplate = "http://service.{}.maxcompute.aliyun.com/api"; + private static final String tunnelUrlTemplate = "http://dt.{}.maxcompute.aliyun-inc.com"; + private final Odps odps; + private final TableTunnel tunnel; + private final String project; + private final String table; + private volatile TableTunnel.DownloadSession tableSession; + private volatile long readRows = 0; + + public MaxComputeTableScan(String region, String project, String table, + String accessKey, String secretKey, boolean enablePublicAccess) { + this.project = project; + this.table = table; + odps = new Odps(new AliyunAccount(accessKey, secretKey)); + odps.setEndpoint(odpsUrlTemplate.replace("{}", region)); + odps.setDefaultProject(this.project); + tunnel = new TableTunnel(odps); + String tunnelUrl = tunnelUrlTemplate.replace("{}", region); + if (enablePublicAccess) { + tunnelUrl = tunnelUrlTemplate.replace("-inc", ""); + } + tunnel.setEndpoint(tunnelUrl); + } + + public TableSchema getSchema() { + return odps.tables().get(table).getSchema(); + } + + public synchronized TableTunnel.DownloadSession getSession() throws IOException { + if (tableSession == null) { + try { + tableSession = tunnel.createDownloadSession(project, table); + } catch (TunnelException e) { + throw new IOException(e); + } + } + return tableSession; + } + + public synchronized void increaseReadRows(long rows) { + // multi-thread writing must be synchronized + readRows += rows; + } + + public boolean endOfScan() { + return readRows >= tableSession.getRecordCount(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java index cff2b99223..9b820fa185 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java @@ -98,9 +98,6 @@ public class S3TableValuedFunction extends ExternalFileTableValuedFunction { } private static Map getValidParams(Map params) throws AnalysisException { - if (!params.containsKey(S3_URI)) { - throw new AnalysisException("Missing required property: " + S3_URI); - } Map validParams = new HashMap<>(); for (Map.Entry entry : params.entrySet()) { String key = entry.getKey(); @@ -113,6 +110,9 @@ public class S3TableValuedFunction extends ExternalFileTableValuedFunction { } validParams.put(lowerKey, entry.getValue()); } + if (!validParams.containsKey(S3_URI)) { + throw new AnalysisException("Missing required property: " + S3_URI); + } return S3Properties.requiredS3TVFProperties(validParams); }