[fix](vresultsink) BufferControlBlock may block all fragment handle threads (#16231)
BufferControlBlock may block all fragment handle threads leads to be out of work modify include: BufferControlBlock cancel after max timeout StmtExcutor notify be to cancel the fragment when unexcepted occur more details see issue #16203
This commit is contained in:
@ -55,7 +55,7 @@ Status ResultBufferMgr::init() {
|
||||
|
||||
Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size,
|
||||
std::shared_ptr<BufferControlBlock>* sender,
|
||||
bool enable_pipeline) {
|
||||
bool enable_pipeline, int query_timeout) {
|
||||
*sender = find_control_block(query_id);
|
||||
if (*sender != nullptr) {
|
||||
LOG(WARNING) << "already have buffer control block for this instance " << query_id;
|
||||
@ -73,6 +73,13 @@ Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size
|
||||
{
|
||||
std::lock_guard<std::mutex> l(_lock);
|
||||
_buffer_map.insert(std::make_pair(query_id, control_block));
|
||||
// BufferControlBlock should destroy after max_timeout
|
||||
// for exceed max_timeout FE will return timeout to client
|
||||
// otherwise in some case may block all fragment handle threads
|
||||
// details see issue https://github.com/apache/doris/issues/16203
|
||||
// add extra 5s for avoid corner case
|
||||
int64_t max_timeout = time(nullptr) + query_timeout + 5;
|
||||
cancel_at_time(max_timeout, query_id);
|
||||
}
|
||||
*sender = control_block;
|
||||
return Status::OK();
|
||||
|
||||
@ -47,7 +47,8 @@ public:
|
||||
// the returned sender do not need release
|
||||
// sender is not used when call cancel or unregister
|
||||
Status create_sender(const TUniqueId& query_id, int buffer_size,
|
||||
std::shared_ptr<BufferControlBlock>* sender, bool enable_pipeline);
|
||||
std::shared_ptr<BufferControlBlock>* sender, bool enable_pipeline,
|
||||
int query_timeout);
|
||||
// fetch data, used by RPC
|
||||
Status fetch_data(const TUniqueId& fragment_id, TFetchDataResult* result);
|
||||
|
||||
|
||||
@ -90,6 +90,7 @@ public:
|
||||
return _query_options.abort_on_default_limit_exceeded;
|
||||
}
|
||||
int max_errors() const { return _query_options.max_errors; }
|
||||
int query_timeout() const { return _query_options.query_timeout; }
|
||||
int max_io_buffers() const { return _query_options.max_io_buffers; }
|
||||
int num_scanner_threads() const { return _query_options.num_scanner_threads; }
|
||||
TQueryType::type query_type() const { return _query_options.query_type; }
|
||||
|
||||
@ -101,7 +101,8 @@ Status VResultFileSink::prepare(RuntimeState* state) {
|
||||
if (_is_top_sink) {
|
||||
// create sender
|
||||
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
|
||||
state->fragment_instance_id(), _buf_size, &_sender, state->enable_pipeline_exec()));
|
||||
state->fragment_instance_id(), _buf_size, &_sender, state->enable_pipeline_exec(),
|
||||
state->query_timeout()));
|
||||
// create writer
|
||||
_writer.reset(new (std::nothrow) VFileResultWriter(
|
||||
_file_opts.get(), _storage_type, state->fragment_instance_id(), _output_vexpr_ctxs,
|
||||
|
||||
@ -62,7 +62,8 @@ Status VResultSink::prepare(RuntimeState* state) {
|
||||
|
||||
// create sender
|
||||
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
|
||||
state->fragment_instance_id(), _buf_size, &_sender, state->enable_pipeline_exec()));
|
||||
state->fragment_instance_id(), _buf_size, &_sender, state->enable_pipeline_exec(),
|
||||
state->query_timeout()));
|
||||
|
||||
// create writer based on sink type
|
||||
switch (_sink_type) {
|
||||
|
||||
@ -994,6 +994,10 @@ public class Coordinator {
|
||||
// fragment,
|
||||
// if any, as well as all plan fragments on remote nodes.
|
||||
public void cancel() {
|
||||
cancel(Types.PPlanFragmentCancelReason.USER_CANCEL);
|
||||
}
|
||||
|
||||
public void cancel(Types.PPlanFragmentCancelReason cancelReason) {
|
||||
lock();
|
||||
try {
|
||||
if (!queryStatus.ok()) {
|
||||
@ -1003,7 +1007,7 @@ public class Coordinator {
|
||||
queryStatus.setStatus(Status.CANCELLED);
|
||||
}
|
||||
LOG.warn("cancel execution of query, this is outside invoke");
|
||||
cancelInternal(Types.PPlanFragmentCancelReason.USER_CANCEL);
|
||||
cancelInternal(cancelReason);
|
||||
} finally {
|
||||
unlock();
|
||||
}
|
||||
|
||||
@ -107,6 +107,7 @@ import org.apache.doris.planner.Planner;
|
||||
import org.apache.doris.planner.ScanNode;
|
||||
import org.apache.doris.proto.Data;
|
||||
import org.apache.doris.proto.InternalService;
|
||||
import org.apache.doris.proto.Types;
|
||||
import org.apache.doris.qe.QueryState.MysqlStateType;
|
||||
import org.apache.doris.qe.cache.Cache;
|
||||
import org.apache.doris.qe.cache.CacheAnalyzer;
|
||||
@ -1286,6 +1287,11 @@ public class StmtExecutor implements ProfileWriter {
|
||||
context.getState().setEof();
|
||||
plannerProfile.setQueryFetchResultFinishTime();
|
||||
} catch (Exception e) {
|
||||
// notify all be cancel runing fragment
|
||||
// in some case may block all fragment handle threads
|
||||
// details see issue https://github.com/apache/doris/issues/16203
|
||||
LOG.warn("cancel fragment query_id:{} cause {}", DebugUtil.printId(context.queryId()), e.getMessage());
|
||||
coord.cancel(Types.PPlanFragmentCancelReason.INTERNAL_ERROR);
|
||||
fetchResultSpan.recordException(e);
|
||||
throw e;
|
||||
} finally {
|
||||
|
||||
Reference in New Issue
Block a user