[Opt](Pipeline) opt pipeline code in mult tablet (#17999)
This commit is contained in:
@ -55,7 +55,6 @@ Status NewOlapScanNode::prepare(RuntimeState* state) {
|
||||
Status NewOlapScanNode::_init_profile() {
|
||||
RETURN_IF_ERROR(VScanNode::_init_profile());
|
||||
|
||||
_num_disks_accessed_counter = ADD_COUNTER(_runtime_profile, "NumDiskAccess", TUnit::UNIT);
|
||||
_tablet_counter = ADD_COUNTER(_runtime_profile, "TabletNum", TUnit::UNIT);
|
||||
|
||||
// 1. init segment profile
|
||||
@ -405,11 +404,10 @@ Status NewOlapScanNode::_init_scanners(std::list<VScanner*>* scanners) {
|
||||
}
|
||||
|
||||
// ranges constructed from scan keys
|
||||
std::vector<std::unique_ptr<doris::OlapScanRange>> cond_ranges;
|
||||
RETURN_IF_ERROR(_scan_keys.get_key_range(&cond_ranges));
|
||||
RETURN_IF_ERROR(_scan_keys.get_key_range(&_cond_ranges));
|
||||
// if we can't get ranges from conditions, we give it a total range
|
||||
if (cond_ranges.empty()) {
|
||||
cond_ranges.emplace_back(new doris::OlapScanRange());
|
||||
if (_cond_ranges.empty()) {
|
||||
_cond_ranges.emplace_back(new doris::OlapScanRange());
|
||||
}
|
||||
int scanners_per_tablet = std::max(1, 64 / (int)_scan_ranges.size());
|
||||
|
||||
@ -462,25 +460,20 @@ Status NewOlapScanNode::_init_scanners(std::list<VScanner*>* scanners) {
|
||||
}
|
||||
}
|
||||
|
||||
std::unordered_set<std::string> disk_set;
|
||||
auto build_new_scanner = [&](const TPaloScanRange& scan_range,
|
||||
const std::vector<OlapScanRange*>& key_ranges,
|
||||
const std::vector<RowsetReaderSharedPtr>& rs_readers,
|
||||
const std::vector<std::pair<int, int>>& rs_reader_seg_offsets) {
|
||||
NewOlapScanner* scanner = new NewOlapScanner(_state, this, _limit_per_scanner,
|
||||
_olap_scan_node.is_preaggregation,
|
||||
_olap_scan_node.is_preaggregation, scan_range,
|
||||
key_ranges, rs_readers, rs_reader_seg_offsets,
|
||||
_need_agg_finalize, _scanner_profile.get());
|
||||
|
||||
scanner->set_compound_filters(_compound_filters);
|
||||
// add scanner to pool before doing prepare.
|
||||
// so that scanner can be automatically deconstructed if prepare failed.
|
||||
_scanner_pool.add(scanner);
|
||||
RETURN_IF_ERROR(scanner->prepare(scan_range, key_ranges, _vconjunct_ctx_ptr.get(),
|
||||
_olap_filters, _filter_predicates, _push_down_functions,
|
||||
_common_vexpr_ctxs_pushdown.get(), rs_readers,
|
||||
rs_reader_seg_offsets));
|
||||
scanners->push_back((VScanner*)scanner);
|
||||
disk_set.insert(scanner->scan_disk());
|
||||
return Status::OK();
|
||||
};
|
||||
if (is_duplicate_key) {
|
||||
@ -489,7 +482,7 @@ Status NewOlapScanNode::_init_scanners(std::list<VScanner*>* scanners) {
|
||||
std::max(segment_count / config::doris_scanner_thread_pool_thread_num, 1);
|
||||
for (int i = 0; i < _scan_ranges.size(); ++i) {
|
||||
auto& scan_range = _scan_ranges[i];
|
||||
std::vector<std::unique_ptr<doris::OlapScanRange>>* ranges = &cond_ranges;
|
||||
std::vector<std::unique_ptr<doris::OlapScanRange>>* ranges = &_cond_ranges;
|
||||
int num_ranges = ranges->size();
|
||||
std::vector<doris::OlapScanRange*> scanner_ranges(num_ranges);
|
||||
for (int j = 0; j < num_ranges; ++j) {
|
||||
@ -553,7 +546,7 @@ Status NewOlapScanNode::_init_scanners(std::list<VScanner*>* scanners) {
|
||||
true);
|
||||
RETURN_IF_ERROR(status);
|
||||
|
||||
std::vector<std::unique_ptr<doris::OlapScanRange>>* ranges = &cond_ranges;
|
||||
std::vector<std::unique_ptr<doris::OlapScanRange>>* ranges = &_cond_ranges;
|
||||
int size_based_scanners_per_tablet = 1;
|
||||
|
||||
if (config::doris_scan_range_max_mb > 0) {
|
||||
@ -577,10 +570,7 @@ Status NewOlapScanNode::_init_scanners(std::list<VScanner*>* scanners) {
|
||||
RETURN_IF_ERROR(build_new_scanner(*scan_range, scanner_ranges, {}, {}));
|
||||
}
|
||||
}
|
||||
COUNTER_SET(_num_disks_accessed_counter, static_cast<int64_t>(disk_set.size()));
|
||||
}
|
||||
// telemetry::set_span_attribute(span, _num_disks_accessed_counter);
|
||||
// telemetry::set_span_attribute(span, _num_scanners);
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -65,6 +65,7 @@ private:
|
||||
private:
|
||||
TOlapScanNode _olap_scan_node;
|
||||
std::vector<std::unique_ptr<TPaloScanRange>> _scan_ranges;
|
||||
std::vector<std::unique_ptr<doris::OlapScanRange>> _cond_ranges;
|
||||
OlapScanKeys _scan_keys;
|
||||
std::vector<TCondition> _olap_filters;
|
||||
// _compound_filters store conditions in the one compound relationship in conjunct expr tree except leaf node of `and` node,
|
||||
|
||||
@ -24,12 +24,21 @@
|
||||
namespace doris::vectorized {
|
||||
|
||||
NewOlapScanner::NewOlapScanner(RuntimeState* state, NewOlapScanNode* parent, int64_t limit,
|
||||
bool aggregation, bool need_agg_finalize, RuntimeProfile* profile)
|
||||
bool aggregation, const TPaloScanRange& scan_range,
|
||||
const std::vector<OlapScanRange*>& key_ranges,
|
||||
const std::vector<RowsetReaderSharedPtr>& rs_readers,
|
||||
const std::vector<std::pair<int, int>>& rs_reader_seg_offsets,
|
||||
bool need_agg_finalize, RuntimeProfile* profile)
|
||||
: VScanner(state, static_cast<VScanNode*>(parent), limit, profile),
|
||||
_aggregation(aggregation),
|
||||
_need_agg_finalize(need_agg_finalize),
|
||||
_version(-1) {
|
||||
_version(-1),
|
||||
_scan_range(scan_range),
|
||||
_key_ranges(key_ranges) {
|
||||
_tablet_reader_params.rs_readers = rs_readers;
|
||||
_tablet_reader_params.rs_readers_segment_offsets = rs_reader_seg_offsets;
|
||||
_tablet_schema = std::make_shared<TabletSchema>();
|
||||
_is_init = false;
|
||||
}
|
||||
|
||||
static std::string read_columns_to_string(TabletSchemaSPtr tablet_schema,
|
||||
@ -46,19 +55,14 @@ static std::string read_columns_to_string(TabletSchemaSPtr tablet_schema,
|
||||
return read_columns_string;
|
||||
}
|
||||
|
||||
Status NewOlapScanner::prepare(const TPaloScanRange& scan_range,
|
||||
const std::vector<OlapScanRange*>& key_ranges,
|
||||
VExprContext** vconjunct_ctx_ptr,
|
||||
const std::vector<TCondition>& filters,
|
||||
const FilterPredicates& filter_predicates,
|
||||
const std::vector<FunctionFilter>& function_filters,
|
||||
VExprContext** common_vexpr_ctxs_pushdown,
|
||||
const std::vector<RowsetReaderSharedPtr>& rs_readers,
|
||||
const std::vector<std::pair<int, int>>& rs_reader_seg_offsets) {
|
||||
RETURN_IF_ERROR(VScanner::prepare(_state, vconjunct_ctx_ptr));
|
||||
if (common_vexpr_ctxs_pushdown != nullptr) {
|
||||
Status NewOlapScanner::init() {
|
||||
_is_init = true;
|
||||
auto parent = static_cast<NewOlapScanNode*>(_parent);
|
||||
RETURN_IF_ERROR(VScanner::prepare(_state, parent->_vconjunct_ctx_ptr.get()));
|
||||
if (parent->_common_vexpr_ctxs_pushdown != nullptr) {
|
||||
// Copy common_vexpr_ctxs_pushdown from scan node to this scanner's _common_vexpr_ctxs_pushdown, just necessary.
|
||||
RETURN_IF_ERROR((*common_vexpr_ctxs_pushdown)->clone(_state, &_common_vexpr_ctxs_pushdown));
|
||||
RETURN_IF_ERROR((*parent->_common_vexpr_ctxs_pushdown)
|
||||
->clone(_state, &_common_vexpr_ctxs_pushdown));
|
||||
}
|
||||
|
||||
// set limit to reduce end of rowset and segment mem use
|
||||
@ -69,8 +73,8 @@ Status NewOlapScanner::prepare(const TPaloScanRange& scan_range,
|
||||
: std::min(static_cast<int64_t>(_state->batch_size()), _parent->limit()));
|
||||
|
||||
// Get olap table
|
||||
TTabletId tablet_id = scan_range.tablet_id;
|
||||
_version = strtoul(scan_range.version.c_str(), nullptr, 10);
|
||||
TTabletId tablet_id = _scan_range.tablet_id;
|
||||
_version = strtoul(_scan_range.version.c_str(), nullptr, 10);
|
||||
{
|
||||
auto [tablet, status] =
|
||||
StorageEngine::instance()->tablet_manager()->get_tablet_and_status(tablet_id, true);
|
||||
@ -114,7 +118,7 @@ Status NewOlapScanner::prepare(const TPaloScanRange& scan_range,
|
||||
|
||||
{
|
||||
std::shared_lock rdlock(_tablet->get_header_lock());
|
||||
if (rs_readers.empty()) {
|
||||
if (_tablet_reader_params.rs_readers.empty()) {
|
||||
const RowsetSharedPtr rowset = _tablet->rowset_with_max_version();
|
||||
if (rowset == nullptr) {
|
||||
std::stringstream ss;
|
||||
@ -137,14 +141,12 @@ Status NewOlapScanner::prepare(const TPaloScanRange& scan_range,
|
||||
<< ", backend=" << BackendOptions::get_localhost();
|
||||
return Status::InternalError(ss.str());
|
||||
}
|
||||
} else {
|
||||
_tablet_reader_params.rs_readers = rs_readers;
|
||||
_tablet_reader_params.rs_readers_segment_offsets = rs_reader_seg_offsets;
|
||||
}
|
||||
|
||||
// Initialize tablet_reader_params
|
||||
RETURN_IF_ERROR(_init_tablet_reader_params(key_ranges, filters, filter_predicates,
|
||||
function_filters));
|
||||
RETURN_IF_ERROR(_init_tablet_reader_params(_key_ranges, parent->_olap_filters,
|
||||
parent->_filter_predicates,
|
||||
parent->_push_down_functions));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -35,20 +35,17 @@ struct FilterPredicates;
|
||||
class NewOlapScanner : public VScanner {
|
||||
public:
|
||||
NewOlapScanner(RuntimeState* state, NewOlapScanNode* parent, int64_t limit, bool aggregation,
|
||||
const TPaloScanRange& scan_range, const std::vector<OlapScanRange*>& key_ranges,
|
||||
const std::vector<RowsetReaderSharedPtr>& rs_readers,
|
||||
const std::vector<std::pair<int, int>>& rs_reader_seg_offsets,
|
||||
bool need_agg_finalize, RuntimeProfile* profile);
|
||||
|
||||
Status init() override;
|
||||
|
||||
Status open(RuntimeState* state) override;
|
||||
|
||||
Status close(RuntimeState* state) override;
|
||||
|
||||
Status prepare(const TPaloScanRange& scan_range, const std::vector<OlapScanRange*>& key_ranges,
|
||||
VExprContext** vconjunct_ctx_ptr, const std::vector<TCondition>& filters,
|
||||
const FilterPredicates& filter_predicates,
|
||||
const std::vector<FunctionFilter>& function_filters,
|
||||
VExprContext** common_vexpr_ctxs_pushdown,
|
||||
const std::vector<RowsetReaderSharedPtr>& rs_readers = {},
|
||||
const std::vector<std::pair<int, int>>& rs_reader_seg_offsets = {});
|
||||
|
||||
const std::string& scan_disk() const { return _tablet->data_dir()->path(); }
|
||||
|
||||
void set_compound_filters(const std::vector<TCondition>& compound_filters);
|
||||
@ -75,6 +72,8 @@ private:
|
||||
TabletSchemaSPtr _tablet_schema;
|
||||
TabletSharedPtr _tablet;
|
||||
int64_t _version;
|
||||
const TPaloScanRange& _scan_range;
|
||||
std::vector<OlapScanRange*> _key_ranges;
|
||||
|
||||
TabletReader::ReaderParams _tablet_reader_params;
|
||||
std::unique_ptr<TabletReader> _tablet_reader;
|
||||
|
||||
@ -260,7 +260,14 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext
|
||||
bool eos = false;
|
||||
RuntimeState* state = ctx->state();
|
||||
DCHECK(nullptr != state);
|
||||
if (!scanner->is_open()) {
|
||||
if (!scanner->is_init()) {
|
||||
status = scanner->init();
|
||||
if (!status.ok()) {
|
||||
ctx->set_status_on_error(status);
|
||||
eos = true;
|
||||
}
|
||||
}
|
||||
if (!eos && !scanner->is_open()) {
|
||||
status = scanner->open(state);
|
||||
if (!status.ok()) {
|
||||
ctx->set_status_on_error(status);
|
||||
|
||||
@ -1363,7 +1363,6 @@ Status VScanNode::_prepare_scanners() {
|
||||
COUNTER_SET(_num_scanners, static_cast<int64_t>(scanners.size()));
|
||||
RETURN_IF_ERROR(_start_scanners(scanners));
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
} // namespace doris::vectorized
|
||||
|
||||
@ -39,7 +39,9 @@ class VScanner {
|
||||
public:
|
||||
VScanner(RuntimeState* state, VScanNode* parent, int64_t limit, RuntimeProfile* profile);
|
||||
|
||||
virtual ~VScanner() {}
|
||||
virtual ~VScanner() = default;
|
||||
|
||||
virtual Status init() { return Status::OK(); }
|
||||
|
||||
virtual Status open(RuntimeState* state) { return Status::OK(); }
|
||||
|
||||
@ -67,6 +69,8 @@ public:
|
||||
|
||||
int64_t get_rows_read() const { return _num_rows_read; }
|
||||
|
||||
bool is_init() const { return _is_init; }
|
||||
|
||||
Status try_append_late_arrival_runtime_filter();
|
||||
|
||||
// Call start_wait_worker_timer() when submit the scanner to the thread pool.
|
||||
@ -179,6 +183,8 @@ protected:
|
||||
// set to true after decrease the "_num_unfinished_scanners" in scanner context
|
||||
bool _is_counted_down = false;
|
||||
|
||||
bool _is_init = true;
|
||||
|
||||
ScannerCounter _counter;
|
||||
int64_t _per_scanner_timer = 0;
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user