From add6bdb24013dc3dfcd418fca9db2cf058e81e23 Mon Sep 17 00:00:00 2001
From: slothever <18522955+wsjz@users.noreply.github.com>
Date: Mon, 20 Nov 2023 10:42:07 +0800
Subject: [PATCH] [fix](multi-catalog)add the max compute fe ut and fix
download expired (#27007)
1. add the max compute fe ut and fix download expired
2. solve memery leak when allocator close
3. add correct partition rows
---
.../max-compute-scanner/pom.xml | 9 +
.../maxcompute/MaxComputeJniScanner.java | 43 +++--
.../maxcompute/MaxComputeJniScannerTest.java | 158 ++++++++++++++++++
.../test_external_catalog_maxcompute.out | 3 +
.../test_external_catalog_maxcompute.groovy | 17 ++
5 files changed, 219 insertions(+), 11 deletions(-)
create mode 100644 fe/be-java-extensions/max-compute-scanner/src/test/java/org/apache/doris/maxcompute/MaxComputeJniScannerTest.java
diff --git a/fe/be-java-extensions/max-compute-scanner/pom.xml b/fe/be-java-extensions/max-compute-scanner/pom.xml
index 8e057da807..9f9fa50e97 100644
--- a/fe/be-java-extensions/max-compute-scanner/pom.xml
+++ b/fe/be-java-extensions/max-compute-scanner/pom.xml
@@ -98,6 +98,15 @@ under the License.
+
+ maven-surefire-plugin
+
+ ${fe_ut_parallel}
+ false
+ false
+ -javaagent:${settings.localRepository}/org/jmockit/jmockit/${jmockit.version}/jmockit-${jmockit.version}.jar
+
+
diff --git a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
index 5f4125ec4e..35a9bcc247 100644
--- a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
+++ b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
@@ -26,6 +26,7 @@ import com.aliyun.odps.OdpsType;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.data.ArrowRecordReader;
import com.aliyun.odps.tunnel.TableTunnel;
+import com.aliyun.odps.tunnel.TunnelException;
import com.aliyun.odps.type.TypeInfo;
import com.aliyun.odps.type.TypeInfoFactory;
import com.google.common.base.Strings;
@@ -60,27 +61,30 @@ public class MaxComputeJniScanner extends JniScanner {
private static final String START_OFFSET = "start_offset";
private static final String SPLIT_SIZE = "split_size";
private static final String PUBLIC_ACCESS = "public_access";
+ private final RootAllocator arrowAllocator = new RootAllocator(Integer.MAX_VALUE);
private final Map tableScans = new ConcurrentHashMap<>();
private final String region;
private final String project;
private final String table;
private PartitionSpec partitionSpec;
private Set partitionColumns;
- private final MaxComputeTableScan curTableScan;
+ private MaxComputeTableScan curTableScan;
private MaxComputeColumnValue columnValue;
private long remainBatchRows = 0;
private long totalRows = 0;
- private RootAllocator arrowAllocator;
private ArrowRecordReader curReader;
private List readColumns;
private Map readColumnsToId;
private long startOffset = -1L;
+ private int retryCount = 2;
private long splitSize = -1L;
+ private final Map refreshParams;
public MaxComputeJniScanner(int batchSize, Map params) {
region = Objects.requireNonNull(params.get(REGION), "required property '" + REGION + "'.");
project = Objects.requireNonNull(params.get(PROJECT), "required property '" + PROJECT + "'.");
table = Objects.requireNonNull(params.get(TABLE), "required property '" + TABLE + "'.");
+ refreshParams = params;
tableScans.putIfAbsent(tableUniqKey(), newTableScan(params));
curTableScan = tableScans.get(tableUniqKey());
String partitionSpec = params.get(PARTITION_SPEC);
@@ -104,6 +108,11 @@ public class MaxComputeJniScanner extends JniScanner {
initTableInfo(columnTypes, requiredFields, predicates, batchSize);
}
+ public void refreshTableScan() {
+ curTableScan = newTableScan(refreshParams);
+ tableScans.put(tableUniqKey(), curTableScan);
+ }
+
private MaxComputeTableScan newTableScan(Map params) {
if (!Strings.isNullOrEmpty(params.get(START_OFFSET))
&& !Strings.isNullOrEmpty(params.get(SPLIT_SIZE))) {
@@ -132,6 +141,10 @@ public class MaxComputeJniScanner extends JniScanner {
readColumnsToId.put(fields[i], i);
}
}
+ }
+
+ @Override
+ public void open() throws IOException {
// reorder columns
List columnList = curTableScan.getSchema().getColumns();
columnList.addAll(curTableScan.getSchema().getPartitionColumns());
@@ -142,10 +155,6 @@ public class MaxComputeJniScanner extends JniScanner {
// Downloading columns data from Max compute only supports the order of table metadata.
// We might get an error message if no sort here: Column reorder is not supported in legacy arrow mode.
readColumns.sort((Comparator.comparing(o -> columnRank.get(o.getName()))));
- }
-
- @Override
- public void open() throws IOException {
if (readColumns.isEmpty()) {
return;
}
@@ -159,19 +168,27 @@ public class MaxComputeJniScanner extends JniScanner {
long start = startOffset == -1L ? 0 : startOffset;
long recordCount = session.getRecordCount();
totalRows = splitSize > 0 ? Math.min(splitSize, recordCount) : recordCount;
-
- arrowAllocator = new RootAllocator(Long.MAX_VALUE);
partitionColumns = session.getSchema().getPartitionColumns().stream()
.map(Column::getName)
.collect(Collectors.toSet());
List maxComputeColumns = new ArrayList<>(readColumns);
maxComputeColumns.removeIf(e -> partitionColumns.contains(e.getName()));
curReader = session.openArrowRecordReader(start, totalRows, maxComputeColumns, arrowAllocator);
+ remainBatchRows = totalRows;
+ } catch (TunnelException e) {
+ if (retryCount > 0 && e.getErrorMsg().contains("TableModified")) {
+ retryCount--;
+ // try to refresh table scan and re-open odps
+ refreshTableScan();
+ open();
+ } else {
+ retryCount = 2;
+ throw new IOException(e);
+ }
} catch (Exception e) {
close();
throw new IOException(e);
}
- remainBatchRows = totalRows;
}
private Column createOdpsColumn(int colIdx, ColumnType dorisType) {
@@ -204,9 +221,11 @@ public class MaxComputeJniScanner extends JniScanner {
case DOUBLE:
odpsType = TypeInfoFactory.DOUBLE;
break;
+ case DATETIME:
case DATETIMEV2:
odpsType = TypeInfoFactory.DATETIME;
break;
+ case DATE:
case DATEV2:
odpsType = TypeInfoFactory.DATE;
break;
@@ -237,7 +256,7 @@ public class MaxComputeJniScanner extends JniScanner {
startOffset = -1;
splitSize = -1;
if (curReader != null) {
- arrowAllocator.close();
+ arrowAllocator.releaseBytes(arrowAllocator.getAllocatedMemory());
curReader.close();
curReader = null;
}
@@ -279,7 +298,9 @@ public class MaxComputeJniScanner extends JniScanner {
Integer readColumnId = readColumnsToId.get(partitionColumn);
if (readColumnId != null && partitionValue != null) {
MaxComputePartitionValue value = new MaxComputePartitionValue(partitionValue);
- appendData(readColumnId, value);
+ for (int i = 0; i < batchRows; i++) {
+ appendData(readColumnId, value);
+ }
}
}
}
diff --git a/fe/be-java-extensions/max-compute-scanner/src/test/java/org/apache/doris/maxcompute/MaxComputeJniScannerTest.java b/fe/be-java-extensions/max-compute-scanner/src/test/java/org/apache/doris/maxcompute/MaxComputeJniScannerTest.java
new file mode 100644
index 0000000000..f14c0610d4
--- /dev/null
+++ b/fe/be-java-extensions/max-compute-scanner/src/test/java/org/apache/doris/maxcompute/MaxComputeJniScannerTest.java
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.maxcompute;
+
+import com.aliyun.odps.Column;
+import com.aliyun.odps.PartitionSpec;
+import com.aliyun.odps.TableSchema;
+import com.aliyun.odps.data.ArrowRecordReader;
+import com.aliyun.odps.tunnel.TableTunnel;
+import com.aliyun.odps.tunnel.TunnelException;
+import com.aliyun.odps.type.TypeInfoFactory;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
+import org.apache.arrow.memory.BufferAllocator;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class MaxComputeJniScannerTest {
+
+ @Mocked
+ private TableTunnel.DownloadSession session;
+ private Map paramsMc = new HashMap() {
+ {
+ put("region", "cn-beijing");
+ put("project", "test_pj");
+ put("table", "test_tb");
+ put("access_key", "ak");
+ put("secret_key", "sk");
+ put("start_offset", "0");
+ put("split_size", "128");
+ put("partition_spec", "p1=2022-06");
+ put("required_fields", "boolean,tinyint,smallint,int,bigint,float,double,"
+ + "date,timestamp,char,varchar,string,decimalv2,decimal64,"
+ + "decimal18,timestamp4");
+ put("columns_types", "boolean#tinyint#smallint#int#bigint#float#double#"
+ + "date#timestamp#char(10)#varchar(10)#string#decimalv2(12,4)#decimal64(10,3)#"
+ + "decimal(18,5)#timestamp(4)");
+ }
+ };
+ private MaxComputeJniScanner scanner = new MaxComputeJniScanner(32, paramsMc);
+
+ @BeforeEach
+ public void init() {
+ new MockUp(MaxComputeJniScanner.class) {
+ @Mock
+ public TableSchema getSchema() {
+ return getTestSchema();
+ }
+ };
+ new MockUp(MaxComputeTableScan.class) {
+ @Mock
+ public TableSchema getSchema() {
+ return getTestSchema();
+ }
+
+ @Mock
+ public TableTunnel.DownloadSession openDownLoadSession() throws IOException {
+ return session;
+ }
+
+ @Mock
+ public TableTunnel.DownloadSession openDownLoadSession(PartitionSpec partitionSpec) throws IOException {
+ return session;
+ }
+ };
+ new MockUp(TableTunnel.DownloadSession.class) {
+ @Mock
+ public TableSchema getSchema() {
+ return getTestSchema();
+ }
+
+ @Mock
+ public long getRecordCount() {
+ return 10;
+ }
+
+ @Mock
+ public ArrowRecordReader openArrowRecordReader(long start, long count, List columns,
+ BufferAllocator allocator)
+ throws TunnelException, IOException {
+ return null;
+ }
+ };
+ }
+
+ private TableSchema getTestSchema() {
+ TableSchema schema = new TableSchema();
+ schema.addColumn(new Column("boolean", TypeInfoFactory.BOOLEAN));
+ schema.addColumn(new Column("bigint", TypeInfoFactory.BIGINT));
+ schema.addPartitionColumn(new Column("date", TypeInfoFactory.DATE));
+ schema.addPartitionColumn(new Column("tinyint", TypeInfoFactory.TINYINT));
+ schema.addPartitionColumn(new Column("smallint", TypeInfoFactory.SMALLINT));
+ schema.addPartitionColumn(new Column("int", TypeInfoFactory.INT));
+ schema.addPartitionColumn(new Column("timestamp", TypeInfoFactory.TIMESTAMP));
+ schema.addPartitionColumn(new Column("char", TypeInfoFactory.getCharTypeInfo(10)));
+ schema.addPartitionColumn(new Column("varchar", TypeInfoFactory.getVarcharTypeInfo(10)));
+ schema.addPartitionColumn(new Column("string", TypeInfoFactory.STRING));
+ schema.addPartitionColumn(new Column("float", TypeInfoFactory.FLOAT));
+ schema.addPartitionColumn(new Column("double", TypeInfoFactory.DOUBLE));
+ schema.addPartitionColumn(new Column("decimalv2",
+ TypeInfoFactory.getDecimalTypeInfo(12, 4)));
+ schema.addPartitionColumn(new Column("decimal64",
+ TypeInfoFactory.getDecimalTypeInfo(10, 3)));
+ schema.addPartitionColumn(new Column("decimal18",
+ TypeInfoFactory.getDecimalTypeInfo(18, 5)));
+ schema.addPartitionColumn(new Column("timestamp4", TypeInfoFactory.TIMESTAMP));
+ return schema;
+ }
+
+ @Test
+ public void testMaxComputeJniScanner() throws IOException {
+ scanner.open();
+ scanner.getNext();
+ scanner.close();
+ }
+
+ @Test
+ public void testMaxComputeJniScannerErr() {
+ try {
+ new MockUp(TableTunnel.DownloadSession.class) {
+ @Mock
+ public ArrowRecordReader openArrowRecordReader(long start, long count, List columns,
+ BufferAllocator allocator)
+ throws TunnelException, IOException {
+ throw new TunnelException("TableModified");
+ }
+ };
+ scanner.open();
+ scanner.getNext();
+ scanner.close();
+ } catch (IOException e) {
+ Assertions.assertTrue(e.getCause() instanceof TunnelException);
+ Assertions.assertEquals(((TunnelException) e.getCause()).getErrorMsg(), "TableModified");
+ }
+ }
+}
diff --git a/regression-test/data/external_table_p2/maxcompute/test_external_catalog_maxcompute.out b/regression-test/data/external_table_p2/maxcompute/test_external_catalog_maxcompute.out
index 5fc7ade489..6cd91cf2ee 100644
--- a/regression-test/data/external_table_p2/maxcompute/test_external_catalog_maxcompute.out
+++ b/regression-test/data/external_table_p2/maxcompute/test_external_catalog_maxcompute.out
@@ -22,3 +22,6 @@ true 77 8920 182239402452
-- !q7 --
6223 maxam 2020-09-21
9601 qewtoll 2020-09-21
+
+-- !replay_q6 --
+9601 qewtoll 2020-09-21
diff --git a/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy b/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy
index 6b050e277a..23d0c0b252 100644
--- a/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy
+++ b/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy
@@ -51,10 +51,27 @@ suite("test_external_catalog_maxcompute", "p2,external,maxcompute,external_remot
qt_q6 """ select * from mc_parts where dt = '2020-09-21' and mc_bigint > 6223 """
qt_q7 """ select * from mc_parts where dt = '2020-09-21' or mc_bigint > 0 """
}
+
sql """ switch `${mc_catalog_name}`; """
sql """ use `${mc_db}`; """
q01()
q02()
q03()
+
+ // replay test
+ sql """drop catalog if exists ${mc_catalog_name};"""
+ sql """
+ create catalog if not exists ${mc_catalog_name} properties (
+ "type" = "max_compute",
+ "mc.region" = "cn-beijing",
+ "mc.default.project" = "${mc_db}",
+ "mc.access_key" = "${ak}",
+ "mc.secret_key" = "${sk}",
+ "mc.public_access" = "true"
+ );
+ """
+ sql """ switch `${mc_catalog_name}`; """
+ sql """ use `${mc_db}`; """
+ qt_replay_q6 """ select * from mc_parts where dt = '2020-09-21' and mc_bigint > 6223 """
}
}