[pipelineX](node)support file scan operator (#24924)

This commit is contained in:
Mryange
2023-09-27 22:10:43 +08:00
committed by GitHub
parent 68087f6c82
commit 430634367a
9 changed files with 247 additions and 19 deletions

View File

@ -0,0 +1,53 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "pipeline/exec/file_scan_operator.h"
#include <fmt/format.h>
#include <memory>
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
#include "pipeline/exec/olap_scan_operator.h"
#include "pipeline/exec/scan_operator.h"
#include "vec/exec/format/format_common.h"
#include "vec/exec/scan/vfile_scanner.h"
namespace doris::pipeline {
Status FileScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* scanners) {
if (Base::_eos_dependency->read_blocked_by() == nullptr) {
return Status::OK();
}
auto& p = _parent->cast<FileScanOperatorX>();
size_t shard_num =
std::min<size_t>(config::doris_scanner_thread_pool_thread_num, _scan_ranges.size());
_kv_cache.reset(new vectorized::ShardedKVCache(shard_num));
for (auto& scan_range : _scan_ranges) {
std::unique_ptr<vectorized::VFileScanner> scanner = vectorized::VFileScanner::create_unique(
state(), this, p._limit_per_scanner,
scan_range.scan_range.ext_scan_range.file_scan_range, _scanner_profile.get(),
_kv_cache.get());
RETURN_IF_ERROR(
scanner->prepare(_conjuncts, &_colname_to_value_range, &_colname_to_slot_id));
scanners->push_back(std::move(scanner));
}
return Status::OK();
}
} // namespace doris::pipeline

View File

@ -0,0 +1,78 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <stdint.h>
#include <string>
#include "common/logging.h"
#include "common/status.h"
#include "operator.h"
#include "pipeline/exec/scan_operator.h"
#include "pipeline/pipeline_x/operator.h"
#include "vec/exec/format/format_common.h"
#include "vec/exec/scan/vscan_node.h"
namespace doris {
class ExecNode;
namespace vectorized {
class VFileScanner;
} // namespace vectorized
} // namespace doris
namespace doris::pipeline {
class FileScanOperatorX;
class FileScanLocalState final : public ScanLocalState<FileScanLocalState> {
public:
using Parent = FileScanOperatorX;
using Base = ScanLocalState<FileScanLocalState>;
ENABLE_FACTORY_CREATOR(FileScanLocalState);
FileScanLocalState(RuntimeState* state, OperatorXBase* parent)
: ScanLocalState<FileScanLocalState>(state, parent) {}
Status _init_scanners(std::list<vectorized::VScannerSPtr>* scanners) override;
void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override {
_scan_ranges = scan_ranges;
}
int parent_id() { return _parent->id(); }
private:
std::vector<TScanRangeParams> _scan_ranges;
// A in memory cache to save some common components
// of the this scan node. eg:
// 1. iceberg delete file
// 2. parquet file meta
// KVCache<std::string> _kv_cache;
std::unique_ptr<vectorized::ShardedKVCache> _kv_cache;
};
class FileScanOperatorX final : public ScanOperatorX<FileScanLocalState> {
public:
FileScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
: ScanOperatorX<FileScanLocalState>(pool, tnode, descs) {
_output_tuple_id = tnode.file_scan_node.tuple_id;
_id = tnode.node_id;
}
private:
friend class FileScanLocalState;
};
} // namespace doris::pipeline

View File

