[Improvement](schema scan) Use async scanner for schema scanners (#38… (#38666)
…403)
This commit is contained in:
@ -50,7 +50,10 @@
|
||||
#include "exec/schema_scanner/schema_workload_groups_scanner.h"
|
||||
#include "exec/schema_scanner/schema_workload_sched_policy_scanner.h"
|
||||
#include "olap/hll.h"
|
||||
#include "pipeline/pipeline_x/dependency.h"
|
||||
#include "runtime/define_primitive_type.h"
|
||||
#include "runtime/fragment_mgr.h"
|
||||
#include "runtime/types.h"
|
||||
#include "util/string_util.h"
|
||||
#include "util/types.h"
|
||||
#include "vec/columns/column.h"
|
||||
@ -64,6 +67,7 @@
|
||||
#include "vec/core/column_with_type_and_name.h"
|
||||
#include "vec/core/types.h"
|
||||
#include "vec/data_types/data_type.h"
|
||||
#include "vec/data_types/data_type_factory.hpp"
|
||||
|
||||
namespace doris {
|
||||
class ObjectPool;
|
||||
@ -84,7 +88,60 @@ Status SchemaScanner::start(RuntimeState* state) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SchemaScanner::get_next_block(vectorized::Block* block, bool* eos) {
|
||||
Status SchemaScanner::get_next_block(RuntimeState* state, vectorized::Block* block, bool* eos) {
|
||||
if (_data_block == nullptr) {
|
||||
return Status::InternalError("No data left!");
|
||||
}
|
||||
DCHECK(_async_thread_running == false);
|
||||
RETURN_IF_ERROR(_scanner_status.status());
|
||||
for (size_t i = 0; i < block->columns(); i++) {
|
||||
std::move(*block->get_by_position(i).column)
|
||||
.mutate()
|
||||
->insert_range_from(*_data_block->get_by_position(i).column, 0,
|
||||
_data_block->rows());
|
||||
}
|
||||
_data_block->clear_column_data();
|
||||
*eos = _eos;
|
||||
if (!*eos) {
|
||||
RETURN_IF_ERROR(get_next_block_async(state));
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SchemaScanner::get_next_block_async(RuntimeState* state) {
|
||||
_dependency->block();
|
||||
auto task_ctx = state->get_task_execution_context();
|
||||
RETURN_IF_ERROR(ExecEnv::GetInstance()->fragment_mgr()->get_thread_pool()->submit_func(
|
||||
[this, task_ctx, state]() {
|
||||
DCHECK(_async_thread_running == false);
|
||||
auto task_lock = task_ctx.lock();
|
||||
if (task_lock == nullptr) {
|
||||
_scanner_status.update(Status::InternalError("Task context not exists!"));
|
||||
return;
|
||||
}
|
||||
SCOPED_ATTACH_TASK(state);
|
||||
_dependency->block();
|
||||
_async_thread_running = true;
|
||||
_finish_dependency->block();
|
||||
if (!_opened) {
|
||||
_data_block = vectorized::Block::create_unique();
|
||||
_init_block(_data_block.get());
|
||||
_scanner_status.update(start(state));
|
||||
_opened = true;
|
||||
}
|
||||
bool eos = false;
|
||||
_scanner_status.update(get_next_block_internal(_data_block.get(), &eos));
|
||||
_eos = eos;
|
||||
_async_thread_running = false;
|
||||
_dependency->set_ready();
|
||||
if (eos) {
|
||||
_finish_dependency->set_ready();
|
||||
}
|
||||
}));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SchemaScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
|
||||
if (!_is_init) {
|
||||
return Status::InternalError("used before initialized.");
|
||||
}
|
||||
@ -173,6 +230,16 @@ std::unique_ptr<SchemaScanner> SchemaScanner::create(TSchemaTableType::type type
|
||||
}
|
||||
}
|
||||
|
||||
void SchemaScanner::_init_block(vectorized::Block* src_block) {
|
||||
const std::vector<SchemaScanner::ColumnDesc>& columns_desc(get_column_desc());
|
||||
for (int i = 0; i < columns_desc.size(); ++i) {
|
||||
TypeDescriptor descriptor(columns_desc[i].type);
|
||||
auto data_type = vectorized::DataTypeFactory::instance().create_data_type(descriptor, true);
|
||||
src_block->insert(vectorized::ColumnWithTypeAndName(data_type->create_column(), data_type,
|
||||
columns_desc[i].name));
|
||||
}
|
||||
}
|
||||
|
||||
Status SchemaScanner::fill_dest_column_for_range(vectorized::Block* block, size_t pos,
|
||||
const std::vector<void*>& datas) {
|
||||
const ColumnDesc& col_desc = _columns[pos];
|
||||
|
||||
@ -22,6 +22,7 @@
|
||||
#include <stddef.h>
|
||||
#include <stdint.h>
|
||||
|
||||
#include <condition_variable>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
@ -43,6 +44,10 @@ namespace vectorized {
|
||||
class Block;
|
||||
}
|
||||
|
||||
namespace pipeline {
|
||||
class Dependency;
|
||||
}
|
||||
|
||||
struct SchemaScannerCommonParam {
|
||||
SchemaScannerCommonParam()
|
||||
: db(nullptr),
|
||||
@ -94,15 +99,23 @@ public:
|
||||
|
||||
// init object need information, schema etc.
|
||||
virtual Status init(SchemaScannerParam* param, ObjectPool* pool);
|
||||
Status get_next_block(RuntimeState* state, vectorized::Block* block, bool* eos);
|
||||
// Start to work
|
||||
virtual Status start(RuntimeState* state);
|
||||
virtual Status get_next_block(vectorized::Block* block, bool* eos);
|
||||
virtual Status get_next_block_internal(vectorized::Block* block, bool* eos);
|
||||
const std::vector<ColumnDesc>& get_column_desc() const { return _columns; }
|
||||
// factory function
|
||||
static std::unique_ptr<SchemaScanner> create(TSchemaTableType::type type);
|
||||
TSchemaTableType::type type() const { return _schema_table_type; }
|
||||
void set_dependency(std::shared_ptr<pipeline::Dependency> dep,
|
||||
std::shared_ptr<pipeline::Dependency> fin_dep) {
|
||||
_dependency = dep;
|
||||
_finish_dependency = fin_dep;
|
||||
}
|
||||
Status get_next_block_async(RuntimeState* state);
|
||||
|
||||
protected:
|
||||
void _init_block(vectorized::Block* src_block);
|
||||
Status fill_dest_column_for_range(vectorized::Block* block, size_t pos,
|
||||
const std::vector<void*>& datas);
|
||||
|
||||
@ -125,6 +138,15 @@ protected:
|
||||
RuntimeProfile::Counter* _get_table_timer = nullptr;
|
||||
RuntimeProfile::Counter* _get_describe_timer = nullptr;
|
||||
RuntimeProfile::Counter* _fill_block_timer = nullptr;
|
||||
|
||||
std::shared_ptr<pipeline::Dependency> _dependency = nullptr;
|
||||
std::shared_ptr<pipeline::Dependency> _finish_dependency = nullptr;
|
||||
|
||||
std::unique_ptr<vectorized::Block> _data_block;
|
||||
AtomicStatus _scanner_status;
|
||||
std::atomic<bool> _eos = false;
|
||||
std::atomic<bool> _opened = false;
|
||||
std::atomic<bool> _async_thread_running = false;
|
||||
};
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -137,7 +137,7 @@ Status SchemaActiveQueriesScanner::_get_active_queries_block_from_fe() {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SchemaActiveQueriesScanner::get_next_block(vectorized::Block* block, bool* eos) {
|
||||
Status SchemaActiveQueriesScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
|
||||
if (!_is_init) {
|
||||
return Status::InternalError("Used before initialized.");
|
||||
}
|
||||
|
||||
@ -36,7 +36,7 @@ public:
|
||||
~SchemaActiveQueriesScanner() override;
|
||||
|
||||
Status start(RuntimeState* state) override;
|
||||
Status get_next_block(vectorized::Block* block, bool* eos) override;
|
||||
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;
|
||||
|
||||
static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns;
|
||||
|
||||
|
||||
@ -51,7 +51,8 @@ Status SchemaBackendActiveTasksScanner::start(RuntimeState* state) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SchemaBackendActiveTasksScanner::get_next_block(vectorized::Block* block, bool* eos) {
|
||||
Status SchemaBackendActiveTasksScanner::get_next_block_internal(vectorized::Block* block,
|
||||
bool* eos) {
|
||||
if (!_is_init) {
|
||||
return Status::InternalError("Used before initialized.");
|
||||
}
|
||||
|
||||
@ -36,7 +36,7 @@ public:
|
||||
~SchemaBackendActiveTasksScanner() override;
|
||||
|
||||
Status start(RuntimeState* state) override;
|
||||
Status get_next_block(vectorized::Block* block, bool* eos) override;
|
||||
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;
|
||||
|
||||
static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns;
|
||||
|
||||
|
||||
@ -48,7 +48,7 @@ SchemaCharsetsScanner::SchemaCharsetsScanner()
|
||||
|
||||
SchemaCharsetsScanner::~SchemaCharsetsScanner() {}
|
||||
|
||||
Status SchemaCharsetsScanner::get_next_block(vectorized::Block* block, bool* eos) {
|
||||
Status SchemaCharsetsScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
|
||||
if (!_is_init) {
|
||||
return Status::InternalError("call this before initial.");
|
||||
}
|
||||
|
||||
@ -36,7 +36,7 @@ public:
|
||||
SchemaCharsetsScanner();
|
||||
~SchemaCharsetsScanner() override;
|
||||
|
||||
Status get_next_block(vectorized::Block* block, bool* eos) override;
|
||||
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;
|
||||
|
||||
private:
|
||||
struct CharsetStruct {
|
||||
|
||||
@ -50,7 +50,7 @@ SchemaCollationsScanner::SchemaCollationsScanner()
|
||||
|
||||
SchemaCollationsScanner::~SchemaCollationsScanner() {}
|
||||
|
||||
Status SchemaCollationsScanner::get_next_block(vectorized::Block* block, bool* eos) {
|
||||
Status SchemaCollationsScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
|
||||
if (!_is_init) {
|
||||
return Status::InternalError("call this before initial.");
|
||||
}
|
||||
|
||||
@ -36,7 +36,7 @@ public:
|
||||
SchemaCollationsScanner();
|
||||
~SchemaCollationsScanner() override;
|
||||
|
||||
Status get_next_block(vectorized::Block* block, bool* eos) override;
|
||||
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;
|
||||
|
||||
private:
|
||||
struct CollationStruct {
|
||||
|
||||
@ -347,7 +347,7 @@ Status SchemaColumnsScanner::_get_new_table() {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SchemaColumnsScanner::get_next_block(vectorized::Block* block, bool* eos) {
|
||||
Status SchemaColumnsScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
|
||||
if (!_is_init) {
|
||||
return Status::InternalError("use this class before inited.");
|
||||
}
|
||||
|
||||
@ -38,7 +38,7 @@ public:
|
||||
SchemaColumnsScanner();
|
||||
~SchemaColumnsScanner() override;
|
||||
Status start(RuntimeState* state) override;
|
||||
Status get_next_block(vectorized::Block* block, bool* eos) override;
|
||||
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;
|
||||
|
||||
private:
|
||||
Status _get_new_table();
|
||||
|
||||
@ -40,7 +40,7 @@ Status SchemaDummyScanner::start(RuntimeState* state) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SchemaDummyScanner::get_next_block(vectorized::Block* block, bool* eos) {
|
||||
Status SchemaDummyScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
|
||||
*eos = true;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -33,7 +33,7 @@ public:
|
||||
SchemaDummyScanner();
|
||||
~SchemaDummyScanner() override;
|
||||
Status start(RuntimeState* state = nullptr) override;
|
||||
Status get_next_block(vectorized::Block* block, bool* eos) override;
|
||||
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;
|
||||
};
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -113,7 +113,7 @@ Status SchemaFilesScanner::start(RuntimeState* state) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SchemaFilesScanner::get_next_block(vectorized::Block* block, bool* eos) {
|
||||
Status SchemaFilesScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
|
||||
if (!_is_init) {
|
||||
return Status::InternalError("Used before initialized.");
|
||||
}
|
||||
|
||||
@ -38,7 +38,7 @@ public:
|
||||
~SchemaFilesScanner() override;
|
||||
|
||||
Status start(RuntimeState* state) override;
|
||||
Status get_next_block(vectorized::Block* block, bool* eos) override;
|
||||
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;
|
||||
|
||||
int _db_index;
|
||||
int _table_index;
|
||||
|
||||
@ -225,7 +225,7 @@ Status SchemaMetadataNameIdsScanner::_fill_block_impl(vectorized::Block* block)
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SchemaMetadataNameIdsScanner::get_next_block(vectorized::Block* block, bool* eos) {
|
||||
Status SchemaMetadataNameIdsScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
|
||||
if (!_is_init) {
|
||||
return Status::InternalError("Used before initialized.");
|
||||
}
|
||||
|
||||
@ -39,7 +39,7 @@ public:
|
||||
~SchemaMetadataNameIdsScanner() override;
|
||||
|
||||
Status start(RuntimeState* state) override;
|
||||
Status get_next_block(vectorized::Block* block, bool* eos) override;
|
||||
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;
|
||||
|
||||
private:
|
||||
Status _get_new_table();
|
||||
|
||||
@ -101,7 +101,7 @@ Status SchemaPartitionsScanner::start(RuntimeState* state) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SchemaPartitionsScanner::get_next_block(vectorized::Block* block, bool* eos) {
|
||||
Status SchemaPartitionsScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
|
||||
if (!_is_init) {
|
||||
return Status::InternalError("Used before initialized.");
|
||||
}
|
||||
|
||||
@ -38,7 +38,7 @@ public:
|
||||
~SchemaPartitionsScanner() override;
|
||||
|
||||
Status start(RuntimeState* state) override;
|
||||
Status get_next_block(vectorized::Block* block, bool* eos) override;
|
||||
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;
|
||||
|
||||
int _db_index;
|
||||
int _table_index;
|
||||
|
||||
@ -62,7 +62,7 @@ Status SchemaProcessListScanner::start(RuntimeState* state) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SchemaProcessListScanner::get_next_block(vectorized::Block* block, bool* eos) {
|
||||
Status SchemaProcessListScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
|
||||
if (!_is_init) {
|
||||
return Status::InternalError("call this before initial.");
|
||||
}
|
||||
|
||||
@ -40,7 +40,7 @@ public:
|
||||
~SchemaProcessListScanner() override;
|
||||
|
||||
Status start(RuntimeState* state) override;
|
||||
Status get_next_block(vectorized::Block* block, bool* eos) override;
|
||||
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;
|
||||
|
||||
static std::vector<SchemaScanner::ColumnDesc> _s_processlist_columns;
|
||||
|
||||
|
||||
@ -88,7 +88,7 @@ Status SchemaProfilingScanner::start(RuntimeState* state) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SchemaProfilingScanner::get_next_block(vectorized::Block* block, bool* eos) {
|
||||
Status SchemaProfilingScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
|
||||
if (!_is_init) {
|
||||
return Status::InternalError("Used before initialized.");
|
||||
}
|
||||
|
||||
@ -38,7 +38,7 @@ public:
|
||||
~SchemaProfilingScanner() override;
|
||||
|
||||
Status start(RuntimeState* state) override;
|
||||
Status get_next_block(vectorized::Block* block, bool* eos) override;
|
||||
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;
|
||||
|
||||
static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns;
|
||||
};
|
||||
|
||||
@ -141,7 +141,7 @@ Status SchemaRoutinesScanner::get_block_from_fe() {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SchemaRoutinesScanner::get_next_block(vectorized::Block* block, bool* eos) {
|
||||
Status SchemaRoutinesScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
|
||||
if (!_is_init) {
|
||||
return Status::InternalError("Used before initialized.");
|
||||
}
|
||||
|
||||
@ -36,7 +36,7 @@ public:
|
||||
~SchemaRoutinesScanner() override = default;
|
||||
|
||||
Status start(RuntimeState* state) override;
|
||||
Status get_next_block(vectorized::Block* block, bool* eos) override;
|
||||
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;
|
||||
|
||||
static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns;
|
||||
|
||||
|
||||
@ -92,7 +92,7 @@ Status SchemaRowsetsScanner::_get_all_rowsets() {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SchemaRowsetsScanner::get_next_block(vectorized::Block* block, bool* eos) {
|
||||
Status SchemaRowsetsScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
|
||||
if (!_is_init) {
|
||||
return Status::InternalError("Used before initialized.");
|
||||
}
|
||||
|
||||
@ -40,7 +40,7 @@ public:
|
||||
~SchemaRowsetsScanner() override = default;
|
||||
|
||||
Status start(RuntimeState* state) override;
|
||||
Status get_next_block(vectorized::Block* block, bool* eos) override;
|
||||
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;
|
||||
|
||||
private:
|
||||
Status _get_all_rowsets();
|
||||
|
||||
@ -82,7 +82,7 @@ Status SchemaSchemaPrivilegesScanner::_get_new_table() {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SchemaSchemaPrivilegesScanner::get_next_block(vectorized::Block* block, bool* eos) {
|
||||
Status SchemaSchemaPrivilegesScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
|
||||
if (!_is_init) {
|
||||
return Status::InternalError("Used before initialized.");
|
||||
}
|
||||
|
||||
@ -38,7 +38,7 @@ public:
|
||||
~SchemaSchemaPrivilegesScanner() override;
|
||||
|
||||
Status start(RuntimeState* state) override;
|
||||
Status get_next_block(vectorized::Block* block, bool* eos) override;
|
||||
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;
|
||||
|
||||
private:
|
||||
Status _get_new_table();
|
||||
|
||||
@ -81,7 +81,7 @@ Status SchemaSchemataScanner::start(RuntimeState* state) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SchemaSchemataScanner::get_next_block(vectorized::Block* block, bool* eos) {
|
||||
Status SchemaSchemataScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
|
||||
if (!_is_init) {
|
||||
return Status::InternalError("Used before Initialized.");
|
||||
}
|
||||
|
||||
@ -38,7 +38,7 @@ public:
|
||||
~SchemaSchemataScanner() override;
|
||||
|
||||
Status start(RuntimeState* state) override;
|
||||
Status get_next_block(vectorized::Block* block, bool* eos) override;
|
||||
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;
|
||||
|
||||
private:
|
||||
Status _fill_block_impl(vectorized::Block* block);
|
||||
|
||||
@ -84,7 +84,7 @@ Status SchemaTablePrivilegesScanner::_get_new_table() {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SchemaTablePrivilegesScanner::get_next_block(vectorized::Block* block, bool* eos) {
|
||||
Status SchemaTablePrivilegesScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
|
||||
if (!_is_init) {
|
||||
return Status::InternalError("Used before initialized.");
|
||||
}
|
||||
|
||||
@ -38,7 +38,7 @@ public:
|
||||
~SchemaTablePrivilegesScanner() override;
|
||||
|
||||
Status start(RuntimeState* state) override;
|
||||
Status get_next_block(vectorized::Block* block, bool* eos) override;
|
||||
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;
|
||||
|
||||
private:
|
||||
Status _get_new_table();
|
||||
|
||||
@ -342,7 +342,7 @@ Status SchemaTablesScanner::_fill_block_impl(vectorized::Block* block) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SchemaTablesScanner::get_next_block(vectorized::Block* block, bool* eos) {
|
||||
Status SchemaTablesScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
|
||||
if (!_is_init) {
|
||||
return Status::InternalError("Used before initialized.");
|
||||
}
|
||||
|
||||
@ -39,7 +39,7 @@ public:
|
||||
~SchemaTablesScanner() override;
|
||||
|
||||
Status start(RuntimeState* state) override;
|
||||
Status get_next_block(vectorized::Block* block, bool* eos) override;
|
||||
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;
|
||||
|
||||
private:
|
||||
Status _get_new_table();
|
||||
|
||||
@ -81,7 +81,7 @@ Status SchemaUserPrivilegesScanner::_get_new_table() {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SchemaUserPrivilegesScanner::get_next_block(vectorized::Block* block, bool* eos) {
|
||||
Status SchemaUserPrivilegesScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
|
||||
if (!_is_init) {
|
||||
return Status::InternalError("Used before initialized.");
|
||||
}
|
||||
|
||||
@ -38,7 +38,7 @@ public:
|
||||
~SchemaUserPrivilegesScanner() override;
|
||||
|
||||
Status start(RuntimeState* state) override;
|
||||
Status get_next_block(vectorized::Block* block, bool* eos) override;
|
||||
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;
|
||||
|
||||
private:
|
||||
Status _get_new_table();
|
||||
|
||||
@ -76,7 +76,7 @@ Status SchemaUserScanner::start(RuntimeState* state) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SchemaUserScanner::get_next_block(vectorized::Block* block, bool* eos) {
|
||||
Status SchemaUserScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
|
||||
if (!_is_init) {
|
||||
return Status::InternalError("call this before initial.");
|
||||
}
|
||||
|
||||
@ -40,7 +40,7 @@ public:
|
||||
~SchemaUserScanner() override;
|
||||
|
||||
Status start(RuntimeState* state) override;
|
||||
Status get_next_block(vectorized::Block* block, bool* eos) override;
|
||||
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;
|
||||
|
||||
static std::vector<SchemaScanner::ColumnDesc> _s_user_columns;
|
||||
|
||||
|
||||
@ -70,7 +70,7 @@ Status SchemaVariablesScanner::start(RuntimeState* state) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SchemaVariablesScanner::get_next_block(vectorized::Block* block, bool* eos) {
|
||||
Status SchemaVariablesScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
|
||||
if (!_is_init) {
|
||||
return Status::InternalError("call this before initial.");
|
||||
}
|
||||
|
||||
@ -40,7 +40,7 @@ public:
|
||||
~SchemaVariablesScanner() override;
|
||||
|
||||
Status start(RuntimeState* state) override;
|
||||
Status get_next_block(vectorized::Block* block, bool* eos) override;
|
||||
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;
|
||||
|
||||
private:
|
||||
struct VariableStruct {
|
||||
|
||||
@ -113,7 +113,7 @@ Status SchemaViewsScanner::_get_new_table() {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SchemaViewsScanner::get_next_block(vectorized::Block* block, bool* eos) {
|
||||
Status SchemaViewsScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
|
||||
if (!_is_init) {
|
||||
return Status::InternalError("Used before initialized.");
|
||||
}
|
||||
|
||||
@ -38,7 +38,7 @@ public:
|
||||
~SchemaViewsScanner() override;
|
||||
|
||||
Status start(RuntimeState* state) override;
|
||||
Status get_next_block(vectorized::Block* block, bool* eos) override;
|
||||
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;
|
||||
|
||||
private:
|
||||
Status _get_new_table();
|
||||
|
||||
@ -114,7 +114,7 @@ Status SchemaWorkloadGroupsScanner::_get_workload_groups_block_from_fe() {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SchemaWorkloadGroupsScanner::get_next_block(vectorized::Block* block, bool* eos) {
|
||||
Status SchemaWorkloadGroupsScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
|
||||
if (!_is_init) {
|
||||
return Status::InternalError("Used before initialized.");
|
||||
}
|
||||
|
||||
@ -36,7 +36,7 @@ public:
|
||||
~SchemaWorkloadGroupsScanner() override;
|
||||
|
||||
Status start(RuntimeState* state) override;
|
||||
Status get_next_block(vectorized::Block* block, bool* eos) override;
|
||||
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;
|
||||
|
||||
static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns;
|
||||
|
||||
|
||||
@ -106,7 +106,8 @@ Status SchemaWorkloadSchedulePolicyScanner::_get_workload_schedule_policy_block_
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SchemaWorkloadSchedulePolicyScanner::get_next_block(vectorized::Block* block, bool* eos) {
|
||||
Status SchemaWorkloadSchedulePolicyScanner::get_next_block_internal(vectorized::Block* block,
|
||||
bool* eos) {
|
||||
if (!_is_init) {
|
||||
return Status::InternalError("Used before initialized.");
|
||||
}
|
||||
|
||||
@ -36,7 +36,7 @@ public:
|
||||
~SchemaWorkloadSchedulePolicyScanner() override;
|
||||
|
||||
Status start(RuntimeState* state) override;
|
||||
Status get_next_block(vectorized::Block* block, bool* eos) override;
|
||||
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;
|
||||
|
||||
static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns;
|
||||
|
||||
|
||||
@ -61,6 +61,7 @@ Status SchemaScanLocalState::init(RuntimeState* state, LocalStateInfo& info) {
|
||||
// new one scanner
|
||||
_schema_scanner = SchemaScanner::create(schema_table->schema_table_type());
|
||||
|
||||
_schema_scanner->set_dependency(_data_dependency, _finish_dependency);
|
||||
if (nullptr == _schema_scanner) {
|
||||
return Status::InternalError("schema scanner get nullptr pointer.");
|
||||
}
|
||||
@ -72,7 +73,7 @@ Status SchemaScanLocalState::open(RuntimeState* state) {
|
||||
SCOPED_TIMER(exec_time_counter());
|
||||
SCOPED_TIMER(_open_timer);
|
||||
RETURN_IF_ERROR(PipelineXLocalState<>::open(state));
|
||||
return _schema_scanner->start(state);
|
||||
return _schema_scanner->get_next_block_async(state);
|
||||
}
|
||||
|
||||
SchemaScanOperatorX::SchemaScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id,
|
||||
@ -239,8 +240,12 @@ Status SchemaScanOperatorX::get_block(RuntimeState* state, vectorized::Block* bl
|
||||
while (true) {
|
||||
RETURN_IF_CANCELLED(state);
|
||||
|
||||
if (local_state._data_dependency->is_blocked_by() != nullptr) {
|
||||
break;
|
||||
}
|
||||
// get all slots from schema table.
|
||||
RETURN_IF_ERROR(local_state._schema_scanner->get_next_block(&src_block, &schema_eos));
|
||||
RETURN_IF_ERROR(
|
||||
local_state._schema_scanner->get_next_block(state, &src_block, &schema_eos));
|
||||
|
||||
if (schema_eos) {
|
||||
*eos = true;
|
||||
|
||||
@ -55,18 +55,30 @@ public:
|
||||
ENABLE_FACTORY_CREATOR(SchemaScanLocalState);
|
||||
|
||||
SchemaScanLocalState(RuntimeState* state, OperatorXBase* parent)
|
||||
: PipelineXLocalState<>(state, parent) {}
|
||||
: PipelineXLocalState<>(state, parent) {
|
||||
_finish_dependency =
|
||||
std::make_shared<Dependency>(parent->operator_id(), parent->node_id(),
|
||||
parent->get_name() + "_FINISH_DEPENDENCY", true);
|
||||
_data_dependency = std::make_shared<Dependency>(parent->operator_id(), parent->node_id(),
|
||||
parent->get_name() + "_DEPENDENCY", true);
|
||||
}
|
||||
~SchemaScanLocalState() override = default;
|
||||
|
||||
Status init(RuntimeState* state, LocalStateInfo& info) override;
|
||||
|
||||
Status open(RuntimeState* state) override;
|
||||
|
||||
Dependency* finishdependency() override { return _finish_dependency.get(); }
|
||||
std::vector<Dependency*> dependencies() const override { return {_data_dependency.get()}; }
|
||||
|
||||
private:
|
||||
friend class SchemaScanOperatorX;
|
||||
|
||||
SchemaScannerParam _scanner_param;
|
||||
std::unique_ptr<SchemaScanner> _schema_scanner;
|
||||
|
||||
std::shared_ptr<Dependency> _finish_dependency;
|
||||
std::shared_ptr<Dependency> _data_dependency;
|
||||
};
|
||||
|
||||
class SchemaScanOperatorX final : public OperatorX<SchemaScanLocalState> {
|
||||
|
||||
@ -89,20 +89,11 @@ class Dependency : public std::enable_shared_from_this<Dependency> {
|
||||
public:
|
||||
ENABLE_FACTORY_CREATOR(Dependency);
|
||||
Dependency(int id, int node_id, std::string name)
|
||||
: _id(id),
|
||||
_node_id(node_id),
|
||||
_name(std::move(name)),
|
||||
_is_write_dependency(false),
|
||||
_ready(false) {}
|
||||
: _id(id), _node_id(node_id), _name(std::move(name)), _ready(false) {}
|
||||
Dependency(int id, int node_id, std::string name, bool ready)
|
||||
: _id(id),
|
||||
_node_id(node_id),
|
||||
_name(std::move(name)),
|
||||
_is_write_dependency(true),
|
||||
_ready(ready) {}
|
||||
: _id(id), _node_id(node_id), _name(std::move(name)), _ready(ready) {}
|
||||
virtual ~Dependency() = default;
|
||||
|
||||
bool is_write_dependency() const { return _is_write_dependency; }
|
||||
[[nodiscard]] int id() const { return _id; }
|
||||
[[nodiscard]] virtual std::string name() const { return _name; }
|
||||
BasicSharedState* shared_state() { return _shared_state; }
|
||||
@ -119,12 +110,10 @@ public:
|
||||
// Notify downstream pipeline tasks this dependency is ready.
|
||||
void set_ready();
|
||||
void set_ready_to_read() {
|
||||
DCHECK(_is_write_dependency) << debug_string();
|
||||
DCHECK(_shared_state->source_deps.size() == 1) << debug_string();
|
||||
_shared_state->source_deps.front()->set_ready();
|
||||
}
|
||||
void set_block_to_read() {
|
||||
DCHECK(_is_write_dependency) << debug_string();
|
||||
DCHECK(_shared_state->source_deps.size() == 1) << debug_string();
|
||||
_shared_state->source_deps.front()->block();
|
||||
}
|
||||
@ -167,7 +156,6 @@ protected:
|
||||
const int _id;
|
||||
const int _node_id;
|
||||
const std::string _name;
|
||||
const bool _is_write_dependency;
|
||||
std::atomic<bool> _ready;
|
||||
|
||||
BasicSharedState* _shared_state = nullptr;
|
||||
|
||||
@ -149,8 +149,6 @@ Status PipelineXTask::_extract_dependencies() {
|
||||
{
|
||||
auto* local_state = _state->get_sink_local_state();
|
||||
write_dependencies = local_state->dependencies();
|
||||
DCHECK(std::all_of(write_dependencies.begin(), write_dependencies.end(),
|
||||
[](auto* dep) { return dep->is_write_dependency(); }));
|
||||
auto* fin_dep = local_state->finishdependency();
|
||||
if (fin_dep) {
|
||||
finish_dependencies.push_back(fin_dep);
|
||||
|
||||
@ -249,7 +249,7 @@ Status VSchemaScanNode::get_next(RuntimeState* state, vectorized::Block* block,
|
||||
RETURN_IF_CANCELLED(state);
|
||||
|
||||
// get all slots from schema table.
|
||||
RETURN_IF_ERROR(_schema_scanner->get_next_block(&src_block, &schema_eos));
|
||||
RETURN_IF_ERROR(_schema_scanner->get_next_block_internal(&src_block, &schema_eos));
|
||||
|
||||
if (schema_eos) {
|
||||
*eos = true;
|
||||
|
||||
Reference in New Issue
Block a user