[feature-wip](multi-catalog) support Iceberg time travel in external table (#15418)

For example
SELECT* FROM tbl FOR VERSION AS OF 10963874102873;
SELECT* FROM tbl FOR TIME AS OF '1986-10-26 01:21:00';
This commit is contained in:
slothever
2022-12-30 00:25:21 +08:00
committed by GitHub
parent 6c847daba0
commit 3ff01ca799
10 changed files with 215 additions and 5 deletions

View File

@ -457,6 +457,7 @@ terminal String
KW_NULL,
KW_NULLS,
KW_OBSERVER,
KW_OF,
KW_OFFSET,
KW_ON,
KW_ONLY,
@ -593,6 +594,7 @@ terminal String
KW_VARCHAR,
KW_VARIABLES,
KW_VERBOSE,
KW_VERSION,
KW_VIEW,
KW_WARNINGS,
KW_WEEK,
@ -685,6 +687,7 @@ nonterminal ArrayList<String> ident_list;
nonterminal PartitionNames opt_partition_names, partition_names;
nonterminal ArrayList<Long> opt_tablet_list, tablet_list;
nonterminal TableSample opt_table_sample, table_sample;
nonterminal TableSnapshot opt_table_snapshot, table_snapshot;
nonterminal ClusterName cluster_name;
nonterminal ClusterName des_cluster_name;
nonterminal TableName table_name, opt_table_name;
@ -5100,9 +5103,31 @@ base_table_ref_list ::=
;
base_table_ref ::=
table_name:name opt_partition_names:partitionNames opt_tablet_list:tabletIds opt_table_alias:alias opt_table_sample:tableSample opt_common_hints:commonHints
table_name:name opt_table_snapshot:tableSnapshot opt_partition_names:partitionNames opt_tablet_list:tabletIds opt_table_alias:alias opt_table_sample:tableSample opt_common_hints:commonHints
{:
RESULT = new TableRef(name, alias, partitionNames, tabletIds, tableSample, commonHints);
RESULT = new TableRef(name, alias, partitionNames, tabletIds, tableSample, commonHints, tableSnapshot);
:}
;
opt_table_snapshot ::=
/* empty */
{:
RESULT = null;
:}
| table_snapshot:tableSnapshot
{:
RESULT = tableSnapshot;
:}
;
table_snapshot ::=
KW_FOR KW_VERSION KW_AS KW_OF INTEGER_LITERAL:version
{:
RESULT = new TableSnapshot(version);
:}
| KW_FOR KW_TIME KW_AS KW_OF STRING_LITERAL:time
{:
RESULT = new TableSnapshot(time);
:}
;
@ -6693,6 +6718,8 @@ keyword ::=
{: RESULT = id; :}
| KW_NULLS:id
{: RESULT = id; :}
| KW_OF:id
{: RESULT = id; :}
| KW_OFFSET:id
{: RESULT = id; :}
| KW_ONLY:id
@ -6817,6 +6844,8 @@ keyword ::=
{: RESULT = id; :}
| KW_VERBOSE:id
{: RESULT = id; :}
| KW_VERSION:id
{: RESULT = id; :}
| KW_VIEW:id
{: RESULT = id; :}
| KW_WARNINGS:id

View File

@ -70,6 +70,7 @@ public class BaseTableRef extends TableRef {
name.analyze(analyzer);
desc = analyzer.registerTableRef(this);
isAnalyzed = true; // true that we have assigned desc
analyzeTableSnapshot(analyzer);
analyzeLateralViewRef(analyzer);
analyzeJoin(analyzer);
analyzeSortHints();

View File

@ -22,6 +22,7 @@ package org.apache.doris.analysis;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
@ -29,6 +30,7 @@ import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.rewrite.ExprRewriter;
import org.apache.doris.rewrite.ExprRewriter.ClauseType;
@ -48,6 +50,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.StringJoiner;
import java.util.regex.Matcher;
/**
* Superclass of all table references, including references to views, base tables
@ -129,6 +132,8 @@ public class TableRef implements ParseNode, Writable {
private boolean isPartitionJoin;
private String sortColumn = null;
private TableSnapshot tableSnapshot;
// END: Members that need to be reset()
// ///////////////////////////////////////
@ -153,6 +158,11 @@ public class TableRef implements ParseNode, Writable {
*/
public TableRef(TableName name, String alias, PartitionNames partitionNames, ArrayList<Long> sampleTabletIds,
TableSample tableSample, ArrayList<String> commonHints) {
this(name, alias, partitionNames, sampleTabletIds, tableSample, commonHints, null);
}
public TableRef(TableName name, String alias, PartitionNames partitionNames, ArrayList<Long> sampleTabletIds,
TableSample tableSample, ArrayList<String> commonHints, TableSnapshot tableSnapshot) {
this.name = name;
if (alias != null) {
if (Env.isStoredTableNamesLowerCase()) {
@ -167,6 +177,7 @@ public class TableRef implements ParseNode, Writable {
this.sampleTabletIds = sampleTabletIds;
this.tableSample = tableSample;
this.commonHints = commonHints;
this.tableSnapshot = tableSnapshot;
isAnalyzed = false;
}
@ -186,6 +197,7 @@ public class TableRef implements ParseNode, Writable {
(other.sortHints != null) ? Lists.newArrayList(other.sortHints) : null;
onClause = (other.onClause != null) ? other.onClause.clone().reset() : null;
partitionNames = (other.partitionNames != null) ? new PartitionNames(other.partitionNames) : null;
tableSnapshot = (other.tableSnapshot != null) ? new TableSnapshot(other.tableSnapshot) : null;
tableSample = (other.tableSample != null) ? new TableSample(other.tableSample) : null;
commonHints = other.commonHints;
@ -302,6 +314,10 @@ public class TableRef implements ParseNode, Writable {
return tableSample;
}
public TableSnapshot getTableSnapshot() {
return tableSnapshot;
}
/**
* This method should only be called after the TableRef has been analyzed.
*/
@ -499,6 +515,27 @@ public class TableRef implements ParseNode, Writable {
}
}
protected void analyzeTableSnapshot(Analyzer analyzer) throws AnalysisException {
if (tableSnapshot == null) {
return;
}
TableIf.TableType tableType = this.getTable().getType();
if (tableType != TableIf.TableType.HMS_EXTERNAL_TABLE) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_NONSUPPORT_TIME_TRAVEL_TABLE);
}
HMSExternalTable extTable = (HMSExternalTable) this.getTable();
if (extTable.getDlaType() != HMSExternalTable.DLAType.ICEBERG) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_NONSUPPORT_TIME_TRAVEL_TABLE);
}
if (tableSnapshot.getType() == TableSnapshot.VersionType.TIME) {
String asOfTime = tableSnapshot.getTime();
Matcher matcher = TimeUtils.DATETIME_FORMAT_REG.matcher(asOfTime);
if (!matcher.matches()) {
throw new AnalysisException("Invalid datetime string: " + asOfTime);
}
}
}
/**
* Analyze the join clause.
* The join clause can only be analyzed after the left table has been analyzed

View File

@ -0,0 +1,70 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
/**
* Snapshot read for time travel
* the version in 2022.12.28 just supports external iceberg table
*/
public class TableSnapshot {
public enum VersionType {
TIME, VERSION
}
private final VersionType type;
private String time;
private long version;
public TableSnapshot(long version) {
this.version = version;
this.type = VersionType.VERSION;
}
public TableSnapshot(String time) {
this.time = time;
this.type = VersionType.TIME;
}
public TableSnapshot(TableSnapshot other) {
this.type = other.type;
this.time = other.time;
this.version = other.version;
}
public VersionType getType() {
return type;
}
public String getTime() {
return time;
}
public long getVersion() {
return version;
}
@Override
public String toString() {
if (this.type == VersionType.VERSION) {
return " FOR VERSION AS OF " + version;
} else {
return " FOR TIME AS OF '" + time + "'";
}
}
}

View File

@ -1699,7 +1699,10 @@ public enum ErrorCode {
ERR_NONSUPPORT_HMS_TABLE(5088, new byte[]{'4', '2', '0', '0', '0'},
"Nonsupport hive metastore table named '%s' in database '%s' with catalog '%s'."),
ERR_TABLE_NAME_LENGTH_LIMIT(5089, new byte[]{'4', '2', '0', '0', '0'}, "Table name length exceeds limit, "
+ "the length of table name '%s' is %d which is greater than the configuration 'table_name_length_limit' (%d).");
+ "the length of table name '%s' is %d which is greater than the configuration 'table_name_length_limit' (%d)."),
ERR_NONSUPPORT_TIME_TRAVEL_TABLE(5090, new byte[]{'4', '2', '0', '0', '0'}, "Only iceberg external"
+ " table supports time travel in current version");
// This is error code
private final int code;

View File

@ -68,7 +68,7 @@ public class TimeUtils {
private static final SimpleDateFormat DATETIME_FORMAT;
private static final SimpleDateFormat TIME_FORMAT;
private static final Pattern DATETIME_FORMAT_REG =
public static final Pattern DATETIME_FORMAT_REG =
Pattern.compile("^((\\d{2}(([02468][048])|([13579][26]))[\\-\\/\\s]?((((0?[13578])|(1[02]))[\\-\\/\\s]?"
+ "((0?[1-9])|([1-2][0-9])|(3[01])))|(((0?[469])|(11))[\\-\\/\\s]?"
+ "((0?[1-9])|([1-2][0-9])|(30)))|(0?2[\\-\\/\\s]?((0?[1-9])|([1-2][0-9])))))|("

View File

@ -25,6 +25,7 @@ import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.analysis.TableName;
import org.apache.doris.analysis.TableRef;
import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.HMSResource;
@ -35,6 +36,7 @@ import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.external.iceberg.util.IcebergUtils;
import org.apache.doris.planner.ColumnRange;
import org.apache.doris.thrift.TFileFormatType;
@ -54,14 +56,17 @@ import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.HistoryEntry;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.types.Conversions;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -168,6 +173,20 @@ public class IcebergScanProvider extends HiveScanProvider {
org.apache.iceberg.Table table = getIcebergTable();
TableScan scan = table.newScan();
TableSnapshot tableSnapshot = desc.getRef().getTableSnapshot();
if (tableSnapshot != null) {
TableSnapshot.VersionType type = tableSnapshot.getType();
try {
if (type == TableSnapshot.VersionType.VERSION) {
scan = scan.useSnapshot(tableSnapshot.getVersion());
} else {
long snapshotId = TimeUtils.timeStringToLong(tableSnapshot.getTime(), TimeUtils.getTimeZone());
scan = scan.useSnapshot(getSnapshotIdAsOfTime(table.history(), snapshotId));
}
} catch (IllegalArgumentException e) {
throw new UserException(e);
}
}
for (Expression predicate : expressions) {
scan = scan.filter(predicate);
}
@ -199,6 +218,27 @@ public class IcebergScanProvider extends HiveScanProvider {
return splits;
}
public static long getSnapshotIdAsOfTime(List<HistoryEntry> historyEntries, long asOfTimestamp) {
// find history at or before asOfTimestamp
HistoryEntry latestHistory = null;
for (HistoryEntry entry : historyEntries) {
if (entry.timestampMillis() <= asOfTimestamp) {
if (latestHistory == null) {
latestHistory = entry;
continue;
}
if (entry.timestampMillis() > latestHistory.timestampMillis()) {
latestHistory = entry;
}
}
}
if (latestHistory == null) {
throw new NotFoundException("No version history at or before "
+ Instant.ofEpochMilli(asOfTimestamp));
}
return latestHistory.snapshotId();
}
private List<IcebergDeleteFileFilter> getDeleteFileFilters(FileScanTask spitTask) {
List<IcebergDeleteFileFilter> filters = new ArrayList<>();
for (DeleteFile delete : spitTask.deletes()) {

View File

@ -317,6 +317,7 @@ import org.apache.doris.qe.SqlModeHelper;
keywordMap.put("null", new Integer(SqlParserSymbols.KW_NULL));
keywordMap.put("nulls", new Integer(SqlParserSymbols.KW_NULLS));
keywordMap.put("observer", new Integer(SqlParserSymbols.KW_OBSERVER));
keywordMap.put("of", new Integer(SqlParserSymbols.KW_OF));
keywordMap.put("offset", new Integer(SqlParserSymbols.KW_OFFSET));
keywordMap.put("on", new Integer(SqlParserSymbols.KW_ON));
keywordMap.put("only", new Integer(SqlParserSymbols.KW_ONLY));
@ -455,6 +456,7 @@ import org.apache.doris.qe.SqlModeHelper;
keywordMap.put("varchar", new Integer(SqlParserSymbols.KW_VARCHAR));
keywordMap.put("variables", new Integer(SqlParserSymbols.KW_VARIABLES));
keywordMap.put("verbose", new Integer(SqlParserSymbols.KW_VERBOSE));
keywordMap.put("version", new Integer(SqlParserSymbols.KW_VERSION));
keywordMap.put("view", new Integer(SqlParserSymbols.KW_VIEW));
keywordMap.put("warnings", new Integer(SqlParserSymbols.KW_WARNINGS));
keywordMap.put("week", new Integer(SqlParserSymbols.KW_WEEK));

View File

@ -30,4 +30,24 @@
2736865
-- !q08 --
1499999990
1499999990
-- !q09 --
1
2
3
-- !q10 --
2
3
4
-- !q11 --
1
2
3
-- !q12 --
2
3
4

View File

@ -42,7 +42,15 @@ suite("test_external_catalog_icebergv2", "p2") {
qt_q07 """ select o_orderkey from orders where o_custkey < 3357 limit 3"""
qt_q08 """ select count(1) as c from customer;"""
}
// test time travel stmt
def q02 = {
qt_q09 """ select c_custkey from customer for time as of '2022-12-27 10:21:36' limit 3 """
qt_q10 """ select c_custkey from customer for time as of '2022-12-28 10:21:36' limit 3 """
qt_q11 """ select c_custkey from customer for version as of 906874575350293177 limit 3 """
qt_q12 """ select c_custkey from customer for version as of 6352416983354893547 limit 3 """
}
sql """ use `tpch_1000_icebergv2`; """
q01()
q02()
}
}