[pipelineX](feature) Support schema scan operator (#24850)
This commit is contained in:
@ -42,8 +42,17 @@ namespace vectorized {
|
||||
class Block;
|
||||
}
|
||||
|
||||
// scanner parameter from frontend
|
||||
struct SchemaScannerParam {
|
||||
struct SchemaScannerCommonParam {
|
||||
SchemaScannerCommonParam()
|
||||
: db(nullptr),
|
||||
table(nullptr),
|
||||
wild(nullptr),
|
||||
user(nullptr),
|
||||
user_ip(nullptr),
|
||||
current_user_ident(nullptr),
|
||||
ip(nullptr),
|
||||
port(0),
|
||||
catalog(nullptr) {}
|
||||
const std::string* db;
|
||||
const std::string* table;
|
||||
const std::string* wild;
|
||||
@ -54,18 +63,14 @@ struct SchemaScannerParam {
|
||||
int32_t port; // frontend thrift port
|
||||
int64_t thread_id;
|
||||
const std::string* catalog;
|
||||
};
|
||||
|
||||
// scanner parameter from frontend
|
||||
struct SchemaScannerParam {
|
||||
std::shared_ptr<SchemaScannerCommonParam> common_param;
|
||||
std::unique_ptr<RuntimeProfile> profile;
|
||||
|
||||
SchemaScannerParam()
|
||||
: db(nullptr),
|
||||
table(nullptr),
|
||||
wild(nullptr),
|
||||
user(nullptr),
|
||||
user_ip(nullptr),
|
||||
current_user_ident(nullptr),
|
||||
ip(nullptr),
|
||||
port(0),
|
||||
catalog(nullptr) {}
|
||||
SchemaScannerParam() : common_param(new SchemaScannerCommonParam()) {}
|
||||
};
|
||||
|
||||
// virtual scanner for all schema table
|
||||
|
||||
@ -78,26 +78,26 @@ Status SchemaColumnsScanner::start(RuntimeState* state) {
|
||||
}
|
||||
// get all database
|
||||
TGetDbsParams db_params;
|
||||
if (nullptr != _param->db) {
|
||||
db_params.__set_pattern(*(_param->db));
|
||||
if (nullptr != _param->common_param->db) {
|
||||
db_params.__set_pattern(*(_param->common_param->db));
|
||||
}
|
||||
if (nullptr != _param->catalog) {
|
||||
db_params.__set_catalog(*(_param->catalog));
|
||||
if (nullptr != _param->common_param->catalog) {
|
||||
db_params.__set_catalog(*(_param->common_param->catalog));
|
||||
}
|
||||
if (nullptr != _param->current_user_ident) {
|
||||
db_params.__set_current_user_ident(*_param->current_user_ident);
|
||||
if (nullptr != _param->common_param->current_user_ident) {
|
||||
db_params.__set_current_user_ident(*_param->common_param->current_user_ident);
|
||||
} else {
|
||||
if (nullptr != _param->user) {
|
||||
db_params.__set_user(*(_param->user));
|
||||
if (nullptr != _param->common_param->user) {
|
||||
db_params.__set_user(*(_param->common_param->user));
|
||||
}
|
||||
if (nullptr != _param->user_ip) {
|
||||
db_params.__set_user_ip(*(_param->user_ip));
|
||||
if (nullptr != _param->common_param->user_ip) {
|
||||
db_params.__set_user_ip(*(_param->common_param->user_ip));
|
||||
}
|
||||
}
|
||||
|
||||
if (nullptr != _param->ip && 0 != _param->port) {
|
||||
RETURN_IF_ERROR(
|
||||
SchemaHelper::get_db_names(*(_param->ip), _param->port, db_params, &_db_result));
|
||||
if (nullptr != _param->common_param->ip && 0 != _param->common_param->port) {
|
||||
RETURN_IF_ERROR(SchemaHelper::get_db_names(
|
||||
*(_param->common_param->ip), _param->common_param->port, db_params, &_db_result));
|
||||
} else {
|
||||
return Status::InternalError("IP or port doesn't exists");
|
||||
}
|
||||
@ -256,19 +256,20 @@ Status SchemaColumnsScanner::_get_new_desc() {
|
||||
desc_params.tables_name.push_back(_table_result.tables[_table_index++]);
|
||||
}
|
||||
|
||||
if (nullptr != _param->current_user_ident) {
|
||||
desc_params.__set_current_user_ident(*(_param->current_user_ident));
|
||||
if (nullptr != _param->common_param->current_user_ident) {
|
||||
desc_params.__set_current_user_ident(*(_param->common_param->current_user_ident));
|
||||
} else {
|
||||
if (nullptr != _param->user) {
|
||||
desc_params.__set_user(*(_param->user));
|
||||
if (nullptr != _param->common_param->user) {
|
||||
desc_params.__set_user(*(_param->common_param->user));
|
||||
}
|
||||
if (nullptr != _param->user_ip) {
|
||||
desc_params.__set_user_ip(*(_param->user_ip));
|
||||
if (nullptr != _param->common_param->user_ip) {
|
||||
desc_params.__set_user_ip(*(_param->common_param->user_ip));
|
||||
}
|
||||
}
|
||||
|
||||
if (nullptr != _param->ip && 0 != _param->port) {
|
||||
RETURN_IF_ERROR(SchemaHelper::describe_tables(*(_param->ip), _param->port, desc_params,
|
||||
if (nullptr != _param->common_param->ip && 0 != _param->common_param->port) {
|
||||
RETURN_IF_ERROR(SchemaHelper::describe_tables(*(_param->common_param->ip),
|
||||
_param->common_param->port, desc_params,
|
||||
&_desc_result));
|
||||
} else {
|
||||
return Status::InternalError("IP or port doesn't exists");
|
||||
@ -285,22 +286,23 @@ Status SchemaColumnsScanner::_get_new_table() {
|
||||
table_params.__set_catalog(_db_result.catalogs[_db_index]);
|
||||
}
|
||||
_db_index++;
|
||||
if (nullptr != _param->table) {
|
||||
table_params.__set_pattern(*(_param->table));
|
||||
if (nullptr != _param->common_param->table) {
|
||||
table_params.__set_pattern(*(_param->common_param->table));
|
||||
}
|
||||
if (nullptr != _param->current_user_ident) {
|
||||
table_params.__set_current_user_ident(*(_param->current_user_ident));
|
||||
if (nullptr != _param->common_param->current_user_ident) {
|
||||
table_params.__set_current_user_ident(*(_param->common_param->current_user_ident));
|
||||
} else {
|
||||
if (nullptr != _param->user) {
|
||||
table_params.__set_user(*(_param->user));
|
||||
if (nullptr != _param->common_param->user) {
|
||||
table_params.__set_user(*(_param->common_param->user));
|
||||
}
|
||||
if (nullptr != _param->user_ip) {
|
||||
table_params.__set_user_ip(*(_param->user_ip));
|
||||
if (nullptr != _param->common_param->user_ip) {
|
||||
table_params.__set_user_ip(*(_param->common_param->user_ip));
|
||||
}
|
||||
}
|
||||
|
||||
if (nullptr != _param->ip && 0 != _param->port) {
|
||||
RETURN_IF_ERROR(SchemaHelper::get_table_names(*(_param->ip), _param->port, table_params,
|
||||
if (nullptr != _param->common_param->ip && 0 != _param->common_param->port) {
|
||||
RETURN_IF_ERROR(SchemaHelper::get_table_names(*(_param->common_param->ip),
|
||||
_param->common_param->port, table_params,
|
||||
&_table_result));
|
||||
} else {
|
||||
return Status::InternalError("IP or port doesn't exists");
|
||||
|
||||
@ -87,26 +87,26 @@ Status SchemaFilesScanner::start(RuntimeState* state) {
|
||||
}
|
||||
SCOPED_TIMER(_get_db_timer);
|
||||
TGetDbsParams db_params;
|
||||
if (nullptr != _param->db) {
|
||||
db_params.__set_pattern(*(_param->db));
|
||||
if (nullptr != _param->common_param->db) {
|
||||
db_params.__set_pattern(*(_param->common_param->db));
|
||||
}
|
||||
if (nullptr != _param->catalog) {
|
||||
db_params.__set_catalog(*(_param->catalog));
|
||||
if (nullptr != _param->common_param->catalog) {
|
||||
db_params.__set_catalog(*(_param->common_param->catalog));
|
||||
}
|
||||
if (nullptr != _param->current_user_ident) {
|
||||
db_params.__set_current_user_ident(*(_param->current_user_ident));
|
||||
if (nullptr != _param->common_param->current_user_ident) {
|
||||
db_params.__set_current_user_ident(*(_param->common_param->current_user_ident));
|
||||
} else {
|
||||
if (nullptr != _param->user) {
|
||||
db_params.__set_user(*(_param->user));
|
||||
if (nullptr != _param->common_param->user) {
|
||||
db_params.__set_user(*(_param->common_param->user));
|
||||
}
|
||||
if (nullptr != _param->user_ip) {
|
||||
db_params.__set_user_ip(*(_param->user_ip));
|
||||
if (nullptr != _param->common_param->user_ip) {
|
||||
db_params.__set_user_ip(*(_param->common_param->user_ip));
|
||||
}
|
||||
}
|
||||
|
||||
if (nullptr != _param->ip && 0 != _param->port) {
|
||||
RETURN_IF_ERROR(
|
||||
SchemaHelper::get_db_names(*(_param->ip), _param->port, db_params, &_db_result));
|
||||
if (nullptr != _param->common_param->ip && 0 != _param->common_param->port) {
|
||||
RETURN_IF_ERROR(SchemaHelper::get_db_names(
|
||||
*(_param->common_param->ip), _param->common_param->port, db_params, &_db_result));
|
||||
} else {
|
||||
return Status::InternalError("IP or port doesn't exists");
|
||||
}
|
||||
|
||||
@ -60,26 +60,26 @@ Status SchemaMetadataNameIdsScanner::start(RuntimeState* state) {
|
||||
}
|
||||
SCOPED_TIMER(_get_db_timer);
|
||||
TGetDbsParams db_params;
|
||||
if (nullptr != _param->db) {
|
||||
db_params.__set_pattern(*(_param->db));
|
||||
if (nullptr != _param->common_param->db) {
|
||||
db_params.__set_pattern(*(_param->common_param->db));
|
||||
}
|
||||
if (nullptr != _param->catalog) {
|
||||
db_params.__set_catalog(*(_param->catalog));
|
||||
if (nullptr != _param->common_param->catalog) {
|
||||
db_params.__set_catalog(*(_param->common_param->catalog));
|
||||
}
|
||||
if (nullptr != _param->current_user_ident) {
|
||||
db_params.__set_current_user_ident(*(_param->current_user_ident));
|
||||
if (nullptr != _param->common_param->current_user_ident) {
|
||||
db_params.__set_current_user_ident(*(_param->common_param->current_user_ident));
|
||||
} else {
|
||||
if (nullptr != _param->user) {
|
||||
db_params.__set_user(*(_param->user));
|
||||
if (nullptr != _param->common_param->user) {
|
||||
db_params.__set_user(*(_param->common_param->user));
|
||||
}
|
||||
if (nullptr != _param->user_ip) {
|
||||
db_params.__set_user_ip(*(_param->user_ip));
|
||||
if (nullptr != _param->common_param->user_ip) {
|
||||
db_params.__set_user_ip(*(_param->common_param->user_ip));
|
||||
}
|
||||
}
|
||||
db_params.__set_get_null_catalog(true);
|
||||
if (nullptr != _param->ip && 0 != _param->port) {
|
||||
RETURN_IF_ERROR(
|
||||
SchemaHelper::get_db_names(*(_param->ip), _param->port, db_params, &_db_result));
|
||||
if (nullptr != _param->common_param->ip && 0 != _param->common_param->port) {
|
||||
RETURN_IF_ERROR(SchemaHelper::get_db_names(
|
||||
*(_param->common_param->ip), _param->common_param->port, db_params, &_db_result));
|
||||
} else {
|
||||
return Status::InternalError("IP or port doesn't exists");
|
||||
}
|
||||
@ -102,22 +102,23 @@ Status SchemaMetadataNameIdsScanner::_get_new_table() {
|
||||
table_params.__set_catalog(_db_result.catalogs[_db_index]);
|
||||
}
|
||||
_db_index++;
|
||||
if (nullptr != _param->wild) {
|
||||
table_params.__set_pattern(*(_param->wild));
|
||||
if (nullptr != _param->common_param->wild) {
|
||||
table_params.__set_pattern(*(_param->common_param->wild));
|
||||
}
|
||||
if (nullptr != _param->current_user_ident) {
|
||||
table_params.__set_current_user_ident(*(_param->current_user_ident));
|
||||
if (nullptr != _param->common_param->current_user_ident) {
|
||||
table_params.__set_current_user_ident(*(_param->common_param->current_user_ident));
|
||||
} else {
|
||||
if (nullptr != _param->user) {
|
||||
table_params.__set_user(*(_param->user));
|
||||
if (nullptr != _param->common_param->user) {
|
||||
table_params.__set_user(*(_param->common_param->user));
|
||||
}
|
||||
if (nullptr != _param->user_ip) {
|
||||
table_params.__set_user_ip(*(_param->user_ip));
|
||||
if (nullptr != _param->common_param->user_ip) {
|
||||
table_params.__set_user_ip(*(_param->common_param->user_ip));
|
||||
}
|
||||
}
|
||||
|
||||
if (nullptr != _param->ip && 0 != _param->port) {
|
||||
RETURN_IF_ERROR(SchemaHelper::list_table_metadata_name_ids(*(_param->ip), _param->port,
|
||||
if (nullptr != _param->common_param->ip && 0 != _param->common_param->port) {
|
||||
RETURN_IF_ERROR(SchemaHelper::list_table_metadata_name_ids(*(_param->common_param->ip),
|
||||
_param->common_param->port,
|
||||
table_params, &_table_result));
|
||||
} else {
|
||||
return Status::InternalError("IP or port doesn't exists");
|
||||
|
||||
@ -75,26 +75,26 @@ Status SchemaPartitionsScanner::start(RuntimeState* state) {
|
||||
}
|
||||
SCOPED_TIMER(_get_db_timer);
|
||||
TGetDbsParams db_params;
|
||||
if (nullptr != _param->db) {
|
||||
db_params.__set_pattern(*(_param->db));
|
||||
if (nullptr != _param->common_param->db) {
|
||||
db_params.__set_pattern(*(_param->common_param->db));
|
||||
}
|
||||
if (nullptr != _param->catalog) {
|
||||
db_params.__set_catalog(*(_param->catalog));
|
||||
if (nullptr != _param->common_param->catalog) {
|
||||
db_params.__set_catalog(*(_param->common_param->catalog));
|
||||
}
|
||||
if (nullptr != _param->current_user_ident) {
|
||||
db_params.__set_current_user_ident(*(_param->current_user_ident));
|
||||
if (nullptr != _param->common_param->current_user_ident) {
|
||||
db_params.__set_current_user_ident(*(_param->common_param->current_user_ident));
|
||||
} else {
|
||||
if (nullptr != _param->user) {
|
||||
db_params.__set_user(*(_param->user));
|
||||
if (nullptr != _param->common_param->user) {
|
||||
db_params.__set_user(*(_param->common_param->user));
|
||||
}
|
||||
if (nullptr != _param->user_ip) {
|
||||
db_params.__set_user_ip(*(_param->user_ip));
|
||||
if (nullptr != _param->common_param->user_ip) {
|
||||
db_params.__set_user_ip(*(_param->common_param->user_ip));
|
||||
}
|
||||
}
|
||||
|
||||
if (nullptr != _param->ip && 0 != _param->port) {
|
||||
RETURN_IF_ERROR(
|
||||
SchemaHelper::get_db_names(*(_param->ip), _param->port, db_params, &_db_result));
|
||||
if (nullptr != _param->common_param->ip && 0 != _param->common_param->port) {
|
||||
RETURN_IF_ERROR(SchemaHelper::get_db_names(
|
||||
*(_param->common_param->ip), _param->common_param->port, db_params, &_db_result));
|
||||
} else {
|
||||
return Status::InternalError("IP or port doesn't exists");
|
||||
}
|
||||
|
||||
@ -65,24 +65,24 @@ Status SchemaProfilingScanner::start(RuntimeState* state) {
|
||||
}
|
||||
SCOPED_TIMER(_get_db_timer);
|
||||
TGetDbsParams db_params;
|
||||
if (nullptr != _param->db) {
|
||||
db_params.__set_pattern(*(_param->db));
|
||||
if (nullptr != _param->common_param->db) {
|
||||
db_params.__set_pattern(*(_param->common_param->db));
|
||||
}
|
||||
if (nullptr != _param->catalog) {
|
||||
db_params.__set_catalog(*(_param->catalog));
|
||||
if (nullptr != _param->common_param->catalog) {
|
||||
db_params.__set_catalog(*(_param->common_param->catalog));
|
||||
}
|
||||
if (nullptr != _param->current_user_ident) {
|
||||
db_params.__set_current_user_ident(*(_param->current_user_ident));
|
||||
if (nullptr != _param->common_param->current_user_ident) {
|
||||
db_params.__set_current_user_ident(*(_param->common_param->current_user_ident));
|
||||
} else {
|
||||
if (nullptr != _param->user) {
|
||||
db_params.__set_user(*(_param->user));
|
||||
if (nullptr != _param->common_param->user) {
|
||||
db_params.__set_user(*(_param->common_param->user));
|
||||
}
|
||||
if (nullptr != _param->user_ip) {
|
||||
db_params.__set_user_ip(*(_param->user_ip));
|
||||
if (nullptr != _param->common_param->user_ip) {
|
||||
db_params.__set_user_ip(*(_param->common_param->user_ip));
|
||||
}
|
||||
}
|
||||
|
||||
if (nullptr == _param->ip || 0 == _param->port) {
|
||||
if (nullptr == _param->common_param->ip || 0 == _param->common_param->port) {
|
||||
return Status::InternalError("IP or port doesn't exists");
|
||||
}
|
||||
return Status::OK();
|
||||
|
||||
@ -57,22 +57,23 @@ Status SchemaSchemaPrivilegesScanner::start(RuntimeState* state) {
|
||||
|
||||
Status SchemaSchemaPrivilegesScanner::_get_new_table() {
|
||||
TGetTablesParams table_params;
|
||||
if (nullptr != _param->wild) {
|
||||
table_params.__set_pattern(*(_param->wild));
|
||||
if (nullptr != _param->common_param->wild) {
|
||||
table_params.__set_pattern(*(_param->common_param->wild));
|
||||
}
|
||||
if (nullptr != _param->current_user_ident) {
|
||||
table_params.__set_current_user_ident(*(_param->current_user_ident));
|
||||
if (nullptr != _param->common_param->current_user_ident) {
|
||||
table_params.__set_current_user_ident(*(_param->common_param->current_user_ident));
|
||||
} else {
|
||||
if (nullptr != _param->user) {
|
||||
table_params.__set_user(*(_param->user));
|
||||
if (nullptr != _param->common_param->user) {
|
||||
table_params.__set_user(*(_param->common_param->user));
|
||||
}
|
||||
if (nullptr != _param->user_ip) {
|
||||
table_params.__set_user_ip(*(_param->user_ip));
|
||||
if (nullptr != _param->common_param->user_ip) {
|
||||
table_params.__set_user_ip(*(_param->common_param->user_ip));
|
||||
}
|
||||
}
|
||||
|
||||
if (nullptr != _param->ip && 0 != _param->port) {
|
||||
RETURN_IF_ERROR(SchemaHelper::list_schema_privilege_status(*(_param->ip), _param->port,
|
||||
if (nullptr != _param->common_param->ip && 0 != _param->common_param->port) {
|
||||
RETURN_IF_ERROR(SchemaHelper::list_schema_privilege_status(*(_param->common_param->ip),
|
||||
_param->common_param->port,
|
||||
table_params, &_priv_result));
|
||||
} else {
|
||||
return Status::InternalError("IP or port doesn't exists");
|
||||
|
||||
@ -53,26 +53,26 @@ Status SchemaSchemataScanner::start(RuntimeState* state) {
|
||||
return Status::InternalError("used before initial.");
|
||||
}
|
||||
TGetDbsParams db_params;
|
||||
if (nullptr != _param->wild) {
|
||||
db_params.__set_pattern(*(_param->wild));
|
||||
if (nullptr != _param->common_param->wild) {
|
||||
db_params.__set_pattern(*(_param->common_param->wild));
|
||||
}
|
||||
if (nullptr != _param->catalog) {
|
||||
db_params.__set_catalog(*(_param->catalog));
|
||||
if (nullptr != _param->common_param->catalog) {
|
||||
db_params.__set_catalog(*(_param->common_param->catalog));
|
||||
}
|
||||
if (nullptr != _param->current_user_ident) {
|
||||
db_params.__set_current_user_ident(*(_param->current_user_ident));
|
||||
if (nullptr != _param->common_param->current_user_ident) {
|
||||
db_params.__set_current_user_ident(*(_param->common_param->current_user_ident));
|
||||
} else {
|
||||
if (nullptr != _param->user) {
|
||||
db_params.__set_user(*(_param->user));
|
||||
if (nullptr != _param->common_param->user) {
|
||||
db_params.__set_user(*(_param->common_param->user));
|
||||
}
|
||||
if (nullptr != _param->user_ip) {
|
||||
db_params.__set_user_ip(*(_param->user_ip));
|
||||
if (nullptr != _param->common_param->user_ip) {
|
||||
db_params.__set_user_ip(*(_param->common_param->user_ip));
|
||||
}
|
||||
}
|
||||
|
||||
if (nullptr != _param->ip && 0 != _param->port) {
|
||||
RETURN_IF_ERROR(
|
||||
SchemaHelper::get_db_names(*(_param->ip), _param->port, db_params, &_db_result));
|
||||
if (nullptr != _param->common_param->ip && 0 != _param->common_param->port) {
|
||||
RETURN_IF_ERROR(SchemaHelper::get_db_names(
|
||||
*(_param->common_param->ip), _param->common_param->port, db_params, &_db_result));
|
||||
} else {
|
||||
return Status::InternalError("IP or port doesn't exists");
|
||||
}
|
||||
|
||||
@ -59,22 +59,23 @@ Status SchemaTablePrivilegesScanner::start(RuntimeState* state) {
|
||||
Status SchemaTablePrivilegesScanner::_get_new_table() {
|
||||
SCOPED_TIMER(_get_table_timer);
|
||||
TGetTablesParams table_params;
|
||||
if (nullptr != _param->wild) {
|
||||
table_params.__set_pattern(*(_param->wild));
|
||||
if (nullptr != _param->common_param->wild) {
|
||||
table_params.__set_pattern(*(_param->common_param->wild));
|
||||
}
|
||||
if (nullptr != _param->current_user_ident) {
|
||||
table_params.__set_current_user_ident(*(_param->current_user_ident));
|
||||
if (nullptr != _param->common_param->current_user_ident) {
|
||||
table_params.__set_current_user_ident(*(_param->common_param->current_user_ident));
|
||||
} else {
|
||||
if (nullptr != _param->user) {
|
||||
table_params.__set_user(*(_param->user));
|
||||
if (nullptr != _param->common_param->user) {
|
||||
table_params.__set_user(*(_param->common_param->user));
|
||||
}
|
||||
if (nullptr != _param->user_ip) {
|
||||
table_params.__set_user_ip(*(_param->user_ip));
|
||||
if (nullptr != _param->common_param->user_ip) {
|
||||
table_params.__set_user_ip(*(_param->common_param->user_ip));
|
||||
}
|
||||
}
|
||||
|
||||
if (nullptr != _param->ip && 0 != _param->port) {
|
||||
RETURN_IF_ERROR(SchemaHelper::list_table_privilege_status(*(_param->ip), _param->port,
|
||||
if (nullptr != _param->common_param->ip && 0 != _param->common_param->port) {
|
||||
RETURN_IF_ERROR(SchemaHelper::list_table_privilege_status(*(_param->common_param->ip),
|
||||
_param->common_param->port,
|
||||
table_params, &_priv_result));
|
||||
} else {
|
||||
return Status::InternalError("IP or port doesn't exists");
|
||||
|
||||
@ -75,26 +75,26 @@ Status SchemaTablesScanner::start(RuntimeState* state) {
|
||||
}
|
||||
SCOPED_TIMER(_get_db_timer);
|
||||
TGetDbsParams db_params;
|
||||
if (nullptr != _param->db) {
|
||||
db_params.__set_pattern(*(_param->db));
|
||||
if (nullptr != _param->common_param->db) {
|
||||
db_params.__set_pattern(*(_param->common_param->db));
|
||||
}
|
||||
if (nullptr != _param->catalog) {
|
||||
db_params.__set_catalog(*(_param->catalog));
|
||||
if (nullptr != _param->common_param->catalog) {
|
||||
db_params.__set_catalog(*(_param->common_param->catalog));
|
||||
}
|
||||
if (nullptr != _param->current_user_ident) {
|
||||
db_params.__set_current_user_ident(*(_param->current_user_ident));
|
||||
if (nullptr != _param->common_param->current_user_ident) {
|
||||
db_params.__set_current_user_ident(*(_param->common_param->current_user_ident));
|
||||
} else {
|
||||
if (nullptr != _param->user) {
|
||||
db_params.__set_user(*(_param->user));
|
||||
if (nullptr != _param->common_param->user) {
|
||||
db_params.__set_user(*(_param->common_param->user));
|
||||
}
|
||||
if (nullptr != _param->user_ip) {
|
||||
db_params.__set_user_ip(*(_param->user_ip));
|
||||
if (nullptr != _param->common_param->user_ip) {
|
||||
db_params.__set_user_ip(*(_param->common_param->user_ip));
|
||||
}
|
||||
}
|
||||
|
||||
if (nullptr != _param->ip && 0 != _param->port) {
|
||||
RETURN_IF_ERROR(
|
||||
SchemaHelper::get_db_names(*(_param->ip), _param->port, db_params, &_db_result));
|
||||
if (nullptr != _param->common_param->ip && 0 != _param->common_param->port) {
|
||||
RETURN_IF_ERROR(SchemaHelper::get_db_names(
|
||||
*(_param->common_param->ip), _param->common_param->port, db_params, &_db_result));
|
||||
} else {
|
||||
return Status::InternalError("IP or port doesn't exists");
|
||||
}
|
||||
@ -109,22 +109,23 @@ Status SchemaTablesScanner::_get_new_table() {
|
||||
table_params.__set_catalog(_db_result.catalogs[_db_index]);
|
||||
}
|
||||
_db_index++;
|
||||
if (nullptr != _param->wild) {
|
||||
table_params.__set_pattern(*(_param->wild));
|
||||
if (nullptr != _param->common_param->wild) {
|
||||
table_params.__set_pattern(*(_param->common_param->wild));
|
||||
}
|
||||
if (nullptr != _param->current_user_ident) {
|
||||
table_params.__set_current_user_ident(*(_param->current_user_ident));
|
||||
if (nullptr != _param->common_param->current_user_ident) {
|
||||
table_params.__set_current_user_ident(*(_param->common_param->current_user_ident));
|
||||
} else {
|
||||
if (nullptr != _param->user) {
|
||||
table_params.__set_user(*(_param->user));
|
||||
if (nullptr != _param->common_param->user) {
|
||||
table_params.__set_user(*(_param->common_param->user));
|
||||
}
|
||||
if (nullptr != _param->user_ip) {
|
||||
table_params.__set_user_ip(*(_param->user_ip));
|
||||
if (nullptr != _param->common_param->user_ip) {
|
||||
table_params.__set_user_ip(*(_param->common_param->user_ip));
|
||||
}
|
||||
}
|
||||
|
||||
if (nullptr != _param->ip && 0 != _param->port) {
|
||||
RETURN_IF_ERROR(SchemaHelper::list_table_status(*(_param->ip), _param->port, table_params,
|
||||
if (nullptr != _param->common_param->ip && 0 != _param->common_param->port) {
|
||||
RETURN_IF_ERROR(SchemaHelper::list_table_status(*(_param->common_param->ip),
|
||||
_param->common_param->port, table_params,
|
||||
&_table_result));
|
||||
} else {
|
||||
return Status::InternalError("IP or port doesn't exists");
|
||||
|
||||
@ -57,22 +57,23 @@ Status SchemaUserPrivilegesScanner::start(RuntimeState* state) {
|
||||
Status SchemaUserPrivilegesScanner::_get_new_table() {
|
||||
SCOPED_TIMER(_get_table_timer);
|
||||
TGetTablesParams table_params;
|
||||
if (nullptr != _param->wild) {
|
||||
table_params.__set_pattern(*(_param->wild));
|
||||
if (nullptr != _param->common_param->wild) {
|
||||
table_params.__set_pattern(*(_param->common_param->wild));
|
||||
}
|
||||
if (nullptr != _param->current_user_ident) {
|
||||
table_params.__set_current_user_ident(*(_param->current_user_ident));
|
||||
if (nullptr != _param->common_param->current_user_ident) {
|
||||
table_params.__set_current_user_ident(*(_param->common_param->current_user_ident));
|
||||
} else {
|
||||
if (nullptr != _param->user) {
|
||||
table_params.__set_user(*(_param->user));
|
||||
if (nullptr != _param->common_param->user) {
|
||||
table_params.__set_user(*(_param->common_param->user));
|
||||
}
|
||||
if (nullptr != _param->user_ip) {
|
||||
table_params.__set_user_ip(*(_param->user_ip));
|
||||
if (nullptr != _param->common_param->user_ip) {
|
||||
table_params.__set_user_ip(*(_param->common_param->user_ip));
|
||||
}
|
||||
}
|
||||
|
||||
if (nullptr != _param->ip && 0 != _param->port) {
|
||||
RETURN_IF_ERROR(SchemaHelper::list_user_privilege_status(*(_param->ip), _param->port,
|
||||
if (nullptr != _param->common_param->ip && 0 != _param->common_param->port) {
|
||||
RETURN_IF_ERROR(SchemaHelper::list_user_privilege_status(*(_param->common_param->ip),
|
||||
_param->common_param->port,
|
||||
table_params, &_priv_result));
|
||||
} else {
|
||||
return Status::InternalError("IP or port doesn't exists");
|
||||
|
||||
@ -50,8 +50,8 @@ SchemaVariablesScanner::~SchemaVariablesScanner() {}
|
||||
Status SchemaVariablesScanner::start(RuntimeState* state) {
|
||||
TShowVariableRequest var_params;
|
||||
// Use db to save type
|
||||
if (_param->db != nullptr) {
|
||||
if (strcmp(_param->db->c_str(), "GLOBAL") == 0) {
|
||||
if (_param->common_param->db != nullptr) {
|
||||
if (strcmp(_param->common_param->db->c_str(), "GLOBAL") == 0) {
|
||||
var_params.__set_varType(TVarType::GLOBAL);
|
||||
} else {
|
||||
var_params.__set_varType(TVarType::SESSION);
|
||||
@ -59,11 +59,11 @@ Status SchemaVariablesScanner::start(RuntimeState* state) {
|
||||
} else {
|
||||
var_params.__set_varType(_type);
|
||||
}
|
||||
var_params.__set_threadId(_param->thread_id);
|
||||
var_params.__set_threadId(_param->common_param->thread_id);
|
||||
|
||||
if (nullptr != _param->ip && 0 != _param->port) {
|
||||
RETURN_IF_ERROR(SchemaHelper::show_variables(*(_param->ip), _param->port, var_params,
|
||||
&_var_result));
|
||||
if (nullptr != _param->common_param->ip && 0 != _param->common_param->port) {
|
||||
RETURN_IF_ERROR(SchemaHelper::show_variables(
|
||||
*(_param->common_param->ip), _param->common_param->port, var_params, &_var_result));
|
||||
} else {
|
||||
return Status::InternalError("IP or port doesn't exists");
|
||||
}
|
||||
|
||||
@ -58,26 +58,26 @@ Status SchemaViewsScanner::start(RuntimeState* state) {
|
||||
}
|
||||
SCOPED_TIMER(_get_db_timer);
|
||||
TGetDbsParams db_params;
|
||||
if (nullptr != _param->db) {
|
||||
db_params.__set_pattern(*(_param->db));
|
||||
if (nullptr != _param->common_param->db) {
|
||||
db_params.__set_pattern(*(_param->common_param->db));
|
||||
}
|
||||
if (nullptr != _param->catalog) {
|
||||
db_params.__set_catalog(*(_param->catalog));
|
||||
if (nullptr != _param->common_param->catalog) {
|
||||
db_params.__set_catalog(*(_param->common_param->catalog));
|
||||
}
|
||||
if (nullptr != _param->current_user_ident) {
|
||||
db_params.__set_current_user_ident(*(_param->current_user_ident));
|
||||
if (nullptr != _param->common_param->current_user_ident) {
|
||||
db_params.__set_current_user_ident(*(_param->common_param->current_user_ident));
|
||||
} else {
|
||||
if (nullptr != _param->user) {
|
||||
db_params.__set_user(*(_param->user));
|
||||
if (nullptr != _param->common_param->user) {
|
||||
db_params.__set_user(*(_param->common_param->user));
|
||||
}
|
||||
if (nullptr != _param->user_ip) {
|
||||
db_params.__set_user_ip(*(_param->user_ip));
|
||||
if (nullptr != _param->common_param->user_ip) {
|
||||
db_params.__set_user_ip(*(_param->common_param->user_ip));
|
||||
}
|
||||
}
|
||||
|
||||
if (nullptr != _param->ip && 0 != _param->port) {
|
||||
RETURN_IF_ERROR(
|
||||
SchemaHelper::get_db_names(*(_param->ip), _param->port, db_params, &_db_result));
|
||||
if (nullptr != _param->common_param->ip && 0 != _param->common_param->port) {
|
||||
RETURN_IF_ERROR(SchemaHelper::get_db_names(
|
||||
*(_param->common_param->ip), _param->common_param->port, db_params, &_db_result));
|
||||
} else {
|
||||
return Status::InternalError("IP or port doesn't exists");
|
||||
}
|
||||
@ -88,23 +88,24 @@ Status SchemaViewsScanner::_get_new_table() {
|
||||
SCOPED_TIMER(_get_table_timer);
|
||||
TGetTablesParams table_params;
|
||||
table_params.__set_db(_db_result.dbs[_db_index++]);
|
||||
if (nullptr != _param->wild) {
|
||||
table_params.__set_pattern(*(_param->wild));
|
||||
if (nullptr != _param->common_param->wild) {
|
||||
table_params.__set_pattern(*(_param->common_param->wild));
|
||||
}
|
||||
if (nullptr != _param->current_user_ident) {
|
||||
table_params.__set_current_user_ident(*(_param->current_user_ident));
|
||||
if (nullptr != _param->common_param->current_user_ident) {
|
||||
table_params.__set_current_user_ident(*(_param->common_param->current_user_ident));
|
||||
} else {
|
||||
if (nullptr != _param->user) {
|
||||
table_params.__set_user(*(_param->user));
|
||||
if (nullptr != _param->common_param->user) {
|
||||
table_params.__set_user(*(_param->common_param->user));
|
||||
}
|
||||
if (nullptr != _param->user_ip) {
|
||||
table_params.__set_user_ip(*(_param->user_ip));
|
||||
if (nullptr != _param->common_param->user_ip) {
|
||||
table_params.__set_user_ip(*(_param->common_param->user_ip));
|
||||
}
|
||||
}
|
||||
table_params.__set_type("VIEW");
|
||||
|
||||
if (nullptr != _param->ip && 0 != _param->port) {
|
||||
RETURN_IF_ERROR(SchemaHelper::list_table_status(*(_param->ip), _param->port, table_params,
|
||||
if (nullptr != _param->common_param->ip && 0 != _param->common_param->port) {
|
||||
RETURN_IF_ERROR(SchemaHelper::list_table_status(*(_param->common_param->ip),
|
||||
_param->common_param->port, table_params,
|
||||
&_table_result));
|
||||
} else {
|
||||
return Status::InternalError("IP or port doesn't exists");
|
||||
|
||||
57
be/src/pipeline/exec/meta_scan_operator.cpp
Normal file
57
be/src/pipeline/exec/meta_scan_operator.cpp
Normal file
@ -0,0 +1,57 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include "pipeline/exec/meta_scan_operator.h"
|
||||
|
||||
#include "vec/exec/scan/vmeta_scanner.h"
|
||||
|
||||
namespace doris::pipeline {
|
||||
|
||||
Status MetaScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* scanners) {
|
||||
if (Base::_eos_dependency->read_blocked_by() == nullptr) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
auto& p = _parent->cast<MetaScanOperatorX>();
|
||||
|
||||
for (auto& scan_range : _scan_ranges) {
|
||||
std::shared_ptr<vectorized::VMetaScanner> scanner = vectorized::VMetaScanner::create_shared(
|
||||
state(), this, p._tuple_id, scan_range, p._limit_per_scanner, profile(),
|
||||
p._user_identity);
|
||||
RETURN_IF_ERROR(scanner->prepare(state(), _conjuncts));
|
||||
scanners->push_back(scanner);
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void MetaScanLocalState::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) {
|
||||
_scan_ranges = scan_ranges;
|
||||
}
|
||||
|
||||
MetaScanOperatorX::MetaScanOperatorX(ObjectPool* pool, const TPlanNode& tnode,
|
||||
const DescriptorTbl& descs)
|
||||
: ScanOperatorX<MetaScanLocalState>(pool, tnode, descs),
|
||||
_tuple_id(tnode.meta_scan_node.tuple_id),
|
||||
_scan_params(tnode.meta_scan_node) {
|
||||
_output_tuple_id = _tuple_id;
|
||||
if (_scan_params.__isset.current_user_ident) {
|
||||
_user_identity = _scan_params.current_user_ident;
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace doris::pipeline
|
||||
69
be/src/pipeline/exec/meta_scan_operator.h
Normal file
69
be/src/pipeline/exec/meta_scan_operator.h
Normal file
@ -0,0 +1,69 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <stdint.h>
|
||||
|
||||
#include <string>
|
||||
|
||||
#include "common/status.h"
|
||||
#include "operator.h"
|
||||
#include "pipeline/exec/scan_operator.h"
|
||||
#include "pipeline/pipeline_x/operator.h"
|
||||
#include "vec/exec/scan/vscan_node.h"
|
||||
|
||||
namespace doris {
|
||||
class ExecNode;
|
||||
|
||||
namespace vectorized {
|
||||
class NewOlapScanner;
|
||||
}
|
||||
} // namespace doris
|
||||
|
||||
namespace doris::pipeline {
|
||||
|
||||
class MetaScanOperatorX;
|
||||
class MetaScanLocalState final : public ScanLocalState<MetaScanLocalState> {
|
||||
public:
|
||||
using Parent = MetaScanOperatorX;
|
||||
using Base = ScanLocalState<MetaScanLocalState>;
|
||||
ENABLE_FACTORY_CREATOR(MetaScanLocalState);
|
||||
MetaScanLocalState(RuntimeState* state, OperatorXBase* parent) : Base(state, parent) {}
|
||||
|
||||
private:
|
||||
friend class vectorized::NewOlapScanner;
|
||||
|
||||
void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override;
|
||||
Status _init_scanners(std::list<vectorized::VScannerSPtr>* scanners) override;
|
||||
|
||||
std::vector<TScanRangeParams> _scan_ranges;
|
||||
};
|
||||
|
||||
class MetaScanOperatorX final : public ScanOperatorX<MetaScanLocalState> {
|
||||
public:
|
||||
MetaScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
|
||||
|
||||
private:
|
||||
friend class MetaScanLocalState;
|
||||
|
||||
TupleId _tuple_id;
|
||||
TUserIdentity _user_identity;
|
||||
TMetaScanNode _scan_params;
|
||||
};
|
||||
|
||||
} // namespace doris::pipeline
|
||||
@ -22,6 +22,7 @@
|
||||
#include <memory>
|
||||
|
||||
#include "pipeline/exec/es_scan_operator.h"
|
||||
#include "pipeline/exec/meta_scan_operator.h"
|
||||
#include "pipeline/exec/olap_scan_operator.h"
|
||||
#include "pipeline/exec/operator.h"
|
||||
#include "vec/exec/runtime_filter_consumer.h"
|
||||
@ -1435,5 +1436,7 @@ template class ScanOperatorX<OlapScanLocalState>;
|
||||
template class ScanLocalState<OlapScanLocalState>;
|
||||
template class ScanOperatorX<EsScanLocalState>;
|
||||
template class ScanLocalState<EsScanLocalState>;
|
||||
template class ScanLocalState<MetaScanLocalState>;
|
||||
template class ScanOperatorX<MetaScanLocalState>;
|
||||
|
||||
} // namespace doris::pipeline
|
||||
|
||||
@ -17,10 +17,13 @@
|
||||
|
||||
#include "schema_scan_operator.h"
|
||||
|
||||
#include <gen_cpp/FrontendService_types.h>
|
||||
|
||||
#include <memory>
|
||||
|
||||
#include "pipeline/exec/operator.h"
|
||||
#include "util/runtime_profile.h"
|
||||
#include "vec/data_types/data_type_factory.hpp"
|
||||
#include "vec/exec/vschema_scan_node.h"
|
||||
|
||||
namespace doris {
|
||||
@ -41,4 +44,230 @@ Status SchemaScanOperator::close(RuntimeState* state) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SchemaScanLocalState::init(RuntimeState* state, LocalStateInfo& info) {
|
||||
RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info));
|
||||
|
||||
SCOPED_TIMER(profile()->total_time_counter());
|
||||
SCOPED_TIMER(_open_timer);
|
||||
auto& p = _parent->cast<SchemaScanOperatorX>();
|
||||
_scanner_param.common_param = p._common_scanner_param;
|
||||
// init schema scanner profile
|
||||
_scanner_param.profile.reset(new RuntimeProfile("SchemaScanner"));
|
||||
profile()->add_child(_scanner_param.profile.get(), true, nullptr);
|
||||
|
||||
// get src tuple desc
|
||||
const SchemaTableDescriptor* schema_table =
|
||||
static_cast<const SchemaTableDescriptor*>(p._dest_tuple_desc->table_desc());
|
||||
// new one scanner
|
||||
_schema_scanner = SchemaScanner::create(schema_table->schema_table_type());
|
||||
|
||||
if (nullptr == _schema_scanner) {
|
||||
return Status::InternalError("schema scanner get nullptr pointer.");
|
||||
}
|
||||
|
||||
RETURN_IF_ERROR(_schema_scanner->init(&_scanner_param, state->obj_pool()));
|
||||
return _schema_scanner->start(state);
|
||||
}
|
||||
|
||||
SchemaScanOperatorX::SchemaScanOperatorX(ObjectPool* pool, const TPlanNode& tnode,
|
||||
const DescriptorTbl& descs)
|
||||
: Base(pool, tnode, descs),
|
||||
_table_name(tnode.schema_scan_node.table_name),
|
||||
_common_scanner_param(new SchemaScannerCommonParam()),
|
||||
_tuple_id(tnode.schema_scan_node.tuple_id),
|
||||
_dest_tuple_desc(nullptr),
|
||||
_tuple_idx(0),
|
||||
_slot_num(0) {}
|
||||
|
||||
Status SchemaScanOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
|
||||
RETURN_IF_ERROR(Base::init(tnode, state));
|
||||
|
||||
if (tnode.schema_scan_node.__isset.db) {
|
||||
_common_scanner_param->db =
|
||||
state->obj_pool()->add(new std::string(tnode.schema_scan_node.db));
|
||||
}
|
||||
|
||||
if (tnode.schema_scan_node.__isset.table) {
|
||||
_common_scanner_param->table =
|
||||
state->obj_pool()->add(new std::string(tnode.schema_scan_node.table));
|
||||
}
|
||||
|
||||
if (tnode.schema_scan_node.__isset.wild) {
|
||||
_common_scanner_param->wild =
|
||||
state->obj_pool()->add(new std::string(tnode.schema_scan_node.wild));
|
||||
}
|
||||
|
||||
if (tnode.schema_scan_node.__isset.current_user_ident) {
|
||||
_common_scanner_param->current_user_ident = state->obj_pool()->add(
|
||||
new TUserIdentity(tnode.schema_scan_node.current_user_ident));
|
||||
} else {
|
||||
if (tnode.schema_scan_node.__isset.user) {
|
||||
_common_scanner_param->user =
|
||||
state->obj_pool()->add(new std::string(tnode.schema_scan_node.user));
|
||||
}
|
||||
if (tnode.schema_scan_node.__isset.user_ip) {
|
||||
_common_scanner_param->user_ip =
|
||||
state->obj_pool()->add(new std::string(tnode.schema_scan_node.user_ip));
|
||||
}
|
||||
}
|
||||
|
||||
if (tnode.schema_scan_node.__isset.ip) {
|
||||
_common_scanner_param->ip =
|
||||
state->obj_pool()->add(new std::string(tnode.schema_scan_node.ip));
|
||||
}
|
||||
if (tnode.schema_scan_node.__isset.port) {
|
||||
_common_scanner_param->port = tnode.schema_scan_node.port;
|
||||
}
|
||||
|
||||
if (tnode.schema_scan_node.__isset.thread_id) {
|
||||
_common_scanner_param->thread_id = tnode.schema_scan_node.thread_id;
|
||||
}
|
||||
|
||||
if (tnode.schema_scan_node.__isset.catalog) {
|
||||
_common_scanner_param->catalog =
|
||||
state->obj_pool()->add(new std::string(tnode.schema_scan_node.catalog));
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SchemaScanOperatorX::open(RuntimeState* state) {
|
||||
RETURN_IF_ERROR(Base::open(state));
|
||||
|
||||
if (_common_scanner_param->user) {
|
||||
TSetSessionParams param;
|
||||
param.__set_user(*_common_scanner_param->user);
|
||||
//TStatus t_status;
|
||||
//RETURN_IF_ERROR(SchemaJniHelper::set_session(param, &t_status));
|
||||
//RETURN_IF_ERROR(Status(t_status));
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SchemaScanOperatorX::prepare(RuntimeState* state) {
|
||||
RETURN_IF_ERROR(Base::prepare(state));
|
||||
|
||||
// get dest tuple desc
|
||||
_dest_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
|
||||
|
||||
if (nullptr == _dest_tuple_desc) {
|
||||
return Status::InternalError("Failed to get tuple descriptor.");
|
||||
}
|
||||
|
||||
_slot_num = _dest_tuple_desc->slots().size();
|
||||
// get src tuple desc
|
||||
const SchemaTableDescriptor* schema_table =
|
||||
static_cast<const SchemaTableDescriptor*>(_dest_tuple_desc->table_desc());
|
||||
|
||||
if (nullptr == schema_table) {
|
||||
return Status::InternalError("Failed to get schema table descriptor.");
|
||||
}
|
||||
|
||||
// new one scanner
|
||||
_schema_scanner = SchemaScanner::create(schema_table->schema_table_type());
|
||||
|
||||
if (nullptr == _schema_scanner) {
|
||||
return Status::InternalError("schema scanner get nullptr pointer.");
|
||||
}
|
||||
|
||||
const std::vector<SchemaScanner::ColumnDesc>& columns_desc(_schema_scanner->get_column_desc());
|
||||
|
||||
// if src columns size is zero, it's the dummy slots.
|
||||
if (0 == columns_desc.size()) {
|
||||
_slot_num = 0;
|
||||
}
|
||||
|
||||
// check if type is ok.
|
||||
for (int i = 0; i < _slot_num; ++i) {
|
||||
int j = 0;
|
||||
for (; j < columns_desc.size(); ++j) {
|
||||
if (boost::iequals(_dest_tuple_desc->slots()[i]->col_name(), columns_desc[j].name)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (j >= columns_desc.size()) {
|
||||
LOG(WARNING) << "no match column for this column("
|
||||
<< _dest_tuple_desc->slots()[i]->col_name() << ")";
|
||||
return Status::InternalError("no match column for this column.");
|
||||
}
|
||||
|
||||
if (columns_desc[j].type != _dest_tuple_desc->slots()[i]->type().type) {
|
||||
LOG(WARNING) << "schema not match. input is " << columns_desc[j].name << "("
|
||||
<< columns_desc[j].type << ") and output is "
|
||||
<< _dest_tuple_desc->slots()[i]->col_name() << "("
|
||||
<< _dest_tuple_desc->slots()[i]->type() << ")";
|
||||
return Status::InternalError("schema not match.");
|
||||
}
|
||||
}
|
||||
|
||||
_tuple_idx = 0;
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SchemaScanOperatorX::get_block(RuntimeState* state, vectorized::Block* block,
|
||||
SourceState& source_state) {
|
||||
CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
|
||||
SCOPED_TIMER(local_state.profile()->total_time_counter());
|
||||
RETURN_IF_CANCELLED(state);
|
||||
bool schema_eos = false;
|
||||
|
||||
const std::vector<SchemaScanner::ColumnDesc>& columns_desc(
|
||||
local_state._schema_scanner->get_column_desc());
|
||||
|
||||
do {
|
||||
block->clear();
|
||||
for (int i = 0; i < _slot_num; ++i) {
|
||||
auto dest_slot_desc = _dest_tuple_desc->slots()[i];
|
||||
block->insert(vectorized::ColumnWithTypeAndName(
|
||||
dest_slot_desc->get_empty_mutable_column(), dest_slot_desc->get_data_type_ptr(),
|
||||
dest_slot_desc->col_name()));
|
||||
}
|
||||
|
||||
// src block columns desc is filled by schema_scanner->get_column_desc.
|
||||
vectorized::Block src_block;
|
||||
for (int i = 0; i < columns_desc.size(); ++i) {
|
||||
TypeDescriptor descriptor(columns_desc[i].type);
|
||||
auto data_type =
|
||||
vectorized::DataTypeFactory::instance().create_data_type(descriptor, true);
|
||||
src_block.insert(vectorized::ColumnWithTypeAndName(data_type->create_column(),
|
||||
data_type, columns_desc[i].name));
|
||||
}
|
||||
while (true) {
|
||||
RETURN_IF_CANCELLED(state);
|
||||
|
||||
// get all slots from schema table.
|
||||
RETURN_IF_ERROR(local_state._schema_scanner->get_next_block(&src_block, &schema_eos));
|
||||
|
||||
if (schema_eos) {
|
||||
source_state = SourceState::FINISHED;
|
||||
break;
|
||||
}
|
||||
|
||||
if (src_block.rows() >= state->batch_size()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (src_block.rows()) {
|
||||
// block->check_number_of_rows();
|
||||
for (int i = 0; i < _slot_num; ++i) {
|
||||
auto dest_slot_desc = _dest_tuple_desc->slots()[i];
|
||||
vectorized::MutableColumnPtr column_ptr =
|
||||
std::move(*block->get_by_position(i).column).mutate();
|
||||
column_ptr->insert_range_from(
|
||||
*src_block.get_by_name(dest_slot_desc->col_name()).column, 0,
|
||||
src_block.rows());
|
||||
}
|
||||
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(
|
||||
local_state._conjuncts, block, _dest_tuple_desc->slots().size()));
|
||||
src_block.clear();
|
||||
}
|
||||
} while (block->rows() == 0 && source_state != SourceState::FINISHED);
|
||||
|
||||
local_state.reached_limit(block, source_state);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
} // namespace doris::pipeline
|
||||
|
||||
@ -21,6 +21,7 @@
|
||||
|
||||
#include "common/status.h"
|
||||
#include "operator.h"
|
||||
#include "pipeline/pipeline_x/operator.h"
|
||||
#include "vec/exec/vschema_scan_node.h"
|
||||
|
||||
namespace doris {
|
||||
@ -48,4 +49,55 @@ public:
|
||||
Status close(RuntimeState* state) override;
|
||||
};
|
||||
|
||||
class SchemaScanOperatorX;
|
||||
class SchemaScanLocalState final : public PipelineXLocalState<> {
|
||||
public:
|
||||
ENABLE_FACTORY_CREATOR(SchemaScanLocalState);
|
||||
|
||||
SchemaScanLocalState(RuntimeState* state, OperatorXBase* parent)
|
||||
: PipelineXLocalState<>(state, parent) {}
|
||||
~SchemaScanLocalState() override = default;
|
||||
|
||||
Status init(RuntimeState* state, LocalStateInfo& info) override;
|
||||
|
||||
private:
|
||||
friend class SchemaScanOperatorX;
|
||||
|
||||
SchemaScannerParam _scanner_param;
|
||||
std::unique_ptr<SchemaScanner> _schema_scanner = nullptr;
|
||||
};
|
||||
|
||||
class SchemaScanOperatorX final : public OperatorX<SchemaScanLocalState> {
|
||||
public:
|
||||
using Base = OperatorX<SchemaScanLocalState>;
|
||||
SchemaScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
|
||||
~SchemaScanOperatorX() override = default;
|
||||
|
||||
Status init(const TPlanNode& tnode, RuntimeState* state) override;
|
||||
Status prepare(RuntimeState* state) override;
|
||||
Status open(RuntimeState* state) override;
|
||||
Status get_block(RuntimeState* state, vectorized::Block* block,
|
||||
SourceState& source_state) override;
|
||||
|
||||
[[nodiscard]] bool is_source() const override { return true; }
|
||||
|
||||
private:
|
||||
friend class SchemaScanLocalState;
|
||||
|
||||
const std::string _table_name;
|
||||
|
||||
std::shared_ptr<SchemaScannerCommonParam> _common_scanner_param;
|
||||
// Tuple id resolved in prepare() to set _tuple_desc;
|
||||
TupleId _tuple_id;
|
||||
|
||||
// Descriptor of dest tuples
|
||||
const TupleDescriptor* _dest_tuple_desc;
|
||||
// Tuple index in tuple row.
|
||||
int _tuple_idx;
|
||||
// slot num need to fill in and return
|
||||
int _slot_num;
|
||||
|
||||
std::unique_ptr<SchemaScanner> _schema_scanner = nullptr;
|
||||
};
|
||||
|
||||
} // namespace doris::pipeline
|
||||
@ -33,6 +33,7 @@
|
||||
#include "pipeline/exec/exchange_source_operator.h"
|
||||
#include "pipeline/exec/hashjoin_build_sink.h"
|
||||
#include "pipeline/exec/hashjoin_probe_operator.h"
|
||||
#include "pipeline/exec/meta_scan_operator.h"
|
||||
#include "pipeline/exec/multi_cast_data_stream_source.h"
|
||||
#include "pipeline/exec/nested_loop_join_build_operator.h"
|
||||
#include "pipeline/exec/nested_loop_join_probe_operator.h"
|
||||
@ -42,6 +43,7 @@
|
||||
#include "pipeline/exec/repeat_operator.h"
|
||||
#include "pipeline/exec/result_file_sink_operator.h"
|
||||
#include "pipeline/exec/result_sink_operator.h"
|
||||
#include "pipeline/exec/schema_scan_operator.h"
|
||||
#include "pipeline/exec/select_operator.h"
|
||||
#include "pipeline/exec/sort_sink_operator.h"
|
||||
#include "pipeline/exec/sort_source_operator.h"
|
||||
@ -408,6 +410,8 @@ DECLARE_OPERATOR_X(UnionSourceLocalState)
|
||||
DECLARE_OPERATOR_X(MultiCastDataStreamSourceLocalState)
|
||||
DECLARE_OPERATOR_X(PartitionSortSourceLocalState)
|
||||
DECLARE_OPERATOR_X(DataGenLocalState)
|
||||
DECLARE_OPERATOR_X(SchemaScanLocalState)
|
||||
DECLARE_OPERATOR_X(MetaScanLocalState)
|
||||
|
||||
#undef DECLARE_OPERATOR_X
|
||||
|
||||
|
||||
@ -56,6 +56,7 @@
|
||||
#include "pipeline/exec/exchange_source_operator.h"
|
||||
#include "pipeline/exec/hashjoin_build_sink.h"
|
||||
#include "pipeline/exec/hashjoin_probe_operator.h"
|
||||
#include "pipeline/exec/meta_scan_operator.h"
|
||||
#include "pipeline/exec/multi_cast_data_stream_source.h"
|
||||
#include "pipeline/exec/nested_loop_join_build_operator.h"
|
||||
#include "pipeline/exec/nested_loop_join_probe_operator.h"
|
||||
@ -66,6 +67,7 @@
|
||||
#include "pipeline/exec/result_file_sink_operator.h"
|
||||
#include "pipeline/exec/result_sink_operator.h"
|
||||
#include "pipeline/exec/scan_operator.h"
|
||||
#include "pipeline/exec/schema_scan_operator.h"
|
||||
#include "pipeline/exec/select_operator.h"
|
||||
#include "pipeline/exec/sort_sink_operator.h"
|
||||
#include "pipeline/exec/sort_source_operator.h"
|
||||
@ -758,6 +760,16 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
|
||||
RETURN_IF_ERROR(cur_pipe->add_operator(op));
|
||||
break;
|
||||
}
|
||||
case TPlanNodeType::SCHEMA_SCAN_NODE: {
|
||||
op.reset(new SchemaScanOperatorX(pool, tnode, descs));
|
||||
RETURN_IF_ERROR(cur_pipe->add_operator(op));
|
||||
break;
|
||||
}
|
||||
case TPlanNodeType::META_SCAN_NODE: {
|
||||
op.reset(new MetaScanOperatorX(pool, tnode, descs));
|
||||
RETURN_IF_ERROR(cur_pipe->add_operator(op));
|
||||
break;
|
||||
}
|
||||
case TPlanNodeType::SELECT_NODE: {
|
||||
op.reset(new SelectOperatorX(pool, tnode, descs));
|
||||
RETURN_IF_ERROR(cur_pipe->add_operator(op));
|
||||
|
||||
@ -64,6 +64,15 @@ VMetaScanner::VMetaScanner(RuntimeState* state, VMetaScanNode* parent, int64_t t
|
||||
_user_identity(user_identity),
|
||||
_scan_range(scan_range.scan_range) {}
|
||||
|
||||
VMetaScanner::VMetaScanner(RuntimeState* state, pipeline::ScanLocalStateBase* local_state,
|
||||
int64_t tuple_id, const TScanRangeParams& scan_range, int64_t limit,
|
||||
RuntimeProfile* profile, TUserIdentity user_identity)
|
||||
: VScanner(state, local_state, limit, profile),
|
||||
_meta_eos(false),
|
||||
_tuple_id(tuple_id),
|
||||
_user_identity(user_identity),
|
||||
_scan_range(scan_range.scan_range) {}
|
||||
|
||||
Status VMetaScanner::open(RuntimeState* state) {
|
||||
VLOG_CRITICAL << "VMetaScanner::open";
|
||||
RETURN_IF_ERROR(VScanner::open(state));
|
||||
|
||||
@ -55,6 +55,10 @@ public:
|
||||
const TScanRangeParams& scan_range, int64_t limit, RuntimeProfile* profile,
|
||||
TUserIdentity user_identity);
|
||||
|
||||
VMetaScanner(RuntimeState* state, pipeline::ScanLocalStateBase* local_state, int64_t tuple_id,
|
||||
const TScanRangeParams& scan_range, int64_t limit, RuntimeProfile* profile,
|
||||
TUserIdentity user_identity);
|
||||
|
||||
Status open(RuntimeState* state) override;
|
||||
Status close(RuntimeState* state) override;
|
||||
Status prepare(RuntimeState* state, const VExprContextSPtrs& conjuncts);
|
||||
|
||||
@ -64,42 +64,47 @@ VSchemaScanNode::~VSchemaScanNode() {}
|
||||
Status VSchemaScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
|
||||
RETURN_IF_ERROR(ExecNode::init(tnode, state));
|
||||
if (tnode.schema_scan_node.__isset.db) {
|
||||
_scanner_param.db = _pool->add(new std::string(tnode.schema_scan_node.db));
|
||||
_scanner_param.common_param->db = _pool->add(new std::string(tnode.schema_scan_node.db));
|
||||
}
|
||||
|
||||
if (tnode.schema_scan_node.__isset.table) {
|
||||
_scanner_param.table = _pool->add(new std::string(tnode.schema_scan_node.table));
|
||||
_scanner_param.common_param->table =
|
||||
_pool->add(new std::string(tnode.schema_scan_node.table));
|
||||
}
|
||||
|
||||
if (tnode.schema_scan_node.__isset.wild) {
|
||||
_scanner_param.wild = _pool->add(new std::string(tnode.schema_scan_node.wild));
|
||||
_scanner_param.common_param->wild =
|
||||
_pool->add(new std::string(tnode.schema_scan_node.wild));
|
||||
}
|
||||
|
||||
if (tnode.schema_scan_node.__isset.current_user_ident) {
|
||||
_scanner_param.current_user_ident =
|
||||
_scanner_param.common_param->current_user_ident =
|
||||
_pool->add(new TUserIdentity(tnode.schema_scan_node.current_user_ident));
|
||||
} else {
|
||||
if (tnode.schema_scan_node.__isset.user) {
|
||||
_scanner_param.user = _pool->add(new std::string(tnode.schema_scan_node.user));
|
||||
_scanner_param.common_param->user =
|
||||
_pool->add(new std::string(tnode.schema_scan_node.user));
|
||||
}
|
||||
if (tnode.schema_scan_node.__isset.user_ip) {
|
||||
_scanner_param.user_ip = _pool->add(new std::string(tnode.schema_scan_node.user_ip));
|
||||
_scanner_param.common_param->user_ip =
|
||||
_pool->add(new std::string(tnode.schema_scan_node.user_ip));
|
||||
}
|
||||
}
|
||||
|
||||
if (tnode.schema_scan_node.__isset.ip) {
|
||||
_scanner_param.ip = _pool->add(new std::string(tnode.schema_scan_node.ip));
|
||||
_scanner_param.common_param->ip = _pool->add(new std::string(tnode.schema_scan_node.ip));
|
||||
}
|
||||
if (tnode.schema_scan_node.__isset.port) {
|
||||
_scanner_param.port = tnode.schema_scan_node.port;
|
||||
_scanner_param.common_param->port = tnode.schema_scan_node.port;
|
||||
}
|
||||
|
||||
if (tnode.schema_scan_node.__isset.thread_id) {
|
||||
_scanner_param.thread_id = tnode.schema_scan_node.thread_id;
|
||||
_scanner_param.common_param->thread_id = tnode.schema_scan_node.thread_id;
|
||||
}
|
||||
|
||||
if (tnode.schema_scan_node.__isset.catalog) {
|
||||
_scanner_param.catalog = _pool->add(new std::string(tnode.schema_scan_node.catalog));
|
||||
_scanner_param.common_param->catalog =
|
||||
_pool->add(new std::string(tnode.schema_scan_node.catalog));
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
@ -121,9 +126,9 @@ Status VSchemaScanNode::open(RuntimeState* state) {
|
||||
RETURN_IF_CANCELLED(state);
|
||||
RETURN_IF_ERROR(ExecNode::open(state));
|
||||
|
||||
if (_scanner_param.user) {
|
||||
if (_scanner_param.common_param->user) {
|
||||
TSetSessionParams param;
|
||||
param.__set_user(*_scanner_param.user);
|
||||
param.__set_user(*_scanner_param.common_param->user);
|
||||
//TStatus t_status;
|
||||
//RETURN_IF_ERROR(SchemaJniHelper::set_session(param, &t_status));
|
||||
//RETURN_IF_ERROR(Status(t_status));
|
||||
|
||||
Reference in New Issue
Block a user