Files
doris/be/src/vec/exec/file_scan_node.h
Mingyu Chen 56e036e68b [feature-wip](multi-catalog) Support runtime filter for file scan node (#11000)
* [feature-wip](multi-catalog) Support runtime filter for file scan node

Co-authored-by: morningman <morningman@apache.org>
2022-07-20 12:36:57 +08:00

149 lines
5.0 KiB
C++

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <atomic>
#include <condition_variable>
#include <future>
#include <map>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
#include "common/status.h"
#include "exec/base_scanner.h"
#include "exec/scan_node.h"
#include "exprs/runtime_filter.h"
#include "gen_cpp/PaloInternalService_types.h"
#include "runtime/descriptors.h"
#include "vec/exec/file_scanner.h"
namespace doris {
class RuntimeState;
class Status;
namespace vectorized {
class FileScanNode final : public ScanNode {
public:
FileScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
~FileScanNode() override = default;
// Called after create this scan node
Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override;
// Prepare partition infos & set up timer
Status prepare(RuntimeState* state) override;
// Start file scan using ParquetScanner or OrcScanner.
Status open(RuntimeState* state) override;
// Fill the next row batch by calling next() on the scanner,
virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override {
return Status::NotSupported("Not Implemented FileScanNode::get_next.");
}
Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos) override;
// Close the scanner, and report errors.
Status close(RuntimeState* state) override;
// No use
Status set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override;
private:
// Write debug string of this into out.
void debug_string(int indentation_level, std::stringstream* out) const override;
// Update process status to one failed status,
// NOTE: Must hold the mutex of this scan node
bool update_status(const Status& new_status) {
if (_process_status.ok()) {
_process_status = new_status;
return true;
}
return false;
}
std::unique_ptr<FileScanner> create_scanner(const TFileScanRange& scan_range,
ScannerCounter* counter);
Status start_scanners();
void scanner_worker(int start_idx, int length, std::promise<Status>& p_status);
// Scan one range
Status scanner_scan(const TFileScanRange& scan_range, ScannerCounter* counter);
Status _acquire_and_build_runtime_filter(RuntimeState* state);
TupleId _tuple_id;
RuntimeState* _runtime_state;
TupleDescriptor* _tuple_desc;
std::map<std::string, SlotDescriptor*> _slots_map;
std::vector<TScanRangeParams> _scan_ranges;
std::mutex _batch_queue_lock;
std::condition_variable _queue_reader_cond;
std::condition_variable _queue_writer_cond;
std::deque<std::shared_ptr<RowBatch>> _batch_queue;
int _num_running_scanners;
std::atomic<bool> _scan_finished;
Status _process_status;
std::vector<std::promise<Status>> _scanners_status;
int _max_buffered_batches;
// The origin preceding filter exprs.
// These exprs will be converted to expr context
// in XXXScanner.
// Because the row descriptor used for these exprs is `src_row_desc`,
// which is initialized in XXXScanner.
std::vector<TExpr> _pre_filter_texprs;
RuntimeProfile::Counter* _wait_scanner_timer;
RuntimeProfile::Counter* _num_rows_filtered;
RuntimeProfile::Counter* _filter_timer;
RuntimeProfile::Counter* _num_scanners;
std::deque<std::shared_ptr<vectorized::Block>> _block_queue;
std::unique_ptr<MutableBlock> _mutable_block;
protected:
struct RuntimeFilterContext {
RuntimeFilterContext() : apply_mark(false), runtimefilter(nullptr) {}
bool apply_mark;
IRuntimeFilter* runtimefilter;
};
const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() const {
return _runtime_filter_descs;
}
std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
std::vector<RuntimeFilterContext> _runtime_filter_ctxs;
std::vector<bool> _runtime_filter_ready_flag;
std::vector<std::unique_ptr<std::mutex>> _rf_locks;
std::map<int, RuntimeFilterContext*> _conjunctid_to_runtime_filter_ctxs;
std::vector<std::unique_ptr<VExprContext*>> _stale_vexpr_ctxs;
};
} // namespace vectorized
} // namespace doris