[feature](iceberg) Step3: Support query iceberg external table (#8179)
1. Add Iceberg scan node 2. Add Iceberg/Hive table type in thrift 3. Support querying Iceberg tables of format types `parquet` and `orc`
This commit is contained in:
@ -147,6 +147,28 @@ std::string BrokerTableDescriptor::debug_string() const {
|
||||
return out.str();
|
||||
}
|
||||
|
||||
HiveTableDescriptor::HiveTableDescriptor(const TTableDescriptor& tdesc)
|
||||
: TableDescriptor(tdesc) {}
|
||||
|
||||
HiveTableDescriptor::~HiveTableDescriptor() {}
|
||||
|
||||
std::string HiveTableDescriptor::debug_string() const {
|
||||
std::stringstream out;
|
||||
out << "HiveTable(" << TableDescriptor::debug_string() << ")";
|
||||
return out.str();
|
||||
}
|
||||
|
||||
IcebergTableDescriptor::IcebergTableDescriptor(const TTableDescriptor& tdesc)
|
||||
: TableDescriptor(tdesc) {}
|
||||
|
||||
IcebergTableDescriptor::~IcebergTableDescriptor() {}
|
||||
|
||||
std::string IcebergTableDescriptor::debug_string() const {
|
||||
std::stringstream out;
|
||||
out << "IcebergTable(" << TableDescriptor::debug_string() << ")";
|
||||
return out.str();
|
||||
}
|
||||
|
||||
EsTableDescriptor::EsTableDescriptor(const TTableDescriptor& tdesc) : TableDescriptor(tdesc) {}
|
||||
|
||||
EsTableDescriptor::~EsTableDescriptor() {}
|
||||
@ -523,6 +545,12 @@ Status DescriptorTbl::create(ObjectPool* pool, const TDescriptorTable& thrift_tb
|
||||
case TTableType::ES_TABLE:
|
||||
desc = pool->add(new EsTableDescriptor(tdesc));
|
||||
break;
|
||||
case TTableType::HIVE_TABLE:
|
||||
desc = pool->add(new HiveTableDescriptor(tdesc));
|
||||
break;
|
||||
case TTableType::ICEBERG_TABLE:
|
||||
desc = pool->add(new IcebergTableDescriptor(tdesc));
|
||||
break;
|
||||
default:
|
||||
DCHECK(false) << "invalid table type: " << tdesc.tableType;
|
||||
}
|
||||
|
||||
@ -190,6 +190,24 @@ public:
|
||||
private:
|
||||
};
|
||||
|
||||
class HiveTableDescriptor : public TableDescriptor {
|
||||
public:
|
||||
HiveTableDescriptor(const TTableDescriptor& tdesc);
|
||||
virtual ~HiveTableDescriptor();
|
||||
virtual std::string debug_string() const;
|
||||
|
||||
private:
|
||||
};
|
||||
|
||||
class IcebergTableDescriptor : public TableDescriptor {
|
||||
public:
|
||||
IcebergTableDescriptor(const TTableDescriptor& tdesc);
|
||||
virtual ~IcebergTableDescriptor();
|
||||
virtual std::string debug_string() const;
|
||||
|
||||
private:
|
||||
};
|
||||
|
||||
class EsTableDescriptor : public TableDescriptor {
|
||||
public:
|
||||
EsTableDescriptor(const TTableDescriptor& tdesc);
|
||||
|
||||
@ -599,6 +599,14 @@ under the License.
|
||||
<artifactId>iceberg-hive-metastore</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<!-- For Iceberg, must be consistent with Iceberg version -->
|
||||
<dependency>
|
||||
<groupId>org.apache.avro</groupId>
|
||||
<artifactId>avro</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
<build>
|
||||
<finalName>palo-fe</finalName>
|
||||
|
||||
@ -134,7 +134,7 @@ public class HiveTable extends Table {
|
||||
@Override
|
||||
public TTableDescriptor toThrift() {
|
||||
THiveTable tHiveTable = new THiveTable(getHiveDb(), getHiveTable(), getHiveProperties());
|
||||
TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.BROKER_TABLE,
|
||||
TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.HIVE_TABLE,
|
||||
fullSchema.size(), 0, getName(), "");
|
||||
tTableDescriptor.setHiveTable(tHiveTable);
|
||||
return tTableDescriptor;
|
||||
|
||||
@ -17,22 +17,38 @@
|
||||
|
||||
package org.apache.doris.catalog;
|
||||
|
||||
|
||||
import org.apache.doris.analysis.StorageBackend;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.io.Text;
|
||||
import org.apache.doris.external.iceberg.IcebergCatalog;
|
||||
import org.apache.doris.external.iceberg.IcebergCatalogMgr;
|
||||
import org.apache.doris.thrift.TBrokerFileStatus;
|
||||
import org.apache.doris.thrift.TIcebergTable;
|
||||
import org.apache.doris.thrift.TTableDescriptor;
|
||||
import org.apache.doris.thrift.TTableType;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.iceberg.FileScanTask;
|
||||
import org.apache.iceberg.TableProperties;
|
||||
import org.apache.iceberg.TableScan;
|
||||
import org.apache.iceberg.catalog.TableIdentifier;
|
||||
import org.apache.iceberg.expressions.Expression;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
* External Iceberg table
|
||||
@ -44,10 +60,23 @@ public class IcebergTable extends Table {
|
||||
private String icebergDb;
|
||||
// remote Iceberg table name
|
||||
private String icebergTbl;
|
||||
// remote Iceberg table location
|
||||
private String location;
|
||||
// Iceberg table file format
|
||||
private String fileFormat;
|
||||
// Iceberg storage type
|
||||
private StorageBackend.StorageType storageType;
|
||||
// Iceberg remote host uri
|
||||
private String hostUri;
|
||||
// location analyze flag
|
||||
private boolean isAnalyzed = false;
|
||||
private Map<String, String> icebergProperties = Maps.newHashMap();
|
||||
|
||||
private org.apache.iceberg.Table icebergTable;
|
||||
|
||||
private final byte[] loadLock = new byte[0];
|
||||
private final AtomicBoolean isLoaded = new AtomicBoolean(false);
|
||||
|
||||
public IcebergTable() {
|
||||
super(TableType.ICEBERG);
|
||||
}
|
||||
@ -73,28 +102,126 @@ public class IcebergTable extends Table {
|
||||
return icebergDb;
|
||||
}
|
||||
|
||||
public void setIcebergDb(String icebergDb) {
|
||||
this.icebergDb = icebergDb;
|
||||
}
|
||||
|
||||
public String getIcebergTbl() {
|
||||
return icebergTbl;
|
||||
}
|
||||
|
||||
public void setIcebergTbl(String icebergTbl) {
|
||||
this.icebergTbl = icebergTbl;
|
||||
}
|
||||
|
||||
public Map<String, String> getIcebergProperties() {
|
||||
return icebergProperties;
|
||||
}
|
||||
|
||||
public void setIcebergProperties(Map<String, String> icebergProperties) {
|
||||
this.icebergProperties = icebergProperties;
|
||||
private void getLocation() throws UserException {
|
||||
if (Strings.isNullOrEmpty(location)) {
|
||||
try {
|
||||
getTable();
|
||||
} catch (Exception e) {
|
||||
throw new UserException("Failed to get table: " + name + ",error: " + e.getMessage());
|
||||
}
|
||||
location = icebergTable.location();
|
||||
}
|
||||
analyzeLocation();
|
||||
}
|
||||
|
||||
public org.apache.iceberg.Table getIcebergTable() {
|
||||
return icebergTable;
|
||||
private void analyzeLocation() throws UserException {
|
||||
if (isAnalyzed) {
|
||||
return;
|
||||
}
|
||||
String[] strings = StringUtils.split(location, "/");
|
||||
|
||||
// analyze storage type
|
||||
String storagePrefix = strings[0].split(":")[0];
|
||||
if (storagePrefix.equalsIgnoreCase("s3")) {
|
||||
this.storageType = StorageBackend.StorageType.S3;
|
||||
} else if (storagePrefix.equalsIgnoreCase("hdfs")) {
|
||||
this.storageType = StorageBackend.StorageType.HDFS;
|
||||
} else {
|
||||
throw new UserException("Not supported storage type: " + storagePrefix);
|
||||
}
|
||||
|
||||
// analyze host uri
|
||||
// eg: hdfs://host:port
|
||||
// s3://host:port
|
||||
String host = strings[1];
|
||||
this.hostUri = storagePrefix + "://" + host;
|
||||
this.isAnalyzed = true;
|
||||
}
|
||||
|
||||
public String getHostUri() throws UserException {
|
||||
if (!isAnalyzed) {
|
||||
getLocation();
|
||||
}
|
||||
return hostUri;
|
||||
}
|
||||
|
||||
public StorageBackend.StorageType getStorageType() throws UserException {
|
||||
if (!isAnalyzed) {
|
||||
getLocation();
|
||||
}
|
||||
return storageType;
|
||||
}
|
||||
|
||||
public String getFileFormat() throws UserException {
|
||||
if (Strings.isNullOrEmpty(fileFormat)) {
|
||||
try {
|
||||
getTable();
|
||||
} catch (Exception e) {
|
||||
throw new UserException("Failed to get table: " + name + ",error: " + e.getMessage());
|
||||
}
|
||||
fileFormat = icebergTable.properties().get(TableProperties.DEFAULT_FILE_FORMAT);
|
||||
}
|
||||
return fileFormat;
|
||||
}
|
||||
|
||||
// get the iceberg table instance, if table is not loaded, load it.
|
||||
private org.apache.iceberg.Table getTable() throws Exception {
|
||||
if (isLoaded.get()) {
|
||||
Preconditions.checkNotNull(icebergTable);
|
||||
return icebergTable;
|
||||
}
|
||||
synchronized (loadLock) {
|
||||
if (icebergTable != null) {
|
||||
return icebergTable;
|
||||
}
|
||||
|
||||
IcebergProperty icebergProperty = getIcebergProperty();
|
||||
IcebergCatalog icebergCatalog = IcebergCatalogMgr.getCatalog(icebergProperty);
|
||||
try {
|
||||
this.icebergTable = icebergCatalog.loadTable(TableIdentifier.of(icebergDb, icebergTbl));
|
||||
LOG.info("finished to load iceberg table: {}", name);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("failed to load iceberg table {} from {}", name, icebergProperty.getHiveMetastoreUris(), e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
isLoaded.set(true);
|
||||
return icebergTable;
|
||||
}
|
||||
}
|
||||
|
||||
private IcebergProperty getIcebergProperty() {
|
||||
Map<String, String> properties = Maps.newHashMap(icebergProperties);
|
||||
properties.put(IcebergProperty.ICEBERG_DATABASE, icebergDb);
|
||||
properties.put(IcebergProperty.ICEBERG_TABLE, icebergTbl);
|
||||
return new IcebergProperty(properties);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get iceberg data file by file system table location and iceberg predicates
|
||||
* @throws Exception
|
||||
*/
|
||||
public List<TBrokerFileStatus> getIcebergDataFiles(List<Expression> predicates) throws Exception {
|
||||
org.apache.iceberg.Table table = getTable();
|
||||
TableScan scan = table.newScan();
|
||||
for (Expression predicate : predicates) {
|
||||
scan = scan.filter(predicate);
|
||||
}
|
||||
List<TBrokerFileStatus> relatedFiles = Lists.newArrayList();
|
||||
for (FileScanTask task : scan.planFiles()) {
|
||||
Path path = Paths.get(task.file().path().toString());
|
||||
String relativePath = "/" + path.subpath(2, path.getNameCount());
|
||||
relatedFiles.add(new TBrokerFileStatus(relativePath, false, task.file().fileSizeInBytes(), false));
|
||||
}
|
||||
return relatedFiles;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -128,7 +255,7 @@ public class IcebergTable extends Table {
|
||||
@Override
|
||||
public TTableDescriptor toThrift() {
|
||||
TIcebergTable tIcebergTable = new TIcebergTable(getIcebergDb(), getIcebergTbl(), getIcebergProperties());
|
||||
TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.BROKER_TABLE,
|
||||
TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.ICEBERG_TABLE,
|
||||
fullSchema.size(), 0, getName(), "");
|
||||
tTableDescriptor.setIcebergTable(tIcebergTable);
|
||||
return tTableDescriptor;
|
||||
|
||||
@ -28,6 +28,7 @@ import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.HiveTable;
|
||||
import org.apache.doris.catalog.IcebergTable;
|
||||
import org.apache.doris.catalog.KeysType;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.OlapTable.OlapTableState;
|
||||
@ -37,6 +38,7 @@ import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.FeMetaVersion;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.io.Text;
|
||||
import org.apache.doris.common.io.Writable;
|
||||
import org.apache.doris.load.loadv2.LoadTask;
|
||||
@ -137,6 +139,15 @@ public class BrokerFileGroup implements Writable {
|
||||
this.columnExprList = columnExprList;
|
||||
}
|
||||
|
||||
// Used for iceberg table, no need to parse
|
||||
public BrokerFileGroup(IcebergTable table) throws UserException {
|
||||
this.tableId = table.getId();
|
||||
this.isNegative = false;
|
||||
this.valueSeparator = "|";
|
||||
this.lineDelimiter = "\n";
|
||||
this.fileFormat = table.getFileFormat();
|
||||
}
|
||||
|
||||
public BrokerFileGroup(DataDescription dataDescription) {
|
||||
this.fileFieldNames = dataDescription.getFileFieldNames();
|
||||
this.columnsFromPath = dataDescription.getColumnsFromPath();
|
||||
|
||||
@ -50,6 +50,7 @@ import org.apache.doris.transaction.TabletQuorumFailedException;
|
||||
import org.apache.doris.transaction.TransactionState;
|
||||
import org.apache.doris.transaction.TransactionStatus;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
@ -63,7 +64,6 @@ import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
|
||||
import avro.shaded.com.google.common.collect.Lists;
|
||||
|
||||
public class LoadChecker extends MasterDaemon {
|
||||
private static final Logger LOG = LogManager.getLogger(LoadChecker.class);
|
||||
|
||||
@ -401,7 +401,7 @@ public class BrokerScanNode extends LoadScanNode {
|
||||
Collections.shuffle(backends, random);
|
||||
}
|
||||
|
||||
private TFileFormatType formatType(String fileFormat, String path) {
|
||||
private TFileFormatType formatType(String fileFormat, String path) throws UserException {
|
||||
if (fileFormat != null) {
|
||||
if (fileFormat.toLowerCase().equals("parquet")) {
|
||||
return TFileFormatType.FORMAT_PARQUET;
|
||||
@ -411,6 +411,8 @@ public class BrokerScanNode extends LoadScanNode {
|
||||
return TFileFormatType.FORMAT_JSON;
|
||||
} else if (fileFormat.toLowerCase().equals("csv")) {
|
||||
return TFileFormatType.FORMAT_CSV_PLAIN;
|
||||
} else {
|
||||
throw new UserException("Not supported file format: " + fileFormat);
|
||||
}
|
||||
}
|
||||
|
||||
@ -432,6 +434,10 @@ public class BrokerScanNode extends LoadScanNode {
|
||||
}
|
||||
}
|
||||
|
||||
public String getHostUri() throws UserException {
|
||||
return "";
|
||||
}
|
||||
|
||||
// If fileFormat is not null, we use fileFormat instead of check file's suffix
|
||||
private void processFileGroup(
|
||||
ParamCreateContext context,
|
||||
@ -440,11 +446,11 @@ public class BrokerScanNode extends LoadScanNode {
|
||||
if (fileStatuses == null || fileStatuses.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
// set hdfs params, used to Hive and Iceberg scan
|
||||
THdfsParams tHdfsParams = new THdfsParams();
|
||||
if (this instanceof HiveScanNode) {
|
||||
String fsName = ((HiveScanNode) this).getHdfsUri();
|
||||
tHdfsParams.setFsName(fsName);
|
||||
}
|
||||
String fsName = getHostUri();
|
||||
tHdfsParams.setFsName(fsName);
|
||||
|
||||
TScanRangeLocations curLocations = newLocations(context.params, brokerDesc);
|
||||
long curInstanceBytes = 0;
|
||||
long curFileOffset = 0;
|
||||
@ -477,10 +483,8 @@ public class BrokerScanNode extends LoadScanNode {
|
||||
} else {
|
||||
TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(curFileOffset, fileStatus, formatType,
|
||||
leftBytes, columnsFromPath, numberOfColumnsFromFile, brokerDesc);
|
||||
if (this instanceof HiveScanNode) {
|
||||
rangeDesc.setHdfsParams(tHdfsParams);
|
||||
rangeDesc.setReadByColumnDef(true);
|
||||
}
|
||||
rangeDesc.setHdfsParams(tHdfsParams);
|
||||
rangeDesc.setReadByColumnDef(true);
|
||||
brokerScanRange(curLocations).addToRanges(rangeDesc);
|
||||
curFileOffset = 0;
|
||||
i++;
|
||||
@ -502,10 +506,8 @@ public class BrokerScanNode extends LoadScanNode {
|
||||
rangeDesc.setNumAsString(context.fileGroup.isNumAsString());
|
||||
rangeDesc.setReadJsonByLine(context.fileGroup.isReadJsonByLine());
|
||||
}
|
||||
if (this instanceof HiveScanNode) {
|
||||
rangeDesc.setHdfsParams(tHdfsParams);
|
||||
rangeDesc.setReadByColumnDef(true);
|
||||
}
|
||||
rangeDesc.setHdfsParams(tHdfsParams);
|
||||
rangeDesc.setReadByColumnDef(true);
|
||||
brokerScanRange(curLocations).addToRanges(rangeDesc);
|
||||
curFileOffset = 0;
|
||||
curInstanceBytes += leftBytes;
|
||||
|
||||
@ -40,6 +40,7 @@ import org.apache.doris.thrift.TPartitionType;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
@ -50,7 +51,6 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import avro.shaded.com.google.common.collect.Maps;
|
||||
|
||||
/**
|
||||
* The distributed planner is responsible for creating an executable, distributed plan
|
||||
|
||||
@ -70,7 +70,7 @@ public class HiveScanNode extends BrokerScanNode {
|
||||
private List<String> partitionKeys = new ArrayList<>();
|
||||
/* hive table properties */
|
||||
|
||||
public String getHdfsUri() {
|
||||
public String getHostUri() {
|
||||
return hdfsUri;
|
||||
}
|
||||
|
||||
|
||||
@ -0,0 +1,109 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.planner;
|
||||
|
||||
import org.apache.doris.analysis.Analyzer;
|
||||
import org.apache.doris.analysis.BrokerDesc;
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.analysis.TupleDescriptor;
|
||||
import org.apache.doris.catalog.IcebergProperty;
|
||||
import org.apache.doris.catalog.IcebergTable;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.external.iceberg.util.IcebergUtils;
|
||||
import org.apache.doris.load.BrokerFileGroup;
|
||||
import org.apache.doris.thrift.TBrokerFileStatus;
|
||||
import org.apache.doris.thrift.TExplainLevel;
|
||||
|
||||
import org.apache.iceberg.expressions.Expression;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.ListIterator;
|
||||
|
||||
public class IcebergScanNode extends BrokerScanNode {
|
||||
private static final Logger LOG = LogManager.getLogger(IcebergScanNode.class);
|
||||
|
||||
private IcebergTable icebergTable;
|
||||
private final List<Expression> icebergPredicates = new ArrayList<>();
|
||||
|
||||
public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName,
|
||||
List<List<TBrokerFileStatus>> fileStatusesList, int filesAdded) {
|
||||
super(id, desc, planNodeName, fileStatusesList, filesAdded);
|
||||
icebergTable = (IcebergTable) desc.getTable();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(Analyzer analyzer) throws UserException {
|
||||
super.init(analyzer);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void initFileGroup() throws UserException {
|
||||
fileGroups = Lists.newArrayList(new BrokerFileGroup(icebergTable));
|
||||
brokerDesc = new BrokerDesc("IcebergTableDesc", icebergTable.getStorageType(),
|
||||
icebergTable.getIcebergProperties());
|
||||
targetTable = icebergTable;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getHostUri() throws UserException {
|
||||
return icebergTable.getHostUri();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void getFileStatus() throws UserException {
|
||||
// extract iceberg conjuncts
|
||||
ListIterator<Expr> it = conjuncts.listIterator();
|
||||
while (it.hasNext()) {
|
||||
Expression expression = IcebergUtils.convertToIcebergExpr(it.next());
|
||||
if (expression != null) {
|
||||
icebergPredicates.add(expression);
|
||||
}
|
||||
}
|
||||
// get iceberg file status
|
||||
List<TBrokerFileStatus> fileStatuses;
|
||||
try {
|
||||
fileStatuses = icebergTable.getIcebergDataFiles(icebergPredicates);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("errors while load iceberg table {} data files.", icebergTable.getName(), e);
|
||||
throw new UserException("errors while load Iceberg table ["
|
||||
+ icebergTable.getName() + "] data files.");
|
||||
}
|
||||
fileStatusesList.add(fileStatuses);
|
||||
filesAdded += fileStatuses.size();
|
||||
for (TBrokerFileStatus fstatus : fileStatuses) {
|
||||
LOG.debug("Add file status is {}", fstatus);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
|
||||
StringBuilder output = new StringBuilder();
|
||||
if (!isLoad()) {
|
||||
output.append(prefix).append("TABLE: ").append(icebergTable.getName()).append("\n");
|
||||
output.append(prefix).append("PATH: ")
|
||||
.append(icebergTable.getIcebergProperties().get(IcebergProperty.ICEBERG_HIVE_METASTORE_URIS))
|
||||
.append("\n");
|
||||
}
|
||||
return output.toString();
|
||||
}
|
||||
}
|
||||
@ -1698,6 +1698,10 @@ public class SingleNodePlanner {
|
||||
scanNode = new HiveScanNode(ctx_.getNextNodeId(), tblRef.getDesc(), "HiveScanNode",
|
||||
null, -1);
|
||||
break;
|
||||
case ICEBERG:
|
||||
scanNode = new IcebergScanNode(ctx_.getNextNodeId(), tblRef.getDesc(), "IcebergScanNode",
|
||||
null, -1);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
@ -17,8 +17,6 @@
|
||||
|
||||
package org.apache.doris.qe;
|
||||
|
||||
import avro.shaded.com.google.common.collect.Maps;
|
||||
import avro.shaded.com.google.common.collect.Sets;
|
||||
import org.apache.doris.common.AuditLog;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.util.DigitalVersion;
|
||||
@ -34,6 +32,9 @@ import org.apache.doris.plugin.PluginMgr;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
@ -17,14 +17,11 @@
|
||||
|
||||
package org.apache.doris.catalog;
|
||||
|
||||
import avro.shaded.com.google.common.collect.Lists;
|
||||
import org.apache.commons.collections.ListUtils;
|
||||
import org.apache.doris.analysis.CreateDbStmt;
|
||||
import org.apache.doris.analysis.CreateTableLikeStmt;
|
||||
import org.apache.doris.analysis.CreateTableStmt;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.ExceptionChecker;
|
||||
import org.apache.doris.common.util.ListUtil;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.utframe.UtFrameUtils;
|
||||
|
||||
@ -33,12 +30,12 @@ import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
import avro.shaded.com.google.common.collect.Lists;
|
||||
|
||||
/**
|
||||
* @author wangcong
|
||||
* @version 1.0
|
||||
|
||||
@ -30,6 +30,8 @@ import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
@ -38,7 +40,6 @@ import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.util.UUID;
|
||||
|
||||
import avro.shaded.com.google.common.collect.Maps;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
||||
12
fe/pom.xml
12
fe/pom.xml
@ -120,7 +120,12 @@ under the License.
|
||||
<log4j2.version>2.17.1</log4j2.version>
|
||||
<revision>0.15-SNAPSHOT</revision>
|
||||
<project.scm.id>github</project.scm.id>
|
||||
<!-- ATTN: avro version must be consistent with Iceberg version -->
|
||||
<!-- Please modify iceberg.version and avro.version together,
|
||||
you can find avro version info in iceberg mvn repository -->
|
||||
<iceberg.version>0.12.0</iceberg.version>
|
||||
<avro.version>1.10.1</avro.version>
|
||||
<!-- ATTN: avro version must be consistent with Iceberg version -->
|
||||
</properties>
|
||||
<profiles>
|
||||
<!-- for custom internal repository -->
|
||||
@ -706,6 +711,13 @@ under the License.
|
||||
<version>${iceberg.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- For Iceberg, must be consistent with Iceberg version -->
|
||||
<dependency>
|
||||
<groupId>org.apache.avro</groupId>
|
||||
<artifactId>avro</artifactId>
|
||||
<version>${avro.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.parquet</groupId>
|
||||
<artifactId>parquet-column</artifactId>
|
||||
|
||||
@ -357,7 +357,9 @@ enum TTableType {
|
||||
KUDU_TABLE, // Deprecated
|
||||
BROKER_TABLE,
|
||||
ES_TABLE,
|
||||
ODBC_TABLE
|
||||
ODBC_TABLE,
|
||||
HIVE_TABLE,
|
||||
ICEBERG_TABLE
|
||||
}
|
||||
|
||||
enum TOdbcTableType {
|
||||
|
||||
Reference in New Issue
Block a user