[refactor](exceptionsafe) make scanner and scancontext exception safe (#19057)

This commit is contained in:
yiguolei
2023-04-27 09:23:01 +08:00
committed by GitHub
parent 1afa7c786f
commit a262f42a28
26 changed files with 99 additions and 76 deletions

View File

@ -137,7 +137,7 @@ Status NewEsScanNode::_process_conjuncts() {
return Status::OK();
}
Status NewEsScanNode::_init_scanners(std::list<VScanner*>* scanners) {
Status NewEsScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
if (_scan_ranges.empty()) {
_eos = true;
return Status::OK();
@ -163,13 +163,12 @@ Status NewEsScanNode::_init_scanners(std::list<VScanner*>* scanners) {
properties[ESScanReader::KEY_QUERY] = ESScrollQueryBuilder::build(
properties, _column_names, _docvalue_context, &doc_value_mode);
NewEsScanner* scanner =
new NewEsScanner(_state, this, _limit_per_scanner, _tuple_id, properties,
_docvalue_context, doc_value_mode, _state->runtime_profile());
std::shared_ptr<NewEsScanner> scanner = NewEsScanner::create_shared(
_state, this, _limit_per_scanner, _tuple_id, properties, _docvalue_context,
doc_value_mode, _state->runtime_profile());
_scanner_pool.add(scanner);
RETURN_IF_ERROR(scanner->prepare(_state, _vconjunct_ctx_ptr.get()));
scanners->push_back(static_cast<VScanner*>(scanner));
scanners->push_back(scanner);
}
return Status::OK();
}

View File

@ -60,7 +60,7 @@ public:
protected:
Status _init_profile() override;
Status _process_conjuncts() override;
Status _init_scanners(std::list<VScanner*>* scanners) override;
Status _init_scanners(std::list<VScannerSPtr>* scanners) override;
private:
TupleId _tuple_id;

View File

@ -24,6 +24,7 @@
#include <string>
#include <vector>
#include "common/factory_creator.h"
#include "common/global_types.h"
#include "common/status.h"
#include "exec/es/es_scan_reader.h"
@ -47,6 +48,8 @@ namespace doris::vectorized {
class NewEsScanNode;
class NewEsScanner : public VScanner {
ENABLE_FACTORY_CREATOR(NewEsScanner);
public:
NewEsScanner(RuntimeState* state, NewEsScanNode* parent, int64_t limit, TupleId tuple_id,
const std::map<std::string, std::string>& properties,

View File

@ -96,7 +96,7 @@ Status NewFileScanNode::_process_conjuncts() {
return Status::OK();
}
Status NewFileScanNode::_init_scanners(std::list<VScanner*>* scanners) {
Status NewFileScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
if (_scan_ranges.empty()) {
_eos = true;
return Status::OK();
@ -107,14 +107,13 @@ Status NewFileScanNode::_init_scanners(std::list<VScanner*>* scanners) {
std::min<size_t>(config::doris_scanner_thread_pool_thread_num, _scan_ranges.size());
_kv_cache.reset(new ShardedKVCache(shard_num));
for (auto& scan_range : _scan_ranges) {
VScanner* scanner = new VFileScanner(_state, this, _limit_per_scanner,
scan_range.scan_range.ext_scan_range.file_scan_range,
runtime_profile(), _kv_cache.get());
_scanner_pool.add(scanner);
RETURN_IF_ERROR(((VFileScanner*)scanner)
->prepare(_vconjunct_ctx_ptr.get(), &_colname_to_value_range,
&_colname_to_slot_id));
scanners->push_back(scanner);
std::unique_ptr<VFileScanner> scanner =
VFileScanner::create_unique(_state, this, _limit_per_scanner,
scan_range.scan_range.ext_scan_range.file_scan_range,
runtime_profile(), _kv_cache.get());
RETURN_IF_ERROR(scanner->prepare(_vconjunct_ctx_ptr.get(), &_colname_to_value_range,
&_colname_to_slot_id));
scanners->push_back(std::move(scanner));
}
return Status::OK();

View File

@ -52,7 +52,7 @@ public:
protected:
Status _init_profile() override;
Status _process_conjuncts() override;
Status _init_scanners(std::list<VScanner*>* scanners) override;
Status _init_scanners(std::list<VScannerSPtr>* scanners) override;
private:
std::vector<TScanRangeParams> _scan_ranges;

View File

@ -61,16 +61,15 @@ Status NewJdbcScanNode::_init_profile() {
return Status::OK();
}
Status NewJdbcScanNode::_init_scanners(std::list<VScanner*>* scanners) {
Status NewJdbcScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
if (_eos == true) {
return Status::OK();
}
NewJdbcScanner* scanner =
new NewJdbcScanner(_state, this, _limit_per_scanner, _tuple_id, _query_string,
_table_type, _state->runtime_profile());
_scanner_pool.add(scanner);
std::unique_ptr<NewJdbcScanner> scanner =
NewJdbcScanner::create_unique(_state, this, _limit_per_scanner, _tuple_id,
_query_string, _table_type, _state->runtime_profile());
RETURN_IF_ERROR(scanner->prepare(_state, _vconjunct_ctx_ptr.get()));
scanners->push_back(static_cast<VScanner*>(scanner));
scanners->push_back(std::move(scanner));
return Status::OK();
}
} // namespace doris::vectorized

View File

@ -44,7 +44,7 @@ public:
protected:
Status _init_profile() override;
Status _init_scanners(std::list<VScanner*>* scanners) override;
Status _init_scanners(std::list<VScannerSPtr>* scanners) override;
private:
std::string _table_name;

View File

@ -23,6 +23,7 @@
#include <memory>
#include <string>
#include "common/factory_creator.h"
#include "common/global_types.h"
#include "common/status.h"
#include "util/runtime_profile.h"
@ -39,6 +40,8 @@ class NewJdbcScanNode;
class VExprContext;
class NewJdbcScanner : public VScanner {
ENABLE_FACTORY_CREATOR(NewJdbcScanner);
public:
friend class JdbcConnector;

View File

@ -61,15 +61,14 @@ Status NewOdbcScanNode::_init_profile() {
return Status::OK();
}
Status NewOdbcScanNode::_init_scanners(std::list<VScanner*>* scanners) {
Status NewOdbcScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
if (_eos == true) {
return Status::OK();
}
NewOdbcScanner* scanner = new NewOdbcScanner(_state, this, _limit_per_scanner, _odbc_scan_node,
_state->runtime_profile());
_scanner_pool.add(scanner);
std::shared_ptr<NewOdbcScanner> scanner = NewOdbcScanner::create_shared(
_state, this, _limit_per_scanner, _odbc_scan_node, _state->runtime_profile());
RETURN_IF_ERROR(scanner->prepare(_state, _vconjunct_ctx_ptr.get()));
scanners->push_back(static_cast<VScanner*>(scanner));
scanners->push_back(scanner);
return Status::OK();
}
} // namespace doris::vectorized

View File

@ -50,7 +50,7 @@ public:
protected:
Status _init_profile() override;
Status _init_scanners(std::list<VScanner*>* scanners) override;
Status _init_scanners(std::list<VScannerSPtr>* scanners) override;
private:
std::string _table_name;

View File

@ -22,6 +22,7 @@
#include <memory>
#include <string>
#include "common/factory_creator.h"
#include "common/global_types.h"
#include "common/status.h"
#include "exec/odbc_connector.h"
@ -43,6 +44,8 @@ class VExprContext;
namespace doris::vectorized {
class NewOdbcScanner : public VScanner {
ENABLE_FACTORY_CREATOR(NewOdbcScanner);
public:
NewOdbcScanner(RuntimeState* state, NewOdbcScanNode* parent, int64_t limit,
const TOdbcScanNode& odbc_scan_node, RuntimeProfile* profile);

View File

@ -423,7 +423,7 @@ std::string NewOlapScanNode::get_name() {
return fmt::format("VNewOlapScanNode({0})", _olap_scan_node.table_name);
}
Status NewOlapScanNode::_init_scanners(std::list<VScanner*>* scanners) {
Status NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
if (_scan_ranges.empty()) {
_eos = true;
return Status::OK();
@ -506,16 +506,13 @@ Status NewOlapScanNode::_init_scanners(std::list<VScanner*>* scanners) {
const std::vector<OlapScanRange*>& key_ranges,
const std::vector<RowsetReaderSharedPtr>& rs_readers,
const std::vector<std::pair<int, int>>& rs_reader_seg_offsets) {
NewOlapScanner* scanner = new NewOlapScanner(_state, this, _limit_per_scanner,
_olap_scan_node.is_preaggregation, scan_range,
key_ranges, rs_readers, rs_reader_seg_offsets,
_need_agg_finalize, _scanner_profile.get());
std::shared_ptr<NewOlapScanner> scanner = NewOlapScanner::create_shared(
_state, this, _limit_per_scanner, _olap_scan_node.is_preaggregation, scan_range,
key_ranges, rs_readers, rs_reader_seg_offsets, _need_agg_finalize,
_scanner_profile.get());
scanner->set_compound_filters(_compound_filters);
// add scanner to pool before doing prepare.
// so that scanner can be automatically deconstructed if prepare failed.
_scanner_pool.add(scanner);
scanners->push_back((VScanner*)scanner);
scanners->push_back(scanner);
return Status::OK();
};
if (is_duplicate_key) {

View File

@ -86,7 +86,7 @@ protected:
bool _should_push_down_common_expr() override;
Status _init_scanners(std::list<VScanner*>* scanners) override;
Status _init_scanners(std::list<VScannerSPtr>* scanners) override;
private:
Status _build_key_ranges_and_filters();

View File

@ -26,6 +26,7 @@
#include <utility>
#include <vector>
#include "common/factory_creator.h"
#include "common/status.h"
#include "olap/data_dir.h"
#include "olap/reader.h"
@ -49,6 +50,8 @@ struct FilterPredicates;
class Block;
class NewOlapScanner : public VScanner {
ENABLE_FACTORY_CREATOR(NewOlapScanner);
public:
NewOlapScanner(RuntimeState* state, NewOlapScanNode* parent, int64_t limit, bool aggregation,
const TPaloScanRange& scan_range, const std::vector<OlapScanRange*>& key_ranges,

View File

@ -24,11 +24,13 @@ namespace doris {
namespace pipeline {
class PipScannerContext : public vectorized::ScannerContext {
ENABLE_FACTORY_CREATOR(PipScannerContext);
public:
PipScannerContext(RuntimeState* state, vectorized::VScanNode* parent,
const TupleDescriptor* input_tuple_desc,
const TupleDescriptor* output_tuple_desc,
const std::list<vectorized::VScanner*>& scanners, int64_t limit,
const std::list<vectorized::VScannerSPtr>& scanners, int64_t limit,
int64_t max_bytes_in_blocks_queue, const std::vector<int>& col_distribute_ids)
: vectorized::ScannerContext(state, parent, input_tuple_desc, output_tuple_desc,
scanners, limit, max_bytes_in_blocks_queue),

View File

@ -44,7 +44,7 @@ namespace doris::vectorized {
ScannerContext::ScannerContext(doris::RuntimeState* state_, doris::vectorized::VScanNode* parent,
const doris::TupleDescriptor* input_tuple_desc,
const doris::TupleDescriptor* output_tuple_desc,
const std::list<VScanner*>& scanners_, int64_t limit_,
const std::list<VScannerSPtr>& scanners_, int64_t limit_,
int64_t max_bytes_in_blocks_queue_)
: _state(state_),
_parent(parent),
@ -242,7 +242,7 @@ Status ScannerContext::_close_and_clear_scanners(VScanNode* node, RuntimeState*
scanner_rows_read << PrettyPrinter::print(finished_scanner_rows, TUnit::UNIT) << ", ";
}
// Only unfinished scanners here
for (auto scanner : _scanners) {
for (auto& scanner : _scanners) {
// Scanners are in ObjPool in ScanNode,
// so no need to delete them here.
// Add per scanner running time before close them
@ -257,7 +257,7 @@ Status ScannerContext::_close_and_clear_scanners(VScanNode* node, RuntimeState*
node->_scanner_profile->add_info_string("PerScannerRowsRead", scanner_rows_read.str());
}
// Only unfinished scanners here
for (auto scanner : _scanners) {
for (auto& scanner : _scanners) {
scanner->close(state);
// Scanners are in ObjPool in ScanNode,
// so no need to delete them here.
@ -314,7 +314,7 @@ void ScannerContext::reschedule_scanner_ctx() {
}
}
void ScannerContext::push_back_scanner_and_reschedule(VScanner* scanner) {
void ScannerContext::push_back_scanner_and_reschedule(VScannerSPtr scanner) {
{
std::unique_lock l(_scanners_lock);
_scanners.push_front(scanner);
@ -344,7 +344,7 @@ void ScannerContext::push_back_scanner_and_reschedule(VScanner* scanner) {
_ctx_finish_cv.notify_one();
}
void ScannerContext::get_next_batch_of_scanners(std::list<VScanner*>* current_run) {
void ScannerContext::get_next_batch_of_scanners(std::list<VScannerSPtr>* current_run) {
// 1. Calculate how many scanners should be scheduled at this run.
int thread_slot_num = 0;
{
@ -364,7 +364,7 @@ void ScannerContext::get_next_batch_of_scanners(std::list<VScanner*>* current_ru
{
std::unique_lock l(_scanners_lock);
for (int i = 0; i < thread_slot_num && !_scanners.empty();) {
auto scanner = _scanners.front();
VScannerSPtr scanner = _scanners.front();
_scanners.pop_front();
if (scanner->need_to_close()) {
_finished_scanner_runtime.push_back(scanner->get_time_cost_ns());

View File

@ -27,10 +27,12 @@
#include <string>
#include <vector>
#include "common/factory_creator.h"
#include "common/status.h"
#include "util/lock.h"
#include "util/runtime_profile.h"
#include "vec/core/block.h"
#include "vec/exec/scan/vscanner.h"
namespace doris {
@ -53,10 +55,13 @@ class ScannerScheduler;
// ScannerScheduler schedules a ScannerContext at a time,
// and submits the Scanners to the scanner thread pool for data scanning.
class ScannerContext {
ENABLE_FACTORY_CREATOR(ScannerContext);
public:
ScannerContext(RuntimeState* state_, VScanNode* parent, const TupleDescriptor* input_tuple_desc,
const TupleDescriptor* output_tuple_desc, const std::list<VScanner*>& scanners_,
int64_t limit_, int64_t max_bytes_in_blocks_queue_);
const TupleDescriptor* output_tuple_desc,
const std::list<VScannerSPtr>& scanners_, int64_t limit_,
int64_t max_bytes_in_blocks_queue_);
virtual ~ScannerContext() = default;
Status init();
@ -74,7 +79,7 @@ public:
// When a scanner complete a scan, this method will be called
// to return the scanner to the list for next scheduling.
void push_back_scanner_and_reschedule(VScanner* scanner);
void push_back_scanner_and_reschedule(VScannerSPtr scanner);
bool set_status_on_error(const Status& status);
@ -110,7 +115,7 @@ public:
int get_num_scheduling_ctx() const { return _num_scheduling_ctx; }
void get_next_batch_of_scanners(std::list<VScanner*>* current_run);
void get_next_batch_of_scanners(std::list<VScannerSPtr>* current_run);
void clear_and_join(VScanNode* node, RuntimeState* state);
@ -223,7 +228,7 @@ protected:
// and then if the scanner is not finished, will be pushed back to this list.
// Not need to protect by lock, because only one scheduler thread will access to it.
doris::Mutex _scanners_lock;
std::list<VScanner*> _scanners;
std::list<VScannerSPtr> _scanners;
std::vector<int64_t> _finished_scanner_runtime;
std::vector<int64_t> _finished_scanner_rows_read;

View File

@ -159,7 +159,7 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* ctx) {
return;
}
std::list<VScanner*> this_run;
std::list<VScannerSPtr> this_run;
ctx->get_next_batch_of_scanners(&this_run);
if (this_run.empty()) {
// There will be 2 cases when this_run is empty:
@ -260,7 +260,7 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* ctx) {
}
void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext* ctx,
VScanner* scanner) {
VScannerSPtr scanner) {
auto tracker_config = [&] {
SCOPED_ATTACH_TASK(scanner->runtime_state());
Thread::set_self_name("_scanner_scan");
@ -334,8 +334,8 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext
// Because FE file cache for external table may out of date.
// So, NOT_FOUND for VFileScanner is not a fail case.
// Will remove this after file reader refactor.
if (!status.ok() && (typeid(*scanner) != typeid(doris::vectorized::VFileScanner) ||
(typeid(*scanner) == typeid(doris::vectorized::VFileScanner) &&
if (!status.ok() && (scanner->get_name() != doris::vectorized::VFileScanner::NAME ||
(scanner->get_name() == doris::vectorized::VFileScanner::NAME &&
!status.is<ErrorCode::NOT_FOUND>()))) {
LOG(WARNING) << "Scan thread read VScanner failed: " << status.to_string();
break;

View File

@ -22,6 +22,7 @@
#include "common/status.h"
#include "util/threadpool.h"
#include "vec/exec/scan/vscanner.h"
namespace doris {
class ExecEnv;
@ -73,7 +74,7 @@ private:
// schedule scanners in a certain ScannerContext
void _schedule_scanners(ScannerContext* ctx);
// execution thread function
void _scanner_scan(ScannerScheduler* scheduler, ScannerContext* ctx, VScanner* scanner);
void _scanner_scan(ScannerScheduler* scheduler, ScannerContext* ctx, VScannerSPtr scanner);
private:
// Scheduling queue number.

View File

@ -26,6 +26,7 @@
#include <unordered_set>
#include <vector>
#include "common/factory_creator.h"
#include "common/global_types.h"
#include "common/status.h"
#include "exec/olap_common.h"
@ -57,7 +58,11 @@ namespace doris::vectorized {
class NewFileScanNode;
class VFileScanner : public VScanner {
ENABLE_FACTORY_CREATOR(VFileScanner);
public:
static constexpr const char* NAME = "VFileScanner";
VFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
const TFileScanRange& scan_range, RuntimeProfile* profile,
ShardedKVCache* kv_cache);
@ -66,11 +71,12 @@ public:
Status close(RuntimeState* state) override;
public:
Status prepare(VExprContext** vconjunct_ctx_ptr,
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
const std::unordered_map<std::string, int>* colname_to_slot_id);
std::string get_name() override { return VFileScanner::NAME; }
protected:
Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) override;

View File

@ -58,16 +58,15 @@ Status VMetaScanNode::_init_profile() {
return Status::OK();
}
Status VMetaScanNode::_init_scanners(std::list<VScanner*>* scanners) {
Status VMetaScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
if (_eos == true) {
return Status::OK();
}
for (auto& scan_range : _scan_ranges) {
VMetaScanner* scanner = new VMetaScanner(_state, this, _tuple_id, scan_range,
_limit_per_scanner, runtime_profile());
_scanner_pool.add(scanner);
std::shared_ptr<VMetaScanner> scanner = VMetaScanner::create_shared(
_state, this, _tuple_id, scan_range, _limit_per_scanner, runtime_profile());
RETURN_IF_ERROR(scanner->prepare(_state, _vconjunct_ctx_ptr.get()));
scanners->push_back(static_cast<VScanner*>(scanner));
scanners->push_back(scanner);
}
return Status::OK();
}

View File

@ -51,7 +51,7 @@ public:
private:
Status _init_profile() override;
Status _init_scanners(std::list<VScanner*>* scanners) override;
Status _init_scanners(std::list<VScannerSPtr>* scanners) override;
Status _process_conjuncts() override;
TupleId _tuple_id;

View File

@ -22,6 +22,7 @@
#include <vector>
#include "common/factory_creator.h"
#include "common/global_types.h"
#include "common/status.h"
#include "vec/data_types/data_type.h"
@ -46,6 +47,8 @@ class VMetaScanNode;
namespace doris::vectorized {
class VMetaScanner : public VScanner {
ENABLE_FACTORY_CREATOR(VMetaScanner);
public:
VMetaScanner(RuntimeState* state, VMetaScanNode* parent, int64_t tuple_id,
const TScanRangeParams& scan_range, int64_t limit, RuntimeProfile* profile);

View File

@ -295,15 +295,15 @@ Status VScanNode::_init_profile() {
return Status::OK();
}
Status VScanNode::_start_scanners(const std::list<VScanner*>& scanners) {
Status VScanNode::_start_scanners(const std::list<VScannerSPtr>& scanners) {
if (_is_pipeline_scan) {
_scanner_ctx.reset(new pipeline::PipScannerContext(
_scanner_ctx = pipeline::PipScannerContext::create_shared(
_state, this, _input_tuple_desc, _output_tuple_desc, scanners, limit(),
_state->query_options().mem_limit / 20, _col_distribute_ids));
_state->query_options().mem_limit / 20, _col_distribute_ids);
} else {
_scanner_ctx.reset(new ScannerContext(_state, this, _input_tuple_desc, _output_tuple_desc,
scanners, limit(),
_state->query_options().mem_limit / 20));
_scanner_ctx = ScannerContext::create_shared(_state, this, _input_tuple_desc,
_output_tuple_desc, scanners, limit(),
_state->query_options().mem_limit / 20);
}
RETURN_IF_ERROR(_scanner_ctx->init());
return Status::OK();
@ -470,7 +470,6 @@ void VScanNode::release_resource(RuntimeState* state) {
if (_common_vexpr_ctxs_pushdown) {
(*_common_vexpr_ctxs_pushdown)->close(state);
}
_scanner_pool.clear();
ExecNode::release_resource(state);
}
@ -1408,7 +1407,7 @@ VScanNode::PushDownType VScanNode::_should_push_down_in_predicate(VInPredicate*
}
Status VScanNode::_prepare_scanners() {
std::list<VScanner*> scanners;
std::list<VScannerSPtr> scanners;
RETURN_IF_ERROR(_init_scanners(&scanners));
if (scanners.empty()) {
_eos = true;

View File

@ -44,6 +44,7 @@
#include "util/lock.h"
#include "util/runtime_profile.h"
#include "vec/exec/scan/scanner_context.h"
#include "vec/exec/scan/vscanner.h"
#include "vec/runtime/shared_scanner_controller.h"
namespace doris {
@ -197,7 +198,7 @@ protected:
// predicate conditions, and scheduling strategy.
// So this method needs to be implemented separately by the subclass of ScanNode.
// Finally, a set of scanners that have been prepared are returned.
virtual Status _init_scanners(std::list<VScanner*>* scanners) { return Status::OK(); }
virtual Status _init_scanners(std::list<VScannerSPtr>* scanners) { return Status::OK(); }
// Different data sources can implement the following 3 methods to determine whether a predicate
// can be pushed down to the data source.
@ -276,8 +277,6 @@ protected:
// Each scan node will generates a ScannerContext to manage all Scanners.
// See comments of ScannerContext for more details
std::shared_ptr<ScannerContext> _scanner_ctx;
// Save all scanner objects.
ObjectPool _scanner_pool;
// indicate this scan node has no more data to return
bool _eos = false;
@ -445,7 +444,7 @@ private:
const std::string& fn_name, int slot_ref_child = -1);
// Submit the scanner to the thread pool and start execution
Status _start_scanners(const std::list<VScanner*>& scanners);
Status _start_scanners(const std::list<VScannerSPtr>& scanners);
};
} // namespace doris::vectorized

View File

@ -64,6 +64,8 @@ public:
virtual Status close(RuntimeState* state);
virtual std::string get_name() { return ""; }
protected:
// Subclass should implement this to return data.
virtual Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) = 0;
@ -204,4 +206,6 @@ protected:
int64_t _per_scanner_timer = 0;
};
using VScannerSPtr = std::shared_ptr<VScanner>;
} // namespace doris::vectorized