[feature-wip](multi-catalog) add iceberg tvf to read snapshots (#15618)

Support new table value function `iceberg_meta("table" = "ctl.db.tbl", "query_type" = "snapshots")`
we can use the sql `select * from iceberg_meta("table" = "ctl.db.tbl", "query_type" = "snapshots")` to get snapshots info  of a table. The other iceberg metadata will be supported later when needed.

One of the usage:

Before we use following sql to time travel:
`select * from ice_table FOR TIME AS OF "2022-10-10 11:11:11"`;
`select * from ice_table FOR VERSION AS OF "snapshot_id"`;
we can use the snapshots metadata to get the `committed time` or `snapshot_id`, 
and then, we can use it as the time or version in time travel clause
This commit is contained in:
slothever
2023-01-10 22:37:35 +08:00
committed by GitHub
parent 542542a4b2
commit 90a92f0643
15 changed files with 787 additions and 5 deletions

View File

@ -43,6 +43,7 @@
#include "vec/exec/scan/new_jdbc_scan_node.h"
#include "vec/exec/scan/new_odbc_scan_node.h"
#include "vec/exec/scan/new_olap_scan_node.h"
#include "vec/exec/scan/vmeta_scan_node.h"
#include "vec/exec/vaggregation_node.h"
#include "vec/exec/vanalytic_eval_node.h"
#include "vec/exec/vassert_num_rows_node.h"
@ -359,6 +360,7 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
case TPlanNodeType::DATA_GEN_SCAN_NODE:
case TPlanNodeType::FILE_SCAN_NODE:
case TPlanNodeType::JDBC_SCAN_NODE:
case TPlanNodeType::META_SCAN_NODE:
break;
default: {
const auto& i = _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type);
@ -404,6 +406,10 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
*node = pool->add(new vectorized::VSchemaScanNode(pool, tnode, descs));
return Status::OK();
case TPlanNodeType::META_SCAN_NODE:
*node = pool->add(new vectorized::VMetaScanNode(pool, tnode, descs));
return Status::OK();
case TPlanNodeType::OLAP_SCAN_NODE:
*node = pool->add(new vectorized::NewOlapScanNode(pool, tnode, descs));
return Status::OK();
@ -550,6 +556,7 @@ void ExecNode::collect_scan_nodes(vector<ExecNode*>* nodes) {
collect_nodes(TPlanNodeType::ES_HTTP_SCAN_NODE, nodes);
collect_nodes(TPlanNodeType::DATA_GEN_SCAN_NODE, nodes);
collect_nodes(TPlanNodeType::FILE_SCAN_NODE, nodes);
collect_nodes(TPlanNodeType::META_SCAN_NODE, nodes);
}
void ExecNode::try_do_aggregate_serde_improve() {
@ -573,7 +580,8 @@ void ExecNode::try_do_aggregate_serde_improve() {
typeid(*child0) == typeid(vectorized::NewFileScanNode) ||
typeid(*child0) == typeid(vectorized::NewOdbcScanNode) ||
typeid(*child0) == typeid(vectorized::NewEsScanNode) ||
typeid(*child0) == typeid(vectorized::NewJdbcScanNode)) {
typeid(*child0) == typeid(vectorized::NewJdbcScanNode) ||
typeid(*child0) == typeid(vectorized::VMetaScanNode)) {
vectorized::VScanNode* scan_node =
static_cast<vectorized::VScanNode*>(agg_node[0]->_children[0]);
scan_node->set_no_agg_finalize();