[pipeline](split by segment)support segment split by scanner (#17738)
* support segment split by scanner * change code by cr
This commit is contained in:
@ -404,8 +404,6 @@ public:
|
||||
|
||||
bool ok() const { return _code == ErrorCode::OK; }
|
||||
|
||||
bool is_blocked_by_sc() const { return _code == ErrorCode::PIP_WAIT_FOR_SC; }
|
||||
|
||||
bool is_io_error() const {
|
||||
return ErrorCode::IO_ERROR == _code || ErrorCode::READ_UNENOUGH == _code ||
|
||||
ErrorCode::CHECKSUM_ERROR == _code || ErrorCode::FILE_DATA_ERROR == _code ||
|
||||
|
||||
@ -101,6 +101,11 @@ public:
|
||||
DeleteBitmap* delete_bitmap {nullptr};
|
||||
|
||||
std::vector<RowsetReaderSharedPtr> rs_readers;
|
||||
// if rs_readers_segment_offsets is not empty, means we only scan
|
||||
// [pair.first, pair.second) segment in rs_reader, only effective in dup key
|
||||
// and pipeline
|
||||
std::vector<std::pair<int, int>> rs_readers_segment_offsets;
|
||||
|
||||
// return_columns is init from query schema
|
||||
std::vector<uint32_t> return_columns;
|
||||
// output_columns only contain columns in OrderByExprs and outputExprs
|
||||
|
||||
@ -42,6 +42,10 @@ void BetaRowsetReader::reset_read_options() {
|
||||
_read_options.key_ranges.clear();
|
||||
}
|
||||
|
||||
RowsetReaderSharedPtr BetaRowsetReader::clone() {
|
||||
return RowsetReaderSharedPtr(new BetaRowsetReader(_rowset));
|
||||
}
|
||||
|
||||
bool BetaRowsetReader::update_profile(RuntimeProfile* profile) {
|
||||
if (_iterator != nullptr) {
|
||||
return _iterator->update_profile(profile);
|
||||
@ -51,6 +55,7 @@ bool BetaRowsetReader::update_profile(RuntimeProfile* profile) {
|
||||
|
||||
Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context,
|
||||
std::vector<RowwiseIteratorUPtr>* out_iters,
|
||||
const std::pair<int, int>& segment_offset,
|
||||
bool use_cache) {
|
||||
RETURN_NOT_OK(_rowset->load());
|
||||
_context = read_context;
|
||||
@ -174,7 +179,15 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
|
||||
should_use_cache));
|
||||
|
||||
// create iterator for each segment
|
||||
for (auto& seg_ptr : _segment_cache_handle.get_segments()) {
|
||||
auto& segments = _segment_cache_handle.get_segments();
|
||||
auto [seg_start, seg_end] = segment_offset;
|
||||
if (seg_start == seg_end) {
|
||||
seg_start = 0;
|
||||
seg_end = segments.size();
|
||||
}
|
||||
|
||||
for (int i = seg_start; i < seg_end; i++) {
|
||||
auto& seg_ptr = segments[i];
|
||||
std::unique_ptr<RowwiseIterator> iter;
|
||||
auto s = seg_ptr->new_iterator(*_input_schema, _read_options, &iter);
|
||||
if (!s.ok()) {
|
||||
@ -191,13 +204,15 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
|
||||
}
|
||||
out_iters->push_back(std::move(iter));
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status BetaRowsetReader::init(RowsetReaderContext* read_context) {
|
||||
Status BetaRowsetReader::init(RowsetReaderContext* read_context,
|
||||
const std::pair<int, int>& segment_offset) {
|
||||
_context = read_context;
|
||||
std::vector<RowwiseIteratorUPtr> iterators;
|
||||
RETURN_NOT_OK(get_segment_iterators(_context, &iterators));
|
||||
RETURN_NOT_OK(get_segment_iterators(_context, &iterators, segment_offset));
|
||||
|
||||
// merge or union segment iterator
|
||||
if (read_context->need_ordered_result && _rowset->rowset_meta()->is_segments_overlapping()) {
|
||||
|
||||
@ -31,10 +31,12 @@ public:
|
||||
|
||||
~BetaRowsetReader() override { _rowset->release(); }
|
||||
|
||||
Status init(RowsetReaderContext* read_context) override;
|
||||
Status init(RowsetReaderContext* read_context,
|
||||
const std::pair<int, int>& segment_offset) override;
|
||||
|
||||
Status get_segment_iterators(RowsetReaderContext* read_context,
|
||||
std::vector<RowwiseIteratorUPtr>* out_iters,
|
||||
const std::pair<int, int>& segment_offset,
|
||||
bool use_cache = false) override;
|
||||
void reset_read_options() override;
|
||||
Status next_block(vectorized::Block* block) override;
|
||||
@ -66,6 +68,8 @@ public:
|
||||
|
||||
bool update_profile(RuntimeProfile* profile) override;
|
||||
|
||||
RowsetReaderSharedPtr clone() override;
|
||||
|
||||
private:
|
||||
bool _should_push_down_value_predicates() const;
|
||||
|
||||
|
||||
@ -41,10 +41,12 @@ public:
|
||||
virtual ~RowsetReader() = default;
|
||||
|
||||
// reader init
|
||||
virtual Status init(RowsetReaderContext* read_context) = 0;
|
||||
virtual Status init(RowsetReaderContext* read_context,
|
||||
const std::pair<int, int>& segment_offset = {0, 0}) = 0;
|
||||
|
||||
virtual Status get_segment_iterators(RowsetReaderContext* read_context,
|
||||
std::vector<RowwiseIteratorUPtr>* out_iters,
|
||||
const std::pair<int, int>& segment_offset = {0, 0},
|
||||
bool use_cache = false) = 0;
|
||||
virtual void reset_read_options() = 0;
|
||||
|
||||
@ -73,6 +75,8 @@ public:
|
||||
}
|
||||
|
||||
virtual bool update_profile(RuntimeProfile* profile) = 0;
|
||||
|
||||
virtual RowsetReaderSharedPtr clone() = 0;
|
||||
};
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -18,36 +18,23 @@
|
||||
#include "olap/tablet_manager.h"
|
||||
|
||||
#include <gen_cpp/Types_types.h>
|
||||
#include <rapidjson/document.h>
|
||||
#include <re2/re2.h>
|
||||
#include <thrift/protocol/TDebugProtocol.h>
|
||||
|
||||
#include <algorithm>
|
||||
#include <boost/algorithm/string.hpp>
|
||||
#include <boost/algorithm/string/classification.hpp>
|
||||
#include <boost/algorithm/string/split.hpp>
|
||||
#include <cstdint>
|
||||
#include <cstdio>
|
||||
#include <cstdlib>
|
||||
#include <filesystem>
|
||||
|
||||
#include "common/compiler_util.h"
|
||||
#include "env/env.h"
|
||||
#include "env/env_util.h"
|
||||
#include "gutil/strings/strcat.h"
|
||||
#include "olap/base_compaction.h"
|
||||
#include "olap/cumulative_compaction.h"
|
||||
#include "olap/data_dir.h"
|
||||
#include "olap/olap_common.h"
|
||||
#include "olap/push_handler.h"
|
||||
#include "olap/reader.h"
|
||||
#include "olap/rowset/rowset_id_generator.h"
|
||||
#include "olap/schema_change.h"
|
||||
#include "olap/tablet.h"
|
||||
#include "olap/tablet_meta.h"
|
||||
#include "olap/tablet_meta_manager.h"
|
||||
#include "olap/utils.h"
|
||||
#include "rapidjson/document.h"
|
||||
#include "rapidjson/prettywriter.h"
|
||||
#include "rapidjson/stringbuffer.h"
|
||||
#include "runtime/thread_context.h"
|
||||
@ -56,7 +43,6 @@
|
||||
#include "util/file_utils.h"
|
||||
#include "util/histogram.h"
|
||||
#include "util/path_util.h"
|
||||
#include "util/pretty_printer.h"
|
||||
#include "util/scoped_cleanup.h"
|
||||
#include "util/time.h"
|
||||
#include "util/trace.h"
|
||||
@ -534,6 +520,19 @@ TabletSharedPtr TabletManager::get_tablet(TTabletId tablet_id, bool include_dele
|
||||
return _get_tablet_unlocked(tablet_id, include_deleted, err);
|
||||
}
|
||||
|
||||
std::pair<TabletSharedPtr, Status> TabletManager::get_tablet_and_status(TTabletId tablet_id,
|
||||
bool include_deleted) {
|
||||
std::string err;
|
||||
auto tablet = get_tablet(tablet_id, include_deleted, &err);
|
||||
if (tablet == nullptr) {
|
||||
auto err_str = fmt::format("failed to get tablet: {}, reason: {}", tablet_id, err);
|
||||
LOG(WARNING) << err_str;
|
||||
return {tablet, Status::InternalError(err_str)};
|
||||
}
|
||||
|
||||
return {tablet, Status::OK()};
|
||||
}
|
||||
|
||||
TabletSharedPtr TabletManager::_get_tablet_unlocked(TTabletId tablet_id, bool include_deleted,
|
||||
string* err) {
|
||||
TabletSharedPtr tablet;
|
||||
|
||||
@ -75,6 +75,9 @@ public:
|
||||
TabletSharedPtr get_tablet(TTabletId tablet_id, bool include_deleted = false,
|
||||
std::string* err = nullptr);
|
||||
|
||||
std::pair<TabletSharedPtr, Status> get_tablet_and_status(TTabletId tablet_id,
|
||||
bool include_deleted = false);
|
||||
|
||||
TabletSharedPtr get_tablet(TTabletId tablet_id, TabletUid tablet_uid,
|
||||
bool include_deleted = false, std::string* err = nullptr);
|
||||
|
||||
|
||||
@ -17,6 +17,8 @@
|
||||
|
||||
#include "vec/exec/scan/new_olap_scan_node.h"
|
||||
|
||||
#include <charconv>
|
||||
|
||||
#include "common/status.h"
|
||||
#include "olap/storage_engine.h"
|
||||
#include "olap/tablet.h"
|
||||
@ -404,57 +406,172 @@ Status NewOlapScanNode::_init_scanners(std::list<VScanner*>* scanners) {
|
||||
}
|
||||
int scanners_per_tablet = std::max(1, 64 / (int)_scan_ranges.size());
|
||||
|
||||
std::unordered_set<std::string> disk_set;
|
||||
for (auto& scan_range : _scan_ranges) {
|
||||
auto tablet_id = scan_range->tablet_id;
|
||||
std::string err;
|
||||
TabletSharedPtr tablet =
|
||||
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, true, &err);
|
||||
if (tablet == nullptr) {
|
||||
auto err_str = fmt::format("failed to get tablet: {}, reason: {}", tablet_id, err);
|
||||
LOG(WARNING) << err_str;
|
||||
return Status::InternalError(err_str);
|
||||
}
|
||||
bool is_duplicate_key = false;
|
||||
int segment_count = 0;
|
||||
std::vector<std::vector<RowsetReaderSharedPtr>> rowset_readers_vector(_scan_ranges.size());
|
||||
std::vector<std::vector<int>> tablet_rs_seg_count(_scan_ranges.size());
|
||||
|
||||
std::vector<std::unique_ptr<doris::OlapScanRange>>* ranges = &cond_ranges;
|
||||
int size_based_scanners_per_tablet = 1;
|
||||
// Split tablet segment by scanner, only use in pipeline in duplicate key
|
||||
// 1. if tablet count lower than scanner thread num, count segment num of all tablet ready for scan
|
||||
// TODO: some tablet may do not have segment, may need split segment all case
|
||||
if (_shared_scan_opt && _scan_ranges.size() < config::doris_scanner_thread_pool_thread_num) {
|
||||
for (int i = 0; i < _scan_ranges.size(); ++i) {
|
||||
auto& scan_range = _scan_ranges[i];
|
||||
auto tablet_id = scan_range->tablet_id;
|
||||
auto [tablet, status] =
|
||||
StorageEngine::instance()->tablet_manager()->get_tablet_and_status(tablet_id,
|
||||
true);
|
||||
RETURN_IF_ERROR(status);
|
||||
|
||||
if (config::doris_scan_range_max_mb > 0) {
|
||||
size_based_scanners_per_tablet = std::max(
|
||||
1, (int)(tablet->tablet_footprint() / (config::doris_scan_range_max_mb << 20)));
|
||||
}
|
||||
|
||||
int ranges_per_scanner =
|
||||
std::max(1, (int)ranges->size() /
|
||||
std::min(scanners_per_tablet, size_based_scanners_per_tablet));
|
||||
int num_ranges = ranges->size();
|
||||
for (int i = 0; i < num_ranges;) {
|
||||
std::vector<doris::OlapScanRange*> scanner_ranges;
|
||||
scanner_ranges.push_back((*ranges)[i].get());
|
||||
++i;
|
||||
for (int j = 1; i < num_ranges && j < ranges_per_scanner &&
|
||||
(*ranges)[i]->end_include == (*ranges)[i - 1]->end_include;
|
||||
++j, ++i) {
|
||||
scanner_ranges.push_back((*ranges)[i].get());
|
||||
is_duplicate_key = tablet->keys_type() == DUP_KEYS;
|
||||
if (!is_duplicate_key) {
|
||||
break;
|
||||
}
|
||||
|
||||
NewOlapScanner* scanner = new NewOlapScanner(
|
||||
_state, this, _limit_per_scanner, _olap_scan_node.is_preaggregation,
|
||||
_need_agg_finalize, _scanner_profile.get());
|
||||
int64_t version = 0;
|
||||
std::from_chars(scan_range->version.c_str(),
|
||||
scan_range->version.c_str() + scan_range->version.size(), version);
|
||||
|
||||
scanner->set_compound_filters(_compound_filters);
|
||||
// add scanner to pool before doing prepare.
|
||||
// so that scanner can be automatically deconstructed if prepare failed.
|
||||
_scanner_pool.add(scanner);
|
||||
RETURN_IF_ERROR(scanner->prepare(
|
||||
*scan_range, scanner_ranges, _vconjunct_ctx_ptr.get(), _olap_filters,
|
||||
_filter_predicates, _push_down_functions, _common_vexpr_ctxs_pushdown.get()));
|
||||
scanners->push_back((VScanner*)scanner);
|
||||
disk_set.insert(scanner->scan_disk());
|
||||
std::shared_lock rdlock(tablet->get_header_lock());
|
||||
// acquire tablet rowset readers at the beginning of the scan node
|
||||
// to prevent this case: when there are lots of olap scanners to run for example 10000
|
||||
// the rowsets maybe compacted when the last olap scanner starts
|
||||
Status acquire_reader_st =
|
||||
tablet->capture_rs_readers({0, version}, &rowset_readers_vector[i]);
|
||||
if (!acquire_reader_st.ok()) {
|
||||
LOG(WARNING) << "fail to init reader.res=" << acquire_reader_st;
|
||||
std::stringstream ss;
|
||||
ss << "failed to initialize storage reader. tablet=" << tablet->full_name()
|
||||
<< ", res=" << acquire_reader_st
|
||||
<< ", backend=" << BackendOptions::get_localhost();
|
||||
return Status::InternalError(ss.str());
|
||||
}
|
||||
|
||||
for (const auto& rowset_reader : rowset_readers_vector[i]) {
|
||||
auto num_segments = rowset_reader->rowset()->num_segments();
|
||||
tablet_rs_seg_count[i].emplace_back(num_segments);
|
||||
segment_count += num_segments;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
COUNTER_SET(_num_disks_accessed_counter, static_cast<int64_t>(disk_set.size()));
|
||||
std::unordered_set<std::string> disk_set;
|
||||
auto build_new_scanner = [&](const TPaloScanRange& scan_range,
|
||||
const std::vector<OlapScanRange*>& key_ranges,
|
||||
const std::vector<RowsetReaderSharedPtr>& rs_readers,
|
||||
const std::vector<std::pair<int, int>>& rs_reader_seg_offsets) {
|
||||
NewOlapScanner* scanner = new NewOlapScanner(_state, this, _limit_per_scanner,
|
||||
_olap_scan_node.is_preaggregation,
|
||||
_need_agg_finalize, _scanner_profile.get());
|
||||
|
||||
scanner->set_compound_filters(_compound_filters);
|
||||
// add scanner to pool before doing prepare.
|
||||
// so that scanner can be automatically deconstructed if prepare failed.
|
||||
_scanner_pool.add(scanner);
|
||||
RETURN_IF_ERROR(scanner->prepare(scan_range, key_ranges, _vconjunct_ctx_ptr.get(),
|
||||
_olap_filters, _filter_predicates, _push_down_functions,
|
||||
_common_vexpr_ctxs_pushdown.get(), rs_readers,
|
||||
rs_reader_seg_offsets));
|
||||
scanners->push_back((VScanner*)scanner);
|
||||
disk_set.insert(scanner->scan_disk());
|
||||
return Status::OK();
|
||||
};
|
||||
if (is_duplicate_key) {
|
||||
// 2. Split by segment count, each scanner need scan avg segment count
|
||||
auto avg_segment_count =
|
||||
std::max(segment_count / config::doris_scanner_thread_pool_thread_num, 1);
|
||||
for (int i = 0; i < _scan_ranges.size(); ++i) {
|
||||
auto& scan_range = _scan_ranges[i];
|
||||
std::vector<std::unique_ptr<doris::OlapScanRange>>* ranges = &cond_ranges;
|
||||
int num_ranges = ranges->size();
|
||||
std::vector<doris::OlapScanRange*> scanner_ranges(num_ranges);
|
||||
for (int j = 0; j < num_ranges; ++j) {
|
||||
scanner_ranges[j] = (*ranges)[j].get();
|
||||
}
|
||||
|
||||
const auto& rs_seg_count = tablet_rs_seg_count[i];
|
||||
int rs_seg_count_index = 0;
|
||||
int rs_seg_start_scan = 0;
|
||||
int scanner_seg_occupy = 0;
|
||||
std::vector<RowsetReaderSharedPtr> rs_readers;
|
||||
std::vector<std::pair<int, int>> rs_reader_seg_offsets;
|
||||
|
||||
while (rs_seg_count_index < rs_seg_count.size()) {
|
||||
auto max_add_seg_nums = rs_seg_count[rs_seg_count_index] - rs_seg_start_scan;
|
||||
rs_readers.emplace_back(rowset_readers_vector[i][rs_seg_count_index]->clone());
|
||||
|
||||
if (scanner_seg_occupy + max_add_seg_nums > avg_segment_count) {
|
||||
auto need_add_seg_nums = avg_segment_count - scanner_seg_occupy;
|
||||
rs_reader_seg_offsets.emplace_back(
|
||||
rs_seg_start_scan,
|
||||
rs_seg_start_scan + need_add_seg_nums); // only scan need_add_seg_nums
|
||||
RETURN_IF_ERROR(build_new_scanner(*scan_range, scanner_ranges, rs_readers,
|
||||
rs_reader_seg_offsets));
|
||||
|
||||
rs_seg_start_scan += need_add_seg_nums;
|
||||
scanner_seg_occupy = 0;
|
||||
rs_readers.clear();
|
||||
rs_reader_seg_offsets.clear();
|
||||
} else if (scanner_seg_occupy + max_add_seg_nums == avg_segment_count) {
|
||||
rs_reader_seg_offsets.emplace_back(rs_seg_start_scan,
|
||||
rs_seg_count[rs_seg_count_index]);
|
||||
RETURN_IF_ERROR(build_new_scanner(*scan_range, scanner_ranges, rs_readers,
|
||||
rs_reader_seg_offsets));
|
||||
|
||||
rs_seg_start_scan = 0;
|
||||
scanner_seg_occupy = 0;
|
||||
rs_readers.clear();
|
||||
rs_reader_seg_offsets.clear();
|
||||
rs_seg_count_index++;
|
||||
} else {
|
||||
rs_reader_seg_offsets.emplace_back(rs_seg_start_scan,
|
||||
rs_seg_count[rs_seg_count_index]);
|
||||
|
||||
rs_seg_start_scan = 0;
|
||||
scanner_seg_occupy += max_add_seg_nums;
|
||||
rs_seg_count_index++;
|
||||
}
|
||||
}
|
||||
|
||||
// dispose some segment tail
|
||||
if (!rs_readers.empty()) {
|
||||
build_new_scanner(*scan_range, scanner_ranges, rs_readers, rs_reader_seg_offsets);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for (auto& scan_range : _scan_ranges) {
|
||||
auto tablet_id = scan_range->tablet_id;
|
||||
auto [tablet, status] =
|
||||
StorageEngine::instance()->tablet_manager()->get_tablet_and_status(tablet_id,
|
||||
true);
|
||||
RETURN_IF_ERROR(status);
|
||||
|
||||
std::vector<std::unique_ptr<doris::OlapScanRange>>* ranges = &cond_ranges;
|
||||
int size_based_scanners_per_tablet = 1;
|
||||
|
||||
if (config::doris_scan_range_max_mb > 0) {
|
||||
size_based_scanners_per_tablet =
|
||||
std::max(1, (int)(tablet->tablet_footprint() /
|
||||
(config::doris_scan_range_max_mb << 20)));
|
||||
}
|
||||
int ranges_per_scanner =
|
||||
std::max(1, (int)ranges->size() / std::min(scanners_per_tablet,
|
||||
size_based_scanners_per_tablet));
|
||||
int num_ranges = ranges->size();
|
||||
for (int i = 0; i < num_ranges;) {
|
||||
std::vector<doris::OlapScanRange*> scanner_ranges;
|
||||
scanner_ranges.push_back((*ranges)[i].get());
|
||||
++i;
|
||||
for (int j = 1; i < num_ranges && j < ranges_per_scanner &&
|
||||
(*ranges)[i]->end_include == (*ranges)[i - 1]->end_include;
|
||||
++j, ++i) {
|
||||
scanner_ranges.push_back((*ranges)[i].get());
|
||||
}
|
||||
RETURN_IF_ERROR(build_new_scanner(*scan_range, scanner_ranges, {}, {}));
|
||||
}
|
||||
}
|
||||
COUNTER_SET(_num_disks_accessed_counter, static_cast<int64_t>(disk_set.size()));
|
||||
}
|
||||
// telemetry::set_span_attribute(span, _num_disks_accessed_counter);
|
||||
// telemetry::set_span_attribute(span, _num_scanners);
|
||||
|
||||
|
||||
@ -52,7 +52,9 @@ Status NewOlapScanner::prepare(const TPaloScanRange& scan_range,
|
||||
const std::vector<TCondition>& filters,
|
||||
const FilterPredicates& filter_predicates,
|
||||
const std::vector<FunctionFilter>& function_filters,
|
||||
VExprContext** common_vexpr_ctxs_pushdown) {
|
||||
VExprContext** common_vexpr_ctxs_pushdown,
|
||||
const std::vector<RowsetReaderSharedPtr>& rs_readers,
|
||||
const std::vector<std::pair<int, int>>& rs_reader_seg_offsets) {
|
||||
RETURN_IF_ERROR(VScanner::prepare(_state, vconjunct_ctx_ptr));
|
||||
if (common_vexpr_ctxs_pushdown != nullptr) {
|
||||
// Copy common_vexpr_ctxs_pushdown from scan node to this scanner's _common_vexpr_ctxs_pushdown, just necessary.
|
||||
@ -70,14 +72,10 @@ Status NewOlapScanner::prepare(const TPaloScanRange& scan_range,
|
||||
TTabletId tablet_id = scan_range.tablet_id;
|
||||
_version = strtoul(scan_range.version.c_str(), nullptr, 10);
|
||||
{
|
||||
std::string err;
|
||||
_tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, true, &err);
|
||||
if (_tablet.get() == nullptr) {
|
||||
std::stringstream ss;
|
||||
ss << "failed to get tablet. tablet_id=" << tablet_id << ", reason=" << err;
|
||||
LOG(WARNING) << ss.str();
|
||||
return Status::InternalError(ss.str());
|
||||
}
|
||||
auto [tablet, status] =
|
||||
StorageEngine::instance()->tablet_manager()->get_tablet_and_status(tablet_id, true);
|
||||
RETURN_IF_ERROR(status);
|
||||
_tablet = std::move(tablet);
|
||||
_tablet_schema->copy_from(*_tablet->tablet_schema());
|
||||
|
||||
TOlapScanNode& olap_scan_node = ((NewOlapScanNode*)_parent)->_olap_scan_node;
|
||||
@ -116,27 +114,32 @@ Status NewOlapScanner::prepare(const TPaloScanRange& scan_range,
|
||||
|
||||
{
|
||||
std::shared_lock rdlock(_tablet->get_header_lock());
|
||||
const RowsetSharedPtr rowset = _tablet->rowset_with_max_version();
|
||||
if (rowset == nullptr) {
|
||||
std::stringstream ss;
|
||||
ss << "fail to get latest version of tablet: " << tablet_id;
|
||||
LOG(WARNING) << ss.str();
|
||||
return Status::InternalError(ss.str());
|
||||
}
|
||||
if (rs_readers.empty()) {
|
||||
const RowsetSharedPtr rowset = _tablet->rowset_with_max_version();
|
||||
if (rowset == nullptr) {
|
||||
std::stringstream ss;
|
||||
ss << "fail to get latest version of tablet: " << tablet_id;
|
||||
LOG(WARNING) << ss.str();
|
||||
return Status::InternalError(ss.str());
|
||||
}
|
||||
|
||||
// acquire tablet rowset readers at the beginning of the scan node
|
||||
// to prevent this case: when there are lots of olap scanners to run for example 10000
|
||||
// the rowsets maybe compacted when the last olap scanner starts
|
||||
Version rd_version(0, _version);
|
||||
Status acquire_reader_st =
|
||||
_tablet->capture_rs_readers(rd_version, &_tablet_reader_params.rs_readers);
|
||||
if (!acquire_reader_st.ok()) {
|
||||
LOG(WARNING) << "fail to init reader.res=" << acquire_reader_st;
|
||||
std::stringstream ss;
|
||||
ss << "failed to initialize storage reader. tablet=" << _tablet->full_name()
|
||||
<< ", res=" << acquire_reader_st
|
||||
<< ", backend=" << BackendOptions::get_localhost();
|
||||
return Status::InternalError(ss.str());
|
||||
// acquire tablet rowset readers at the beginning of the scan node
|
||||
// to prevent this case: when there are lots of olap scanners to run for example 10000
|
||||
// the rowsets maybe compacted when the last olap scanner starts
|
||||
Version rd_version(0, _version);
|
||||
Status acquire_reader_st =
|
||||
_tablet->capture_rs_readers(rd_version, &_tablet_reader_params.rs_readers);
|
||||
if (!acquire_reader_st.ok()) {
|
||||
LOG(WARNING) << "fail to init reader.res=" << acquire_reader_st;
|
||||
std::stringstream ss;
|
||||
ss << "failed to initialize storage reader. tablet=" << _tablet->full_name()
|
||||
<< ", res=" << acquire_reader_st
|
||||
<< ", backend=" << BackendOptions::get_localhost();
|
||||
return Status::InternalError(ss.str());
|
||||
}
|
||||
} else {
|
||||
_tablet_reader_params.rs_readers = rs_readers;
|
||||
_tablet_reader_params.rs_readers_segment_offsets = rs_reader_seg_offsets;
|
||||
}
|
||||
|
||||
// Initialize tablet_reader_params
|
||||
|
||||
@ -45,7 +45,9 @@ public:
|
||||
VExprContext** vconjunct_ctx_ptr, const std::vector<TCondition>& filters,
|
||||
const FilterPredicates& filter_predicates,
|
||||
const std::vector<FunctionFilter>& function_filters,
|
||||
VExprContext** common_vexpr_ctxs_pushdown);
|
||||
VExprContext** common_vexpr_ctxs_pushdown,
|
||||
const std::vector<RowsetReaderSharedPtr>& rs_readers = {},
|
||||
const std::vector<std::pair<int, int>>& rs_reader_seg_offsets = {});
|
||||
|
||||
const std::string& scan_disk() const { return _tablet->data_dir()->path(); }
|
||||
|
||||
|
||||
@ -84,7 +84,6 @@ public:
|
||||
|
||||
void set_max_queue_size(int max_queue_size) override {
|
||||
for (int i = 0; i < max_queue_size; ++i) {
|
||||
_blocks_queue_empty.emplace_back(true);
|
||||
_queue_mutexs.emplace_back(new std::mutex);
|
||||
_blocks_queues.emplace_back(std::list<vectorized::BlockUPtr>());
|
||||
}
|
||||
@ -92,7 +91,6 @@ public:
|
||||
|
||||
private:
|
||||
int _next_queue_to_feed = 0;
|
||||
std::vector<bool> _blocks_queue_empty;
|
||||
std::vector<std::unique_ptr<std::mutex>> _queue_mutexs;
|
||||
std::vector<std::list<vectorized::BlockUPtr>> _blocks_queues;
|
||||
};
|
||||
|
||||
@ -75,10 +75,17 @@ Status BlockReader::_init_collect_iter(const ReaderParams& read_params) {
|
||||
|
||||
_reader_context.push_down_agg_type_opt = read_params.push_down_agg_type_opt;
|
||||
std::vector<RowsetReaderSharedPtr> valid_rs_readers;
|
||||
for (auto& rs_reader : read_params.rs_readers) {
|
||||
DCHECK(read_params.rs_readers_segment_offsets.empty() ||
|
||||
read_params.rs_readers_segment_offsets.size() == read_params.rs_readers.size());
|
||||
|
||||
bool is_empty = read_params.rs_readers_segment_offsets.empty();
|
||||
for (int i = 0; i < read_params.rs_readers.size(); ++i) {
|
||||
auto& rs_reader = read_params.rs_readers[i];
|
||||
// _vcollect_iter.topn_next() will init rs_reader by itself
|
||||
if (!_vcollect_iter.use_topn_next()) {
|
||||
RETURN_NOT_OK(rs_reader->init(&_reader_context));
|
||||
RETURN_NOT_OK(rs_reader->init(
|
||||
&_reader_context,
|
||||
is_empty ? std::pair {0, 0} : read_params.rs_readers_segment_offsets[i]));
|
||||
}
|
||||
Status res = _vcollect_iter.add_child(rs_reader);
|
||||
if (!res.ok() && !res.is<END_OF_FILE>()) {
|
||||
|
||||
@ -53,7 +53,8 @@ Status VerticalBlockReader::_get_segment_iterators(const ReaderParams& read_para
|
||||
// segment iterator will be inited here
|
||||
// In vertical compaction, every group will load segment so we should cache
|
||||
// segment to avoid tot many s3 head request
|
||||
RETURN_NOT_OK(rs_reader->get_segment_iterators(&_reader_context, segment_iters, true));
|
||||
RETURN_NOT_OK(
|
||||
rs_reader->get_segment_iterators(&_reader_context, segment_iters, {0, 0}, true));
|
||||
// if segments overlapping, all segment iterator should be inited in
|
||||
// heap merge iterator. If segments are none overlapping, only first segment of this
|
||||
// rowset will be inited and push to heap, other segment will be inited later when current
|
||||
|
||||
@ -64,7 +64,7 @@ public class AuditEvent {
|
||||
public int errorCode = 0;
|
||||
@AuditField(value = "ErrorMessage")
|
||||
public String errorMessage = "";
|
||||
@AuditField(value = "Time")
|
||||
@AuditField(value = "Time(ms)")
|
||||
public long queryTime = -1;
|
||||
@AuditField(value = "ScanBytes")
|
||||
public long scanBytes = -1;
|
||||
|
||||
Reference in New Issue
Block a user