[pick](branch-2.1) pick #46266 (#46840)

### What problem does this PR solve?

```
[fix](cancel) Fix BufferControlBlock cancel msg #46266
```
This commit is contained in:
Xinyi Zou
2025-01-13 10:37:11 +08:00
committed by GitHub
parent 842196ace3
commit 9ccfff12ab
2 changed files with 8 additions and 4 deletions

View File

@ -172,7 +172,10 @@ BufferControlBlock::BufferControlBlock(const TUniqueId& id, int buffer_size, Run
}
BufferControlBlock::~BufferControlBlock() {
cancel(Status::Cancelled("Cancelled"));
cancel(Status::Cancelled(
"BufferControlBlock is destructed, this is not the expected path, the correct path is "
"ResultBufferMgr::cancel before the destructor, fragmentId: {}",
print_id(_fragment_id)));
}
Status BufferControlBlock::init() {
@ -447,7 +450,7 @@ void BufferControlBlock::cancel(const Status& reason) {
}
_waiting_rpc.clear();
for (auto& ctx : _waiting_arrow_result_batch_rpc) {
ctx->on_failure(Status::Cancelled("Cancelled"));
ctx->on_failure(reason);
}
_waiting_arrow_result_batch_rpc.clear();
_arrow_flight_result_batch_queue.clear();

View File

@ -216,8 +216,9 @@ void ResultBufferMgr::cancel_thread() {
}
// cancel query
for (int i = 0; i < query_to_cancel.size(); ++i) {
cancel(query_to_cancel[i], Status::TimedOut("Query tiemout"));
for (const auto& id : query_to_cancel) {
cancel(id, Status::Cancelled("Clean up expired BufferControlBlock, queryId: {}",
print_id(id)));
}
} while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(1)));