[pipelineX](bug) Fix scan dependency timeout (#27696)

This commit is contained in:
Gabriel
2023-11-28 18:21:11 +08:00
committed by GitHub
parent 91b0edfaa2
commit 38d30f21f1
2 changed files with 34 additions and 2 deletions

View File

@ -57,6 +57,10 @@ public:
~PartitionSortSourceDependency() override = default;
void block() override {
if (_always_ready) {
return;
}
std::unique_lock<std::mutex> lc(_always_done_lock);
if (_always_ready) {
return;
}
@ -64,6 +68,10 @@ public:
}
void set_always_ready() {
if (_always_ready) {
return;
}
std::unique_lock<std::mutex> lc(_always_done_lock);
if (_always_ready) {
return;
}
@ -71,8 +79,16 @@ public:
set_ready();
}
std::string debug_string(int indentation_level = 0) override {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}, _always_ready = {}",
Dependency::debug_string(indentation_level), _always_ready);
return fmt::to_string(debug_string_buffer);
}
private:
std::atomic<bool> _always_ready {false};
bool _always_ready {false};
std::mutex _always_done_lock;
};
class PartitionSortSourceOperatorX;

View File

@ -74,6 +74,10 @@ public:
bool push_to_blocking_queue() const override { return true; }
void block() override {
if (_scanner_done) {
return;
}
std::unique_lock<std::mutex> lc(_always_done_lock);
if (_scanner_done) {
return;
}
@ -81,6 +85,10 @@ public:
}
void set_scanner_done() {
if (_scanner_done) {
return;
}
std::unique_lock<std::mutex> lc(_always_done_lock);
if (_scanner_done) {
return;
}
@ -88,11 +96,19 @@ public:
Dependency::set_ready();
}
std::string debug_string(int indentation_level = 0) override {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}, _scanner_done = {}",
Dependency::debug_string(indentation_level), _scanner_done);
return fmt::to_string(debug_string_buffer);
}
void set_scanner_ctx(vectorized::ScannerContext* scanner_ctx) { _scanner_ctx = scanner_ctx; }
private:
vectorized::ScannerContext* _scanner_ctx = nullptr;
std::atomic<bool> _scanner_done {false};
bool _scanner_done {false};
std::mutex _always_done_lock;
};
class ScanLocalStateBase : public PipelineXLocalState<>, public vectorized::RuntimeFilterConsumer {