diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 7d525d8118..e29f5535f9 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -43,6 +43,7 @@ #include "vec/exec/scan/new_jdbc_scan_node.h" #include "vec/exec/scan/new_odbc_scan_node.h" #include "vec/exec/scan/new_olap_scan_node.h" +#include "vec/exec/scan/vmeta_scan_node.h" #include "vec/exec/vaggregation_node.h" #include "vec/exec/vanalytic_eval_node.h" #include "vec/exec/vassert_num_rows_node.h" @@ -359,6 +360,7 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN case TPlanNodeType::DATA_GEN_SCAN_NODE: case TPlanNodeType::FILE_SCAN_NODE: case TPlanNodeType::JDBC_SCAN_NODE: + case TPlanNodeType::META_SCAN_NODE: break; default: { const auto& i = _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type); @@ -404,6 +406,10 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN *node = pool->add(new vectorized::VSchemaScanNode(pool, tnode, descs)); return Status::OK(); + case TPlanNodeType::META_SCAN_NODE: + *node = pool->add(new vectorized::VMetaScanNode(pool, tnode, descs)); + return Status::OK(); + case TPlanNodeType::OLAP_SCAN_NODE: *node = pool->add(new vectorized::NewOlapScanNode(pool, tnode, descs)); return Status::OK(); @@ -550,6 +556,7 @@ void ExecNode::collect_scan_nodes(vector* nodes) { collect_nodes(TPlanNodeType::ES_HTTP_SCAN_NODE, nodes); collect_nodes(TPlanNodeType::DATA_GEN_SCAN_NODE, nodes); collect_nodes(TPlanNodeType::FILE_SCAN_NODE, nodes); + collect_nodes(TPlanNodeType::META_SCAN_NODE, nodes); } void ExecNode::try_do_aggregate_serde_improve() { @@ -573,7 +580,8 @@ void ExecNode::try_do_aggregate_serde_improve() { typeid(*child0) == typeid(vectorized::NewFileScanNode) || typeid(*child0) == typeid(vectorized::NewOdbcScanNode) || typeid(*child0) == typeid(vectorized::NewEsScanNode) || - typeid(*child0) == typeid(vectorized::NewJdbcScanNode)) { + typeid(*child0) == typeid(vectorized::NewJdbcScanNode) || + typeid(*child0) == typeid(vectorized::VMetaScanNode)) { vectorized::VScanNode* scan_node = static_cast(agg_node[0]->_children[0]); scan_node->set_no_agg_finalize(); diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index fe82155e66..4ff1e99c32 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -46,6 +46,7 @@ #include "vec/exec/scan/new_jdbc_scan_node.h" #include "vec/exec/scan/new_odbc_scan_node.h" #include "vec/exec/scan/new_olap_scan_node.h" +#include "vec/exec/scan/vmeta_scan_node.h" #include "vec/exec/vexchange_node.h" #include "vec/runtime/vdata_stream_mgr.h" @@ -166,7 +167,8 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request, typeid(*node) == typeid(vectorized::NewFileScanNode) || typeid(*node) == typeid(vectorized::NewOdbcScanNode) || typeid(*node) == typeid(vectorized::NewEsScanNode) || - typeid(*node) == typeid(vectorized::NewJdbcScanNode)) { + typeid(*node) == typeid(vectorized::NewJdbcScanNode) || + typeid(*node) == typeid(vectorized::VMetaScanNode)) { vectorized::VScanNode* scan_node = static_cast(scan_nodes[i]); const std::vector& scan_ranges = find_with_default(params.per_node_scan_ranges, scan_node->id(), no_scan_ranges); diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index 012e9bdfad..db0d1ed6c2 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -282,6 +282,8 @@ set(VEC_FILES exec/scan/new_jdbc_scan_node.cpp exec/scan/new_es_scanner.cpp exec/scan/new_es_scan_node.cpp + exec/scan/vmeta_scan_node.cpp + exec/scan/vmeta_scanner.cpp exec/format/csv/csv_reader.cpp exec/format/orc/vorc_reader.cpp exec/format/json/new_json_reader.cpp diff --git a/be/src/vec/exec/scan/vmeta_scan_node.cpp b/be/src/vec/exec/scan/vmeta_scan_node.cpp new file mode 100644 index 0000000000..ae7585ecf4 --- /dev/null +++ b/be/src/vec/exec/scan/vmeta_scan_node.cpp @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vmeta_scan_node.h" + +#include "vmeta_scanner.h" + +namespace doris::vectorized { + +VMetaScanNode::VMetaScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) + : VScanNode(pool, tnode, descs), + _tuple_id(tnode.meta_scan_node.tuple_id), + _scan_params(tnode.meta_scan_node) { + _output_tuple_id = _tuple_id; +} + +Status VMetaScanNode::init(const TPlanNode& tnode, RuntimeState* state) { + RETURN_IF_ERROR(VScanNode::init(tnode, state)); + return Status::OK(); +} + +Status VMetaScanNode::prepare(RuntimeState* state) { + RETURN_IF_ERROR(VScanNode::prepare(state)); + return Status::OK(); +} + +void VMetaScanNode::set_scan_ranges(const std::vector& scan_ranges) { + _scan_ranges = scan_ranges; +} + +Status VMetaScanNode::_init_profile() { + RETURN_IF_ERROR(VScanNode::_init_profile()); + return Status::OK(); +} + +Status VMetaScanNode::_init_scanners(std::list* scanners) { + if (_eos == true) { + return Status::OK(); + } + for (auto& scan_range : _scan_ranges) { + VMetaScanner* scanner = + new VMetaScanner(_state, this, _tuple_id, scan_range, _limit_per_scanner); + RETURN_IF_ERROR(scanner->prepare(_state, _vconjunct_ctx_ptr.get())); + _scanner_pool.add(scanner); + scanners->push_back(static_cast(scanner)); + } + return Status::OK(); +} + +Status VMetaScanNode::_process_conjuncts() { + return Status::OK(); +} + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/vmeta_scan_node.h b/be/src/vec/exec/scan/vmeta_scan_node.h new file mode 100644 index 0000000000..93774f89d1 --- /dev/null +++ b/be/src/vec/exec/scan/vmeta_scan_node.h @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "runtime/runtime_state.h" +#include "vec/exec/scan/vscan_node.h" + +namespace doris::vectorized { + +class VMetaScanNode : public VScanNode { +public: + VMetaScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + ~VMetaScanNode() override = default; + + Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override; + Status prepare(RuntimeState* state) override; + void set_scan_ranges(const std::vector& scan_ranges) override; + const TMetaScanNode& scan_params() { return _scan_params; }; + +private: + Status _init_profile() override; + Status _init_scanners(std::list* scanners) override; + Status _process_conjuncts() override; + + TupleId _tuple_id; + TMetaScanNode _scan_params; + std::vector _scan_ranges; +}; + +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/exec/scan/vmeta_scanner.cpp b/be/src/vec/exec/scan/vmeta_scanner.cpp new file mode 100644 index 0000000000..c36d7877c0 --- /dev/null +++ b/be/src/vec/exec/scan/vmeta_scanner.cpp @@ -0,0 +1,209 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vmeta_scanner.h" + +#include +#include + +#include "gen_cpp/FrontendService.h" +#include "runtime/client_cache.h" +#include "util/thrift_rpc_helper.h" +#include "vec/runtime/vdatetime_value.h" + +namespace doris::vectorized { + +VMetaScanner::VMetaScanner(RuntimeState* state, VMetaScanNode* parent, int64_t tuple_id, + const TScanRangeParams& scan_range, int64_t limit) + : VScanner(state, static_cast(parent), limit), + _parent(parent), + _meta_eos(false), + _tuple_id(tuple_id), + _scan_range(scan_range.scan_range) {} + +Status VMetaScanner::open(RuntimeState* state) { + VLOG_CRITICAL << "VMetaScanner::open"; + RETURN_IF_ERROR(VScanner::open(state)); + return Status::OK(); +} + +Status VMetaScanner::prepare(RuntimeState* state, VExprContext** vconjunct_ctx_ptr) { + VLOG_CRITICAL << "VMetaScanner::prepare"; + if (vconjunct_ctx_ptr != nullptr) { + // Copy vconjunct_ctx_ptr from scan node to this scanner's _vconjunct_ctx. + RETURN_IF_ERROR((*vconjunct_ctx_ptr)->clone(_state, &_vconjunct_ctx)); + } + _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); + if (_scan_range.meta_scan_range.__isset.iceberg_params) { + RETURN_IF_ERROR(_fetch_iceberg_metadata_batch()); + } else { + _meta_eos = true; + } + return Status::OK(); +} + +Status VMetaScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) { + VLOG_CRITICAL << "VMetaScanner::_get_block_impl"; + if (nullptr == state || nullptr == block || nullptr == eof) { + return Status::InternalError("input is NULL pointer"); + } + if (_meta_eos == true) { + *eof = true; + return Status::OK(); + } + + auto column_size = _tuple_desc->slots().size(); + std::vector columns(column_size); + bool mem_reuse = block->mem_reuse(); + do { + RETURN_IF_CANCELLED(state); + + columns.resize(column_size); + for (auto i = 0; i < column_size; i++) { + if (mem_reuse) { + columns[i] = std::move(*block->get_by_position(i).column).mutate(); + } else { + columns[i] = _tuple_desc->slots()[i]->get_empty_mutable_column(); + } + } + // fill block + _fill_block_with_remote_data(columns); + if (_meta_eos == true) { + if (block->rows() == 0) { + *eof = true; + } + break; + } + // Before really use the Block, must clear other ptr of column in block + // So here need do std::move and clear in `columns` + if (!mem_reuse) { + int column_index = 0; + for (const auto slot_desc : _tuple_desc->slots()) { + block->insert(ColumnWithTypeAndName(std::move(columns[column_index++]), + slot_desc->get_data_type_ptr(), + slot_desc->col_name())); + } + } else { + columns.clear(); + } + VLOG_ROW << "VMetaScanNode output rows: " << block->rows(); + } while (block->rows() == 0 && !(*eof)); + return Status::OK(); +} + +Status VMetaScanner::_fill_block_with_remote_data(const std::vector& columns) { + VLOG_CRITICAL << "VMetaScanner::_fill_block_with_remote_data"; + for (int col_idx = 0; col_idx < columns.size(); col_idx++) { + auto slot_desc = _tuple_desc->slots()[col_idx]; + // because the fe planner filter the non_materialize column + if (!slot_desc->is_materialized()) { + continue; + } + + for (int _row_idx = 0; _row_idx < _batch_data.size(); _row_idx++) { + vectorized::IColumn* col_ptr = columns[col_idx].get(); + if (slot_desc->is_nullable() == true) { + auto* nullable_column = reinterpret_cast(col_ptr); + col_ptr = &nullable_column->get_nested_column(); + } + switch (slot_desc->type().type) { + case TYPE_INT: { + int64_t data = _batch_data[_row_idx].column_value[col_idx].intVal; + reinterpret_cast*>(col_ptr) + ->insert_value(data); + break; + } + case TYPE_BIGINT: { + int64_t data = _batch_data[_row_idx].column_value[col_idx].longVal; + reinterpret_cast*>(col_ptr) + ->insert_value(data); + break; + } + case TYPE_DATETIMEV2: { + uint64_t data = _batch_data[_row_idx].column_value[col_idx].longVal; + reinterpret_cast*>(col_ptr) + ->insert_value(data); + break; + } + case TYPE_STRING: + case TYPE_CHAR: + case TYPE_VARCHAR: { + std::string data = _batch_data[_row_idx].column_value[col_idx].stringVal; + reinterpret_cast(col_ptr)->insert_data(data.c_str(), + data.length()); + break; + } + default: { + std::string error_msg = + fmt::format("Invalid column type {} on column: {}.", + slot_desc->type().debug_string(), slot_desc->col_name()); + return Status::InternalError(std::string(error_msg)); + } + } + } + } + _meta_eos = true; + return Status::OK(); +} + +Status VMetaScanner::_fetch_iceberg_metadata_batch() { + VLOG_CRITICAL << "VMetaScanner::_fetch_iceberg_metadata_batch"; + TFetchSchemaTableDataRequest request; + request.cluster_name = ""; + request.__isset.cluster_name = true; + request.schema_table_name = TSchemaTableName::ICEBERG_TABLE_META; + request.__isset.schema_table_name = true; + auto scan_params = _parent->scan_params(); + TMetadataTableRequestParams meta_table_params = TMetadataTableRequestParams(); + meta_table_params.catalog = scan_params.catalog; + meta_table_params.__isset.catalog = true; + meta_table_params.database = scan_params.database; + meta_table_params.__isset.database = true; + meta_table_params.table = scan_params.table; + meta_table_params.__isset.table = true; + + meta_table_params.iceberg_metadata_params = _scan_range.meta_scan_range.iceberg_params; + meta_table_params.__isset.iceberg_metadata_params = true; + + request.metada_table_params = meta_table_params; + request.__isset.metada_table_params = true; + + TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address; + TFetchSchemaTableDataResult result; + + RETURN_IF_ERROR(ThriftRpcHelper::rpc( + master_addr.hostname, master_addr.port, + [&request, &result](FrontendServiceConnection& client) { + client->fetchSchemaTableData(result, request); + }, + config::txn_commit_rpc_timeout_ms)); + + Status status(result.status); + if (!status.ok()) { + LOG(WARNING) << "fetch schema table data from master failed, errmsg=" << status; + return status; + } + _batch_data = std::move(result.data_batch); + return Status::OK(); +} + +Status VMetaScanner::close(RuntimeState* state) { + VLOG_CRITICAL << "VMetaScanner::close"; + RETURN_IF_ERROR(VScanner::close(state)); + return Status::OK(); +} +} // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/vmeta_scanner.h b/be/src/vec/exec/scan/vmeta_scanner.h new file mode 100644 index 0000000000..7663ffe1c8 --- /dev/null +++ b/be/src/vec/exec/scan/vmeta_scanner.h @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "runtime/runtime_state.h" +#include "vec/exec/scan/vscanner.h" +#include "vmeta_scan_node.h" + +namespace doris::vectorized { + +class VMetaScanner : public VScanner { +public: + VMetaScanner(RuntimeState* state, VMetaScanNode* parent, int64_t tuple_id, + const TScanRangeParams& scan_range, int64_t limit); + + Status open(RuntimeState* state) override; + Status close(RuntimeState* state) override; + Status prepare(RuntimeState* state, VExprContext** vconjunct_ctx_ptr); + +protected: + Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) override; + Status _fill_block_with_remote_data(const std::vector& columns); + Status _fetch_iceberg_metadata_batch(); + +private: + VMetaScanNode* _parent; + bool _meta_eos; + TupleId _tuple_id; + const TupleDescriptor* _tuple_desc; + std::vector _batch_data; + const TScanRange& _scan_range; +}; +} // namespace doris::vectorized diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/MetadataScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MetadataScanNode.java new file mode 100644 index 0000000000..431a456646 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MetadataScanNode.java @@ -0,0 +1,114 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner.external; + +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.common.UserException; +import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.planner.ScanNode; +import org.apache.doris.statistics.StatisticalType; +import org.apache.doris.system.Backend; +import org.apache.doris.tablefunction.IcebergTableValuedFunction; +import org.apache.doris.tablefunction.MetadataTableValuedFunction; +import org.apache.doris.thrift.TIcebergMetadataParams; +import org.apache.doris.thrift.TIcebergMetadataType; +import org.apache.doris.thrift.TMetaScanNode; +import org.apache.doris.thrift.TMetaScanRange; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TPlanNode; +import org.apache.doris.thrift.TPlanNodeType; +import org.apache.doris.thrift.TScanRange; +import org.apache.doris.thrift.TScanRangeLocation; +import org.apache.doris.thrift.TScanRangeLocations; + +import com.google.common.collect.Lists; + +import java.util.List; + +public class MetadataScanNode extends ScanNode { + + private MetadataTableValuedFunction tvf; + + private List scanRangeLocations = Lists.newArrayList(); + + private final BackendPolicy backendPolicy = new BackendPolicy(); + + public MetadataScanNode(PlanNodeId id, TupleDescriptor desc, MetadataTableValuedFunction tvf) { + super(id, desc, "METADATA_SCAN_NODE", StatisticalType.METADATA_SCAN_NODE); + this.tvf = tvf; + } + + @Override + public void init(Analyzer analyzer) throws UserException { + super.init(analyzer); + backendPolicy.init(); + } + + @Override + protected void toThrift(TPlanNode planNode) { + planNode.setNodeType(TPlanNodeType.META_SCAN_NODE); + TMetaScanNode metaScanNode = new TMetaScanNode(); + metaScanNode.setCatalog(tvf.getMetadataTableName().getCtl()); + metaScanNode.setDatabase(tvf.getMetadataTableName().getDb()); + metaScanNode.setTable(tvf.getMetadataTableName().getTbl()); + metaScanNode.setTupleId(desc.getId().asInt()); + planNode.setMetaScanNode(metaScanNode); + } + + @Override + public List getScanRangeLocations(long maxScanRangeLength) { + return scanRangeLocations; + } + + @Override + public void finalize(Analyzer analyzer) throws UserException { + buildScanRanges(); + } + + private void buildScanRanges() { + if (tvf.getMetaType() == MetadataTableValuedFunction.MetaType.ICEBERG) { + IcebergTableValuedFunction icebergTvf = (IcebergTableValuedFunction) tvf; + // todo: split + TScanRangeLocations locations = createIcebergTvfLocations(icebergTvf); + scanRangeLocations.add(locations); + } + } + + private TScanRangeLocations createIcebergTvfLocations(IcebergTableValuedFunction icebergTvf) { + TScanRange scanRange = new TScanRange(); + TMetaScanRange metaScanRange = new TMetaScanRange(); + // set iceberg metadata params + TIcebergMetadataParams icebergMetadataParams = new TIcebergMetadataParams(); + int metadataType = icebergTvf.getMetaQueryType().ordinal(); + icebergMetadataParams.setMetadataType(TIcebergMetadataType.findByValue(metadataType)); + + metaScanRange.setIcebergParams(icebergMetadataParams); + scanRange.setMetaScanRange(metaScanRange); + // set location + TScanRangeLocation location = new TScanRangeLocation(); + Backend backend = backendPolicy.getNextBe(); + location.setBackendId(backend.getId()); + location.setServer(new TNetworkAddress(backend.getHost(), backend.getBePort())); + + TScanRangeLocations result = new TScanRangeLocations(); + result.addToLocations(location); + result.setScanRange(scanRange); + return result; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index ddfd45cf99..40b0dc9b8e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -24,6 +24,7 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.HMSResource; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.S3Resource; import org.apache.doris.catalog.Table; @@ -47,6 +48,7 @@ import org.apache.doris.common.Version; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.datasource.HMSExternalCatalog; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.master.MasterImpl; import org.apache.doris.mysql.privilege.PrivPredicate; @@ -85,6 +87,7 @@ import org.apache.doris.thrift.TGetStoragePolicy; import org.apache.doris.thrift.TGetStoragePolicyResult; import org.apache.doris.thrift.TGetTablesParams; import org.apache.doris.thrift.TGetTablesResult; +import org.apache.doris.thrift.TIcebergMetadataType; import org.apache.doris.thrift.TInitExternalCtlMetaRequest; import org.apache.doris.thrift.TInitExternalCtlMetaResult; import org.apache.doris.thrift.TListPrivilegesResult; @@ -100,6 +103,7 @@ import org.apache.doris.thrift.TLoadTxnRollbackResult; import org.apache.doris.thrift.TMasterOpRequest; import org.apache.doris.thrift.TMasterOpResult; import org.apache.doris.thrift.TMasterResult; +import org.apache.doris.thrift.TMetadataTableRequestParams; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPrivilegeStatus; import org.apache.doris.thrift.TReportExecStatusParams; @@ -131,11 +135,19 @@ import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.gson.Gson; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; +import org.jetbrains.annotations.NotNull; +import java.time.Instant; +import java.time.LocalDateTime; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -997,6 +1009,8 @@ public class FrontendServiceImpl implements FrontendService.Iface { switch (request.getSchemaTableName()) { case BACKENDS: return getBackendsSchemaTable(request); + case ICEBERG_TABLE_META: + return getIcebergMetadataTable(request); default: break; } @@ -1005,6 +1019,85 @@ public class FrontendServiceImpl implements FrontendService.Iface { return result; } + private TFetchSchemaTableDataResult getIcebergMetadataTable(TFetchSchemaTableDataRequest request) { + if (!request.isSetMetadaTableParams()) { + return errorResult("Metadata table params is not set. "); + } + TMetadataTableRequestParams params = request.getMetadaTableParams(); + if (!params.isSetIcebergMetadataParams()) { + return errorResult("Iceberg metadata params is not set. "); + } + + HMSExternalCatalog catalog = (HMSExternalCatalog) Env.getCurrentEnv().getCatalogMgr() + .getCatalog(params.getCatalog()); + org.apache.iceberg.Table table; + try { + table = getIcebergTable(catalog, params.getDatabase(), params.getTable()); + } catch (MetaNotFoundException e) { + return errorResult(e.getMessage()); + } + TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult(); + List dataBatch = Lists.newArrayList(); + TIcebergMetadataType metadataType = params.getIcebergMetadataParams().getMetadataType(); + switch (metadataType) { + case SNAPSHOTS: + for (Snapshot snapshot : table.snapshots()) { + TRow trow = new TRow(); + LocalDateTime committedAt = LocalDateTime.ofInstant(Instant.ofEpochMilli( + snapshot.timestampMillis()), TimeUtils.getTimeZone().toZoneId()); + long encodedDatetime = convertToDateTimeV2(committedAt.getYear(), committedAt.getMonthValue(), + committedAt.getDayOfMonth(), committedAt.getHour(), + committedAt.getMinute(), committedAt.getSecond()); + + trow.addToColumnValue(new TCell().setLongVal(encodedDatetime)); + trow.addToColumnValue(new TCell().setLongVal(snapshot.snapshotId())); + if (snapshot.parentId() == null) { + trow.addToColumnValue(new TCell().setLongVal(-1L)); + } else { + trow.addToColumnValue(new TCell().setLongVal(snapshot.parentId())); + } + trow.addToColumnValue(new TCell().setStringVal(snapshot.operation())); + trow.addToColumnValue(new TCell().setStringVal(snapshot.manifestListLocation())); + dataBatch.add(trow); + } + break; + default: + return errorResult("Unsupported metadata inspect type: " + metadataType); + } + result.setDataBatch(dataBatch); + result.setStatus(new TStatus(TStatusCode.OK)); + return result; + } + + public static long convertToDateTimeV2(int year, int month, int day, int hour, int minute, int second) { + return (long) second << 20 | (long) minute << 26 | (long) hour << 32 + | (long) day << 37 | (long) month << 42 | (long) year << 46; + } + + @NotNull + private TFetchSchemaTableDataResult errorResult(String msg) { + TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult(); + result.setStatus(new TStatus(TStatusCode.INTERNAL_ERROR)); + result.status.addToErrorMsgs(msg); + return result; + } + + private org.apache.iceberg.Table getIcebergTable(HMSExternalCatalog catalog, String db, String tbl) + throws MetaNotFoundException { + org.apache.iceberg.hive.HiveCatalog hiveCatalog = new org.apache.iceberg.hive.HiveCatalog(); + Configuration conf = new HdfsConfiguration(); + Map properties = catalog.getCatalogProperty().getHadoopProperties(); + for (Map.Entry entry : properties.entrySet()) { + conf.set(entry.getKey(), entry.getValue()); + } + hiveCatalog.setConf(conf); + Map catalogProperties = new HashMap<>(); + catalogProperties.put(HMSResource.HIVE_METASTORE_URIS, catalog.getHiveMetastoreUris()); + catalogProperties.put("uri", catalog.getHiveMetastoreUris()); + hiveCatalog.initialize("hive", catalogProperties); + return hiveCatalog.loadTable(TableIdentifier.of(db, tbl)); + } + private TFetchSchemaTableDataResult getBackendsSchemaTable(TFetchSchemaTableDataRequest request) { final SystemInfoService clusterInfoService = Env.getCurrentSystemInfo(); TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java index ebdfd0471e..47ad4a6161 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java @@ -46,5 +46,6 @@ public enum StatisticalType { UNION_NODE, TABLE_VALUED_FUNCTION_NODE, FILE_SCAN_NODE, + METADATA_SCAN_NODE, JDBC_SCAN_NODE, } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java new file mode 100644 index 0000000000..4e5aa7dff0 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java @@ -0,0 +1,123 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tablefunction; + +import org.apache.doris.analysis.TableName; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.qe.ConnectContext; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * The Implement of table valued function + * iceberg_meta("table" = "ctl.db.tbl", "query_type" = "snapshots"). + */ +public class IcebergTableValuedFunction extends MetadataTableValuedFunction { + + public enum MetadataType { SNAPSHOTS } + + public static final String NAME = "iceberg_meta"; + private static final String TABLE = "table"; + private static final String QUERY_TYPE = "query_type"; + + private static final ImmutableSet PROPERTIES_SET = new ImmutableSet.Builder() + .add(TABLE) + .add(QUERY_TYPE) + .build(); + + private final MetadataType queryType; + private final TableName tableName; + + public IcebergTableValuedFunction(Map params) throws AnalysisException { + super(MetaType.ICEBERG); + Map validParams = Maps.newHashMap(); + for (String key : params.keySet()) { + if (!PROPERTIES_SET.contains(key.toLowerCase())) { + throw new AnalysisException("'" + key + "' is invalid property"); + } + // check ctl db tbl + validParams.put(key.toLowerCase(), params.get(key)); + } + String tableName = validParams.get(TABLE); + String queryType = validParams.get(QUERY_TYPE); + if (tableName == null || queryType == null) { + throw new AnalysisException("Invalid iceberg metadata query"); + } + String[] names = tableName.split("\\."); + if (names.length != 3) { + throw new AnalysisException("The iceberg table name contains the catalogName, databaseName, and tableName"); + } + this.tableName = new TableName(names[0], names[1], names[2]); + // check auth + if (!Env.getCurrentEnv().getAuth().checkTblPriv(ConnectContext.get(), this.tableName, PrivPredicate.SELECT)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "SELECT", + ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(), + this.tableName.getDb() + ": " + this.tableName.getTbl()); + } + try { + this.queryType = MetadataType.valueOf(queryType.toUpperCase()); + } catch (IllegalArgumentException e) { + throw new AnalysisException("Unsupported iceberg metadata query type: " + queryType); + } + } + + @Override + public String getTableName() { + return "IcebergMetadataTableValuedFunction"; + } + + public TableName getMetadataTableName() { + return tableName; + } + + public MetadataType getMetaQueryType() { + return queryType; + } + + /** + * The tvf can register columns of metadata table + * The data is provided by getIcebergMetadataTable in FrontendService + * @see org.apache.doris.service.FrontendServiceImpl + * @return metadata columns + */ + @Override + public List getTableColumns() throws AnalysisException { + List resColumns = new ArrayList<>(); + if (queryType == MetadataType.SNAPSHOTS) { + resColumns.add(new Column("committed_at", PrimitiveType.DATETIMEV2, false)); + resColumns.add(new Column("snapshot_id", PrimitiveType.BIGINT, false)); + resColumns.add(new Column("parent_id", PrimitiveType.BIGINT, false)); + resColumns.add(new Column("operation", PrimitiveType.STRING, false)); + // todo: compress manifest_list string + resColumns.add(new Column("manifest_list", PrimitiveType.STRING, false)); + // resColumns.add(new Column("summary", PrimitiveType.MAP, false)); + } + return resColumns; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java new file mode 100644 index 0000000000..fd83c59957 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tablefunction; + +import org.apache.doris.analysis.TableName; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.planner.ScanNode; +import org.apache.doris.planner.external.MetadataScanNode; + +public abstract class MetadataTableValuedFunction extends TableValuedFunctionIf { + + public enum MetaType { ICEBERG } + + private final MetaType metaType; + + public MetadataTableValuedFunction(MetaType metaType) { + this.metaType = metaType; + } + + public MetaType getMetaType() { + return metaType; + } + + public abstract TableName getMetadataTableName(); + + @Override + public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) { + return new MetadataScanNode(id, desc, this); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java index 862b986e97..fdd9e106fe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java @@ -49,6 +49,8 @@ public abstract class TableValuedFunctionIf { return new S3TableValuedFunction(params); case HdfsTableValuedFunction.NAME: return new HdfsTableValuedFunction(params); + case IcebergTableValuedFunction.NAME: + return new IcebergTableValuedFunction(params); default: throw new AnalysisException("Could not find table function " + funcName); } diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 4c7d062727..32c73a5663 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -694,11 +694,20 @@ struct TInitExternalCtlMetaResult { enum TSchemaTableName{ BACKENDS = 0, + ICEBERG_TABLE_META = 1, +} + +struct TMetadataTableRequestParams { + 1: optional PlanNodes.TIcebergMetadataParams iceberg_metadata_params + 2: optional string catalog + 3: optional string database + 4: optional string table } struct TFetchSchemaTableDataRequest { 1: optional string cluster_name 2: optional TSchemaTableName schema_table_name + 3: optional TMetadataTableRequestParams metada_table_params } struct TFetchSchemaTableDataResult { diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index e96752884d..5b72633c39 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -370,6 +370,18 @@ struct TDataGenScanRange { 1: optional TTVFNumbersScanRange numbers_params } +enum TIcebergMetadataType { + SNAPSHOTS = 0, +} + +struct TIcebergMetadataParams { + 1: optional TIcebergMetadataType metadata_type +} + +struct TMetaScanRange { + 1: optional TIcebergMetadataParams iceberg_params +} + // Specification of an individual data range which is held in its entirety // by a storage server struct TScanRange { @@ -380,6 +392,7 @@ struct TScanRange { 7: optional TEsScanRange es_scan_range 8: optional TExternalScanRange ext_scan_range 9: optional TDataGenScanRange data_gen_scan_range + 10: optional TMetaScanRange meta_scan_range } struct TMySQLScanNode { @@ -513,10 +526,9 @@ struct TSchemaScanNode { struct TMetaScanNode { 1: required Types.TTupleId tuple_id - 2: required string table_name - 3: optional string db + 2: optional string catalog + 3: optional string database 4: optional string table - 5: optional string user } struct TSortInfo {