@ -19,9 +19,11 @@
#include <fmt/format.h>
#include <cstdint>
#include <memory>
#include "pipeline/exec/es_scan_operator.h"
#include "pipeline/exec/file_scan_operator.h"
#include "pipeline/exec/jdbc_scan_operator.h"
#include "pipeline/exec/meta_scan_operator.h"
#include "pipeline/exec/olap_scan_operator.h"
@ -1204,6 +1206,11 @@ TPushAggOp::type ScanLocalState<Derived>::get_push_down_agg_type() {
return _parent->cast<typename Derived::Parent>()._push_down_agg_type;
}
template <typename Derived>
int64_t ScanLocalState<Derived>::get_push_down_count() {
return _parent->cast<typename Derived::Parent>()._push_down_count;
}
template <typename Derived>
int64_t ScanLocalState<Derived>::limit_per_scanner() {
return _parent->cast<typename Derived::Parent>()._limit_per_scanner;
@ -1271,6 +1278,9 @@ ScanOperatorX<LocalStateType>::ScanOperatorX(ObjectPool* pool, const TPlanNode&
_should_run_serial = true;
}
}
if (tnode.__isset.push_down_count) {
_push_down_count = tnode.push_down_count;
}
}
template <typename LocalStateType>
@ -1436,6 +1446,8 @@ template class ScanOperatorX<OlapScanLocalState>;
template class ScanLocalState<OlapScanLocalState>;
template class ScanOperatorX<JDBCScanLocalState>;
template class ScanLocalState<JDBCScanLocalState>;
template class ScanOperatorX<FileScanLocalState>;
template class ScanLocalState<FileScanLocalState>;
template class ScanOperatorX<EsScanLocalState>;
template class ScanLocalState<EsScanLocalState>;
template class ScanLocalState<MetaScanLocalState>;

View File

@ -19,6 +19,7 @@
#include <stdint.h>
#include <cstdint>
#include <string>
#include "common/status.h"
@ -139,6 +140,8 @@ public:
virtual TPushAggOp::type get_push_down_agg_type() = 0;
virtual int64_t get_push_down_count() = 0;
[[nodiscard]] std::string get_name() { return _parent->get_name(); }
protected:
@ -220,6 +223,8 @@ class ScanLocalState : public ScanLocalStateBase {
TPushAggOp::type get_push_down_agg_type() override;
int64_t get_push_down_count() override;
protected:
template <typename LocalStateType>
friend class ScanOperatorX;
@ -427,6 +432,7 @@ public:
TPushAggOp::type get_push_down_agg_type() { return _push_down_agg_type; }
int64_t get_push_down_count() const { return _push_down_count; }
using OperatorX<LocalStateType>::id;
protected:
@ -464,6 +470,9 @@ protected:
std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
TPushAggOp::type _push_down_agg_type;
// Record the value of the aggregate function 'count' from doris's be
int64_t _push_down_count = -1;
};
} // namespace doris::pipeline

View File

@ -31,6 +31,7 @@
#include "pipeline/exec/es_scan_operator.h"
#include "pipeline/exec/exchange_sink_operator.h"
#include "pipeline/exec/exchange_source_operator.h"
#include "pipeline/exec/file_scan_operator.h"
#include "pipeline/exec/hashjoin_build_sink.h"
#include "pipeline/exec/hashjoin_probe_operator.h"
#include "pipeline/exec/jdbc_scan_operator.h"
@ -552,6 +553,7 @@ DECLARE_OPERATOR_X(PartitionSortSinkLocalState)
DECLARE_OPERATOR_X(HashJoinProbeLocalState)
DECLARE_OPERATOR_X(OlapScanLocalState)
DECLARE_OPERATOR_X(JDBCScanLocalState)
DECLARE_OPERATOR_X(FileScanLocalState)
DECLARE_OPERATOR_X(EsScanLocalState)
DECLARE_OPERATOR_X(AnalyticLocalState)
DECLARE_OPERATOR_X(SortLocalState)

View File

