From fc12362a6dfefbead89d59d97f544ad191973fed Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Wed, 20 Sep 2023 14:42:54 +0800 Subject: [PATCH] [feature-wip](arrow-flight)(step2) FE support Arrow Flight server (#24314) This is a POC, the design documentation will be updated soon --- be/src/agent/heartbeat_server.cpp | 1 + be/src/common/config.cpp | 2 +- be/src/common/config.h | 6 +- be/src/runtime/buffer_control_block.cpp | 1 - be/src/runtime/result_buffer_mgr.cpp | 40 ++- be/src/runtime/result_buffer_mgr.h | 5 +- .../arrow_flight_batch_reader.cpp | 9 +- be/src/service/doris_main.cpp | 2 +- be/src/service/internal_service.cpp | 35 ++ be/src/service/internal_service.h | 5 + be/src/util/arrow/row_batch.cpp | 14 + be/src/util/arrow/row_batch.h | 3 + conf/be.conf | 2 +- conf/fe.conf | 1 + .../developer-guide/be-vscode-dev.md | 1 + .../community/developer-guide/fe-idea-dev.md | 1 + .../cluster-management/upgrade.md | 1 + docs/en/docs/admin-manual/config/be-config.md | 6 + docs/en/docs/admin-manual/config/fe-config.md | 6 + .../http-actions/fe/bootstrap-action.md | 3 + .../http-actions/fe/node-action.md | 1 + .../maint-monitor/metadata-operation.md | 5 + docs/en/docs/get-starting/quick-start.md | 1 + docs/en/docs/install/standard-deployment.md | 1 + .../table-functions/frontends.md | 2 + .../developer-guide/be-vscode-dev.md | 1 + .../community/developer-guide/fe-idea-dev.md | 1 + .../cluster-management/upgrade.md | 1 + .../docs/admin-manual/config/be-config.md | 6 + .../docs/admin-manual/config/fe-config.md | 6 + .../http-actions/fe/bootstrap-action.md | 3 + .../http-actions/fe/node-action.md | 1 + .../maint-monitor/metadata-operation.md | 4 + docs/zh-CN/docs/get-starting/quick-start.md | 2 + .../zh-CN/docs/install/standard-deployment.md | 1 + .../table-functions/frontends.md | 2 + .../java/org/apache/doris/common/Config.java | 3 + fe/fe-core/pom.xml | 71 +++- .../main/java/org/apache/doris/DorisFE.java | 8 +- .../doris/common/proc/BackendsProcDir.java | 9 +- .../doris/common/proc/FrontendsProcNode.java | 4 +- .../apache/doris/common/util/NetUtils.java | 2 + .../httpv2/rest/BootstrapFinishAction.java | 12 + .../httpv2/rest/manager/ClusterAction.java | 2 + .../doris/journal/bdbje/BDBDebugger.java | 3 +- .../translator/PhysicalPlanTranslator.java | 3 +- .../doris/planner/DistributedPlanner.java | 1 + .../apache/doris/planner/PlanFragment.java | 9 +- .../org/apache/doris/planner/ResultSink.java | 10 + .../org/apache/doris/qe/ConnectContext.java | 11 + .../java/org/apache/doris/qe/Coordinator.java | 38 ++ .../org/apache/doris/qe/PointQueryExec.java | 10 +- .../java/org/apache/doris/qe/QeService.java | 18 +- .../org/apache/doris/qe/StmtExecutor.java | 65 +++- .../doris/rpc/BackendServiceClient.java | 5 + .../apache/doris/rpc/BackendServiceProxy.java | 12 + .../doris/service/FrontendServiceImpl.java | 1 + .../service/arrowflight/FlightSqlService.java | 67 ++++ .../arrowflight/FlightSqlServiceImpl.java | 326 ++++++++++++++++++ .../arrowflight/FlightStatementExecutor.java | 224 ++++++++++++ .../java/org/apache/doris/system/Backend.java | 21 +- .../doris/system/BackendHbResponse.java | 11 +- .../org/apache/doris/system/Frontend.java | 6 + .../doris/system/FrontendHbResponse.java | 11 +- .../org/apache/doris/system/HeartbeatMgr.java | 12 +- .../doris/system/SystemInfoService.java | 1 + .../BackendsTableValuedFunction.java | 1 + .../FrontendsTableValuedFunction.java | 1 + .../LocalTableValuedFunction.java | 2 +- .../tablefunction/MetadataGenerator.java | 1 + .../apache/doris/system/HeartbeatMgrTest.java | 3 + .../doris/system/SystemInfoServiceTest.java | 2 +- .../apache/doris/utframe/AnotherDemoTest.java | 4 + .../doris/utframe/DemoMultiBackendsTest.java | 3 +- .../doris/utframe/MockedBackendFactory.java | 9 +- .../apache/doris/utframe/MockedFrontend.java | 1 + .../doris/utframe/TestWithFeService.java | 6 +- .../apache/doris/utframe/UtFrameUtils.java | 6 +- fe/pom.xml | 73 +++- gensrc/proto/internal_service.proto | 11 + gensrc/thrift/FrontendService.thrift | 1 + gensrc/thrift/HeartbeatService.thrift | 1 + .../performance_p0/redundant_conjuncts.out | 2 + .../suites/demo_p0/httpTest_action.groovy | 2 +- .../tvf/test_backends_tvf.groovy | 2 +- .../tvf/test_frontends_tvf.groovy | 4 +- .../test_map_load_and_compaction.groovy | 2 +- .../information_schema.groovy | 2 +- 88 files changed, 1226 insertions(+), 71 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlService.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlServiceImpl.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightStatementExecutor.java diff --git a/be/src/agent/heartbeat_server.cpp b/be/src/agent/heartbeat_server.cpp index 326758ffeb..964a898129 100644 --- a/be/src/agent/heartbeat_server.cpp +++ b/be/src/agent/heartbeat_server.cpp @@ -76,6 +76,7 @@ void HeartbeatServer::heartbeat(THeartbeatResult& heartbeat_result, heartbeat_result.backend_info.__set_http_port(config::webserver_port); heartbeat_result.backend_info.__set_be_rpc_port(-1); heartbeat_result.backend_info.__set_brpc_port(config::brpc_port); + heartbeat_result.backend_info.__set_arrow_flight_sql_port(config::arrow_flight_sql_port); heartbeat_result.backend_info.__set_version(get_short_version()); heartbeat_result.backend_info.__set_be_start_time(_be_epoch); heartbeat_result.backend_info.__set_be_node_role(config::be_node_role); diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 74980aa41d..5b3a8dafd2 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -59,7 +59,7 @@ DEFINE_Int32(be_port, "9060"); // port for brpc DEFINE_Int32(brpc_port, "8060"); -DEFINE_Int32(arrow_flight_port, "-1"); +DEFINE_Int32(arrow_flight_sql_port, "-1"); // the number of bthreads for brpc, the default value is set to -1, // which means the number of bthreads is #cpu-cores diff --git a/be/src/common/config.h b/be/src/common/config.h index 4c41e6f0e1..b34a6c98cc 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -96,9 +96,9 @@ DECLARE_Int32(be_port); // port for brpc DECLARE_Int32(brpc_port); -// port for arrow flight -// Default -1, do not start arrow flight server. -DECLARE_Int32(arrow_flight_port); +// port for arrow flight sql +// Default -1, do not start arrow flight sql server. +DECLARE_Int32(arrow_flight_sql_port); // the number of bthreads for brpc, the default value is set to -1, // which means the number of bthreads is #cpu-cores diff --git a/be/src/runtime/buffer_control_block.cpp b/be/src/runtime/buffer_control_block.cpp index c51ced1aeb..adbaf7fbb0 100644 --- a/be/src/runtime/buffer_control_block.cpp +++ b/be/src/runtime/buffer_control_block.cpp @@ -144,7 +144,6 @@ Status BufferControlBlock::add_batch(std::unique_ptr& result) _fe_result_batch_queue.push_back(std::move(result)); } _buffer_rows += num_rows; - _data_arrival.notify_one(); } else { auto ctx = _waiting_rpc.front(); _waiting_rpc.pop_front(); diff --git a/be/src/runtime/result_buffer_mgr.cpp b/be/src/runtime/result_buffer_mgr.cpp index a3b99300f2..c4d0f148ed 100644 --- a/be/src/runtime/result_buffer_mgr.cpp +++ b/be/src/runtime/result_buffer_mgr.cpp @@ -21,6 +21,9 @@ #include #include #include + +#include +#include // IWYU pragma: no_include #include // IWYU pragma: keep #include @@ -33,6 +36,7 @@ #include "util/doris_metrics.h" #include "util/metrics.h" #include "util/thread.h" +#include "util/uid_util.h" namespace doris { @@ -42,7 +46,7 @@ ResultBufferMgr::ResultBufferMgr() : _stop_background_threads_latch(1) { // Each BufferControlBlock has a limited queue size of 1024, it's not needed to count the // actual size of all BufferControlBlock. REGISTER_HOOK_METRIC(result_buffer_block_count, [this]() { - // std::lock_guard l(_lock); + // std::lock_guard l(_buffer_map_lock); return _buffer_map.size(); }); } @@ -80,7 +84,7 @@ Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size } { - std::lock_guard l(_lock); + std::unique_lock wlock(_buffer_map_lock); _buffer_map.insert(std::make_pair(query_id, control_block)); // BufferControlBlock should destroy after max_timeout // for exceed max_timeout FE will return timeout to client @@ -95,8 +99,7 @@ Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size } std::shared_ptr ResultBufferMgr::find_control_block(const TUniqueId& query_id) { - // TODO(zhaochun): this lock can be bottleneck? - std::lock_guard l(_lock); + std::shared_lock rlock(_buffer_map_lock); BufferMap::iterator iter = _buffer_map.find(query_id); if (_buffer_map.end() != iter) { @@ -108,14 +111,12 @@ std::shared_ptr ResultBufferMgr::find_control_block(const TU void ResultBufferMgr::register_row_descriptor(const TUniqueId& query_id, const RowDescriptor& row_desc) { - { - std::lock_guard l(_lock); - _row_descriptor_map.insert(std::make_pair(query_id, row_desc)); - } + std::unique_lock wlock(_row_descriptor_map_lock); + _row_descriptor_map.insert(std::make_pair(query_id, row_desc)); } RowDescriptor ResultBufferMgr::find_row_descriptor(const TUniqueId& query_id) { - std::lock_guard l(_lock); + std::shared_lock rlock(_row_descriptor_map_lock); RowDescriptorMap::iterator iter = _row_descriptor_map.find(query_id); if (_row_descriptor_map.end() != iter) { @@ -150,18 +151,23 @@ Status ResultBufferMgr::fetch_arrow_data(const TUniqueId& finst_id, } Status ResultBufferMgr::cancel(const TUniqueId& query_id) { - std::lock_guard l(_lock); - BufferMap::iterator iter = _buffer_map.find(query_id); + { + std::unique_lock wlock(_buffer_map_lock); + BufferMap::iterator iter = _buffer_map.find(query_id); - if (_buffer_map.end() != iter) { - iter->second->cancel(); - _buffer_map.erase(iter); + if (_buffer_map.end() != iter) { + iter->second->cancel(); + _buffer_map.erase(iter); + } } - RowDescriptorMap::iterator row_desc_iter = _row_descriptor_map.find(query_id); + { + std::unique_lock wlock(_row_descriptor_map_lock); + RowDescriptorMap::iterator row_desc_iter = _row_descriptor_map.find(query_id); - if (_row_descriptor_map.end() != row_desc_iter) { - _row_descriptor_map.erase(row_desc_iter); + if (_row_descriptor_map.end() != row_desc_iter) { + _row_descriptor_map.erase(row_desc_iter); + } } return Status::OK(); diff --git a/be/src/runtime/result_buffer_mgr.h b/be/src/runtime/result_buffer_mgr.h index 8c9b621968..4e5cd38a72 100644 --- a/be/src/runtime/result_buffer_mgr.h +++ b/be/src/runtime/result_buffer_mgr.h @@ -23,6 +23,7 @@ #include #include #include +#include #include #include @@ -86,9 +87,11 @@ private: void cancel_thread(); // lock for buffer map - std::mutex _lock; + std::shared_mutex _buffer_map_lock; // buffer block map BufferMap _buffer_map; + // lock for descriptor map + std::shared_mutex _row_descriptor_map_lock; // for arrow flight RowDescriptorMap _row_descriptor_map; diff --git a/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp b/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp index d7a648c54b..8a0f1e6785 100644 --- a/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp +++ b/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp @@ -57,12 +57,17 @@ arrow::Result> ArrowFlightBatchReader::C } arrow::Status ArrowFlightBatchReader::ReadNext(std::shared_ptr* out) { - CHECK(*out == nullptr); + // *out not nullptr + *out = nullptr; auto st = ExecEnv::GetInstance()->result_mgr()->fetch_arrow_data(statement_->query_id, out); if (UNLIKELY(!st.ok())) { - LOG(WARNING) << st.to_string(); + LOG(WARNING) << "ArrowFlightBatchReader fetch arrow data failed: " + st.to_string(); ARROW_RETURN_NOT_OK(to_arrow_status(st)); } + if (*out != nullptr) { + VLOG_NOTICE << "ArrowFlightBatchReader read next: " << (*out)->num_rows() << ", " + << (*out)->num_columns(); + } return arrow::Status::OK(); } diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index abfa7913e7..3e2552ef23 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -536,7 +536,7 @@ int main(int argc, char** argv) { // 5. arrow flight service std::shared_ptr flight_server = std::move(doris::flight::FlightSqlServer::create()).ValueOrDie(); - status = flight_server->init(doris::config::arrow_flight_port); + status = flight_server->init(doris::config::arrow_flight_sql_port); // 6. start daemon thread to do clean or gc jobs doris::Daemon daemon; diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 525e9ea024..5367135c13 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -96,6 +96,7 @@ #include "runtime/thread_context.h" #include "runtime/types.h" #include "service/point_query_executor.h" +#include "util/arrow/row_batch.h" #include "util/async_io.h" #include "util/brpc_client_cache.h" #include "util/doris_metrics.h" @@ -704,6 +705,40 @@ void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c } } +void PInternalServiceImpl::fetch_arrow_flight_schema(google::protobuf::RpcController* controller, + const PFetchArrowFlightSchemaRequest* request, + PFetchArrowFlightSchemaResult* result, + google::protobuf::Closure* done) { + bool ret = _light_work_pool.try_offer([request, result, done]() { + brpc::ClosureGuard closure_guard(done); + RowDescriptor row_desc = ExecEnv::GetInstance()->result_mgr()->find_row_descriptor( + UniqueId(request->finst_id()).to_thrift()); + if (row_desc.equals(RowDescriptor())) { + auto st = Status::NotFound("not found row descriptor"); + st.to_protobuf(result->mutable_status()); + return; + } + + std::shared_ptr schema; + auto st = convert_to_arrow_schema(row_desc, &schema); + if (UNLIKELY(!st.ok())) { + st.to_protobuf(result->mutable_status()); + return; + } + + std::string schema_str; + st = serialize_arrow_schema(row_desc, &schema, &schema_str); + if (st.ok()) { + result->set_schema(std::move(schema_str)); + } + st.to_protobuf(result->mutable_status()); + }); + if (!ret) { + offer_failed(result, done, _heavy_work_pool); + return; + } +} + Status PInternalServiceImpl::_tablet_fetch_data(const PTabletKeyLookupRequest* request, PTabletKeyLookupResponse* response) { PointQueryExecutor lookup_util; diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index e7a5914274..db0ee07581 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -82,6 +82,11 @@ public: PFetchTableSchemaResult* result, google::protobuf::Closure* done) override; + void fetch_arrow_flight_schema(google::protobuf::RpcController* controller, + const PFetchArrowFlightSchemaRequest* request, + PFetchArrowFlightSchemaResult* result, + google::protobuf::Closure* done) override; + void tablet_writer_open(google::protobuf::RpcController* controller, const PTabletWriterOpenRequest* request, PTabletWriterOpenResult* response, diff --git a/be/src/util/arrow/row_batch.cpp b/be/src/util/arrow/row_batch.cpp index 78fe346be6..b60034696a 100644 --- a/be/src/util/arrow/row_batch.cpp +++ b/be/src/util/arrow/row_batch.cpp @@ -37,6 +37,8 @@ #include "runtime/define_primitive_type.h" #include "runtime/descriptors.h" #include "runtime/types.h" +#include "util/arrow/block_convertor.h" +#include "vec/core/block.h" namespace doris { @@ -188,4 +190,16 @@ Status serialize_record_batch(const arrow::RecordBatch& record_batch, std::strin return Status::OK(); } +Status serialize_arrow_schema(RowDescriptor row_desc, std::shared_ptr* schema, + std::string* result) { + std::vector slots; + for (auto tuple_desc : row_desc.tuple_descriptors()) { + slots.insert(slots.end(), tuple_desc->slots().begin(), tuple_desc->slots().end()); + } + auto block = vectorized::Block(slots, 0); + std::shared_ptr batch; + RETURN_IF_ERROR(convert_to_arrow_batch(block, *schema, arrow::default_memory_pool(), &batch)); + return serialize_record_batch(*batch, result); +} + } // namespace doris diff --git a/be/src/util/arrow/row_batch.h b/be/src/util/arrow/row_batch.h index 8946502f83..b5e9d8d3c3 100644 --- a/be/src/util/arrow/row_batch.h +++ b/be/src/util/arrow/row_batch.h @@ -43,4 +43,7 @@ Status convert_to_arrow_schema(const RowDescriptor& row_desc, Status serialize_record_batch(const arrow::RecordBatch& record_batch, std::string* result); +Status serialize_arrow_schema(RowDescriptor row_desc, std::shared_ptr* schema, + std::string* result); + } // namespace doris diff --git a/conf/be.conf b/conf/be.conf index e91eb7d52d..52ac34a91b 100644 --- a/conf/be.conf +++ b/conf/be.conf @@ -40,7 +40,7 @@ be_port = 9060 webserver_port = 8040 heartbeat_service_port = 9050 brpc_port = 8060 -arrow_flight_port = -1 +arrow_flight_sql_port = -1 # HTTPS configures enable_https = false diff --git a/conf/fe.conf b/conf/fe.conf index 82701115b9..fd145e743e 100644 --- a/conf/fe.conf +++ b/conf/fe.conf @@ -52,6 +52,7 @@ http_port = 8030 rpc_port = 9020 query_port = 9030 edit_log_port = 9010 +arrow_flight_sql_port = -1 # Choose one if there are more than one ip except loopback address. # Note that there should at most one ip match this list. diff --git a/docs/en/community/developer-guide/be-vscode-dev.md b/docs/en/community/developer-guide/be-vscode-dev.md index bf70f93c2c..a01dabddd3 100644 --- a/docs/en/community/developer-guide/be-vscode-dev.md +++ b/docs/en/community/developer-guide/be-vscode-dev.md @@ -115,6 +115,7 @@ be_rpc_port = 9070 webserver_port = 8040 heartbeat_service_port = 9050 brpc_port = 8060 +arrow_flight_sql_port = -1 # Note that there should be at most one ip that matches this list. # If no ip matches this rule, it will choose one randomly. diff --git a/docs/en/community/developer-guide/fe-idea-dev.md b/docs/en/community/developer-guide/fe-idea-dev.md index b873bc8d15..d52454cdb6 100644 --- a/docs/en/community/developer-guide/fe-idea-dev.md +++ b/docs/en/community/developer-guide/fe-idea-dev.md @@ -174,6 +174,7 @@ sys_log_level = INFO http_port = 8030 rpc_port = 9020 query_port = 9030 +arrow_flight_sql_port = -1 edit_log_port = 9010 # Choose one if there are more than one ip except loopback address. diff --git a/docs/en/docs/admin-manual/cluster-management/upgrade.md b/docs/en/docs/admin-manual/cluster-management/upgrade.md index 41ae6baf61..5cdb67c72a 100644 --- a/docs/en/docs/admin-manual/cluster-management/upgrade.md +++ b/docs/en/docs/admin-manual/cluster-management/upgrade.md @@ -144,6 +144,7 @@ admin set frontend config("disable_tablet_scheduler" = "true"); http_port = 18030 rpc_port = 19020 query_port = 19030 + arrow_flight_sql_port = 19040 edit_log_port = 19010 ... ``` diff --git a/docs/en/docs/admin-manual/config/be-config.md b/docs/en/docs/admin-manual/config/be-config.md index 08c1206746..dac22a43a8 100644 --- a/docs/en/docs/admin-manual/config/be-config.md +++ b/docs/en/docs/admin-manual/config/be-config.md @@ -127,6 +127,12 @@ There are two ways to configure BE configuration items: * Description: The port of BRPC on BE, used for communication between BEs * Default value: 8060 +#### `arrow_flight_sql_port` + +* Type: int32 +* Description: The port of Arrow Flight SQL server on BE, used for communication between Arrow Flight Client and BE +* Default value: -1 + #### `enable_https` * Type: bool diff --git a/docs/en/docs/admin-manual/config/fe-config.md b/docs/en/docs/admin-manual/config/fe-config.md index cdca4f048d..8fc84e6ef4 100644 --- a/docs/en/docs/admin-manual/config/fe-config.md +++ b/docs/en/docs/admin-manual/config/fe-config.md @@ -384,6 +384,12 @@ Default:9030 FE MySQL server port +#### `arrow_flight_sql_port` + +Default:-1 + +Arrow Flight SQL server port + #### `frontend_address` Status: Deprecated, not recommended use. This parameter may be deleted later diff --git a/docs/en/docs/admin-manual/http-actions/fe/bootstrap-action.md b/docs/en/docs/admin-manual/http-actions/fe/bootstrap-action.md index d08b932dd2..17d4dd9a5a 100644 --- a/docs/en/docs/admin-manual/http-actions/fe/bootstrap-action.md +++ b/docs/en/docs/admin-manual/http-actions/fe/bootstrap-action.md @@ -76,6 +76,7 @@ none "data": { "queryPort": 9030, "rpcPort": 9020, + "arrowFlightSqlPort": 9040, "maxReplayedJournal": 17287 }, "count": 0 @@ -85,6 +86,7 @@ none * `queryPort` is the MySQL protocol port of the FE node. * `rpcPort` is the thrift RPC port of the FE node. * `maxReplayedJournal` represents the maximum metadata journal id currently played back by the FE node. + * `arrowFlightSqlPort` is the Arrow Flight SQL port of the FE node. ## Examples @@ -114,6 +116,7 @@ none "data": { "queryPort": 9030, "rpcPort": 9020, + "arrowFlightSqlPort": 9040, "maxReplayedJournal": 17287 }, "count": 0 diff --git a/docs/en/docs/admin-manual/http-actions/fe/node-action.md b/docs/en/docs/admin-manual/http-actions/fe/node-action.md index e1189b7e4e..842d58c7a3 100644 --- a/docs/en/docs/admin-manual/http-actions/fe/node-action.md +++ b/docs/en/docs/admin-manual/http-actions/fe/node-action.md @@ -80,6 +80,7 @@ frontends: "HttpPort", "QueryPort", "RpcPort", + "ArrowFlightSqlPort", "Role", "IsMaster", "ClusterId", diff --git a/docs/en/docs/admin-manual/maint-monitor/metadata-operation.md b/docs/en/docs/admin-manual/maint-monitor/metadata-operation.md index 1975b448e5..a8483da8f8 100644 --- a/docs/en/docs/admin-manual/maint-monitor/metadata-operation.md +++ b/docs/en/docs/admin-manual/maint-monitor/metadata-operation.md @@ -239,6 +239,7 @@ FE currently has the following ports * http_port: http port, also used to push image * rpc_port: thrift server port of Frontend * query_port: Mysql connection port +* arrow_flight_sql_port: Arrow Flight SQL connection port 1. edit_log_port @@ -256,6 +257,10 @@ FE currently has the following ports After modifying the configuration, restart FE directly. This only affects mysql's connection target. +5. arrow_flight_sql_port + + After modifying the configuration, restart FE directly. This only affects arrow flight sql server connection target. + ### Recover metadata from FE memory In some extreme cases, the image file on the disk may be damaged, but the metadata in the memory is intact. At this point, we can dump the metadata from the memory and replace the image file on the disk to recover the metadata. the entire non-stop query service operation steps are as follows: diff --git a/docs/en/docs/get-starting/quick-start.md b/docs/en/docs/get-starting/quick-start.md index 414fe14018..db145540cc 100644 --- a/docs/en/docs/get-starting/quick-start.md +++ b/docs/en/docs/get-starting/quick-start.md @@ -143,6 +143,7 @@ mysql> show frontends\G; HttpPort: 8030 QueryPort: 9030 RpcPort: 9020 +ArrowFlightSqlPort: 9040 Role: FOLLOWER IsMaster: true ClusterId: 1685821635 diff --git a/docs/en/docs/install/standard-deployment.md b/docs/en/docs/install/standard-deployment.md index e546d64c58..5622ea84e1 100644 --- a/docs/en/docs/install/standard-deployment.md +++ b/docs/en/docs/install/standard-deployment.md @@ -123,6 +123,7 @@ Doris instances communicate directly over the network. The following table shows | FE | http_port | 8030 | FE <--> FE, user <--> FE | HTTP server port on FE | | FE | rpc_port | 9020 | BE --> FE, FE <--> FE | Thrift server port on FE; The configurations of each FE should be consistent. | | FE | query_port | 9030 | user <--> FE | MySQL server port on FE | +| FE | arrow_flight_sql_port | 9040 | user <--> FE | Arrow Flight SQL server port on FE | | FE | edit\_log_port | 9010 | FE <--> FE | Port on FE for BDBJE communication | | Broker | broker ipc_port | 8000 | FE --> Broker, BE --> Broker | Thrift server port on Broker for receiving requests | diff --git a/docs/en/docs/sql-manual/sql-functions/table-functions/frontends.md b/docs/en/docs/sql-manual/sql-functions/table-functions/frontends.md index 52bc90cd7b..c78f19a649 100644 --- a/docs/en/docs/sql-manual/sql-functions/table-functions/frontends.md +++ b/docs/en/docs/sql-manual/sql-functions/table-functions/frontends.md @@ -56,6 +56,7 @@ mysql> desc function frontends(); | HttpPort | TEXT | No | false | NULL | NONE | | QueryPort | TEXT | No | false | NULL | NONE | | RpcPort | TEXT | No | false | NULL | NONE | +| ArrowFlightSqlPort| TEXT | No | false | NULL | NONE | | Role | TEXT | No | false | NULL | NONE | | IsMaster | TEXT | No | false | NULL | NONE | | ClusterId | TEXT | No | false | NULL | NONE | @@ -85,6 +86,7 @@ mysql> select * from frontends()\G HttpPort: 8034 QueryPort: 9033 RpcPort: 9023 +ArrowFlightSqlPort: 9040 Role: FOLLOWER IsMaster: true ClusterId: 1258341841 diff --git a/docs/zh-CN/community/developer-guide/be-vscode-dev.md b/docs/zh-CN/community/developer-guide/be-vscode-dev.md index 7a18a186f3..9e8a1855fa 100644 --- a/docs/zh-CN/community/developer-guide/be-vscode-dev.md +++ b/docs/zh-CN/community/developer-guide/be-vscode-dev.md @@ -114,6 +114,7 @@ be_rpc_port = 9070 webserver_port = 8040 heartbeat_service_port = 9050 brpc_port = 8060 +arrow_flight_sql_port = -1 # Note that there should at most one ip match this list. # If no ip match this rule, will choose one randomly. diff --git a/docs/zh-CN/community/developer-guide/fe-idea-dev.md b/docs/zh-CN/community/developer-guide/fe-idea-dev.md index 62813adc3b..5eb1a70548 100644 --- a/docs/zh-CN/community/developer-guide/fe-idea-dev.md +++ b/docs/zh-CN/community/developer-guide/fe-idea-dev.md @@ -169,6 +169,7 @@ sys_log_level = INFO http_port = 8030 rpc_port = 9020 query_port = 9030 +arrow_flight_sql_port = -1 edit_log_port = 9010 # Choose one if there are more than one ip except loopback address. diff --git a/docs/zh-CN/docs/admin-manual/cluster-management/upgrade.md b/docs/zh-CN/docs/admin-manual/cluster-management/upgrade.md index 27f2994b8e..0b2145b9c7 100644 --- a/docs/zh-CN/docs/admin-manual/cluster-management/upgrade.md +++ b/docs/zh-CN/docs/admin-manual/cluster-management/upgrade.md @@ -144,6 +144,7 @@ admin set frontend config("disable_tablet_scheduler" = "true"); http_port = 18030 rpc_port = 19020 query_port = 19030 + arrow_flight_sql_port = 19040 edit_log_port = 19010 ... ``` diff --git a/docs/zh-CN/docs/admin-manual/config/be-config.md b/docs/zh-CN/docs/admin-manual/config/be-config.md index 7501a80019..9e91afd12e 100644 --- a/docs/zh-CN/docs/admin-manual/config/be-config.md +++ b/docs/zh-CN/docs/admin-manual/config/be-config.md @@ -123,6 +123,12 @@ BE 重启后该配置将失效。如果想持久化修改结果,使用如下 * 描述:BE 上的 brpc 的端口,用于 BE 之间通讯 * 默认值:8060 +#### `arrow_flight_sql_port` + +* 类型:int32 +* 描述:FE 上的 Arrow Flight SQL server 的端口,用于从 Arrow Flight Client 和 BE 之间通讯 +* 默认值:-1 + #### `enable_https` * 类型:bool diff --git a/docs/zh-CN/docs/admin-manual/config/fe-config.md b/docs/zh-CN/docs/admin-manual/config/fe-config.md index 1079b60431..e4c02ef8b4 100644 --- a/docs/zh-CN/docs/admin-manual/config/fe-config.md +++ b/docs/zh-CN/docs/admin-manual/config/fe-config.md @@ -384,6 +384,12 @@ heartbeat_mgr 中处理心跳事件的线程数。 Doris FE 通过 mysql 协议查询连接端口 +#### `arrow_flight_sql_port` + +默认值:-1 + +Doris FE 通过 Arrow Flight SQL 协议查询连接端口 + #### `frontend_address` 状态:已弃用,不建议使用。 diff --git a/docs/zh-CN/docs/admin-manual/http-actions/fe/bootstrap-action.md b/docs/zh-CN/docs/admin-manual/http-actions/fe/bootstrap-action.md index 689da335c9..d767aa1cc5 100644 --- a/docs/zh-CN/docs/admin-manual/http-actions/fe/bootstrap-action.md +++ b/docs/zh-CN/docs/admin-manual/http-actions/fe/bootstrap-action.md @@ -76,6 +76,7 @@ under the License. "data": { "queryPort": 9030, "rpcPort": 9020, + "arrowFlightSqlPort": 9040, "maxReplayedJournal": 17287 }, "count": 0 @@ -85,6 +86,7 @@ under the License. * `queryPort` 是 FE 节点的 MySQL 协议端口。 * `rpcPort` 是 FE 节点的 thrift RPC 端口。 * `maxReplayedJournal` 表示 FE 节点当前回放的最大元数据日志id。 + * `arrowFlightSqlPort` 是 FE 节点的 Arrow Flight SQL 协议端口。 ## Examples @@ -114,6 +116,7 @@ under the License. "data": { "queryPort": 9030, "rpcPort": 9020, + "arrowFlightSqlPort": 9040, "maxReplayedJournal": 17287 }, "count": 0 diff --git a/docs/zh-CN/docs/admin-manual/http-actions/fe/node-action.md b/docs/zh-CN/docs/admin-manual/http-actions/fe/node-action.md index 9960ad1551..53cc693b6f 100644 --- a/docs/zh-CN/docs/admin-manual/http-actions/fe/node-action.md +++ b/docs/zh-CN/docs/admin-manual/http-actions/fe/node-action.md @@ -80,6 +80,7 @@ frontends: "HttpPort", "QueryPort", "RpcPort", + "ArrowFlightSqlPort", "Role", "IsMaster", "ClusterId", diff --git a/docs/zh-CN/docs/admin-manual/maint-monitor/metadata-operation.md b/docs/zh-CN/docs/admin-manual/maint-monitor/metadata-operation.md index 12ef2de434..beb10e06b3 100644 --- a/docs/zh-CN/docs/admin-manual/maint-monitor/metadata-operation.md +++ b/docs/zh-CN/docs/admin-manual/maint-monitor/metadata-operation.md @@ -240,6 +240,7 @@ FE 目前有以下几个端口 * http_port:http 端口,也用于推送 image * rpc_port:FE 的 thrift server port * query_port:Mysql 连接端口 +* arrow_flight_sql_port: Arrow Flight SQL 连接端口 1. edit_log_port @@ -257,6 +258,9 @@ FE 目前有以下几个端口 修改配置后,直接重启 FE 即可。这个只影响到 mysql 的连接目标。 +5. arrow_flight_sql_port + + 修改配置后,直接重启 FE 即可。这个只影响到 Arrow Flight SQL 的连接目标。 ### 从 FE 内存中恢复元数据 diff --git a/docs/zh-CN/docs/get-starting/quick-start.md b/docs/zh-CN/docs/get-starting/quick-start.md index 8df83dab60..ff9e75bdfa 100644 --- a/docs/zh-CN/docs/get-starting/quick-start.md +++ b/docs/zh-CN/docs/get-starting/quick-start.md @@ -147,6 +147,7 @@ mysql> show frontends\G HttpPort: 8030 QueryPort: 9030 RpcPort: 9020 +ArrowFlightSqlPort: 9040 Role: FOLLOWER IsMaster: true ClusterId: 1685821635 @@ -277,6 +278,7 @@ mysql> SHOW BACKENDS\G BePort: 9060 HttpPort: 8040 BrpcPort: 8060 + ArrowFlightSqlPort: 8070 LastStartTime: 2022-08-16 15:31:37 LastHeartbeat: 2022-08-17 13:33:17 Alive: true diff --git a/docs/zh-CN/docs/install/standard-deployment.md b/docs/zh-CN/docs/install/standard-deployment.md index a338ab5f35..923dc52a3d 100644 --- a/docs/zh-CN/docs/install/standard-deployment.md +++ b/docs/zh-CN/docs/install/standard-deployment.md @@ -119,6 +119,7 @@ Doris 各个实例直接通过网络进行通讯。以下表格展示了所有 | FE | http_port | 8030 | FE <--> FE,用户 <--> FE |FE 上的 http server 端口 | | FE | rpc_port | 9020 | BE --> FE, FE <--> FE | FE 上的 thrift server 端口,每个fe的配置需要保持一致| | FE | query_port | 9030 | 用户 <--> FE | FE 上的 mysql server 端口 | +| FE | arrow_flight_sql_port | 9040 | 用户 <--> FE | FE 上的 Arrow Flight SQL server 端口 | | FE | edit\_log_port | 9010 | FE <--> FE | FE 上的 bdbje 之间通信用的端口 | | Broker | broker\_ipc_port | 8000 | FE --> Broker, BE --> Broker | Broker 上的 thrift server,用于接收请求 | diff --git a/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/frontends.md b/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/frontends.md index f367fd3014..a2c85ec754 100644 --- a/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/frontends.md +++ b/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/frontends.md @@ -55,6 +55,7 @@ mysql> desc function frontends(); | HttpPort | TEXT | No | false | NULL | NONE | | QueryPort | TEXT | No | false | NULL | NONE | | RpcPort | TEXT | No | false | NULL | NONE | +| ArrowFlightSqlPort| TEXT | No | false | NULL | NONE | | Role | TEXT | No | false | NULL | NONE | | IsMaster | TEXT | No | false | NULL | NONE | | ClusterId | TEXT | No | false | NULL | NONE | @@ -84,6 +85,7 @@ mysql> select * from frontends()\G HttpPort: 8034 QueryPort: 9033 RpcPort: 9023 +ArrowFlightSqlPort: 9040 Role: FOLLOWER IsMaster: true ClusterId: 1258341841 diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 1d4f35fef7..a2aa07ba24 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -390,6 +390,9 @@ public class Config extends ConfigBase { @ConfField(description = {"FE MySQL server 的端口号", "The port of FE MySQL server"}) public static int query_port = 9030; + @ConfField(description = {"FE Arrow-Flight-SQL server 的端口号", "The port of FE Arrow-Flight-SQ server"}) + public static int arrow_flight_sql_port = -1; + @ConfField(description = {"MySQL 服务的 IO 线程数", "The number of IO threads in MySQL service"}) public static int mysql_service_io_threads_num = 4; diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml index 26e49d276e..0c00f1f442 100644 --- a/fe/fe-core/pom.xml +++ b/fe/fe-core/pom.xml @@ -496,6 +496,10 @@ under the License. org.apache.httpcomponents httpclient + + com.google.flatbuffers + flatbuffers-java + org.apache.doris hive-catalog-shade @@ -706,6 +710,64 @@ under the License. quartz 2.3.2 + + + + org.apache.arrow + arrow-memory-netty + + + io.grpc + grpc-netty + + + io.grpc + grpc-core + + + io.grpc + grpc-context + + + io.netty + netty-buffer + + + io.netty + netty-handler + + + io.netty + netty-transport + + + io.grpc + grpc-api + + + org.apache.arrow + flight-core + + + org.apache.arrow + arrow-memory-core + + + org.apache.arrow + arrow-jdbc + + + org.apache.arrow + arrow-vector + + + org.hamcrest + hamcrest + + + org.apache.arrow + flight-sql + @@ -777,7 +839,7 @@ under the License. - + de.jflex @@ -1053,5 +1115,12 @@ under the License. + + + kr.motd.maven + os-maven-plugin + 1.7.0 + + diff --git a/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java b/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java index f204d11edb..59dd3d96b0 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java +++ b/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java @@ -193,7 +193,8 @@ public class DorisFE { } if (options.enableQeService) { - QeService qeService = new QeService(Config.query_port, ExecuteEnv.getInstance().getScheduler()); + QeService qeService = new QeService(Config.query_port, Config.arrow_flight_sql_port, + ExecuteEnv.getInstance().getScheduler()); qeService.start(); } @@ -231,6 +232,11 @@ public class DorisFE { "Rpc port", NetUtils.RPC_PORT_SUGGESTION)) { throw new IOException("port " + Config.rpc_port + " already in use"); } + if (Config.arrow_flight_sql_port != -1 + && !NetUtils.isPortAvailable(FrontendOptions.getLocalHostAddress(), Config.arrow_flight_sql_port, + "Arrow Flight SQL port", NetUtils.ARROW_FLIGHT_SQL_SUGGESTION)) { + throw new IOException("port " + Config.arrow_flight_sql_port + " already in use"); + } } /* diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/BackendsProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/BackendsProcDir.java index 7658cfae07..647e4caf57 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/BackendsProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/BackendsProcDir.java @@ -44,10 +44,10 @@ public class BackendsProcDir implements ProcDirInterface { private static final Logger LOG = LogManager.getLogger(BackendsProcDir.class); public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder().add("BackendId") - .add("Host").add("HeartbeatPort").add("BePort").add("HttpPort").add("BrpcPort").add("LastStartTime") - .add("LastHeartbeat").add("Alive").add("SystemDecommissioned").add("TabletNum").add("DataUsedCapacity") - .add("TrashUsedCapcacity").add("AvailCapacity").add("TotalCapacity").add("UsedPct").add("MaxDiskUsedPct") - .add("RemoteUsedCapacity").add("Tag").add("ErrMsg").add("Version").add("Status") + .add("Host").add("HeartbeatPort").add("BePort").add("HttpPort").add("BrpcPort").add("ArrowFlightSqlPort") + .add("LastStartTime").add("LastHeartbeat").add("Alive").add("SystemDecommissioned").add("TabletNum") + .add("DataUsedCapacity").add("TrashUsedCapcacity").add("AvailCapacity").add("TotalCapacity").add("UsedPct") + .add("MaxDiskUsedPct").add("RemoteUsedCapacity").add("Tag").add("ErrMsg").add("Version").add("Status") .add("HeartbeatFailureCounter").add("NodeRole") .build(); @@ -107,6 +107,7 @@ public class BackendsProcDir implements ProcDirInterface { backendInfo.add(String.valueOf(backend.getBePort())); backendInfo.add(String.valueOf(backend.getHttpPort())); backendInfo.add(String.valueOf(backend.getBrpcPort())); + backendInfo.add(String.valueOf(backend.getArrowFlightSqlPort())); backendInfo.add(TimeUtils.longToTimeString(backend.getLastStartTime())); backendInfo.add(TimeUtils.longToTimeString(backend.getLastUpdateMs())); backendInfo.add(String.valueOf(backend.isAlive())); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java index e11cd81058..0e50025963 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java @@ -46,7 +46,7 @@ public class FrontendsProcNode implements ProcNodeInterface { public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() .add("Name").add("Host").add("EditLogPort").add("HttpPort").add("QueryPort").add("RpcPort") - .add("Role").add("IsMaster").add("ClusterId").add("Join").add("Alive") + .add("ArrowFlightSqlPort").add("Role").add("IsMaster").add("ClusterId").add("Join").add("Alive") .add("ReplayedJournalId").add("LastStartTime").add("LastHeartbeat") .add("IsHelper").add("ErrMsg").add("Version") .add("CurrentConnected") @@ -119,9 +119,11 @@ public class FrontendsProcNode implements ProcNodeInterface { if (fe.getHost().equals(env.getSelfNode().getHost())) { info.add(Integer.toString(Config.query_port)); info.add(Integer.toString(Config.rpc_port)); + info.add(Integer.toString(Config.arrow_flight_sql_port)); } else { info.add(Integer.toString(fe.getQueryPort())); info.add(Integer.toString(fe.getRpcPort())); + info.add(Integer.toString(fe.getArrowFlightSqlPort())); } info.add(fe.getRole().name()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/NetUtils.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/NetUtils.java index 334dd11564..0c1ac130cd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/NetUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/NetUtils.java @@ -48,6 +48,8 @@ public class NetUtils { public static final String HTTPS_PORT_SUGGESTION = "Please change the 'https_port' in fe.conf and try again. " + "But you need to make sure that ALL FEs https_port are same."; public static final String RPC_PORT_SUGGESTION = "Please change the 'rpc_port' in fe.conf and try again."; + public static final String ARROW_FLIGHT_SQL_SUGGESTION = + "Please change the 'arrow_flight_sql_port' in fe.conf and try again."; // Target format is "host:port" public static InetSocketAddress createSocketAddr(String target) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/BootstrapFinishAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/BootstrapFinishAction.java index b2878522ed..fb503f7fee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/BootstrapFinishAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/BootstrapFinishAction.java @@ -40,6 +40,7 @@ import javax.servlet.http.HttpServletResponse; * "data": { * "queryPort": 9030, * "rpcPort": 9020, + * "arrowFlightSqlPort": 9040, * "maxReplayedJournal": 17287 * }, * "count": 0 @@ -53,6 +54,7 @@ public class BootstrapFinishAction extends RestBaseController { public static final String REPLAYED_JOURNAL_ID = "replayedJournalId"; public static final String QUERY_PORT = "queryPort"; + public static final String ARROW_FLIGHT_SQL_PORT = "arrowFlightSqlPort"; public static final String RPC_PORT = "rpcPort"; public static final String VERSION = "version"; @@ -91,6 +93,7 @@ public class BootstrapFinishAction extends RestBaseController { result.setReplayedJournalId(replayedJournalId); result.setQueryPort(Config.query_port); result.setRpcPort(Config.rpc_port); + result.setArrowFlightSqlPort(Config.arrow_flight_sql_port); result.setVersion(Version.DORIS_BUILD_VERSION + "-" + Version.DORIS_BUILD_SHORT_HASH); } @@ -107,6 +110,7 @@ public class BootstrapFinishAction extends RestBaseController { private long replayedJournalId = 0; private int queryPort = 0; private int rpcPort = 0; + private int arrowFlightSqlPort = 0; private String version = ""; public BootstrapResult() { @@ -125,10 +129,18 @@ public class BootstrapFinishAction extends RestBaseController { this.queryPort = queryPort; } + public void setArrowFlightSqlPort(int arrowFlightSqlPort) { + this.arrowFlightSqlPort = arrowFlightSqlPort; + } + public int getQueryPort() { return queryPort; } + public int getArrowFlightSqlPort() { + return arrowFlightSqlPort; + } + public void setRpcPort(int rpcPort) { this.rpcPort = rpcPort; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/ClusterAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/ClusterAction.java index 983bafc852..929e461013 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/ClusterAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/ClusterAction.java @@ -65,6 +65,8 @@ public class ClusterAction extends RestBaseController { result.put("mysql", frontends.stream().map(ip -> ip + ":" + Config.query_port).collect(Collectors.toList())); result.put("http", frontends.stream().map(ip -> ip + ":" + Config.http_port).collect(Collectors.toList())); + result.put("arrow flight sql server", frontends.stream().map( + ip -> ip + ":" + Config.arrow_flight_sql_port).collect(Collectors.toList())); return ResponseEntityBuilder.ok(result); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBDebugger.java b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBDebugger.java index e6b0f872f4..ae48526515 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBDebugger.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBDebugger.java @@ -87,7 +87,8 @@ public class BDBDebugger { httpServer.start(); // MySQl server - QeService qeService = new QeService(Config.query_port, ExecuteEnv.getInstance().getScheduler()); + QeService qeService = new QeService(Config.query_port, Config.arrow_flight_sql_port, + ExecuteEnv.getInstance().getScheduler()); qeService.start(); ThreadPoolManager.registerAllThreadPoolMetric(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 6a67a17e3c..1982023cc5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -333,7 +333,8 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor physicalResultSink, PlanTranslatorContext context) { PlanFragment planFragment = physicalResultSink.child().accept(this, context); - planFragment.setSink(new ResultSink(planFragment.getPlanRoot().getId())); + planFragment.setSink(new ResultSink(planFragment.getPlanRoot().getId(), + ConnectContext.get().getResultSinkType())); return planFragment; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index a719081496..7dc45029e7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -268,6 +268,7 @@ public class DistributedPlanner { mergePlan.init(ctx.getRootAnalyzer()); Preconditions.checkState(mergePlan.hasValidStats()); PlanFragment fragment = new PlanFragment(ctx.getNextFragmentId(), mergePlan, DataPartition.UNPARTITIONED); + fragment.setResultSinkType(ctx.getRootAnalyzer().getContext().getResultSinkType()); inputFragment.setDestination(mergePlan); return fragment; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java index 3d74bfc0df..16be7e17a0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java @@ -31,6 +31,7 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TPartitionType; import org.apache.doris.thrift.TPlanFragment; +import org.apache.doris.thrift.TResultSinkType; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -145,6 +146,8 @@ public class PlanFragment extends TreeNode { // has colocate plan node private boolean hasColocatePlanNode = false; + private TResultSinkType resultSinkType = TResultSinkType.MYSQL_PROTOCAL; + /** * C'tor for fragment with specific partition; the output is by default broadcast. */ @@ -234,6 +237,10 @@ public class PlanFragment extends TreeNode { this.hasColocatePlanNode = hasColocatePlanNode; } + public void setResultSinkType(TResultSinkType resultSinkType) { + this.resultSinkType = resultSinkType; + } + public boolean hasColocatePlanNode() { return hasColocatePlanNode; } @@ -269,7 +276,7 @@ public class PlanFragment extends TreeNode { } else { // add ResultSink // we're streaming to an result sink - sink = new ResultSink(planRoot.getId()); + sink = new ResultSink(planRoot.getId(), resultSinkType); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ResultSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ResultSink.java index 49a4ca3333..1b0b745223 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ResultSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ResultSink.java @@ -22,6 +22,7 @@ import org.apache.doris.thrift.TDataSinkType; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TFetchOption; import org.apache.doris.thrift.TResultSink; +import org.apache.doris.thrift.TResultSinkType; /** * Result sink that forwards data to @@ -33,10 +34,17 @@ public class ResultSink extends DataSink { // Two phase fetch option private TFetchOption fetchOption; + private TResultSinkType resultSinkType = TResultSinkType.MYSQL_PROTOCAL; + public ResultSink(PlanNodeId exchNodeId) { this.exchNodeId = exchNodeId; } + public ResultSink(PlanNodeId exchNodeId, TResultSinkType resultSinkType) { + this.exchNodeId = exchNodeId; + this.resultSinkType = resultSinkType; + } + @Override public String getExplainString(String prefix, TExplainLevel explainLevel) { StringBuilder strBuilder = new StringBuilder(); @@ -49,6 +57,7 @@ public class ResultSink extends DataSink { strBuilder.append(prefix).append(" ").append("FETCH ROW STORE\n"); } } + strBuilder.append(prefix).append(" ").append(resultSinkType).append("\n"); return strBuilder.toString(); } @@ -63,6 +72,7 @@ public class ResultSink extends DataSink { if (fetchOption != null) { tResultSink.setFetchOption(fetchOption); } + tResultSink.setType(resultSinkType); result.setResultSink(tResultSink); return result; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index bf7c84a2c4..5a427a1080 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -42,6 +42,7 @@ import org.apache.doris.statistics.ColumnStatistic; import org.apache.doris.statistics.Histogram; import org.apache.doris.system.Backend; import org.apache.doris.task.LoadTaskInfo; +import org.apache.doris.thrift.TResultSinkType; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.TransactionEntry; import org.apache.doris.transaction.TransactionStatus; @@ -176,6 +177,8 @@ public class ConnectContext { private String workloadGroupName = ""; private Map insertGroupCommitTableToBeMap = new HashMap<>(); + private TResultSinkType resultSinkType = TResultSinkType.MYSQL_PROTOCAL; + public void setUserQueryTimeout(int queryTimeout) { if (queryTimeout > 0) { sessionVariable.setQueryTimeoutS(queryTimeout); @@ -221,6 +224,10 @@ public class ConnectContext { return mysqlSslContext; } + public TResultSinkType getResultSinkType() { + return resultSinkType; + } + public void setOrUpdateInsertResult(long txnId, String label, String db, String tbl, TransactionStatus txnStatus, long loadedRows, int filteredRows) { if (isTxnModel() && insertResult != null) { @@ -644,6 +651,10 @@ public class ConnectContext { this.statementContext = statementContext; } + public void setResultSinkType(TResultSinkType resultSinkType) { + this.resultSinkType = resultSinkType; + } + // kill operation with no protect. public void kill(boolean killConnection) { LOG.warn("kill query from {}, kill connection: {}", getMysqlChannel().getRemoteHostPortString(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 48f7191531..feff5b7ebe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -19,6 +19,7 @@ package org.apache.doris.qe; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.DescriptorTable; +import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.StorageBackend; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.FsBroker; @@ -207,6 +208,11 @@ public class Coordinator implements CoordInterface { private final List needCheckBackendExecStates = Lists.newArrayList(); private final List needCheckPipelineExecContexts = Lists.newArrayList(); private ResultReceiver receiver; + private TNetworkAddress resultFlightServerAddr; + private TNetworkAddress resultInternalServiceAddr; + private ArrayList resultOutputExprs; + + private TUniqueId finstId; private final List scanNodes; private int scanRangeNum = 0; // number of instances of this query, equals to @@ -274,6 +280,22 @@ public class Coordinator implements CoordInterface { return executionProfile; } + public TNetworkAddress getResultFlightServerAddr() { + return resultFlightServerAddr; + } + + public TNetworkAddress getResultInternalServiceAddr() { + return resultInternalServiceAddr; + } + + public ArrayList getResultOutputExprs() { + return resultOutputExprs; + } + + public TUniqueId getFinstId() { + return finstId; + } + // True if all scan node are ExternalScanNode. private boolean isAllExternalScan = true; @@ -598,6 +620,10 @@ public class Coordinator implements CoordInterface { TNetworkAddress execBeAddr = topParams.instanceExecParams.get(0).host; receiver = new ResultReceiver(queryId, topParams.instanceExecParams.get(0).instanceId, addressToBackendID.get(execBeAddr), toBrpcHost(execBeAddr), this.timeoutDeadline); + finstId = topParams.instanceExecParams.get(0).instanceId; + resultFlightServerAddr = toArrowFlightHost(execBeAddr); + resultInternalServiceAddr = toBrpcHost(execBeAddr); + resultOutputExprs = fragments.get(0).getOutputExprs(); if (LOG.isDebugEnabled()) { LOG.debug("dispatch query job: {} to {}", DebugUtil.printId(queryId), topParams.instanceExecParams.get(0).host); @@ -1595,6 +1621,18 @@ public class Coordinator implements CoordInterface { return new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); } + private TNetworkAddress toArrowFlightHost(TNetworkAddress host) throws Exception { + Backend backend = Env.getCurrentSystemInfo().getBackendWithBePort( + host.getHostname(), host.getPort()); + if (backend == null) { + throw new UserException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG); + } + if (backend.getArrowFlightSqlPort() < 0) { + return null; + } + return new TNetworkAddress(backend.getHost(), backend.getArrowFlightSqlPort()); + } + // estimate if this fragment contains UnionNode private boolean containsUnionNode(PlanNode node) { if (node instanceof UnionNode) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java index 0ffb5b989d..50c422a197 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java @@ -247,10 +247,10 @@ public class PointQueryExec implements CoordInterface { while (pResult == null) { InternalService.PTabletKeyLookupRequest request = requestBuilder.build(); Future futureResponse = - BackendServiceProxy.getInstance().fetchTabletDataAsync(backend.getBrpcAdress(), request); + BackendServiceProxy.getInstance().fetchTabletDataAsync(backend.getBrpcAddress(), request); long currentTs = System.currentTimeMillis(); if (currentTs >= timeoutTs) { - LOG.warn("fetch result timeout {}", backend.getBrpcAdress()); + LOG.warn("fetch result timeout {}", backend.getBrpcAddress()); status.setStatus("query timeout"); return null; } @@ -265,18 +265,18 @@ public class PointQueryExec implements CoordInterface { } } catch (TimeoutException e) { futureResponse.cancel(true); - LOG.warn("fetch result timeout {}, addr {}", timeoutTs - currentTs, backend.getBrpcAdress()); + LOG.warn("fetch result timeout {}, addr {}", timeoutTs - currentTs, backend.getBrpcAddress()); status.setStatus("query timeout"); return null; } } } catch (RpcException e) { - LOG.warn("fetch result rpc exception {}, e {}", backend.getBrpcAdress(), e); + LOG.warn("fetch result rpc exception {}, e {}", backend.getBrpcAddress(), e); status.setRpcStatus(e.getMessage()); SimpleScheduler.addToBlacklist(backend.getId(), e.getMessage()); return null; } catch (ExecutionException e) { - LOG.warn("fetch result execution exception {}, addr {}", e, backend.getBrpcAdress()); + LOG.warn("fetch result execution exception {}, addr {}", e, backend.getBrpcAddress()); if (e.getMessage().contains("time out")) { // if timeout, we set error code to TIMEOUT, and it will not retry querying. status.setStatus(new Status(TStatusCode.TIMEOUT, e.getMessage())); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeService.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QeService.java index 36eb4e8db5..f1e9a65345 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeService.java @@ -18,6 +18,7 @@ package org.apache.doris.qe; import org.apache.doris.mysql.MysqlServer; +import org.apache.doris.service.arrowflight.FlightSqlService; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -33,13 +34,18 @@ public class QeService { // MySQL protocol service private MysqlServer mysqlServer; + private int arrowFlightSQLPort; + private FlightSqlService flightSqlService; + @Deprecated - public QeService(int port) { + public QeService(int port, int arrowFlightSQLPort) { this.port = port; + this.arrowFlightSQLPort = arrowFlightSQLPort; } - public QeService(int port, ConnectScheduler scheduler) { + public QeService(int port, int arrowFlightSQLPort, ConnectScheduler scheduler) { this.port = port; + this.arrowFlightSQLPort = arrowFlightSQLPort; this.mysqlServer = new MysqlServer(port, scheduler); } @@ -56,6 +62,14 @@ public class QeService { LOG.error("mysql server start failed"); System.exit(-1); } + if (arrowFlightSQLPort != -1) { + this.flightSqlService = new FlightSqlService(arrowFlightSQLPort); + if (!flightSqlService.start()) { + System.exit(-1); + } + } else { + LOG.info("No Arrow Flight SQL service that needs to be started."); + } LOG.info("QE service start."); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index ecedfa3aa2..6b30fc58e2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -146,6 +146,7 @@ import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException; import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.rpc.RpcException; import org.apache.doris.service.FrontendOptions; +import org.apache.doris.service.arrowflight.FlightStatementExecutor; import org.apache.doris.statistics.ResultRow; import org.apache.doris.statistics.util.InternalQueryBuffer; import org.apache.doris.system.Backend; @@ -2597,7 +2598,8 @@ public class StmtExecutor { planner = new NereidsPlanner(statementContext); planner.plan(parsedStmt, context.getSessionVariable().toThrift()); } catch (Exception e) { - LOG.warn("fall back to legacy planner, because: {}", e.getMessage(), e); + LOG.warn("Arrow Flight SQL fall back to legacy planner, because: {}", + e.getMessage(), e); parsedStmt = null; planner = null; context.getState().setNereids(false); @@ -2612,7 +2614,6 @@ public class StmtExecutor { LOG.warn("Failed to run internal SQL: {}", originStmt, e); throw new RuntimeException("Failed to execute internal SQL. " + Util.getRootCauseMessage(e), e); } - planner.getFragments(); RowBatch batch; coord = new Coordinator(context, analyzer, planner, context.getStatsErrorEstimator()); profile.addExecutionProfile(coord.getExecutionProfile()); @@ -2646,7 +2647,7 @@ public class StmtExecutor { } } catch (Exception e) { fetchResultSpan.recordException(e); - throw new RuntimeException("Failed to execute internal SQL. " + Util.getRootCauseMessage(e), e); + throw new RuntimeException("Failed to fetch internal SQL result. " + Util.getRootCauseMessage(e), e); } finally { fetchResultSpan.end(); } @@ -2657,6 +2658,64 @@ public class StmtExecutor { } } + public void executeArrowFlightQuery(FlightStatementExecutor flightStatementExecutor) { + LOG.debug("ARROW FLIGHT QUERY: " + originStmt.toString()); + try { + try { + if (ConnectContext.get() != null + && ConnectContext.get().getSessionVariable().isEnableNereidsPlanner()) { + try { + parseByNereids(); + Preconditions.checkState(parsedStmt instanceof LogicalPlanAdapter, + "Nereids only process LogicalPlanAdapter," + + " but parsedStmt is " + parsedStmt.getClass().getName()); + context.getState().setNereids(true); + context.getState().setIsQuery(true); + planner = new NereidsPlanner(statementContext); + planner.plan(parsedStmt, context.getSessionVariable().toThrift()); + } catch (Exception e) { + LOG.warn("fall back to legacy planner, because: {}", e.getMessage(), e); + parsedStmt = null; + context.getState().setNereids(false); + analyzer = new Analyzer(context.getEnv(), context); + analyze(context.getSessionVariable().toThrift()); + } + } else { + analyzer = new Analyzer(context.getEnv(), context); + analyze(context.getSessionVariable().toThrift()); + } + } catch (Exception e) { + throw new RuntimeException("Failed to execute Arrow Flight SQL. " + Util.getRootCauseMessage(e), e); + } + coord = new Coordinator(context, analyzer, planner, context.getStatsErrorEstimator()); + profile.addExecutionProfile(coord.getExecutionProfile()); + try { + QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), + new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord)); + } catch (UserException e) { + throw new RuntimeException("Failed to execute Arrow Flight SQL. " + Util.getRootCauseMessage(e), e); + } + + Span queryScheduleSpan = context.getTracer() + .spanBuilder("Arrow Flight SQL schedule").setParent(Context.current()).startSpan(); + try (Scope scope = queryScheduleSpan.makeCurrent()) { + coord.exec(); + } catch (Exception e) { + queryScheduleSpan.recordException(e); + LOG.warn("Failed to coord exec Arrow Flight SQL, because: {}", e.getMessage(), e); + throw new RuntimeException(e.getMessage() + Util.getRootCauseMessage(e), e); + } finally { + queryScheduleSpan.end(); + } + } finally { + QeProcessorImpl.INSTANCE.unregisterQuery(context.queryId()); // TODO for query profile + } + flightStatementExecutor.setFinstId(coord.getFinstId()); + flightStatementExecutor.setResultFlightServerAddr(coord.getResultFlightServerAddr()); + flightStatementExecutor.setResultInternalServiceAddr(coord.getResultInternalServiceAddr()); + flightStatementExecutor.setResultOutputExprs(coord.getResultOutputExprs()); + } + private List convertResultBatchToResultRows(TResultBatch batch) { List columns = parsedStmt.getColLabels(); List resultRows = new ArrayList<>(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java index e1f5a2c95b..3250d18688 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java @@ -100,6 +100,11 @@ public class BackendServiceClient { return blockingStub.fetchData(request); } + public Future fetchArrowFlightSchema( + InternalService.PFetchArrowFlightSchemaRequest request) { + return stub.fetchArrowFlightSchema(request); + } + public Future fetchTableStructureAsync( InternalService.PFetchTableSchemaRequest request) { return stub.fetchTableSchema(request); diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index b30b8e0c5a..55881b4cf9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -259,6 +259,18 @@ public class BackendServiceProxy { } } + public Future fetchArrowFlightSchema( + TNetworkAddress address, InternalService.PFetchArrowFlightSchemaRequest request) throws RpcException { + try { + final BackendServiceClient client = getProxy(address); + return client.fetchArrowFlightSchema(request); + } catch (Throwable e) { + LOG.warn("fetch arrow flight schema catch a exception, address={}:{}", + address.getHostname(), address.getPort(), e); + throw new RpcException(address.hostname, e.getMessage()); + } + } + public Future fetchTableStructureAsync( TNetworkAddress address, InternalService.PFetchTableSchemaRequest request) throws RpcException { try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index b0796a372d..4238e012c5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -2226,6 +2226,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { result.setReplayedJournalId(replayedJournalId); result.setQueryPort(Config.query_port); result.setRpcPort(Config.rpc_port); + result.setArrowFlightSqlPort(Config.arrow_flight_sql_port); result.setVersion(Version.DORIS_BUILD_VERSION + "-" + Version.DORIS_BUILD_SHORT_HASH); result.setLastStartupTime(exeEnv.getStartupTime()); result.setProcessUUID(exeEnv.getProcessUUID()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlService.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlService.java new file mode 100644 index 0000000000..e0ec4bf10c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlService.java @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.service.arrowflight; + +import org.apache.arrow.flight.FlightServer; +import org.apache.arrow.flight.Location; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; + +/** + * flight sql protocol implementation based on nio. + */ +public class FlightSqlService { + private static final Logger LOG = LogManager.getLogger(FlightSqlService.class); + private final FlightServer flightServer; + private volatile boolean running; + + public FlightSqlService(int port) { + BufferAllocator allocator = new RootAllocator(); + Location location = Location.forGrpcInsecure("0.0.0.0", port); + FlightSqlServiceImpl producer = new FlightSqlServiceImpl(location); + flightServer = FlightServer.builder(allocator, location, producer).build(); + } + + // start Arrow Flight SQL service, return true if success, otherwise false + public boolean start() { + try { + flightServer.start(); + running = true; + LOG.info("Arrow Flight SQL service is started."); + } catch (IOException e) { + LOG.error("Start Arrow Flight SQL service failed.", e); + return false; + } + return true; + } + + public void stop() { + if (running) { + running = false; + try { + flightServer.close(); + } catch (InterruptedException e) { + LOG.warn("close Arrow Flight SQL server failed.", e); + } + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlServiceImpl.java new file mode 100644 index 0000000000..38e275b1d5 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlServiceImpl.java @@ -0,0 +1,326 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/apache/arrow/blob/main/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/example/FlightSqlExample.java +// and modified by Doris + +package org.apache.doris.service.arrowflight; + +import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.common.util.Util; + +import com.google.protobuf.Any; +import com.google.protobuf.ByteString; +import com.google.protobuf.Message; +import org.apache.arrow.flight.CallStatus; +import org.apache.arrow.flight.Criteria; +import org.apache.arrow.flight.FlightDescriptor; +import org.apache.arrow.flight.FlightEndpoint; +import org.apache.arrow.flight.FlightInfo; +import org.apache.arrow.flight.FlightStream; +import org.apache.arrow.flight.Location; +import org.apache.arrow.flight.PutResult; +import org.apache.arrow.flight.Result; +import org.apache.arrow.flight.SchemaResult; +import org.apache.arrow.flight.Ticket; +import org.apache.arrow.flight.sql.FlightSqlProducer; +import org.apache.arrow.flight.sql.SqlInfoBuilder; +import org.apache.arrow.flight.sql.impl.FlightSql.ActionClosePreparedStatementRequest; +import org.apache.arrow.flight.sql.impl.FlightSql.ActionCreatePreparedStatementRequest; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetCatalogs; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetCrossReference; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetDbSchemas; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetExportedKeys; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetImportedKeys; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetPrimaryKeys; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetSqlInfo; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetTableTypes; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetTables; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetXdbcTypeInfo; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandPreparedStatementQuery; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandPreparedStatementUpdate; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandStatementQuery; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandStatementUpdate; +import org.apache.arrow.flight.sql.impl.FlightSql.SqlSupportedCaseSensitivity; +import org.apache.arrow.flight.sql.impl.FlightSql.TicketStatementQuery; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collections; +import java.util.List; + +public class FlightSqlServiceImpl implements FlightSqlProducer, AutoCloseable { + private static final Logger LOG = LogManager.getLogger(FlightSqlServiceImpl.class); + private final Location location; + private final BufferAllocator rootAllocator = new RootAllocator(); + private final SqlInfoBuilder sqlInfoBuilder; + + public FlightSqlServiceImpl(final Location location) { + this.location = location; + sqlInfoBuilder = new SqlInfoBuilder(); + sqlInfoBuilder.withFlightSqlServerName("DorisFE") + .withFlightSqlServerVersion("1.0") + .withFlightSqlServerArrowVersion("13.0") + .withFlightSqlServerReadOnly(false) + .withSqlIdentifierQuoteChar("`") + .withSqlDdlCatalog(true) + .withSqlDdlSchema(false) + .withSqlDdlTable(false) + .withSqlIdentifierCase(SqlSupportedCaseSensitivity.SQL_CASE_SENSITIVITY_CASE_INSENSITIVE) + .withSqlQuotedIdentifierCase(SqlSupportedCaseSensitivity.SQL_CASE_SENSITIVITY_CASE_INSENSITIVE); + } + + @Override + public void getStreamPreparedStatement(final CommandPreparedStatementQuery command, final CallContext context, + final ServerStreamListener listener) { + throw CallStatus.UNIMPLEMENTED.withDescription("getStreamPreparedStatement unimplemented").toRuntimeException(); + } + + @Override + public void closePreparedStatement(final ActionClosePreparedStatementRequest request, final CallContext context, + final StreamListener listener) { + throw CallStatus.UNIMPLEMENTED.withDescription("closePreparedStatement unimplemented").toRuntimeException(); + } + + @Override + public FlightInfo getFlightInfoStatement(final CommandStatementQuery request, final CallContext context, + final FlightDescriptor descriptor) { + try { + final String query = request.getQuery(); + final FlightStatementExecutor flightStatementExecutor = new FlightStatementExecutor(query); + + flightStatementExecutor.executeQuery(); + + TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder() + .setStatementHandle(ByteString.copyFromUtf8( + DebugUtil.printId(flightStatementExecutor.getFinstId()) + ":" + query)).build(); + final Ticket ticket = new Ticket(Any.pack(ticketStatement).toByteArray()); + // TODO Support multiple endpoints. + Location location = Location.forGrpcInsecure(flightStatementExecutor.getResultFlightServerAddr().hostname, + flightStatementExecutor.getResultFlightServerAddr().port); + List endpoints = Collections.singletonList(new FlightEndpoint(ticket, location)); + + Schema schema; + schema = flightStatementExecutor.fetchArrowFlightSchema(5000); + if (schema == null) { + throw CallStatus.INTERNAL.withDescription("fetch arrow flight schema is null").toRuntimeException(); + } + return new FlightInfo(schema, descriptor, endpoints, -1, -1); + } catch (Exception e) { + LOG.warn("get flight info statement failed, " + e.getMessage(), e); + throw CallStatus.INTERNAL.withDescription(Util.getRootCauseMessage(e)).withCause(e).toRuntimeException(); + } + } + + @Override + public FlightInfo getFlightInfoPreparedStatement(final CommandPreparedStatementQuery command, + final CallContext context, + final FlightDescriptor descriptor) { + throw CallStatus.UNIMPLEMENTED.withDescription("getFlightInfoPreparedStatement unimplemented") + .toRuntimeException(); + } + + @Override + public SchemaResult getSchemaStatement(final CommandStatementQuery command, final CallContext context, + final FlightDescriptor descriptor) { + throw CallStatus.UNIMPLEMENTED.withDescription("getSchemaStatement unimplemented").toRuntimeException(); + } + + @Override + public void close() throws Exception { + AutoCloseables.close(rootAllocator); + } + + @Override + public void listFlights(CallContext context, Criteria criteria, StreamListener listener) { + throw CallStatus.UNIMPLEMENTED.withDescription("listFlights unimplemented").toRuntimeException(); + } + + @Override + public void createPreparedStatement(final ActionCreatePreparedStatementRequest request, final CallContext context, + final StreamListener listener) { + throw CallStatus.UNIMPLEMENTED.withDescription("createPreparedStatement unimplemented").toRuntimeException(); + } + + @Override + public void doExchange(CallContext context, FlightStream reader, ServerStreamListener writer) { + throw CallStatus.UNIMPLEMENTED.withDescription("doExchange unimplemented").toRuntimeException(); + } + + @Override + public Runnable acceptPutStatement(CommandStatementUpdate command, + CallContext context, FlightStream flightStream, + StreamListener ackStream) { + throw CallStatus.UNIMPLEMENTED.withDescription("acceptPutStatement unimplemented").toRuntimeException(); + } + + @Override + public Runnable acceptPutPreparedStatementUpdate(CommandPreparedStatementUpdate command, CallContext context, + FlightStream flightStream, StreamListener ackStream) { + throw CallStatus.UNIMPLEMENTED.withDescription("acceptPutPreparedStatementUpdate unimplemented") + .toRuntimeException(); + } + + @Override + public Runnable acceptPutPreparedStatementQuery(CommandPreparedStatementQuery command, CallContext context, + FlightStream flightStream, StreamListener ackStream) { + throw CallStatus.UNIMPLEMENTED.withDescription("acceptPutPreparedStatementQuery unimplemented") + .toRuntimeException(); + } + + @Override + public FlightInfo getFlightInfoSqlInfo(final CommandGetSqlInfo request, final CallContext context, + final FlightDescriptor descriptor) { + return getFlightInfoForSchema(request, descriptor, Schemas.GET_SQL_INFO_SCHEMA); + } + + @Override + public void getStreamSqlInfo(final CommandGetSqlInfo command, final CallContext context, + final ServerStreamListener listener) { + this.sqlInfoBuilder.send(command.getInfoList(), listener); + } + + @Override + public FlightInfo getFlightInfoTypeInfo(CommandGetXdbcTypeInfo request, CallContext context, + FlightDescriptor descriptor) { + return getFlightInfoForSchema(request, descriptor, Schemas.GET_TYPE_INFO_SCHEMA); + } + + @Override + public void getStreamTypeInfo(CommandGetXdbcTypeInfo request, CallContext context, + ServerStreamListener listener) { + throw CallStatus.UNIMPLEMENTED.withDescription("getStreamTypeInfo unimplemented").toRuntimeException(); + } + + @Override + public FlightInfo getFlightInfoCatalogs(final CommandGetCatalogs request, final CallContext context, + final FlightDescriptor descriptor) { + return getFlightInfoForSchema(request, descriptor, Schemas.GET_CATALOGS_SCHEMA); + } + + @Override + public void getStreamCatalogs(final CallContext context, final ServerStreamListener listener) { + throw CallStatus.UNIMPLEMENTED.withDescription("getStreamCatalogs unimplemented").toRuntimeException(); + } + + @Override + public FlightInfo getFlightInfoSchemas(final CommandGetDbSchemas request, final CallContext context, + final FlightDescriptor descriptor) { + return getFlightInfoForSchema(request, descriptor, Schemas.GET_SCHEMAS_SCHEMA); + } + + @Override + public void getStreamSchemas(final CommandGetDbSchemas command, final CallContext context, + final ServerStreamListener listener) { + throw CallStatus.UNIMPLEMENTED.withDescription("getStreamSchemas unimplemented").toRuntimeException(); + } + + @Override + public FlightInfo getFlightInfoTables(final CommandGetTables request, final CallContext context, + final FlightDescriptor descriptor) { + Schema schemaToUse = Schemas.GET_TABLES_SCHEMA; + if (!request.getIncludeSchema()) { + schemaToUse = Schemas.GET_TABLES_SCHEMA_NO_SCHEMA; + } + return getFlightInfoForSchema(request, descriptor, schemaToUse); + } + + @Override + public void getStreamTables(final CommandGetTables command, final CallContext context, + final ServerStreamListener listener) { + throw CallStatus.UNIMPLEMENTED.withDescription("getStreamTables unimplemented").toRuntimeException(); + } + + @Override + public FlightInfo getFlightInfoTableTypes(final CommandGetTableTypes request, final CallContext context, + final FlightDescriptor descriptor) { + return getFlightInfoForSchema(request, descriptor, Schemas.GET_TABLE_TYPES_SCHEMA); + } + + @Override + public void getStreamTableTypes(final CallContext context, final ServerStreamListener listener) { + throw CallStatus.UNIMPLEMENTED.withDescription("getStreamTableTypes unimplemented").toRuntimeException(); + } + + @Override + public FlightInfo getFlightInfoPrimaryKeys(final CommandGetPrimaryKeys request, final CallContext context, + final FlightDescriptor descriptor) { + return getFlightInfoForSchema(request, descriptor, Schemas.GET_PRIMARY_KEYS_SCHEMA); + } + + @Override + public void getStreamPrimaryKeys(final CommandGetPrimaryKeys command, final CallContext context, + final ServerStreamListener listener) { + + throw CallStatus.UNIMPLEMENTED.withDescription("getStreamPrimaryKeys unimplemented").toRuntimeException(); + } + + @Override + public FlightInfo getFlightInfoExportedKeys(final CommandGetExportedKeys request, final CallContext context, + final FlightDescriptor descriptor) { + return getFlightInfoForSchema(request, descriptor, Schemas.GET_EXPORTED_KEYS_SCHEMA); + } + + @Override + public void getStreamExportedKeys(final CommandGetExportedKeys command, final CallContext context, + final ServerStreamListener listener) { + throw CallStatus.UNIMPLEMENTED.withDescription("getStreamExportedKeys unimplemented").toRuntimeException(); + } + + @Override + public FlightInfo getFlightInfoImportedKeys(final CommandGetImportedKeys request, final CallContext context, + final FlightDescriptor descriptor) { + return getFlightInfoForSchema(request, descriptor, Schemas.GET_IMPORTED_KEYS_SCHEMA); + } + + @Override + public void getStreamImportedKeys(final CommandGetImportedKeys command, final CallContext context, + final ServerStreamListener listener) { + throw CallStatus.UNIMPLEMENTED.withDescription("getStreamImportedKeys unimplemented").toRuntimeException(); + } + + @Override + public FlightInfo getFlightInfoCrossReference(CommandGetCrossReference request, CallContext context, + FlightDescriptor descriptor) { + return getFlightInfoForSchema(request, descriptor, Schemas.GET_CROSS_REFERENCE_SCHEMA); + } + + @Override + public void getStreamCrossReference(CommandGetCrossReference command, CallContext context, + ServerStreamListener listener) { + throw CallStatus.UNIMPLEMENTED.withDescription("getStreamCrossReference unimplemented").toRuntimeException(); + } + + @Override + public void getStreamStatement(final TicketStatementQuery ticketStatementQuery, final CallContext context, + final ServerStreamListener listener) { + throw CallStatus.UNIMPLEMENTED.withDescription("getStreamStatement unimplemented").toRuntimeException(); + } + + private FlightInfo getFlightInfoForSchema(final T request, final FlightDescriptor descriptor, + final Schema schema) { + final Ticket ticket = new Ticket(Any.pack(request).toByteArray()); + // TODO Support multiple endpoints. + final List endpoints = Collections.singletonList(new FlightEndpoint(ticket, location)); + + return new FlightInfo(schema, descriptor, endpoints, -1, -1); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightStatementExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightStatementExecutor.java new file mode 100644 index 0000000000..ced03350de --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightStatementExecutor.java @@ -0,0 +1,224 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/apache/arrow/blob/main/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/example/StatementContext.java +// and modified by Doris + +package org.apache.doris.service.arrowflight; + +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.Status; +import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.proto.InternalService; +import org.apache.doris.proto.Types; +import org.apache.doris.qe.AutoCloseConnectContext; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SessionVariable; +import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.rpc.BackendServiceProxy; +import org.apache.doris.rpc.RpcException; +import org.apache.doris.system.SystemInfoService; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TResultSinkType; +import org.apache.doris.thrift.TStatusCode; +import org.apache.doris.thrift.TUniqueId; + +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowStreamReader; +import org.apache.arrow.vector.types.pojo.Schema; + +import java.io.ByteArrayInputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public final class FlightStatementExecutor { + private AutoCloseConnectContext acConnectContext; + private final String query; + private TUniqueId queryId; + private TUniqueId finstId; + private TNetworkAddress resultFlightServerAddr; + private TNetworkAddress resultInternalServiceAddr; + private ArrayList resultOutputExprs; + + public FlightStatementExecutor(final String query) { + this.query = query; + acConnectContext = buildConnectContext(); + } + + public void setQueryId(TUniqueId queryId) { + this.queryId = queryId; + } + + public void setFinstId(TUniqueId finstId) { + this.finstId = finstId; + } + + public void setResultFlightServerAddr(TNetworkAddress resultFlightServerAddr) { + this.resultFlightServerAddr = resultFlightServerAddr; + } + + public void setResultInternalServiceAddr(TNetworkAddress resultInternalServiceAddr) { + this.resultInternalServiceAddr = resultInternalServiceAddr; + } + + public void setResultOutputExprs(ArrayList resultOutputExprs) { + this.resultOutputExprs = resultOutputExprs; + } + + public String getQuery() { + return query; + } + + public TUniqueId getQueryId() { + return queryId; + } + + public TUniqueId getFinstId() { + return finstId; + } + + public TNetworkAddress getResultFlightServerAddr() { + return resultFlightServerAddr; + } + + public TNetworkAddress getResultInternalServiceAddr() { + return resultInternalServiceAddr; + } + + public ArrayList getResultOutputExprs() { + return resultOutputExprs; + } + + @Override + public boolean equals(final Object other) { + if (!(other instanceof FlightStatementExecutor)) { + return false; + } + return this == other; + } + + @Override + public int hashCode() { + return Objects.hash(this); + } + + public static AutoCloseConnectContext buildConnectContext() { + ConnectContext connectContext = new ConnectContext(); + SessionVariable sessionVariable = connectContext.getSessionVariable(); + sessionVariable.internalSession = true; + sessionVariable.setEnablePipelineEngine(false); // TODO + sessionVariable.setEnablePipelineXEngine(false); // TODO + connectContext.setEnv(Env.getCurrentEnv()); + connectContext.setQualifiedUser(UserIdentity.ROOT.getQualifiedUser()); // TODO + connectContext.setCurrentUserIdentity(UserIdentity.ROOT); // TODO + connectContext.setStartTime(); + connectContext.setCluster(SystemInfoService.DEFAULT_CLUSTER); + connectContext.setResultSinkType(TResultSinkType.ARROW_FLIGHT_PROTOCAL); + return new AutoCloseConnectContext(connectContext); + } + + public void executeQuery() { + try { + UUID uuid = UUID.randomUUID(); + TUniqueId queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); + setQueryId(queryId); + acConnectContext.connectContext.setQueryId(queryId); + StmtExecutor stmtExecutor = new StmtExecutor(acConnectContext.connectContext, getQuery()); + acConnectContext.connectContext.setExecutor(stmtExecutor); + stmtExecutor.executeArrowFlightQuery(this); + } catch (Exception e) { + throw new RuntimeException("Failed to coord exec", e); + } + } + + public Schema fetchArrowFlightSchema(int timeoutMs) { + TNetworkAddress address = getResultInternalServiceAddr(); + TUniqueId tid = getFinstId(); + ArrayList resultOutputExprs = getResultOutputExprs(); + Types.PUniqueId finstId = Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build(); + try { + InternalService.PFetchArrowFlightSchemaRequest request = + InternalService.PFetchArrowFlightSchemaRequest.newBuilder() + .setFinstId(finstId) + .build(); + + Future future + = BackendServiceProxy.getInstance().fetchArrowFlightSchema(address, request); + InternalService.PFetchArrowFlightSchemaResult pResult; + pResult = future.get(timeoutMs, TimeUnit.MILLISECONDS); + if (pResult == null) { + throw new RuntimeException(String.format("fetch arrow flight schema timeout, finstId: %s", + DebugUtil.printId(tid))); + } + TStatusCode code = TStatusCode.findByValue(pResult.getStatus().getStatusCode()); + if (code != TStatusCode.OK) { + Status status = null; + status.setPstatus(pResult.getStatus()); + throw new RuntimeException(String.format("fetch arrow flight schema failed, finstId: %s, errmsg: %s", + DebugUtil.printId(tid), status)); + } + if (pResult.hasSchema() && pResult.getSchema().size() > 0) { + RootAllocator rootAllocator = new RootAllocator(Integer.MAX_VALUE); + ArrowStreamReader arrowStreamReader = new ArrowStreamReader( + new ByteArrayInputStream(pResult.getSchema().toByteArray()), + rootAllocator + ); + try { + VectorSchemaRoot root = arrowStreamReader.getVectorSchemaRoot(); + List fieldVectors = root.getFieldVectors(); + if (fieldVectors.size() != resultOutputExprs.size()) { + throw new RuntimeException(String.format( + "Schema size %s' is not equal to arrow field size %s, finstId: %s.", + fieldVectors.size(), resultOutputExprs.size(), DebugUtil.printId(tid))); + } + return root.getSchema(); + } catch (Exception e) { + throw new RuntimeException("Read Arrow Flight Schema failed.", e); + } + } else { + throw new RuntimeException(String.format("get empty arrow flight schema, finstId: %s", + DebugUtil.printId(tid))); + } + } catch (RpcException e) { + throw new RuntimeException(String.format( + "arrow flight schema fetch catch rpc exception, finstId: %s,backend: %s", + DebugUtil.printId(tid), address), e); + } catch (InterruptedException e) { + throw new RuntimeException(String.format( + "arrow flight schema future get interrupted exception, finstId: %s,backend: %s", + DebugUtil.printId(tid), address), e); + } catch (ExecutionException e) { + throw new RuntimeException(String.format( + "arrow flight schema future get execution exception, finstId: %s,backend: %s", + DebugUtil.printId(tid), address), e); + } catch (TimeoutException e) { + throw new RuntimeException(String.format( + "arrow flight schema fetch timeout, finstId: %s,backend: %s", + DebugUtil.printId(tid), address), e); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java index db470fb91d..fcb5e63e83 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java @@ -76,6 +76,8 @@ public class Backend implements Writable { private volatile int beRpcPort; // be rpc port @SerializedName("brpcPort") private volatile int brpcPort = -1; + @SerializedName("arrowFlightSqlPort") + private volatile int arrowFlightSqlPort = -1; @SerializedName("lastUpdateMs") private volatile long lastUpdateMs; @@ -204,6 +206,10 @@ public class Backend implements Writable { return brpcPort; } + public int getArrowFlightSqlPort() { + return arrowFlightSqlPort; + } + public String getHeartbeatErrMsg() { return heartbeatErrMsg; } @@ -289,6 +295,10 @@ public class Backend implements Writable { this.brpcPort = brpcPort; } + public void setArrowFlightSqlPort(int arrowFlightSqlPort) { + this.arrowFlightSqlPort = arrowFlightSqlPort; + } + public void setCpuCores(int cpuCores) { this.cpuCores = cpuCores; } @@ -670,6 +680,11 @@ public class Backend implements Writable { this.brpcPort = hbResponse.getBrpcPort(); } + if (this.arrowFlightSqlPort != hbResponse.getArrowFlightSqlPort() && !FeConstants.runningUnitTest) { + isChanged = true; + this.arrowFlightSqlPort = hbResponse.getArrowFlightSqlPort(); + } + if (this.isShutDown.get() != hbResponse.isShutDown()) { isChanged = true; LOG.info("{} shutdown state is changed", this.toString()); @@ -796,10 +811,14 @@ public class Backend implements Writable { return tagMap; } - public TNetworkAddress getBrpcAdress() { + public TNetworkAddress getBrpcAddress() { return new TNetworkAddress(getHost(), getBrpcPort()); } + public TNetworkAddress getArrowFlightAddress() { + return new TNetworkAddress(getHost(), getArrowFlightSqlPort()); + } + public String getTagMapString() { return "{" + new PrintableMap<>(tagMap, ":", true, false).toString() + "}"; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/BackendHbResponse.java b/fe/fe-core/src/main/java/org/apache/doris/system/BackendHbResponse.java index 18c5b94568..a91dd12b04 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/BackendHbResponse.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/BackendHbResponse.java @@ -37,6 +37,8 @@ public class BackendHbResponse extends HeartbeatResponse implements Writable { private int httpPort; @SerializedName(value = "brpcPort") private int brpcPort; + @SerializedName(value = "arrowFlightSqlPort") + private int arrowFlightSqlPort; @SerializedName(value = "nodeRole") private String nodeRole = Tag.VALUE_MIX; @@ -54,7 +56,7 @@ public class BackendHbResponse extends HeartbeatResponse implements Writable { } public BackendHbResponse(long beId, int bePort, int httpPort, int brpcPort, long hbTime, long beStartTime, - String version, String nodeRole, boolean isShutDown) { + String version, String nodeRole, boolean isShutDown, int arrowFlightSqlPort) { super(HeartbeatResponse.Type.BACKEND); this.beId = beId; this.status = HbStatus.OK; @@ -66,6 +68,7 @@ public class BackendHbResponse extends HeartbeatResponse implements Writable { this.version = version; this.nodeRole = nodeRole; this.isShutDown = isShutDown; + this.arrowFlightSqlPort = arrowFlightSqlPort; } public BackendHbResponse(long beId, String errMsg) { @@ -99,6 +102,10 @@ public class BackendHbResponse extends HeartbeatResponse implements Writable { return brpcPort; } + public int getArrowFlightSqlPort() { + return arrowFlightSqlPort; + } + public long getBeStartTime() { return beStartTime; } @@ -122,6 +129,7 @@ public class BackendHbResponse extends HeartbeatResponse implements Writable { bePort = in.readInt(); httpPort = in.readInt(); brpcPort = in.readInt(); + arrowFlightSqlPort = in.readInt(); } @Override @@ -133,6 +141,7 @@ public class BackendHbResponse extends HeartbeatResponse implements Writable { sb.append(", bePort: ").append(bePort); sb.append(", httpPort: ").append(httpPort); sb.append(", brpcPort: ").append(brpcPort); + sb.append(", arrowFlightSqlPort: ").append(arrowFlightSqlPort); return sb.toString(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java index 95937b9d42..51e2369746 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java @@ -51,6 +51,7 @@ public class Frontend implements Writable { private int queryPort; private int rpcPort; + private int arrowFlightSqlPort; private long replayedJournalId; private long lastStartupTime; @@ -100,6 +101,10 @@ public class Frontend implements Writable { return rpcPort; } + public int getArrowFlightSqlPort() { + return arrowFlightSqlPort; + } + public boolean isAlive() { return isAlive; } @@ -153,6 +158,7 @@ public class Frontend implements Writable { version = hbResponse.getVersion(); queryPort = hbResponse.getQueryPort(); rpcPort = hbResponse.getRpcPort(); + arrowFlightSqlPort = hbResponse.getArrowFlightSqlPort(); replayedJournalId = hbResponse.getReplayedJournalId(); lastUpdateTime = hbResponse.getHbTime(); heartbeatErrMsg = ""; diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/FrontendHbResponse.java b/fe/fe-core/src/main/java/org/apache/doris/system/FrontendHbResponse.java index f7d7e90624..c9afcef49b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/FrontendHbResponse.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/FrontendHbResponse.java @@ -39,6 +39,8 @@ public class FrontendHbResponse extends HeartbeatResponse implements Writable { private int queryPort; @SerializedName(value = "rpcPort") private int rpcPort; + @SerializedName(value = "arrowFlightSqlPort") + private int arrowFlightSqlPort; @SerializedName(value = "replayedJournalId") private long replayedJournalId; private String version; @@ -50,7 +52,7 @@ public class FrontendHbResponse extends HeartbeatResponse implements Writable { super(HeartbeatResponse.Type.FRONTEND); } - public FrontendHbResponse(String name, int queryPort, int rpcPort, + public FrontendHbResponse(String name, int queryPort, int rpcPort, int arrowFlightSqlPort, long replayedJournalId, long hbTime, String version, long feStartTime, List diskInfos, long processUUID) { @@ -59,6 +61,7 @@ public class FrontendHbResponse extends HeartbeatResponse implements Writable { this.name = name; this.queryPort = queryPort; this.rpcPort = rpcPort; + this.arrowFlightSqlPort = arrowFlightSqlPort; this.replayedJournalId = replayedJournalId; this.hbTime = hbTime; this.version = version; @@ -87,6 +90,10 @@ public class FrontendHbResponse extends HeartbeatResponse implements Writable { return rpcPort; } + public int getArrowFlightSqlPort() { + return arrowFlightSqlPort; + } + public long getReplayedJournalId() { return replayedJournalId; } @@ -113,6 +120,7 @@ public class FrontendHbResponse extends HeartbeatResponse implements Writable { name = Text.readString(in); queryPort = in.readInt(); rpcPort = in.readInt(); + arrowFlightSqlPort = in.readInt(); replayedJournalId = in.readLong(); } @@ -124,6 +132,7 @@ public class FrontendHbResponse extends HeartbeatResponse implements Writable { sb.append(", version: ").append(version); sb.append(", queryPort: ").append(queryPort); sb.append(", rpcPort: ").append(rpcPort); + sb.append(", arrowFlightSqlPort: ").append(arrowFlightSqlPort); sb.append(", replayedJournalId: ").append(replayedJournalId); sb.append(", festartTime: ").append(processUUID); return sb.toString(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java index 7dc1275afe..a285d529a2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java @@ -238,6 +238,7 @@ public class HeartbeatMgr extends MasterDaemon { backendInfo.setHttpPort(2); backendInfo.setBeRpcPort(3); backendInfo.setBrpcPort(4); + backendInfo.setArrowFlightSqlPort(8); backendInfo.setVersion("test-1234"); result = new THeartbeatResult(); result.setStatus(new TStatus(TStatusCode.OK)); @@ -253,6 +254,10 @@ public class HeartbeatMgr extends MasterDaemon { if (tBackendInfo.isSetBrpcPort()) { brpcPort = tBackendInfo.getBrpcPort(); } + int arrowFlightSqlPort = -1; + if (tBackendInfo.isSetArrowFlightSqlPort()) { + arrowFlightSqlPort = tBackendInfo.getArrowFlightSqlPort(); + } String version = ""; if (tBackendInfo.isSetVersion()) { version = tBackendInfo.getVersion(); @@ -267,7 +272,7 @@ public class HeartbeatMgr extends MasterDaemon { isShutDown = tBackendInfo.isIsShutdown(); } return new BackendHbResponse(backendId, bePort, httpPort, brpcPort, - System.currentTimeMillis(), beStartTime, version, nodeRole, isShutDown); + System.currentTimeMillis(), beStartTime, version, nodeRole, isShutDown, arrowFlightSqlPort); } else { return new BackendHbResponse(backendId, backend.getHost(), result.getStatus().getErrorMsgs().isEmpty() @@ -308,7 +313,8 @@ public class HeartbeatMgr extends MasterDaemon { // heartbeat to self if (Env.getCurrentEnv().isReady()) { return new FrontendHbResponse(fe.getNodeName(), Config.query_port, Config.rpc_port, - Env.getCurrentEnv().getMaxJournalId(), System.currentTimeMillis(), + Config.arrow_flight_sql_port, Env.getCurrentEnv().getMaxJournalId(), + System.currentTimeMillis(), Version.DORIS_BUILD_VERSION + "-" + Version.DORIS_BUILD_SHORT_HASH, ExecuteEnv.getInstance().getStartupTime(), ExecuteEnv.getInstance().getDiskInfos(), ExecuteEnv.getInstance().getProcessUUID()); @@ -331,7 +337,7 @@ public class HeartbeatMgr extends MasterDaemon { ok = true; if (result.getStatus() == TFrontendPingFrontendStatusCode.OK) { return new FrontendHbResponse(fe.getNodeName(), result.getQueryPort(), - result.getRpcPort(), result.getReplayedJournalId(), + result.getRpcPort(), result.getArrowFlightSqlPort(), result.getReplayedJournalId(), System.currentTimeMillis(), result.getVersion(), result.getLastStartupTime(), FeDiskInfo.fromThrifts(result.getDiskInfos()), result.getProcessUUID()); } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java index 9cf9f5ad27..69af28dffd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java @@ -788,6 +788,7 @@ public class SystemInfoService { memoryBe.setHttpPort(be.getHttpPort()); memoryBe.setBeRpcPort(be.getBeRpcPort()); memoryBe.setBrpcPort(be.getBrpcPort()); + memoryBe.setArrowFlightSqlPort(be.getArrowFlightSqlPort()); memoryBe.setLastUpdateMs(be.getLastUpdateMs()); memoryBe.setLastStartTime(be.getLastStartTime()); memoryBe.setDisks(be.getDisks()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/BackendsTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/BackendsTableValuedFunction.java index 3141c240fb..4e6fe27398 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/BackendsTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/BackendsTableValuedFunction.java @@ -45,6 +45,7 @@ public class BackendsTableValuedFunction extends MetadataTableValuedFunction { new Column("BePort", ScalarType.createType(PrimitiveType.INT)), new Column("HttpPort", ScalarType.createType(PrimitiveType.INT)), new Column("BrpcPort", ScalarType.createType(PrimitiveType.INT)), + new Column("ArrowFlightSqlPort", ScalarType.createType(PrimitiveType.INT)), new Column("LastStartTime", ScalarType.createStringType()), new Column("LastHeartbeat", ScalarType.createStringType()), new Column("Alive", ScalarType.createType(PrimitiveType.BOOLEAN)), diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsTableValuedFunction.java index 92109d05a9..d23c9dfd6f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsTableValuedFunction.java @@ -44,6 +44,7 @@ public class FrontendsTableValuedFunction extends MetadataTableValuedFunction { new Column("HttpPort", ScalarType.createStringType()), new Column("QueryPort", ScalarType.createStringType()), new Column("RpcPort", ScalarType.createStringType()), + new Column("ArrowFlightSqlPort", ScalarType.createStringType()), new Column("Role", ScalarType.createStringType()), new Column("IsMaster", ScalarType.createStringType()), new Column("ClusterId", ScalarType.createStringType()), diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/LocalTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/LocalTableValuedFunction.java index 129c3f930c..bf78faec95 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/LocalTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/LocalTableValuedFunction.java @@ -92,7 +92,7 @@ public class LocalTableValuedFunction extends ExternalFileTableValuedFunction { } BackendServiceProxy proxy = BackendServiceProxy.getInstance(); - TNetworkAddress address = be.getBrpcAdress(); + TNetworkAddress address = be.getBrpcAddress(); InternalService.PGlobRequest.Builder requestBuilder = InternalService.PGlobRequest.newBuilder(); requestBuilder.setPattern(filePath); try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java index a2d2599599..7b9d3f892e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java @@ -188,6 +188,7 @@ public class MetadataGenerator { trow.addToColumnValue(new TCell().setIntVal(backend.getBePort())); trow.addToColumnValue(new TCell().setIntVal(backend.getHttpPort())); trow.addToColumnValue(new TCell().setIntVal(backend.getBrpcPort())); + trow.addToColumnValue(new TCell().setIntVal(backend.getArrowFlightSqlPort())); } trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(backend.getLastStartTime()))); trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(backend.getLastUpdateMs()))); diff --git a/fe/fe-core/src/test/java/org/apache/doris/system/HeartbeatMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/system/HeartbeatMgrTest.java index d8c3877706..0ad1b0bb64 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/system/HeartbeatMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/system/HeartbeatMgrTest.java @@ -95,6 +95,7 @@ public class HeartbeatMgrTest { normalResult.setReplayedJournalId(191224); normalResult.setQueryPort(9131); normalResult.setRpcPort(9121); + normalResult.setArrowFlightSqlPort(9141); normalResult.setVersion("test"); TFrontendPingFrontendRequest badRequest = new TFrontendPingFrontendRequest(12345, "abcde"); @@ -123,6 +124,7 @@ public class HeartbeatMgrTest { Assert.assertEquals(191224, hbResponse.getReplayedJournalId()); Assert.assertEquals(9131, hbResponse.getQueryPort()); Assert.assertEquals(9121, hbResponse.getRpcPort()); + Assert.assertEquals(9141, hbResponse.getArrowFlightSqlPort()); Assert.assertEquals(HbStatus.OK, hbResponse.getStatus()); Assert.assertEquals("test", hbResponse.getVersion()); @@ -135,6 +137,7 @@ public class HeartbeatMgrTest { Assert.assertEquals(0, hbResponse.getReplayedJournalId()); Assert.assertEquals(0, hbResponse.getQueryPort()); Assert.assertEquals(0, hbResponse.getRpcPort()); + Assert.assertEquals(0, hbResponse.getArrowFlightSqlPort()); Assert.assertEquals(HbStatus.BAD, hbResponse.getStatus()); Assert.assertEquals("not ready", hbResponse.getMsg()); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java index 3c50bd47c8..7c5556e8cf 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java @@ -97,7 +97,7 @@ public class SystemInfoServiceTest { System.out.println(Env.getCurrentEnvJournalVersion()); BackendHbResponse writeResponse = new BackendHbResponse(1L, 1234, 1234, 1234, 1234, 1234, "test", - Tag.VALUE_COMPUTATION, false); + Tag.VALUE_COMPUTATION, false, 1234); // Write objects to file File file1 = new File("./BackendHbResponseSerialization"); diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/AnotherDemoTest.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/AnotherDemoTest.java index 6f3cf22d44..98a3893123 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/AnotherDemoTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/AnotherDemoTest.java @@ -52,12 +52,14 @@ public class AnotherDemoTest { private static int fe_http_port; private static int fe_rpc_port; private static int fe_query_port; + private static int fe_arrow_flight_sql_port; private static int fe_edit_log_port; private static int be_heartbeat_port; private static int be_thrift_port; private static int be_brpc_port; private static int be_http_port; + private static int be_arrow_flight_sql_port; // use a unique dir so that it won't be conflict with other unit test which // may also start a Mocked Frontend @@ -81,12 +83,14 @@ public class AnotherDemoTest { fe_http_port = UtFrameUtils.findValidPort(); fe_rpc_port = UtFrameUtils.findValidPort(); fe_query_port = UtFrameUtils.findValidPort(); + fe_arrow_flight_sql_port = UtFrameUtils.findValidPort(); fe_edit_log_port = UtFrameUtils.findValidPort(); be_heartbeat_port = UtFrameUtils.findValidPort(); be_thrift_port = UtFrameUtils.findValidPort(); be_brpc_port = UtFrameUtils.findValidPort(); be_http_port = UtFrameUtils.findValidPort(); + be_arrow_flight_sql_port = UtFrameUtils.findValidPort(); } @Test diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java index 83c75f052b..10f94adeb8 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java @@ -198,7 +198,8 @@ public class DemoMultiBackendsTest { BackendsProcDir dir = new BackendsProcDir(Env.getCurrentSystemInfo()); ProcResult result = dir.fetchResult(); Assert.assertEquals(BackendsProcDir.TITLE_NAMES.size(), result.getColumnNames().size()); - Assert.assertEquals("{\"location\" : \"default\"}", result.getRows().get(0).get(18)); + Assert.assertEquals("{\"location\" : \"default\"}", + result.getRows().get(0).get(BackendsProcDir.TITLE_NAMES.size() - 6)); Assert.assertEquals( "{\"lastSuccessReportTabletsTime\":\"N/A\",\"lastStreamLoadTime\":-1,\"isQueryDisabled\":false,\"isLoadDisabled\":false}", result.getRows().get(0).get(BackendsProcDir.TITLE_NAMES.size() - 3)); diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java index 3a09cae73b..55abcf9542 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java @@ -86,10 +86,11 @@ public class MockedBackendFactory { public static final int BE_DEFAULT_THRIFT_PORT = 9060; public static final int BE_DEFAULT_BRPC_PORT = 8060; public static final int BE_DEFAULT_HTTP_PORT = 8040; + public static final int BE_DEFAULT_ARROW_FLIGHT_SQL_PORT = 8070; // create a mocked backend with customize parameters public static MockedBackend createBackend(String host, int heartbeatPort, int thriftPort, int brpcPort, - int httpPort, + int httpPort, int arrowFlightSqlPort, HeartbeatService.Iface hbService, BeThriftService beThriftService, PBackendServiceGrpc.PBackendServiceImplBase pBackendService) throws IOException { @@ -105,16 +106,20 @@ public class MockedBackendFactory { private int beHttpPort; private int beBrpcPort; - public DefaultHeartbeatServiceImpl(int beThriftPort, int beHttpPort, int beBrpcPort) { + private int beArrowFlightSqlPort; + + public DefaultHeartbeatServiceImpl(int beThriftPort, int beHttpPort, int beBrpcPort, int beArrowFlightSqlPort) { this.beThriftPort = beThriftPort; this.beHttpPort = beHttpPort; this.beBrpcPort = beBrpcPort; + this.beArrowFlightSqlPort = beArrowFlightSqlPort; } @Override public THeartbeatResult heartbeat(TMasterInfo masterInfo) throws TException { TBackendInfo backendInfo = new TBackendInfo(beThriftPort, beHttpPort); backendInfo.setBrpcPort(beBrpcPort); + backendInfo.setArrowFlightSqlPort(beArrowFlightSqlPort); THeartbeatResult result = new THeartbeatResult(new TStatus(TStatusCode.OK), backendInfo); return result; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedFrontend.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedFrontend.java index 4ee3819971..6382621d90 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedFrontend.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedFrontend.java @@ -79,6 +79,7 @@ public class MockedFrontend { MIN_FE_CONF.put("http_port", "8030"); MIN_FE_CONF.put("rpc_port", "9020"); MIN_FE_CONF.put("query_port", "9030"); + MIN_FE_CONF.put("arrow_flight_sql_port", "9040"); MIN_FE_CONF.put("edit_log_port", "9010"); MIN_FE_CONF.put("priority_networks", "127.0.0.1/24"); MIN_FE_CONF.put("sys_log_verbose_modules", "org"); diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java index ec0a87ef34..8861112624 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java @@ -364,12 +364,14 @@ public abstract class TestWithFeService { int feHttpPort = findValidPort(); int feRpcPort = findValidPort(); int feQueryPort = findValidPort(); + int arrowFlightSqlPort = findValidPort(); int feEditLogPort = findValidPort(); Map feConfMap = Maps.newHashMap(); // set additional fe config feConfMap.put("http_port", String.valueOf(feHttpPort)); feConfMap.put("rpc_port", String.valueOf(feRpcPort)); feConfMap.put("query_port", String.valueOf(feQueryPort)); + feConfMap.put("arrow_flight_sql_port", String.valueOf(arrowFlightSqlPort)); feConfMap.put("edit_log_port", String.valueOf(feEditLogPort)); feConfMap.put("tablet_create_timeout_second", "10"); // start fe in "DORIS_HOME/fe/mocked/" @@ -449,10 +451,11 @@ public abstract class TestWithFeService { int beThriftPort = findValidPort(); int beBrpcPort = findValidPort(); int beHttpPort = findValidPort(); + int beArrowFlightSqlPort = findValidPort(); // start be MockedBackend backend = MockedBackendFactory.createBackend(beHost, beHeartbeatPort, beThriftPort, beBrpcPort, - beHttpPort, new DefaultHeartbeatServiceImpl(beThriftPort, beHttpPort, beBrpcPort), + beHttpPort, beArrowFlightSqlPort, new DefaultHeartbeatServiceImpl(beThriftPort, beHttpPort, beBrpcPort, beArrowFlightSqlPort), new DefaultBeThriftServiceImpl(), new DefaultPBackendServiceImpl()); backend.setFeAddress(new TNetworkAddress("127.0.0.1", feRpcPort)); backend.start(); @@ -471,6 +474,7 @@ public abstract class TestWithFeService { be.setBePort(beThriftPort); be.setHttpPort(beHttpPort); be.setBrpcPort(beBrpcPort); + be.setArrowFlightSqlPort(beArrowFlightSqlPort); Env.getCurrentSystemInfo().addBackend(be); return be; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java index 2e2d53edb7..407171a69c 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java @@ -193,6 +193,7 @@ public class UtFrameUtils { int feHttpPort = findValidPort(); int feRpcPort = findValidPort(); int feQueryPort = findValidPort(); + int arrowFlightSqlPort = findValidPort(); int feEditLogPort = findValidPort(); // start fe in "DORIS_HOME/fe/mocked/" @@ -202,6 +203,7 @@ public class UtFrameUtils { feConfMap.put("http_port", String.valueOf(feHttpPort)); feConfMap.put("rpc_port", String.valueOf(feRpcPort)); feConfMap.put("query_port", String.valueOf(feQueryPort)); + feConfMap.put("arrow_flight_sql_port", String.valueOf(arrowFlightSqlPort)); feConfMap.put("edit_log_port", String.valueOf(feEditLogPort)); feConfMap.put("tablet_create_timeout_second", "10"); frontend.init(dorisHome + "/" + runningDir, feConfMap); @@ -278,10 +280,11 @@ public class UtFrameUtils { int beThriftPort = findValidPort(); int beBrpcPort = findValidPort(); int beHttpPort = findValidPort(); + int beArrowFlightSqlPort = findValidPort(); // start be MockedBackend backend = MockedBackendFactory.createBackend(beHost, beHeartbeatPort, beThriftPort, beBrpcPort, - beHttpPort, new DefaultHeartbeatServiceImpl(beThriftPort, beHttpPort, beBrpcPort), + beHttpPort, beArrowFlightSqlPort, new DefaultHeartbeatServiceImpl(beThriftPort, beHttpPort, beBrpcPort, beArrowFlightSqlPort), new DefaultBeThriftServiceImpl(), new DefaultPBackendServiceImpl()); backend.setFeAddress(new TNetworkAddress("127.0.0.1", feRpcPort)); backend.start(); @@ -299,6 +302,7 @@ public class UtFrameUtils { be.setBePort(beThriftPort); be.setHttpPort(beHttpPort); be.setBrpcPort(beBrpcPort); + be.setArrowFlightSqlPort(beArrowFlightSqlPort); Env.getCurrentSystemInfo().addBackend(be); return be; } diff --git a/fe/pom.xml b/fe/pom.xml index a9ba53de1e..3e04562d55 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -233,16 +233,17 @@ under the License. 2.18.0 2.0.6 4.0.2 - 4.1.94.Final + + 4.1.96.Final 3.10.6.Final 2.1 - 1.30.0 + 1.56.0 3.32.0 - 3.21.12 + 3.24.3 - 3.21.9 + 3.24.3 com.google.protobuf:protoc:${protoc.artifact.version} io.grpc:protoc-gen-grpc-java:${grpc.version} 3.1.5 @@ -275,7 +276,7 @@ under the License. 1.1.0 3.0.0rc1 0.43.3-public - 9.0.0 + 13.0.0 1.11.1 0.13.1 @@ -313,6 +314,9 @@ under the License. 0.4.0-incubating 3.4.4 + + shade-format-flatbuffers + 1.12.0 @@ -1426,6 +1430,65 @@ under the License. client ${vesoft.client.version} + + io.grpc + grpc-netty + ${grpc.version} + + + io.grpc + grpc-core + ${grpc.version} + + + io.grpc + grpc-context + ${grpc.version} + + + io.netty + netty-tcnative-boringssl-static + + + io.grpc + grpc-api + ${grpc.version} + + + com.google.flatbuffers + flatbuffers-java + ${flatbuffers.version} + + + org.apache.arrow + arrow-vector + ${arrow.version} + + + org.apache.arrow + arrow-memory-netty + ${arrow.version} + + + org.apache.arrow + flight-core + ${arrow.version} + + + org.apache.arrow + flight-sql + ${arrow.version} + + + org.apache.arrow + arrow-memory-core + ${arrow.version} + + + org.apache.arrow + arrow-jdbc + ${arrow.version} + diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index a3d1459314..878544e74b 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -249,6 +249,16 @@ message PFetchDataResult { optional bool empty_batch = 6; }; +message PFetchArrowFlightSchemaRequest { + optional PUniqueId finst_id = 1; +}; + +message PFetchArrowFlightSchemaResult { + optional PStatus status = 1; + // valid when status is ok + optional bytes schema = 2; +}; + message KeyTuple { repeated string key_column_rep = 1; } @@ -812,5 +822,6 @@ service PBackendService { rpc report_stream_load_status(PReportStreamLoadStatusRequest) returns (PReportStreamLoadStatusResponse); rpc glob(PGlobRequest) returns (PGlobResponse); rpc group_commit_insert(PGroupCommitInsertRequest) returns (PGroupCommitInsertResponse); + rpc fetch_arrow_flight_schema(PFetchArrowFlightSchemaRequest) returns (PFetchArrowFlightSchemaResult); }; diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index e5c7b9bb0b..6c85c0290c 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -824,6 +824,7 @@ struct TFrontendPingFrontendResult { 7: optional i64 lastStartupTime 8: optional list diskInfos 9: optional i64 processUUID + 10: optional i32 arrowFlightSqlPort } struct TPropertyVal { diff --git a/gensrc/thrift/HeartbeatService.thrift b/gensrc/thrift/HeartbeatService.thrift index 7fcf45804d..5a7e47d982 100644 --- a/gensrc/thrift/HeartbeatService.thrift +++ b/gensrc/thrift/HeartbeatService.thrift @@ -50,6 +50,7 @@ struct TBackendInfo { 6: optional i64 be_start_time // This field will also be uesd to identify a be process 7: optional string be_node_role 8: optional bool is_shutdown + 9: optional Types.TPort arrow_flight_sql_port } struct THeartbeatResult { diff --git a/regression-test/data/performance_p0/redundant_conjuncts.out b/regression-test/data/performance_p0/redundant_conjuncts.out index f82e0c9453..6a7fd4fd93 100644 --- a/regression-test/data/performance_p0/redundant_conjuncts.out +++ b/regression-test/data/performance_p0/redundant_conjuncts.out @@ -6,6 +6,7 @@ PLAN FRAGMENT 0 PARTITION: HASH_PARTITIONED: `default_cluster:regression_test_performance_p0`.`redundant_conjuncts`.`k1` VRESULT SINK + MYSQL_PROTOCAL 0:VOlapScanNode TABLE: default_cluster:regression_test_performance_p0.redundant_conjuncts(redundant_conjuncts), PREAGGREGATION: OFF. Reason: No AggregateInfo @@ -21,6 +22,7 @@ PLAN FRAGMENT 0 PARTITION: HASH_PARTITIONED: `default_cluster:regression_test_performance_p0`.`redundant_conjuncts`.`k1` VRESULT SINK + MYSQL_PROTOCAL 0:VOlapScanNode TABLE: default_cluster:regression_test_performance_p0.redundant_conjuncts(redundant_conjuncts), PREAGGREGATION: OFF. Reason: No AggregateInfo diff --git a/regression-test/suites/demo_p0/httpTest_action.groovy b/regression-test/suites/demo_p0/httpTest_action.groovy index 6d03e081f8..3120a92b5f 100644 --- a/regression-test/suites/demo_p0/httpTest_action.groovy +++ b/regression-test/suites/demo_p0/httpTest_action.groovy @@ -24,7 +24,7 @@ suite("http_test_action") { def backendIdToBackendIP = [:] def backendIdToBackendBrpcPort = [:] for (String[] backend in backends) { - if (backend[8].equals("true")) { + if (backend[9].equals("true")) { backendIdToBackendIP.put(backend[0], backend[1]) backendIdToBackendBrpcPort.put(backend[0], backend[5]) } diff --git a/regression-test/suites/external_table_p0/tvf/test_backends_tvf.groovy b/regression-test/suites/external_table_p0/tvf/test_backends_tvf.groovy index be497ee25a..2f6f774ad8 100644 --- a/regression-test/suites/external_table_p0/tvf/test_backends_tvf.groovy +++ b/regression-test/suites/external_table_p0/tvf/test_backends_tvf.groovy @@ -19,7 +19,7 @@ suite("test_backends_tvf","p0,external,tvf,external_docker") { List> table = sql """ select * from backends(); """ assertTrue(table.size() > 0) - assertEquals(24, table[0].size) + assertEquals(25, table[0].size) // filter columns table = sql """ select BackendId, Host, Alive, TotalCapacity, Version, NodeRole from backends();""" diff --git a/regression-test/suites/external_table_p0/tvf/test_frontends_tvf.groovy b/regression-test/suites/external_table_p0/tvf/test_frontends_tvf.groovy index e247f8bdf1..0f7a4f1b2d 100644 --- a/regression-test/suites/external_table_p0/tvf/test_frontends_tvf.groovy +++ b/regression-test/suites/external_table_p0/tvf/test_frontends_tvf.groovy @@ -19,7 +19,7 @@ suite("test_frontends_tvf","p0,external,tvf,external_docker") { List> table = sql """ select * from `frontends`(); """ assertTrue(table.size() > 0) - assertTrue(table[0].size == 17) + assertTrue(table[0].size == 18) // filter columns table = sql """ select Name from `frontends`();""" @@ -43,7 +43,7 @@ suite("test_frontends_tvf","p0,external,tvf,external_docker") { assertTrue(res[0][0] > 0) sql """ select Name, Host, EditLogPort - HttpPort, QueryPort, RpcPort, `Role`, IsMaster, ClusterId + HttpPort, QueryPort, RpcPort, ArrowFlightSqlPort, `Role`, IsMaster, ClusterId `Join`, Alive, ReplayedJournalId, LastHeartbeat IsHelper, ErrMsg, Version, CurrentConnected from frontends(); """ diff --git a/regression-test/suites/load_p0/stream_load/test_map_load_and_compaction.groovy b/regression-test/suites/load_p0/stream_load/test_map_load_and_compaction.groovy index c5eb2689bd..4a3b99c1d0 100644 --- a/regression-test/suites/load_p0/stream_load/test_map_load_and_compaction.groovy +++ b/regression-test/suites/load_p0/stream_load/test_map_load_and_compaction.groovy @@ -134,7 +134,7 @@ suite("test_map_load_and_compaction", "p0") { backends = sql """ show backends; """ assertTrue(backends.size() > 0) for (String[] b : backends) { - assertEquals("true", b[8]) + assertEquals("true", b[9]) } } finally { try_sql("DROP TABLE IF EXISTS ${testTable}") diff --git a/regression-test/suites/nereids_syntax_p0/information_schema.groovy b/regression-test/suites/nereids_syntax_p0/information_schema.groovy index c4fead2017..59ab91ab97 100644 --- a/regression-test/suites/nereids_syntax_p0/information_schema.groovy +++ b/regression-test/suites/nereids_syntax_p0/information_schema.groovy @@ -18,7 +18,7 @@ suite("information_schema") { List> table = sql """ select * from backends(); """ assertTrue(table.size() > 0) - assertTrue(table[0].size == 24) + assertTrue(table[0].size == 25) sql "SELECT DATABASE();" sql "select USER();"