Spark return error to users when spark on doris query failed (#2531)

This commit is contained in:
Youngwb
2019-12-30 21:58:13 +08:00
committed by ZHAO Chun
parent da8c9b4429
commit feda66f99f
15 changed files with 192 additions and 21 deletions

View File

@ -443,7 +443,7 @@ namespace config {
CONF_Bool(auto_recover_index_loading_failure, "false");
// max external scan cache batch count, means cache max_memory_cache_batch_count * batch_size row
// default is 10, batch_size's defualt value is 1024 means 10 * 1024 rows will be cached
// default is 20, batch_size's defualt value is 1024 means 20 * 1024 rows will be cached
CONF_Int32(max_memory_sink_batch_count, "20");
// This configuration is used for the context gc thread schedule period

View File

@ -101,6 +101,7 @@ set(RUNTIME_FILES
routine_load/data_consumer_pool.cpp
routine_load/routine_load_task_executor.cpp
small_file_mgr.cpp
record_batch_queue.cpp
result_queue_mgr.cpp
memory_scratch_sink.cpp
external_scan_context_mgr.cpp

View File

@ -665,6 +665,7 @@ Status FragmentMgr::exec_external_plan_fragment(const TScanOpenParams& params, c
TQueryOptions query_options;
query_options.batch_size = params.batch_size;
query_options.query_timeout = params.query_timeout;
query_options.query_type = TQueryType::EXTERNAL;
exec_fragment_params.__set_query_options(query_options);
VLOG_ROW << "external exec_plan_fragment params is "
<< apache::thrift::ThriftDebugString(exec_fragment_params).c_str();

View File

@ -80,7 +80,7 @@ private:
const RowDescriptor& _row_desc;
std::shared_ptr<arrow::Schema> _arrow_schema;
shared_block_queue_t _queue;
BlockQueueSharedPtr _queue;
RuntimeProfile* _profile; // Allocated from _pool

View File

@ -34,6 +34,7 @@
#include "runtime/descriptors.h"
#include "runtime/data_stream_mgr.h"
#include "runtime/result_buffer_mgr.h"
#include "runtime/result_queue_mgr.h"
#include "runtime/row_batch.h"
#include "runtime/mem_tracker.h"
#include "util/cpu_info.h"
@ -519,6 +520,10 @@ void PlanFragmentExecutor::update_status(const Status& status) {
_runtime_state->set_mem_limit_exceeded(status.get_error_msg());
}
_status = status;
if (_runtime_state->query_options().query_type == TQueryType::EXTERNAL) {
TUniqueId fragment_instance_id = _runtime_state->fragment_instance_id();
_exec_env->result_queue_mgr()->update_queue_status(fragment_instance_id, status);
}
}
}

View File

@ -0,0 +1,38 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/record_batch_queue.h"
namespace doris {
void RecordBatchQueue::update_status(const Status& status) {
if (status.ok()) {
return;
}
{
std::lock_guard<SpinLock> l(_status_lock);
if (_status.ok()) {
_status = status;
}
}
}
void RecordBatchQueue::shutdown() {
_queue.shutdown();
}
}

View File

@ -0,0 +1,66 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef DORIS_RECORD_BATCH_QUEUE_H
#define DORIS_RECORD_BATCH_QUEUE_H
#include <util/spinlock.h>
#include "common/status.h"
#include "util/blocking_queue.hpp"
namespace arrow {
class RecordBatch;
}
namespace doris {
// The RecordBatchQueue is created and managed by the ResultQueueMgr to
// cache external query results, as well as query status. Where both
// BlockingGet and BlockingPut operations block if the queue is empty or
// full, respectively.
class RecordBatchQueue {
public:
RecordBatchQueue(u_int32_t max_elements) : _queue(max_elements) {}
Status status() {
std::lock_guard<SpinLock> l(_status_lock);
return _status;
}
void update_status(const Status& status);
bool blocking_get(std::shared_ptr<arrow::RecordBatch>* result) {
return _queue.blocking_get(result);
}
bool blocking_put(const std::shared_ptr<arrow::RecordBatch>& val) {
return _queue.blocking_put(val);
}
// Shut down the queue. Wakes up all threads waiting on blocking_get or blocking_put.
void shutdown();
private:
BlockingQueue<std::shared_ptr<arrow::RecordBatch>> _queue;
SpinLock _status_lock;
Status _status;
};
}
#endif //DORIS_RECORD_BATCH_QUEUE_H

View File

@ -27,13 +27,13 @@
namespace doris {
ResultQueueMgr::ResultQueueMgr() : _max_sink_batch_count(config::max_memory_sink_batch_count) {
ResultQueueMgr::ResultQueueMgr() {
}
ResultQueueMgr::~ResultQueueMgr() {
}
Status ResultQueueMgr::fetch_result(const TUniqueId& fragment_instance_id, std::shared_ptr<arrow::RecordBatch>* result, bool *eos) {
shared_block_queue_t queue;
BlockQueueSharedPtr queue;
{
std::lock_guard<std::mutex> l(_lock);
auto iter = _fragment_queue_map.find(fragment_instance_id);
@ -43,6 +43,8 @@ Status ResultQueueMgr::fetch_result(const TUniqueId& fragment_instance_id, std::
return Status::InternalError("fragment_instance_id does not exists");
}
}
// check queue status before get result
RETURN_IF_ERROR(queue->status());
bool sucess = queue->blocking_get(result);
if (sucess) {
// sentinel nullptr indicates scan end
@ -61,14 +63,14 @@ Status ResultQueueMgr::fetch_result(const TUniqueId& fragment_instance_id, std::
return Status::OK();
}
void ResultQueueMgr::create_queue(const TUniqueId& fragment_instance_id, shared_block_queue_t* queue) {
void ResultQueueMgr::create_queue(const TUniqueId& fragment_instance_id, BlockQueueSharedPtr* queue) {
std::lock_guard<std::mutex> l(_lock);
auto iter = _fragment_queue_map.find(fragment_instance_id);
if (iter != _fragment_queue_map.end()) {
*queue = iter->second;
} else {
// the blocking queue size = 20 (default), in this way, one queue have 20 * 1024 rows at most
shared_block_queue_t tmp(new BlockingQueue<std::shared_ptr<arrow::RecordBatch>>(_max_sink_batch_count));
BlockQueueSharedPtr tmp(new RecordBatchQueue(config::max_memory_sink_batch_count));
_fragment_queue_map.insert(std::make_pair(fragment_instance_id, tmp));
*queue = tmp;
}
@ -78,10 +80,24 @@ Status ResultQueueMgr::cancel(const TUniqueId& fragment_instance_id) {
std::lock_guard<std::mutex> l(_lock);
auto iter = _fragment_queue_map.find(fragment_instance_id);
if (iter != _fragment_queue_map.end()) {
// first remove RecordBatch from queue
// avoid MemoryScratchSink block on send or close operation
iter->second->shutdown();
// remove this queue from map
_fragment_queue_map.erase(fragment_instance_id);
}
return Status::OK();
}
void ResultQueueMgr::update_queue_status(const TUniqueId& fragment_instance_id, const Status& status) {
if (status.ok()) {
return;
}
std::lock_guard<std::mutex> l(_lock);
auto iter = _fragment_queue_map.find(fragment_instance_id);
if (iter != _fragment_queue_map.end()) {
iter->second->update_status(status);
}
}
}

View File

@ -23,10 +23,10 @@
#include <unordered_map>
#include "common/status.h"
#include "util/blocking_queue.hpp"
#include "util/hash_util.hpp"
#include "runtime/primitive_type.h"
#include "runtime/raw_value.h"
#include "runtime/record_batch_queue.h"
namespace arrow {
@ -38,7 +38,8 @@ namespace doris {
class TUniqueId;
class TScanRowBatch;
typedef std::shared_ptr<BlockingQueue< std::shared_ptr<arrow::RecordBatch>>> shared_block_queue_t;
class RecordBatchQueue;
typedef std::shared_ptr<RecordBatchQueue> BlockQueueSharedPtr;
class ResultQueueMgr {
@ -48,14 +49,15 @@ public:
Status fetch_result(const TUniqueId& fragment_instance_id, std::shared_ptr<arrow::RecordBatch>* result, bool *eos);
void create_queue(const TUniqueId& fragment_instance_id, shared_block_queue_t* queue);
void create_queue(const TUniqueId& fragment_instance_id, BlockQueueSharedPtr* queue);
Status cancel(const TUniqueId& fragment_id);
void update_queue_status(const TUniqueId& fragment_id, const Status& status);
private:
std::mutex _lock;
u_int32_t _max_sink_batch_count;
std::unordered_map<TUniqueId, shared_block_queue_t> _fragment_queue_map;
std::unordered_map<TUniqueId, BlockQueueSharedPtr> _fragment_queue_map;
};
}

View File

@ -291,6 +291,7 @@ void BackendService::get_next(TScanBatchResult& result_, const TScanNextBatchPar
TUniqueId fragment_instance_id = context->fragment_instance_id;
std::shared_ptr<arrow::RecordBatch> record_batch;
bool eos;
st = _exec_env->result_queue_mgr()->fetch_result(fragment_instance_id, &record_batch, &eos);
if (st.ok()) {
result_.__set_eos(eos);
@ -307,6 +308,7 @@ void BackendService::get_next(TScanBatchResult& result_, const TScanNextBatchPar
}
}
} else {
LOG(WARNING) << "fragment_instance_id [" << print_id(fragment_instance_id) << "] fetch result status [" << st.to_string() + "]";
st.to_thrift(&t_status);
result_.status = t_status;
}

View File

@ -42,7 +42,7 @@ protected:
};
TEST_F(ResultQueueMgrTest, create_normal) {
shared_block_queue_t block_queue_t;
BlockQueueSharedPtr block_queue_t;
TUniqueId query_id;
query_id.lo = 10;
query_id.hi = 100;
@ -57,11 +57,11 @@ TEST_F(ResultQueueMgrTest, create_same_queue) {
query_id.lo = 10;
query_id.hi = 100;
shared_block_queue_t block_queue_t_1;
BlockQueueSharedPtr block_queue_t_1;
queue_mgr.create_queue(query_id, &block_queue_t_1);
ASSERT_TRUE(block_queue_t_1 != nullptr);
shared_block_queue_t block_queue_t_2;
BlockQueueSharedPtr block_queue_t_2;
queue_mgr.create_queue(query_id, &block_queue_t_2);
ASSERT_TRUE(block_queue_t_2 != nullptr);
@ -74,7 +74,7 @@ TEST_F(ResultQueueMgrTest, fetch_result_normal) {
query_id.hi = 100;
ResultQueueMgr queue_mgr;
shared_block_queue_t block_queue_t;
BlockQueueSharedPtr block_queue_t;
queue_mgr.create_queue(query_id, &block_queue_t);
ASSERT_TRUE(block_queue_t != nullptr);
@ -110,7 +110,7 @@ TEST_F(ResultQueueMgrTest, fetch_result_end) {
query_id.lo = 10;
query_id.hi = 100;
shared_block_queue_t block_queue_t;
BlockQueueSharedPtr block_queue_t;
queue_mgr.create_queue(query_id, &block_queue_t);
ASSERT_TRUE(block_queue_t != nullptr);
block_queue_t->blocking_put(nullptr);
@ -127,7 +127,7 @@ TEST_F(ResultQueueMgrTest, normal_cancel) {
query_id.lo = 10;
query_id.hi = 100;
ResultQueueMgr queue_mgr;
shared_block_queue_t block_queue_t;
BlockQueueSharedPtr block_queue_t;
queue_mgr.create_queue(query_id, &block_queue_t);
ASSERT_TRUE(block_queue_t != nullptr);
ASSERT_TRUE(queue_mgr.cancel(query_id).ok());
@ -138,7 +138,7 @@ TEST_F(ResultQueueMgrTest, cancel_no_block) {
query_id.lo = 10;
query_id.hi = 100;
ResultQueueMgr queue_mgr;
shared_block_queue_t block_queue_t;
BlockQueueSharedPtr block_queue_t;
queue_mgr.create_queue(query_id, &block_queue_t);
ASSERT_TRUE(block_queue_t != nullptr);
ASSERT_TRUE(queue_mgr.cancel(query_id).ok());
@ -155,4 +155,4 @@ int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
doris::CpuInfo::init();
return RUN_ALL_TESTS();
}
}

View File

@ -19,6 +19,8 @@ package org.apache.doris.spark.backend;
import org.apache.doris.spark.cfg.ConfigurationOptions;
import org.apache.doris.spark.exception.ConnectedFailedException;
import org.apache.doris.spark.exception.DorisException;
import org.apache.doris.spark.exception.DorisInternalException;
import org.apache.doris.spark.util.ErrorMessages;
import org.apache.doris.spark.cfg.Settings;
import org.apache.doris.spark.serialization.Routing;
@ -151,16 +153,17 @@ public class BackendClient {
* @return scan batch result
* @throws ConnectedFailedException throw if cannot connect to Doris BE
*/
public TScanBatchResult getNext(TScanNextBatchParams nextBatchParams) throws ConnectedFailedException {
public TScanBatchResult getNext(TScanNextBatchParams nextBatchParams) throws DorisException {
logger.debug("GetNext to '{}', parameter is '{}'.", routing, nextBatchParams);
if (!isConnected) {
open();
}
TException ex = null;
TScanBatchResult result = null;
for (int attempt = 0; attempt < retries; ++attempt) {
logger.debug("Attempt {} to getNext {}.", attempt, routing);
try {
TScanBatchResult result = client.get_next(nextBatchParams);
result = client.get_next(nextBatchParams);
if (result == null) {
logger.warn("GetNext result from {} is null.", routing);
continue;
@ -176,6 +179,12 @@ public class BackendClient {
ex = e;
}
}
if (result != null && (TStatusCode.OK != (result.getStatus().getStatus_code()))) {
logger.error(ErrorMessages.DORIS_INTERNAL_FAIL_MESSAGE, routing, result.getStatus().getStatus_code(),
result.getStatus().getError_msgs());
throw new DorisInternalException(routing.toString(), result.getStatus().getStatus_code(),
result.getStatus().getError_msgs());
}
logger.error(ErrorMessages.CONNECT_FAILED_MESSAGE, routing);
throw new ConnectedFailedException(routing.toString(), ex);
}

View File

@ -0,0 +1,29 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.spark.exception;
import org.apache.doris.thrift.TStatusCode;
import java.util.List;
public class DorisInternalException extends DorisException {
public DorisInternalException(String server, TStatusCode statusCode, List<String> errorMsgs) {
super("Doris server " + server + " internal failed, status code [" + statusCode + "] error message is " + errorMsgs);
}
}

View File

@ -22,4 +22,5 @@ public abstract class ErrorMessages {
public static final String CONNECT_FAILED_MESSAGE = "Connect to doris {} failed.";
public static final String ILLEGAL_ARGUMENT_MESSAGE = "argument '{}' is illegal, value is '{}'.";
public static final String SHOULD_NOT_HAPPEN_MESSAGE = "Should not come here.";
public static final String DORIS_INTERNAL_FAIL_MESSAGE = "Doris server '{}' internal failed, status is '{}', error message is '{}'";
}

View File

@ -42,6 +42,7 @@ const i64 DEFAULT_PARTITION_ID = -1;
enum TQueryType {
SELECT,
LOAD,
EXTERNAL
}
enum TErrorHubType {