diff --git a/be/src/common/config.h b/be/src/common/config.h index 51cad63042..1937287001 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -443,7 +443,7 @@ namespace config { CONF_Bool(auto_recover_index_loading_failure, "false"); // max external scan cache batch count, means cache max_memory_cache_batch_count * batch_size row - // default is 10, batch_size's defualt value is 1024 means 10 * 1024 rows will be cached + // default is 20, batch_size's defualt value is 1024 means 20 * 1024 rows will be cached CONF_Int32(max_memory_sink_batch_count, "20"); // This configuration is used for the context gc thread schedule period diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index b99183a0bc..40fa551741 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -101,6 +101,7 @@ set(RUNTIME_FILES routine_load/data_consumer_pool.cpp routine_load/routine_load_task_executor.cpp small_file_mgr.cpp + record_batch_queue.cpp result_queue_mgr.cpp memory_scratch_sink.cpp external_scan_context_mgr.cpp diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 21159a8764..7deb819744 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -665,6 +665,7 @@ Status FragmentMgr::exec_external_plan_fragment(const TScanOpenParams& params, c TQueryOptions query_options; query_options.batch_size = params.batch_size; query_options.query_timeout = params.query_timeout; + query_options.query_type = TQueryType::EXTERNAL; exec_fragment_params.__set_query_options(query_options); VLOG_ROW << "external exec_plan_fragment params is " << apache::thrift::ThriftDebugString(exec_fragment_params).c_str(); diff --git a/be/src/runtime/memory_scratch_sink.h b/be/src/runtime/memory_scratch_sink.h index 510a7e01ae..6b88ce9d24 100644 --- a/be/src/runtime/memory_scratch_sink.h +++ b/be/src/runtime/memory_scratch_sink.h @@ -80,7 +80,7 @@ private: const RowDescriptor& _row_desc; std::shared_ptr _arrow_schema; - shared_block_queue_t _queue; + BlockQueueSharedPtr _queue; RuntimeProfile* _profile; // Allocated from _pool diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 4a9eb0e02d..ff703ebb19 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -34,6 +34,7 @@ #include "runtime/descriptors.h" #include "runtime/data_stream_mgr.h" #include "runtime/result_buffer_mgr.h" +#include "runtime/result_queue_mgr.h" #include "runtime/row_batch.h" #include "runtime/mem_tracker.h" #include "util/cpu_info.h" @@ -519,6 +520,10 @@ void PlanFragmentExecutor::update_status(const Status& status) { _runtime_state->set_mem_limit_exceeded(status.get_error_msg()); } _status = status; + if (_runtime_state->query_options().query_type == TQueryType::EXTERNAL) { + TUniqueId fragment_instance_id = _runtime_state->fragment_instance_id(); + _exec_env->result_queue_mgr()->update_queue_status(fragment_instance_id, status); + } } } diff --git a/be/src/runtime/record_batch_queue.cpp b/be/src/runtime/record_batch_queue.cpp new file mode 100644 index 0000000000..ffd9067333 --- /dev/null +++ b/be/src/runtime/record_batch_queue.cpp @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/record_batch_queue.h" + +namespace doris { + +void RecordBatchQueue::update_status(const Status& status) { + if (status.ok()) { + return; + } + { + std::lock_guard l(_status_lock); + if (_status.ok()) { + _status = status; + } + } +} + +void RecordBatchQueue::shutdown() { + _queue.shutdown(); +} + +} diff --git a/be/src/runtime/record_batch_queue.h b/be/src/runtime/record_batch_queue.h new file mode 100644 index 0000000000..ee5cfb771d --- /dev/null +++ b/be/src/runtime/record_batch_queue.h @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef DORIS_RECORD_BATCH_QUEUE_H +#define DORIS_RECORD_BATCH_QUEUE_H + +#include +#include "common/status.h" +#include "util/blocking_queue.hpp" + +namespace arrow { + +class RecordBatch; +} + +namespace doris { + +// The RecordBatchQueue is created and managed by the ResultQueueMgr to +// cache external query results, as well as query status. Where both +// BlockingGet and BlockingPut operations block if the queue is empty or +// full, respectively. +class RecordBatchQueue { +public: + RecordBatchQueue(u_int32_t max_elements) : _queue(max_elements) {} + + Status status() { + std::lock_guard l(_status_lock); + return _status; + } + + void update_status(const Status& status); + + bool blocking_get(std::shared_ptr* result) { + return _queue.blocking_get(result); + } + + bool blocking_put(const std::shared_ptr& val) { + return _queue.blocking_put(val); + } + + // Shut down the queue. Wakes up all threads waiting on blocking_get or blocking_put. + void shutdown(); + +private: + BlockingQueue> _queue; + SpinLock _status_lock; + Status _status; +}; + +} + +#endif //DORIS_RECORD_BATCH_QUEUE_H diff --git a/be/src/runtime/result_queue_mgr.cpp b/be/src/runtime/result_queue_mgr.cpp index a4c7749bc7..f674826350 100644 --- a/be/src/runtime/result_queue_mgr.cpp +++ b/be/src/runtime/result_queue_mgr.cpp @@ -27,13 +27,13 @@ namespace doris { -ResultQueueMgr::ResultQueueMgr() : _max_sink_batch_count(config::max_memory_sink_batch_count) { +ResultQueueMgr::ResultQueueMgr() { } ResultQueueMgr::~ResultQueueMgr() { } Status ResultQueueMgr::fetch_result(const TUniqueId& fragment_instance_id, std::shared_ptr* result, bool *eos) { - shared_block_queue_t queue; + BlockQueueSharedPtr queue; { std::lock_guard l(_lock); auto iter = _fragment_queue_map.find(fragment_instance_id); @@ -43,6 +43,8 @@ Status ResultQueueMgr::fetch_result(const TUniqueId& fragment_instance_id, std:: return Status::InternalError("fragment_instance_id does not exists"); } } + // check queue status before get result + RETURN_IF_ERROR(queue->status()); bool sucess = queue->blocking_get(result); if (sucess) { // sentinel nullptr indicates scan end @@ -61,14 +63,14 @@ Status ResultQueueMgr::fetch_result(const TUniqueId& fragment_instance_id, std:: return Status::OK(); } -void ResultQueueMgr::create_queue(const TUniqueId& fragment_instance_id, shared_block_queue_t* queue) { +void ResultQueueMgr::create_queue(const TUniqueId& fragment_instance_id, BlockQueueSharedPtr* queue) { std::lock_guard l(_lock); auto iter = _fragment_queue_map.find(fragment_instance_id); if (iter != _fragment_queue_map.end()) { *queue = iter->second; } else { // the blocking queue size = 20 (default), in this way, one queue have 20 * 1024 rows at most - shared_block_queue_t tmp(new BlockingQueue>(_max_sink_batch_count)); + BlockQueueSharedPtr tmp(new RecordBatchQueue(config::max_memory_sink_batch_count)); _fragment_queue_map.insert(std::make_pair(fragment_instance_id, tmp)); *queue = tmp; } @@ -78,10 +80,24 @@ Status ResultQueueMgr::cancel(const TUniqueId& fragment_instance_id) { std::lock_guard l(_lock); auto iter = _fragment_queue_map.find(fragment_instance_id); if (iter != _fragment_queue_map.end()) { + // first remove RecordBatch from queue + // avoid MemoryScratchSink block on send or close operation + iter->second->shutdown(); // remove this queue from map _fragment_queue_map.erase(fragment_instance_id); } return Status::OK(); } +void ResultQueueMgr::update_queue_status(const TUniqueId& fragment_instance_id, const Status& status) { + if (status.ok()) { + return; + } + std::lock_guard l(_lock); + auto iter = _fragment_queue_map.find(fragment_instance_id); + if (iter != _fragment_queue_map.end()) { + iter->second->update_status(status); + } +} + } diff --git a/be/src/runtime/result_queue_mgr.h b/be/src/runtime/result_queue_mgr.h index 81c146449c..9ceba7922c 100644 --- a/be/src/runtime/result_queue_mgr.h +++ b/be/src/runtime/result_queue_mgr.h @@ -23,10 +23,10 @@ #include #include "common/status.h" -#include "util/blocking_queue.hpp" #include "util/hash_util.hpp" #include "runtime/primitive_type.h" #include "runtime/raw_value.h" +#include "runtime/record_batch_queue.h" namespace arrow { @@ -38,7 +38,8 @@ namespace doris { class TUniqueId; class TScanRowBatch; -typedef std::shared_ptr>> shared_block_queue_t; +class RecordBatchQueue; +typedef std::shared_ptr BlockQueueSharedPtr; class ResultQueueMgr { @@ -48,14 +49,15 @@ public: Status fetch_result(const TUniqueId& fragment_instance_id, std::shared_ptr* result, bool *eos); - void create_queue(const TUniqueId& fragment_instance_id, shared_block_queue_t* queue); + void create_queue(const TUniqueId& fragment_instance_id, BlockQueueSharedPtr* queue); Status cancel(const TUniqueId& fragment_id); + void update_queue_status(const TUniqueId& fragment_id, const Status& status); + private: std::mutex _lock; - u_int32_t _max_sink_batch_count; - std::unordered_map _fragment_queue_map; + std::unordered_map _fragment_queue_map; }; } diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index 6d9dbfaae8..8ef958ebe7 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -291,6 +291,7 @@ void BackendService::get_next(TScanBatchResult& result_, const TScanNextBatchPar TUniqueId fragment_instance_id = context->fragment_instance_id; std::shared_ptr record_batch; bool eos; + st = _exec_env->result_queue_mgr()->fetch_result(fragment_instance_id, &record_batch, &eos); if (st.ok()) { result_.__set_eos(eos); @@ -307,6 +308,7 @@ void BackendService::get_next(TScanBatchResult& result_, const TScanNextBatchPar } } } else { + LOG(WARNING) << "fragment_instance_id [" << print_id(fragment_instance_id) << "] fetch result status [" << st.to_string() + "]"; st.to_thrift(&t_status); result_.status = t_status; } diff --git a/be/test/runtime/result_queue_mgr_test.cpp b/be/test/runtime/result_queue_mgr_test.cpp index 85f173781e..f8c0a3e0f5 100644 --- a/be/test/runtime/result_queue_mgr_test.cpp +++ b/be/test/runtime/result_queue_mgr_test.cpp @@ -42,7 +42,7 @@ protected: }; TEST_F(ResultQueueMgrTest, create_normal) { - shared_block_queue_t block_queue_t; + BlockQueueSharedPtr block_queue_t; TUniqueId query_id; query_id.lo = 10; query_id.hi = 100; @@ -57,11 +57,11 @@ TEST_F(ResultQueueMgrTest, create_same_queue) { query_id.lo = 10; query_id.hi = 100; - shared_block_queue_t block_queue_t_1; + BlockQueueSharedPtr block_queue_t_1; queue_mgr.create_queue(query_id, &block_queue_t_1); ASSERT_TRUE(block_queue_t_1 != nullptr); - shared_block_queue_t block_queue_t_2; + BlockQueueSharedPtr block_queue_t_2; queue_mgr.create_queue(query_id, &block_queue_t_2); ASSERT_TRUE(block_queue_t_2 != nullptr); @@ -74,7 +74,7 @@ TEST_F(ResultQueueMgrTest, fetch_result_normal) { query_id.hi = 100; ResultQueueMgr queue_mgr; - shared_block_queue_t block_queue_t; + BlockQueueSharedPtr block_queue_t; queue_mgr.create_queue(query_id, &block_queue_t); ASSERT_TRUE(block_queue_t != nullptr); @@ -110,7 +110,7 @@ TEST_F(ResultQueueMgrTest, fetch_result_end) { query_id.lo = 10; query_id.hi = 100; - shared_block_queue_t block_queue_t; + BlockQueueSharedPtr block_queue_t; queue_mgr.create_queue(query_id, &block_queue_t); ASSERT_TRUE(block_queue_t != nullptr); block_queue_t->blocking_put(nullptr); @@ -127,7 +127,7 @@ TEST_F(ResultQueueMgrTest, normal_cancel) { query_id.lo = 10; query_id.hi = 100; ResultQueueMgr queue_mgr; - shared_block_queue_t block_queue_t; + BlockQueueSharedPtr block_queue_t; queue_mgr.create_queue(query_id, &block_queue_t); ASSERT_TRUE(block_queue_t != nullptr); ASSERT_TRUE(queue_mgr.cancel(query_id).ok()); @@ -138,7 +138,7 @@ TEST_F(ResultQueueMgrTest, cancel_no_block) { query_id.lo = 10; query_id.hi = 100; ResultQueueMgr queue_mgr; - shared_block_queue_t block_queue_t; + BlockQueueSharedPtr block_queue_t; queue_mgr.create_queue(query_id, &block_queue_t); ASSERT_TRUE(block_queue_t != nullptr); ASSERT_TRUE(queue_mgr.cancel(query_id).ok()); @@ -155,4 +155,4 @@ int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); doris::CpuInfo::init(); return RUN_ALL_TESTS(); -} \ No newline at end of file +} diff --git a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/backend/BackendClient.java b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/backend/BackendClient.java index 62d63981de..90baf79933 100644 --- a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/backend/BackendClient.java +++ b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/backend/BackendClient.java @@ -19,6 +19,8 @@ package org.apache.doris.spark.backend; import org.apache.doris.spark.cfg.ConfigurationOptions; import org.apache.doris.spark.exception.ConnectedFailedException; +import org.apache.doris.spark.exception.DorisException; +import org.apache.doris.spark.exception.DorisInternalException; import org.apache.doris.spark.util.ErrorMessages; import org.apache.doris.spark.cfg.Settings; import org.apache.doris.spark.serialization.Routing; @@ -151,16 +153,17 @@ public class BackendClient { * @return scan batch result * @throws ConnectedFailedException throw if cannot connect to Doris BE */ - public TScanBatchResult getNext(TScanNextBatchParams nextBatchParams) throws ConnectedFailedException { + public TScanBatchResult getNext(TScanNextBatchParams nextBatchParams) throws DorisException { logger.debug("GetNext to '{}', parameter is '{}'.", routing, nextBatchParams); if (!isConnected) { open(); } TException ex = null; + TScanBatchResult result = null; for (int attempt = 0; attempt < retries; ++attempt) { logger.debug("Attempt {} to getNext {}.", attempt, routing); try { - TScanBatchResult result = client.get_next(nextBatchParams); + result = client.get_next(nextBatchParams); if (result == null) { logger.warn("GetNext result from {} is null.", routing); continue; @@ -176,6 +179,12 @@ public class BackendClient { ex = e; } } + if (result != null && (TStatusCode.OK != (result.getStatus().getStatus_code()))) { + logger.error(ErrorMessages.DORIS_INTERNAL_FAIL_MESSAGE, routing, result.getStatus().getStatus_code(), + result.getStatus().getError_msgs()); + throw new DorisInternalException(routing.toString(), result.getStatus().getStatus_code(), + result.getStatus().getError_msgs()); + } logger.error(ErrorMessages.CONNECT_FAILED_MESSAGE, routing); throw new ConnectedFailedException(routing.toString(), ex); } diff --git a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/exception/DorisInternalException.java b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/exception/DorisInternalException.java new file mode 100644 index 0000000000..f42aceed54 --- /dev/null +++ b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/exception/DorisInternalException.java @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.spark.exception; + +import org.apache.doris.thrift.TStatusCode; + +import java.util.List; + +public class DorisInternalException extends DorisException { + public DorisInternalException(String server, TStatusCode statusCode, List errorMsgs) { + super("Doris server " + server + " internal failed, status code [" + statusCode + "] error message is " + errorMsgs); + } + +} diff --git a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/util/ErrorMessages.java b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/util/ErrorMessages.java index aff289db12..92a04e91c0 100644 --- a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/util/ErrorMessages.java +++ b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/util/ErrorMessages.java @@ -22,4 +22,5 @@ public abstract class ErrorMessages { public static final String CONNECT_FAILED_MESSAGE = "Connect to doris {} failed."; public static final String ILLEGAL_ARGUMENT_MESSAGE = "argument '{}' is illegal, value is '{}'."; public static final String SHOULD_NOT_HAPPEN_MESSAGE = "Should not come here."; + public static final String DORIS_INTERNAL_FAIL_MESSAGE = "Doris server '{}' internal failed, status is '{}', error message is '{}'"; } diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 4f7ac35807..fba6012ad1 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -42,6 +42,7 @@ const i64 DEFAULT_PARTITION_ID = -1; enum TQueryType { SELECT, LOAD, + EXTERNAL } enum TErrorHubType {