diff --git a/be/src/exec/schema_scanner.cpp b/be/src/exec/schema_scanner.cpp index c233c6f83f..2b6b2c1f3c 100644 --- a/be/src/exec/schema_scanner.cpp +++ b/be/src/exec/schema_scanner.cpp @@ -69,15 +69,12 @@ namespace doris { class ObjectPool; SchemaScanner::SchemaScanner(const std::vector& columns) - : _is_init(false), - _param(nullptr), - _columns(columns), - _schema_table_type(TSchemaTableType::SCH_INVALID) {} + : _is_init(false), _columns(columns), _schema_table_type(TSchemaTableType::SCH_INVALID) {} SchemaScanner::SchemaScanner(const std::vector& columns, TSchemaTableType::type type) - : _is_init(false), _param(nullptr), _columns(columns), _schema_table_type(type) {} + : _is_init(false), _columns(columns), _schema_table_type(type) {} -SchemaScanner::~SchemaScanner() {} +SchemaScanner::~SchemaScanner() = default; Status SchemaScanner::start(RuntimeState* state) { if (!_is_init) { @@ -189,7 +186,7 @@ Status SchemaScanner::fill_dest_column_for_range(vectorized::Block* block, size_ size_t fill_num = datas.size(); col_ptr = &nullable_column->get_nested_column(); for (int i = 0; i < fill_num; ++i) { - auto data = datas[i]; + auto* data = datas[i]; if (data == nullptr) { // For nested column need not insert default. nullable_column->insert_data(nullptr, 0); @@ -199,125 +196,115 @@ Status SchemaScanner::fill_dest_column_for_range(vectorized::Block* block, size_ } switch (col_desc.type) { case TYPE_HLL: { - HyperLogLog* hll_slot = reinterpret_cast(data); - reinterpret_cast(col_ptr)->get_data().emplace_back(*hll_slot); + auto* hll_slot = reinterpret_cast(data); + assert_cast(col_ptr)->get_data().emplace_back(*hll_slot); break; } case TYPE_VARCHAR: case TYPE_CHAR: case TYPE_STRING: { - StringRef* str_slot = reinterpret_cast(data); - reinterpret_cast(col_ptr)->insert_data(str_slot->data, - str_slot->size); + auto* str_slot = reinterpret_cast(data); + assert_cast(col_ptr)->insert_data(str_slot->data, + str_slot->size); break; } case TYPE_BOOLEAN: { uint8_t num = *reinterpret_cast(data); - reinterpret_cast*>(col_ptr)->insert_value( - num); + assert_cast*>(col_ptr)->insert_value(num); break; } case TYPE_TINYINT: { int8_t num = *reinterpret_cast(data); - reinterpret_cast*>(col_ptr)->insert_value( - num); + assert_cast*>(col_ptr)->insert_value(num); break; } case TYPE_SMALLINT: { int16_t num = *reinterpret_cast(data); - reinterpret_cast*>(col_ptr)->insert_value( - num); + assert_cast*>(col_ptr)->insert_value(num); break; } case TYPE_INT: { int32_t num = *reinterpret_cast(data); - reinterpret_cast*>(col_ptr)->insert_value( - num); + assert_cast*>(col_ptr)->insert_value(num); break; } case TYPE_BIGINT: { int64_t num = *reinterpret_cast(data); - reinterpret_cast*>(col_ptr)->insert_value( - num); + assert_cast*>(col_ptr)->insert_value(num); break; } case TYPE_LARGEINT: { __int128 num; memcpy(&num, data, sizeof(__int128)); - reinterpret_cast*>(col_ptr)->insert_value( - num); + assert_cast*>(col_ptr)->insert_value(num); break; } case TYPE_FLOAT: { float num = *reinterpret_cast(data); - reinterpret_cast*>(col_ptr)->insert_value( - num); + assert_cast*>(col_ptr)->insert_value(num); break; } case TYPE_DOUBLE: { double num = *reinterpret_cast(data); - reinterpret_cast*>(col_ptr)->insert_value( - num); + assert_cast*>(col_ptr)->insert_value(num); break; } case TYPE_DATE: { - reinterpret_cast*>(col_ptr)->insert_data( + assert_cast*>(col_ptr)->insert_data( reinterpret_cast(data), 0); break; } case TYPE_DATEV2: { uint32_t num = *reinterpret_cast(data); - reinterpret_cast*>(col_ptr)->insert_value( - num); + assert_cast(col_ptr)->insert_value(num); break; } case TYPE_DATETIME: { - reinterpret_cast*>(col_ptr)->insert_data( + assert_cast*>(col_ptr)->insert_data( reinterpret_cast(data), 0); break; } case TYPE_DATETIMEV2: { - uint32_t num = *reinterpret_cast(data); - reinterpret_cast*>(col_ptr)->insert_value( - num); + uint64_t num = *reinterpret_cast(data); + assert_cast(col_ptr)->insert_value(num); break; } case TYPE_DECIMALV2: { const vectorized::Int128 num = (reinterpret_cast(data))->value; - reinterpret_cast(col_ptr)->insert_data( + assert_cast(col_ptr)->insert_data( reinterpret_cast(&num), 0); break; } case TYPE_DECIMAL128I: { const vectorized::Int128 num = (reinterpret_cast(data))->value; - reinterpret_cast(col_ptr)->insert_data( + assert_cast(col_ptr)->insert_data( reinterpret_cast(&num), 0); break; } case TYPE_DECIMAL32: { const int32_t num = *reinterpret_cast(data); - reinterpret_cast(col_ptr)->insert_data( + assert_cast(col_ptr)->insert_data( reinterpret_cast(&num), 0); break; } case TYPE_DECIMAL64: { const int64_t num = *reinterpret_cast(data); - reinterpret_cast(col_ptr)->insert_data( + assert_cast(col_ptr)->insert_data( reinterpret_cast(&num), 0); break; } diff --git a/be/src/exec/schema_scanner/schema_processlist_scanner.cpp b/be/src/exec/schema_scanner/schema_processlist_scanner.cpp index 2ecc2be9e0..f5f5bc2363 100644 --- a/be/src/exec/schema_scanner/schema_processlist_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_processlist_scanner.cpp @@ -19,9 +19,11 @@ #include +#include #include #include "exec/schema_scanner/schema_helper.h" +#include "runtime/define_primitive_type.h" #include "runtime/runtime_state.h" #include "vec/common/string_ref.h" #include "vec/core/block.h" @@ -30,15 +32,19 @@ namespace doris { std::vector SchemaProcessListScanner::_s_processlist_columns = { + {"CURRENT_CONNECTED", TYPE_VARCHAR, sizeof(StringRef), false}, {"ID", TYPE_LARGEINT, sizeof(int128_t), false}, {"USER", TYPE_VARCHAR, sizeof(StringRef), false}, {"HOST", TYPE_VARCHAR, sizeof(StringRef), false}, + {"LOGIN_TIME", TYPE_DATETIMEV2, sizeof(DateTimeV2ValueType), false}, {"CATALOG", TYPE_VARCHAR, sizeof(StringRef), false}, {"DB", TYPE_VARCHAR, sizeof(StringRef), false}, {"COMMAND", TYPE_VARCHAR, sizeof(StringRef), false}, {"TIME", TYPE_INT, sizeof(int32_t), false}, {"STATE", TYPE_VARCHAR, sizeof(StringRef), false}, - {"INFO", TYPE_VARCHAR, sizeof(StringRef), false}}; + {"QUERY_ID", TYPE_VARCHAR, sizeof(StringRef), false}, + {"INFO", TYPE_VARCHAR, sizeof(StringRef), false}, + {"FE", TYPE_VARCHAR, sizeof(StringRef), false}}; SchemaProcessListScanner::SchemaProcessListScanner() : SchemaScanner(_s_processlist_columns, TSchemaTableType::SCH_PROCESSLIST) {} @@ -90,48 +96,36 @@ Status SchemaProcessListScanner::_fill_block_impl(vectorized::Block* block) { for (size_t row_idx = 0; row_idx < row_num; ++row_idx) { const auto& row = process_list[row_idx]; + if (row.size() != _s_processlist_columns.size()) { + return Status::InternalError( + "process list meet invalid schema, schema_size={}, input_data_size={}", + _s_processlist_columns.size(), row.size()); + } // Fetch and store the column value based on its index std::string& column_value = column_values[row_idx]; // Reference to the actual string in the vector - - switch (col_idx) { - case 0: - column_value = row.size() > 1 ? row[1] : ""; - break; // ID - case 1: - column_value = row.size() > 2 ? row[2] : ""; - break; // USER - case 2: - column_value = row.size() > 3 ? row[3] : ""; - break; // HOST - case 3: - column_value = row.size() > 5 ? row[5] : ""; - break; // CATALOG - case 4: - column_value = row.size() > 6 ? row[6] : ""; - break; // DB - case 5: - column_value = row.size() > 7 ? row[7] : ""; - break; // COMMAND - case 6: - column_value = row.size() > 8 ? row[8] : ""; - break; // TIME - case 7: - column_value = row.size() > 9 ? row[9] : ""; - break; // STATE - case 8: - column_value = row.size() > 11 ? row[11] : ""; - break; // INFO - default: - column_value = ""; - break; - } + column_value = row[col_idx]; if (_s_processlist_columns[col_idx].type == TYPE_LARGEINT || _s_processlist_columns[col_idx].type == TYPE_INT) { - int128_t val = !column_value.empty() ? std::stoll(column_value) : 0; - int_vals[row_idx] = val; + try { + int128_t val = !column_value.empty() ? std::stoll(column_value) : 0; + int_vals[row_idx] = val; + } catch (const std::exception& e) { + return Status::InternalError( + "process list meet invalid data, column={}, data={}, reason={}", + _s_processlist_columns[col_idx].name, column_value, e.what()); + } + datas[row_idx] = &int_vals[row_idx]; + } else if (_s_processlist_columns[col_idx].type == TYPE_DATETIMEV2) { + auto* dv = reinterpret_cast*>(&int_vals[row_idx]); + if (!dv->from_date_str(column_value.data(), column_value.size(), -1, + config::allow_zero_date)) { + return Status::InternalError( + "process list meet invalid data, column={}, data={}, reason={}", + _s_processlist_columns[col_idx].name, column_value); + } datas[row_idx] = &int_vals[row_idx]; } else { str_refs[row_idx] = diff --git a/be/src/pipeline/exec/schema_scan_operator.cpp b/be/src/pipeline/exec/schema_scan_operator.cpp index 2d32e21d99..f26b2d706b 100644 --- a/be/src/pipeline/exec/schema_scan_operator.cpp +++ b/be/src/pipeline/exec/schema_scan_operator.cpp @@ -52,11 +52,11 @@ Status SchemaScanLocalState::init(RuntimeState* state, LocalStateInfo& info) { auto& p = _parent->cast(); _scanner_param.common_param = p._common_scanner_param; // init schema scanner profile - _scanner_param.profile.reset(new RuntimeProfile("SchemaScanner")); + _scanner_param.profile = std::make_unique("SchemaScanner"); profile()->add_child(_scanner_param.profile.get(), true, nullptr); // get src tuple desc - const SchemaTableDescriptor* schema_table = + const auto* schema_table = static_cast(p._dest_tuple_desc->table_desc()); // new one scanner _schema_scanner = SchemaScanner::create(schema_table->schema_table_type()); @@ -81,7 +81,6 @@ SchemaScanOperatorX::SchemaScanOperatorX(ObjectPool* pool, const TPlanNode& tnod _table_name(tnode.schema_scan_node.table_name), _common_scanner_param(new SchemaScannerCommonParam()), _tuple_id(tnode.schema_scan_node.tuple_id), - _dest_tuple_desc(nullptr), _tuple_idx(0), _slot_num(0) {} @@ -162,7 +161,7 @@ Status SchemaScanOperatorX::prepare(RuntimeState* state) { _slot_num = _dest_tuple_desc->slots().size(); // get src tuple desc - const SchemaTableDescriptor* schema_table = + const auto* schema_table = static_cast(_dest_tuple_desc->table_desc()); if (nullptr == schema_table) { @@ -179,7 +178,7 @@ Status SchemaScanOperatorX::prepare(RuntimeState* state) { const std::vector& columns_desc(_schema_scanner->get_column_desc()); // if src columns size is zero, it's the dummy slots. - if (0 == columns_desc.size()) { + if (columns_desc.empty()) { _slot_num = 0; } @@ -193,17 +192,15 @@ Status SchemaScanOperatorX::prepare(RuntimeState* state) { } if (j >= columns_desc.size()) { - LOG(WARNING) << "no match column for this column(" - << _dest_tuple_desc->slots()[i]->col_name() << ")"; - return Status::InternalError("no match column for this column."); + return Status::InternalError("no match column for this column({})", + _dest_tuple_desc->slots()[i]->col_name()); } if (columns_desc[j].type != _dest_tuple_desc->slots()[i]->type().type) { - LOG(WARNING) << "schema not match. input is " << columns_desc[j].name << "(" - << columns_desc[j].type << ") and output is " - << _dest_tuple_desc->slots()[i]->col_name() << "(" - << _dest_tuple_desc->slots()[i]->type() << ")"; - return Status::InternalError("schema not match."); + return Status::InternalError("schema not match. input is {}({}) and output is {}({})", + columns_desc[j].name, type_to_string(columns_desc[j].type), + _dest_tuple_desc->slots()[i]->col_name(), + type_to_string(_dest_tuple_desc->slots()[i]->type().type)); } } @@ -224,7 +221,7 @@ Status SchemaScanOperatorX::get_block(RuntimeState* state, vectorized::Block* bl do { block->clear(); for (int i = 0; i < _slot_num; ++i) { - auto dest_slot_desc = _dest_tuple_desc->slots()[i]; + auto* dest_slot_desc = _dest_tuple_desc->slots()[i]; block->insert(vectorized::ColumnWithTypeAndName( dest_slot_desc->get_empty_mutable_column(), dest_slot_desc->get_data_type_ptr(), dest_slot_desc->col_name())); @@ -258,7 +255,7 @@ Status SchemaScanOperatorX::get_block(RuntimeState* state, vectorized::Block* bl if (src_block.rows()) { // block->check_number_of_rows(); for (int i = 0; i < _slot_num; ++i) { - auto dest_slot_desc = _dest_tuple_desc->slots()[i]; + auto* dest_slot_desc = _dest_tuple_desc->slots()[i]; vectorized::MutableColumnPtr column_ptr = std::move(*block->get_by_position(i).column).mutate(); column_ptr->insert_range_from( diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java index 92206dc3fd..0995884dc6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java @@ -489,17 +489,22 @@ public class SchemaTable extends Table { .column("SPILL_THRESHOLD_HIGH_WATERMARK", ScalarType.createVarchar(256)) .column("TAG", ScalarType.createVarchar(256)) .build())) - .put("processlist", new SchemaTable(SystemIdGenerator.getNextId(), "processlist", TableType.SCHEMA, - builder().column("ID", ScalarType.createType(PrimitiveType.LARGEINT)) - .column("USER", ScalarType.createVarchar(32)) - .column("HOST", ScalarType.createVarchar(261)) - .column("CATALOG", ScalarType.createVarchar(64)) - .column("DB", ScalarType.createVarchar(64)) - .column("COMMAND", ScalarType.createVarchar(16)) - .column("TIME", ScalarType.createType(PrimitiveType.INT)) - .column("STATE", ScalarType.createVarchar(64)) - .column("INFO", ScalarType.createVarchar(ScalarType.MAX_VARCHAR_LENGTH)) - .build())) + .put("processlist", + new SchemaTable(SystemIdGenerator.getNextId(), "processlist", TableType.SCHEMA, + builder().column("CURRENT_CONNECTED", ScalarType.createVarchar(16)) + .column("ID", ScalarType.createType(PrimitiveType.LARGEINT)) + .column("USER", ScalarType.createVarchar(32)) + .column("HOST", ScalarType.createVarchar(261)) + .column("LOGIN_TIME", ScalarType.createType(PrimitiveType.DATETIMEV2)) + .column("CATALOG", ScalarType.createVarchar(64)) + .column("DB", ScalarType.createVarchar(64)) + .column("COMMAND", ScalarType.createVarchar(16)) + .column("TIME", ScalarType.createType(PrimitiveType.INT)) + .column("STATE", ScalarType.createVarchar(64)) + .column("QUERY_ID", ScalarType.createVarchar(256)) + .column("INFO", ScalarType.createVarchar(ScalarType.MAX_VARCHAR_LENGTH)) + .column("FE", + ScalarType.createVarchar(64)).build())) .put("workload_policy", new SchemaTable(SystemIdGenerator.getNextId(), "workload_policy", TableType.SCHEMA, builder().column("ID", ScalarType.createType(PrimitiveType.BIGINT)) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index ef6d3b9ea7..48a79bd9b0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -1023,7 +1023,7 @@ public class ConnectContext { if (connId == connectionId) { row.add("Yes"); } else { - row.add(""); + row.add("No"); } row.add("" + connectionId); row.add(ClusterNamespace.getNameFromFullName(qualifiedUser)); diff --git a/regression-test/suites/external_table_p0/info_schema_db/test_info_schema_db.groovy b/regression-test/suites/external_table_p0/info_schema_db/test_info_schema_db.groovy index ac85abe848..1ad8fc4068 100644 --- a/regression-test/suites/external_table_p0/info_schema_db/test_info_schema_db.groovy +++ b/regression-test/suites/external_table_p0/info_schema_db/test_info_schema_db.groovy @@ -131,4 +131,6 @@ suite("test_info_schema_db", "p0,external,hive,external_docker,external_docker_h qt_sql116 """select table_catalog, table_schema, table_name from information_schema.tables where table_schema='${innerdb}'""" qt_sql117 """select table_catalog, table_schema, table_name from ${catalog_name}.information_schema.columns where table_schema='tpch1_parquet'""" qt_sql118 """select table_catalog, table_schema, table_name from ${catalog_name}.INFORMATION_SCHEMA.COLUMNS where TABLE_SCHEMA='tpch1_parquet'""" + + sql "select * from information_schema.PROCESSLIST;" }