[refactor](scan) change scan reschedule into scan context (#27766)
* [refactor](scan) change scan reschedule into scan context
This commit is contained in:
@ -59,10 +59,6 @@ bool ScanOperator::can_read() {
|
||||
// _scanner_ctx->no_schedule(): should schedule _scanner_ctx
|
||||
return true;
|
||||
} else {
|
||||
if (_node->_scanner_ctx->get_num_running_scanners() == 0 &&
|
||||
_node->_scanner_ctx->should_be_scheduled()) {
|
||||
_node->_scanner_ctx->reschedule_scanner_ctx();
|
||||
}
|
||||
return _node->ready_to_read(); // there are some blocks to process
|
||||
}
|
||||
}
|
||||
|
||||
@ -63,17 +63,6 @@ public:
|
||||
ScanDependency(int id, int node_id, QueryContext* query_ctx)
|
||||
: Dependency(id, node_id, "ScanDependency", query_ctx), _scanner_ctx(nullptr) {}
|
||||
|
||||
// TODO(gabriel):
|
||||
[[nodiscard]] Dependency* is_blocked_by(PipelineXTask* task) override {
|
||||
if (_scanner_ctx && _scanner_ctx->get_num_running_scanners() == 0 &&
|
||||
_scanner_ctx->should_be_scheduled()) {
|
||||
_scanner_ctx->reschedule_scanner_ctx();
|
||||
}
|
||||
return Dependency::is_blocked_by(task);
|
||||
}
|
||||
|
||||
bool push_to_blocking_queue() const override { return true; }
|
||||
|
||||
void block() override {
|
||||
if (_scanner_done) {
|
||||
return;
|
||||
|
||||
@ -76,11 +76,15 @@ public:
|
||||
*block = std::move(_blocks_queues[id].front());
|
||||
_blocks_queues[id].pop_front();
|
||||
|
||||
if (_blocks_queues[id].empty() && _dependency) {
|
||||
_dependency->block();
|
||||
if (_blocks_queues[id].empty()) {
|
||||
this->reschedule_scanner_ctx();
|
||||
if (_dependency) {
|
||||
_dependency->block();
|
||||
}
|
||||
}
|
||||
}
|
||||
_current_used_bytes -= (*block)->allocated_bytes();
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user