diff --git a/be/src/runtime/result_buffer_mgr.cpp b/be/src/runtime/result_buffer_mgr.cpp index 51d1878114..9a757da6b5 100644 --- a/be/src/runtime/result_buffer_mgr.cpp +++ b/be/src/runtime/result_buffer_mgr.cpp @@ -55,7 +55,7 @@ Status ResultBufferMgr::init() { Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size, std::shared_ptr* sender, - bool enable_pipeline) { + bool enable_pipeline, int query_timeout) { *sender = find_control_block(query_id); if (*sender != nullptr) { LOG(WARNING) << "already have buffer control block for this instance " << query_id; @@ -73,6 +73,13 @@ Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size { std::lock_guard l(_lock); _buffer_map.insert(std::make_pair(query_id, control_block)); + // BufferControlBlock should destroy after max_timeout + // for exceed max_timeout FE will return timeout to client + // otherwise in some case may block all fragment handle threads + // details see issue https://github.com/apache/doris/issues/16203 + // add extra 5s for avoid corner case + int64_t max_timeout = time(nullptr) + query_timeout + 5; + cancel_at_time(max_timeout, query_id); } *sender = control_block; return Status::OK(); diff --git a/be/src/runtime/result_buffer_mgr.h b/be/src/runtime/result_buffer_mgr.h index c4722bcef7..5402f9a114 100644 --- a/be/src/runtime/result_buffer_mgr.h +++ b/be/src/runtime/result_buffer_mgr.h @@ -47,7 +47,8 @@ public: // the returned sender do not need release // sender is not used when call cancel or unregister Status create_sender(const TUniqueId& query_id, int buffer_size, - std::shared_ptr* sender, bool enable_pipeline); + std::shared_ptr* sender, bool enable_pipeline, + int query_timeout); // fetch data, used by RPC Status fetch_data(const TUniqueId& fragment_id, TFetchDataResult* result); diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index b6f02c2d2c..f9f4b8df04 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -90,6 +90,7 @@ public: return _query_options.abort_on_default_limit_exceeded; } int max_errors() const { return _query_options.max_errors; } + int query_timeout() const { return _query_options.query_timeout; } int max_io_buffers() const { return _query_options.max_io_buffers; } int num_scanner_threads() const { return _query_options.num_scanner_threads; } TQueryType::type query_type() const { return _query_options.query_type; } diff --git a/be/src/vec/sink/vresult_file_sink.cpp b/be/src/vec/sink/vresult_file_sink.cpp index 2c8f79de48..5f7e874b09 100644 --- a/be/src/vec/sink/vresult_file_sink.cpp +++ b/be/src/vec/sink/vresult_file_sink.cpp @@ -101,7 +101,8 @@ Status VResultFileSink::prepare(RuntimeState* state) { if (_is_top_sink) { // create sender RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( - state->fragment_instance_id(), _buf_size, &_sender, state->enable_pipeline_exec())); + state->fragment_instance_id(), _buf_size, &_sender, state->enable_pipeline_exec(), + state->query_timeout())); // create writer _writer.reset(new (std::nothrow) VFileResultWriter( _file_opts.get(), _storage_type, state->fragment_instance_id(), _output_vexpr_ctxs, diff --git a/be/src/vec/sink/vresult_sink.cpp b/be/src/vec/sink/vresult_sink.cpp index 467fb6c82c..af71b76280 100644 --- a/be/src/vec/sink/vresult_sink.cpp +++ b/be/src/vec/sink/vresult_sink.cpp @@ -62,7 +62,8 @@ Status VResultSink::prepare(RuntimeState* state) { // create sender RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( - state->fragment_instance_id(), _buf_size, &_sender, state->enable_pipeline_exec())); + state->fragment_instance_id(), _buf_size, &_sender, state->enable_pipeline_exec(), + state->query_timeout())); // create writer based on sink type switch (_sink_type) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index ef2e3a9ca8..36ae55b273 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -994,6 +994,10 @@ public class Coordinator { // fragment, // if any, as well as all plan fragments on remote nodes. public void cancel() { + cancel(Types.PPlanFragmentCancelReason.USER_CANCEL); + } + + public void cancel(Types.PPlanFragmentCancelReason cancelReason) { lock(); try { if (!queryStatus.ok()) { @@ -1003,7 +1007,7 @@ public class Coordinator { queryStatus.setStatus(Status.CANCELLED); } LOG.warn("cancel execution of query, this is outside invoke"); - cancelInternal(Types.PPlanFragmentCancelReason.USER_CANCEL); + cancelInternal(cancelReason); } finally { unlock(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 95e69783ff..556792bd9a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -107,6 +107,7 @@ import org.apache.doris.planner.Planner; import org.apache.doris.planner.ScanNode; import org.apache.doris.proto.Data; import org.apache.doris.proto.InternalService; +import org.apache.doris.proto.Types; import org.apache.doris.qe.QueryState.MysqlStateType; import org.apache.doris.qe.cache.Cache; import org.apache.doris.qe.cache.CacheAnalyzer; @@ -1286,6 +1287,11 @@ public class StmtExecutor implements ProfileWriter { context.getState().setEof(); plannerProfile.setQueryFetchResultFinishTime(); } catch (Exception e) { + // notify all be cancel runing fragment + // in some case may block all fragment handle threads + // details see issue https://github.com/apache/doris/issues/16203 + LOG.warn("cancel fragment query_id:{} cause {}", DebugUtil.printId(context.queryId()), e.getMessage()); + coord.cancel(Types.PPlanFragmentCancelReason.INTERNAL_ERROR); fetchResultSpan.recordException(e); throw e; } finally {