@ -54,6 +54,7 @@
#include "pipeline/exec/es_scan_operator.h"
#include "pipeline/exec/exchange_sink_operator.h"
#include "pipeline/exec/exchange_source_operator.h"
#include "pipeline/exec/file_scan_operator.h"
#include "pipeline/exec/hashjoin_build_sink.h"
#include "pipeline/exec/hashjoin_probe_operator.h"
#include "pipeline/exec/jdbc_scan_operator.h"
@ -563,6 +564,11 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
RETURN_IF_ERROR(cur_pipe->add_operator(op));
break;
}
case doris::TPlanNodeType::FILE_SCAN_NODE: {
op.reset(new FileScanOperatorX(pool, tnode, descs));
RETURN_IF_ERROR(cur_pipe->add_operator(op));
break;
}
case TPlanNodeType::ES_SCAN_NODE:
case TPlanNodeType::ES_HTTP_SCAN_NODE: {
op.reset(new EsScanOperatorX(pool, tnode, descs));

View File

@ -78,6 +78,7 @@ Status PipelineXTask::prepare(RuntimeState* state, const TPipelineInstanceParams
std::vector<TScanRangeParams> no_scan_ranges;
auto scan_ranges = find_with_default(local_params.per_node_scan_ranges,
_operators.front()->id(), no_scan_ranges);
for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) {
auto& deps = get_upstream_dependency(_operators[op_idx]->id());
std::vector<LocalStateInfo> infos;

View File

@ -116,6 +116,35 @@ VFileScanner::VFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t
_is_load = (_input_tuple_desc != nullptr);
}
VFileScanner::VFileScanner(RuntimeState* state, pipeline::FileScanLocalState* local_state,
int64_t limit, const TFileScanRange& scan_range, RuntimeProfile* profile,
ShardedKVCache* kv_cache)
: VScanner(state, local_state, limit, profile),
_ranges(scan_range.ranges),
_next_range(0),
_cur_reader(nullptr),
_cur_reader_eof(false),
_kv_cache(kv_cache),
_strict_mode(false) {
if (scan_range.params.__isset.strict_mode) {
_strict_mode = scan_range.params.strict_mode;
}
if (state->get_query_ctx() != nullptr &&
state->get_query_ctx()->file_scan_range_params_map.count(local_state->parent_id()) > 0) {
_params = &(state->get_query_ctx()->file_scan_range_params_map[local_state->parent_id()]);
} else {
CHECK(scan_range.__isset.params);
_params = &(scan_range.params);
}
// For load scanner, there are input and output tuple.
// For query scanner, there is only output tuple
_input_tuple_desc = state->desc_tbl().get_tuple_descriptor(_params->src_tuple_id);
_real_tuple_desc = _input_tuple_desc == nullptr ? _output_tuple_desc : _input_tuple_desc;
_is_load = (_input_tuple_desc != nullptr);
}
Status VFileScanner::prepare(
const VExprContextSPtrs& conjuncts,
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
@ -123,20 +152,37 @@ Status VFileScanner::prepare(
RETURN_IF_ERROR(VScanner::prepare(_state, conjuncts));
_colname_to_value_range = colname_to_value_range;
_col_name_to_slot_id = colname_to_slot_id;
_get_block_timer = ADD_TIMER(_parent->_scanner_profile, "FileScannerGetBlockTime");
_open_reader_timer = ADD_TIMER(_parent->_scanner_profile, "FileScannerOpenReaderTime");
_cast_to_input_block_timer =
ADD_TIMER(_parent->_scanner_profile, "FileScannerCastInputBlockTime");
_fill_path_columns_timer =
ADD_TIMER(_parent->_scanner_profile, "FileScannerFillPathColumnTime");
_fill_missing_columns_timer =
ADD_TIMER(_parent->_scanner_profile, "FileScannerFillMissingColumnTime");
_pre_filter_timer = ADD_TIMER(_parent->_scanner_profile, "FileScannerPreFilterTimer");
_convert_to_output_block_timer =
ADD_TIMER(_parent->_scanner_profile, "FileScannerConvertOuputBlockTime");
_empty_file_counter = ADD_COUNTER(_parent->_scanner_profile, "EmptyFileNum", TUnit::UNIT);
_file_counter = ADD_COUNTER(_parent->_scanner_profile, "FileNumber", TUnit::UNIT);
if (get_parent() != nullptr) {
_get_block_timer = ADD_TIMER(_parent->_scanner_profile, "FileScannerGetBlockTime");
_open_reader_timer = ADD_TIMER(_parent->_scanner_profile, "FileScannerOpenReaderTime");
_cast_to_input_block_timer =
ADD_TIMER(_parent->_scanner_profile, "FileScannerCastInputBlockTime");
_fill_path_columns_timer =
ADD_TIMER(_parent->_scanner_profile, "FileScannerFillPathColumnTime");
_fill_missing_columns_timer =
ADD_TIMER(_parent->_scanner_profile, "FileScannerFillMissingColumnTime");
_pre_filter_timer = ADD_TIMER(_parent->_scanner_profile, "FileScannerPreFilterTimer");
_convert_to_output_block_timer =
ADD_TIMER(_parent->_scanner_profile, "FileScannerConvertOuputBlockTime");
_empty_file_counter = ADD_COUNTER(_parent->_scanner_profile, "EmptyFileNum", TUnit::UNIT);
_file_counter = ADD_COUNTER(_parent->_scanner_profile, "FileNumber", TUnit::UNIT);
} else {
_get_block_timer = ADD_TIMER(_local_state->scanner_profile(), "FileScannerGetBlockTime");
_open_reader_timer =
ADD_TIMER(_local_state->scanner_profile(), "FileScannerOpenReaderTime");
_cast_to_input_block_timer =
ADD_TIMER(_local_state->scanner_profile(), "FileScannerCastInputBlockTime");
_fill_path_columns_timer =
ADD_TIMER(_local_state->scanner_profile(), "FileScannerFillPathColumnTime");
_fill_missing_columns_timer =
ADD_TIMER(_local_state->scanner_profile(), "FileScannerFillMissingColumnTime");
_pre_filter_timer = ADD_TIMER(_local_state->scanner_profile(), "FileScannerPreFilterTimer");
_convert_to_output_block_timer =
ADD_TIMER(_local_state->scanner_profile(), "FileScannerConvertOuputBlockTime");
_empty_file_counter =
ADD_COUNTER(_local_state->scanner_profile(), "EmptyFileNum", TUnit::UNIT);
_file_counter = ADD_COUNTER(_local_state->scanner_profile(), "FileNumber", TUnit::UNIT);
}
_file_cache_statistics.reset(new io::FileCacheStatistics());
_io_ctx.reset(new io::IOContext());
@ -268,7 +314,7 @@ Status VFileScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eo
if (read_rows > 0) {
// If the push_down_agg_type is COUNT, no need to do the rest,
// because we only save a number in block.
if (_parent->get_push_down_agg_type() != TPushAggOp::type::COUNT) {
if (_get_push_down_agg_type() != TPushAggOp::type::COUNT) {
// Convert the src block columns type to string in-place.
RETURN_IF_ERROR(_cast_to_input_block(block));
// FileReader can fill partition and missing columns itself
@ -705,9 +751,9 @@ Status VFileScanner::_get_next_reader() {
if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "iceberg") {
std::unique_ptr<IcebergTableReader> iceberg_reader =
IcebergTableReader::create_unique(
std::move(parquet_reader), _profile, _state, *_params, range,
_kv_cache, _io_ctx.get(), _parent->get_push_down_count());
IcebergTableReader::create_unique(std::move(parquet_reader), _profile,
_state, *_params, range, _kv_cache,
_io_ctx.get(), _get_push_down_count());
init_status = iceberg_reader->init_reader(
_file_col_names, _col_id_name_map, _colname_to_value_range,
_push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(),
@ -815,7 +861,7 @@ Status VFileScanner::_get_next_reader() {
_name_to_col_type.clear();
_missing_cols.clear();
_cur_reader->get_columns(&_name_to_col_type, &_missing_cols);
_cur_reader->set_push_down_agg_type(_parent->get_push_down_agg_type());
_cur_reader->set_push_down_agg_type(_get_push_down_agg_type());
RETURN_IF_ERROR(_generate_fill_columns());
if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) {
fmt::memory_buffer col_buf;

View File

@ -32,6 +32,7 @@
#include "exec/olap_common.h"
#include "exec/text_converter.h"
#include "io/io_common.h"
#include "pipeline/exec/file_scan_operator.h"
#include "runtime/descriptors.h"
#include "util/runtime_profile.h"
#include "vec/common/schema_util.h"
@ -67,6 +68,10 @@ public:
const TFileScanRange& scan_range, RuntimeProfile* profile,
ShardedKVCache* kv_cache);
VFileScanner(RuntimeState* state, pipeline::FileScanLocalState* parent, int64_t limit,
const TFileScanRange& scan_range, RuntimeProfile* profile,
ShardedKVCache* kv_cache);
Status open(RuntimeState* state) override;
Status close(RuntimeState* state) override;
@ -208,5 +213,21 @@ private:
_counter.num_rows_unselected = 0;
_counter.num_rows_filtered = 0;
}
TPushAggOp::type _get_push_down_agg_type() {
if (get_parent() != nullptr) {
return _parent->get_push_down_agg_type();
} else {
return _local_state->get_push_down_agg_type();
}
}
int64_t _get_push_down_count() {
if (get_parent() != nullptr) {
return _parent->get_push_down_count();
} else {
return _local_state->get_push_down_count();
}
}
};
} // namespace doris::vectorized