[fix](multi-catalog)add the max compute fe ut and fix download expired (#27007)

1. add the max compute fe ut and fix download expired
2. solve memery leak when allocator close
3. add correct partition rows
This commit is contained in:
slothever
2023-11-20 10:42:07 +08:00
committed by GitHub
parent 8c9a22cc4f
commit add6bdb240
5 changed files with 219 additions and 11 deletions

View File

@ -98,6 +98,15 @@ under the License.
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<forkCount>${fe_ut_parallel}</forkCount>
<reuseForks>false</reuseForks>
<useFile>false</useFile>
<argLine>-javaagent:${settings.localRepository}/org/jmockit/jmockit/${jmockit.version}/jmockit-${jmockit.version}.jar</argLine>
</configuration>
</plugin>
</plugins>
</build>

View File

@ -26,6 +26,7 @@ import com.aliyun.odps.OdpsType;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.data.ArrowRecordReader;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TunnelException;
import com.aliyun.odps.type.TypeInfo;
import com.aliyun.odps.type.TypeInfoFactory;
import com.google.common.base.Strings;
@ -60,27 +61,30 @@ public class MaxComputeJniScanner extends JniScanner {
private static final String START_OFFSET = "start_offset";
private static final String SPLIT_SIZE = "split_size";
private static final String PUBLIC_ACCESS = "public_access";
private final RootAllocator arrowAllocator = new RootAllocator(Integer.MAX_VALUE);
private final Map<String, MaxComputeTableScan> tableScans = new ConcurrentHashMap<>();
private final String region;
private final String project;
private final String table;
private PartitionSpec partitionSpec;
private Set<String> partitionColumns;
private final MaxComputeTableScan curTableScan;
private MaxComputeTableScan curTableScan;
private MaxComputeColumnValue columnValue;
private long remainBatchRows = 0;
private long totalRows = 0;
private RootAllocator arrowAllocator;
private ArrowRecordReader curReader;
private List<Column> readColumns;
private Map<String, Integer> readColumnsToId;
private long startOffset = -1L;
private int retryCount = 2;
private long splitSize = -1L;
private final Map<String, String> refreshParams;
public MaxComputeJniScanner(int batchSize, Map<String, String> params) {
region = Objects.requireNonNull(params.get(REGION), "required property '" + REGION + "'.");
project = Objects.requireNonNull(params.get(PROJECT), "required property '" + PROJECT + "'.");
table = Objects.requireNonNull(params.get(TABLE), "required property '" + TABLE + "'.");
refreshParams = params;
tableScans.putIfAbsent(tableUniqKey(), newTableScan(params));
curTableScan = tableScans.get(tableUniqKey());
String partitionSpec = params.get(PARTITION_SPEC);
@ -104,6 +108,11 @@ public class MaxComputeJniScanner extends JniScanner {
initTableInfo(columnTypes, requiredFields, predicates, batchSize);
}
public void refreshTableScan() {
curTableScan = newTableScan(refreshParams);
tableScans.put(tableUniqKey(), curTableScan);
}
private MaxComputeTableScan newTableScan(Map<String, String> params) {
if (!Strings.isNullOrEmpty(params.get(START_OFFSET))
&& !Strings.isNullOrEmpty(params.get(SPLIT_SIZE))) {
@ -132,6 +141,10 @@ public class MaxComputeJniScanner extends JniScanner {
readColumnsToId.put(fields[i], i);
}
}
}
@Override
public void open() throws IOException {
// reorder columns
List<Column> columnList = curTableScan.getSchema().getColumns();
columnList.addAll(curTableScan.getSchema().getPartitionColumns());
@ -142,10 +155,6 @@ public class MaxComputeJniScanner extends JniScanner {
// Downloading columns data from Max compute only supports the order of table metadata.
// We might get an error message if no sort here: Column reorder is not supported in legacy arrow mode.
readColumns.sort((Comparator.comparing(o -> columnRank.get(o.getName()))));
}
@Override
public void open() throws IOException {
if (readColumns.isEmpty()) {
return;
}
@ -159,19 +168,27 @@ public class MaxComputeJniScanner extends JniScanner {
long start = startOffset == -1L ? 0 : startOffset;
long recordCount = session.getRecordCount();
totalRows = splitSize > 0 ? Math.min(splitSize, recordCount) : recordCount;
arrowAllocator = new RootAllocator(Long.MAX_VALUE);
partitionColumns = session.getSchema().getPartitionColumns().stream()
.map(Column::getName)
.collect(Collectors.toSet());
List<Column> maxComputeColumns = new ArrayList<>(readColumns);
maxComputeColumns.removeIf(e -> partitionColumns.contains(e.getName()));
curReader = session.openArrowRecordReader(start, totalRows, maxComputeColumns, arrowAllocator);
remainBatchRows = totalRows;
} catch (TunnelException e) {
if (retryCount > 0 && e.getErrorMsg().contains("TableModified")) {
retryCount--;
// try to refresh table scan and re-open odps
refreshTableScan();
open();
} else {
retryCount = 2;
throw new IOException(e);
}
} catch (Exception e) {
close();
throw new IOException(e);
}
remainBatchRows = totalRows;
}
private Column createOdpsColumn(int colIdx, ColumnType dorisType) {
@ -204,9 +221,11 @@ public class MaxComputeJniScanner extends JniScanner {
case DOUBLE:
odpsType = TypeInfoFactory.DOUBLE;
break;
case DATETIME:
case DATETIMEV2:
odpsType = TypeInfoFactory.DATETIME;
break;
case DATE:
case DATEV2:
odpsType = TypeInfoFactory.DATE;
break;
@ -237,7 +256,7 @@ public class MaxComputeJniScanner extends JniScanner {
startOffset = -1;
splitSize = -1;
if (curReader != null) {
arrowAllocator.close();
arrowAllocator.releaseBytes(arrowAllocator.getAllocatedMemory());
curReader.close();
curReader = null;
}
@ -279,7 +298,9 @@ public class MaxComputeJniScanner extends JniScanner {
Integer readColumnId = readColumnsToId.get(partitionColumn);
if (readColumnId != null && partitionValue != null) {
MaxComputePartitionValue value = new MaxComputePartitionValue(partitionValue);
appendData(readColumnId, value);
for (int i = 0; i < batchRows; i++) {
appendData(readColumnId, value);
}
}
}
}

View File

@ -0,0 +1,158 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.maxcompute;
import com.aliyun.odps.Column;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.data.ArrowRecordReader;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TunnelException;
import com.aliyun.odps.type.TypeInfoFactory;
import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;
import org.apache.arrow.memory.BufferAllocator;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class MaxComputeJniScannerTest {
@Mocked
private TableTunnel.DownloadSession session;
private Map<String, String> paramsMc = new HashMap<String, String>() {
{
put("region", "cn-beijing");
put("project", "test_pj");
put("table", "test_tb");
put("access_key", "ak");
put("secret_key", "sk");
put("start_offset", "0");
put("split_size", "128");
put("partition_spec", "p1=2022-06");
put("required_fields", "boolean,tinyint,smallint,int,bigint,float,double,"
+ "date,timestamp,char,varchar,string,decimalv2,decimal64,"
+ "decimal18,timestamp4");
put("columns_types", "boolean#tinyint#smallint#int#bigint#float#double#"
+ "date#timestamp#char(10)#varchar(10)#string#decimalv2(12,4)#decimal64(10,3)#"
+ "decimal(18,5)#timestamp(4)");
}
};
private MaxComputeJniScanner scanner = new MaxComputeJniScanner(32, paramsMc);
@BeforeEach
public void init() {
new MockUp<MaxComputeJniScanner>(MaxComputeJniScanner.class) {
@Mock
public TableSchema getSchema() {
return getTestSchema();
}
};
new MockUp<MaxComputeTableScan>(MaxComputeTableScan.class) {
@Mock
public TableSchema getSchema() {
return getTestSchema();
}
@Mock
public TableTunnel.DownloadSession openDownLoadSession() throws IOException {
return session;
}
@Mock
public TableTunnel.DownloadSession openDownLoadSession(PartitionSpec partitionSpec) throws IOException {
return session;
}
};
new MockUp<TableTunnel.DownloadSession>(TableTunnel.DownloadSession.class) {
@Mock
public TableSchema getSchema() {
return getTestSchema();
}
@Mock
public long getRecordCount() {
return 10;
}
@Mock
public ArrowRecordReader openArrowRecordReader(long start, long count, List<Column> columns,
BufferAllocator allocator)
throws TunnelException, IOException {
return null;
}
};
}
private TableSchema getTestSchema() {
TableSchema schema = new TableSchema();
schema.addColumn(new Column("boolean", TypeInfoFactory.BOOLEAN));
schema.addColumn(new Column("bigint", TypeInfoFactory.BIGINT));
schema.addPartitionColumn(new Column("date", TypeInfoFactory.DATE));
schema.addPartitionColumn(new Column("tinyint", TypeInfoFactory.TINYINT));
schema.addPartitionColumn(new Column("smallint", TypeInfoFactory.SMALLINT));
schema.addPartitionColumn(new Column("int", TypeInfoFactory.INT));
schema.addPartitionColumn(new Column("timestamp", TypeInfoFactory.TIMESTAMP));
schema.addPartitionColumn(new Column("char", TypeInfoFactory.getCharTypeInfo(10)));
schema.addPartitionColumn(new Column("varchar", TypeInfoFactory.getVarcharTypeInfo(10)));
schema.addPartitionColumn(new Column("string", TypeInfoFactory.STRING));
schema.addPartitionColumn(new Column("float", TypeInfoFactory.FLOAT));
schema.addPartitionColumn(new Column("double", TypeInfoFactory.DOUBLE));
schema.addPartitionColumn(new Column("decimalv2",
TypeInfoFactory.getDecimalTypeInfo(12, 4)));
schema.addPartitionColumn(new Column("decimal64",
TypeInfoFactory.getDecimalTypeInfo(10, 3)));
schema.addPartitionColumn(new Column("decimal18",
TypeInfoFactory.getDecimalTypeInfo(18, 5)));
schema.addPartitionColumn(new Column("timestamp4", TypeInfoFactory.TIMESTAMP));
return schema;
}
@Test
public void testMaxComputeJniScanner() throws IOException {
scanner.open();
scanner.getNext();
scanner.close();
}
@Test
public void testMaxComputeJniScannerErr() {
try {
new MockUp<TableTunnel.DownloadSession>(TableTunnel.DownloadSession.class) {
@Mock
public ArrowRecordReader openArrowRecordReader(long start, long count, List<Column> columns,
BufferAllocator allocator)
throws TunnelException, IOException {
throw new TunnelException("TableModified");
}
};
scanner.open();
scanner.getNext();
scanner.close();
} catch (IOException e) {
Assertions.assertTrue(e.getCause() instanceof TunnelException);
Assertions.assertEquals(((TunnelException) e.getCause()).getErrorMsg(), "TableModified");
}
}
}

View File

@ -22,3 +22,6 @@ true 77 8920 182239402452
-- !q7 --
6223 maxam 2020-09-21
9601 qewtoll 2020-09-21
-- !replay_q6 --
9601 qewtoll 2020-09-21

View File

@ -51,10 +51,27 @@ suite("test_external_catalog_maxcompute", "p2,external,maxcompute,external_remot
qt_q6 """ select * from mc_parts where dt = '2020-09-21' and mc_bigint > 6223 """
qt_q7 """ select * from mc_parts where dt = '2020-09-21' or mc_bigint > 0 """
}
sql """ switch `${mc_catalog_name}`; """
sql """ use `${mc_db}`; """
q01()
q02()
q03()
// replay test
sql """drop catalog if exists ${mc_catalog_name};"""
sql """
create catalog if not exists ${mc_catalog_name} properties (
"type" = "max_compute",
"mc.region" = "cn-beijing",
"mc.default.project" = "${mc_db}",
"mc.access_key" = "${ak}",
"mc.secret_key" = "${sk}",
"mc.public_access" = "true"
);
"""
sql """ switch `${mc_catalog_name}`; """
sql """ use `${mc_db}`; """
qt_replay_q6 """ select * from mc_parts where dt = '2020-09-21' and mc_bigint > 6223 """
}
}