[fix](pipelineX) Fix unexpected OOM on pipelineX (#29436)
This commit is contained in:
@ -53,7 +53,7 @@ public:
|
||||
~AggSinkDependency() override = default;
|
||||
|
||||
void set_ready() override {
|
||||
if (_is_streaming_agg_state()) {
|
||||
if (_shared_state && _is_streaming_agg_state()) {
|
||||
if (((SharedState*)Dependency::_shared_state.get())
|
||||
->data_queue->has_enough_space_to_push()) {
|
||||
Dependency::set_ready();
|
||||
|
||||
@ -59,7 +59,6 @@ struct BasicSharedState {
|
||||
DependencySPtr source_dep = nullptr;
|
||||
DependencySPtr sink_dep = nullptr;
|
||||
|
||||
virtual Status close(RuntimeState* state) { return Status::OK(); }
|
||||
virtual ~BasicSharedState() = default;
|
||||
};
|
||||
|
||||
@ -90,6 +89,7 @@ public:
|
||||
void set_shared_state(std::shared_ptr<BasicSharedState> shared_state) {
|
||||
_shared_state = shared_state;
|
||||
}
|
||||
void clear_shared_state() { _shared_state.reset(); }
|
||||
virtual std::string debug_string(int indentation_level = 0);
|
||||
|
||||
// Start the watcher. We use it to count how long this dependency block the current pipeline task.
|
||||
|
||||
@ -376,11 +376,9 @@ Status PipelineXLocalState<DependencyType>::close(RuntimeState* state) {
|
||||
if (_closed) {
|
||||
return Status::OK();
|
||||
}
|
||||
if (_shared_state) {
|
||||
RETURN_IF_ERROR(_shared_state->close(state));
|
||||
}
|
||||
if constexpr (!std::is_same_v<DependencyType, FakeDependency>) {
|
||||
COUNTER_SET(_wait_for_dependency_timer, _dependency->watcher_elapse_time());
|
||||
_dependency->clear_shared_state();
|
||||
}
|
||||
if (_rows_returned_counter != nullptr) {
|
||||
COUNTER_SET(_rows_returned_counter, _num_rows_returned);
|
||||
@ -439,11 +437,11 @@ Status PipelineXSinkLocalState<DependencyType>::close(RuntimeState* state, Statu
|
||||
if (_closed) {
|
||||
return Status::OK();
|
||||
}
|
||||
if (_shared_state) {
|
||||
RETURN_IF_ERROR(_shared_state->close(state));
|
||||
}
|
||||
if constexpr (!std::is_same_v<DependencyType, FakeDependency>) {
|
||||
COUNTER_SET(_wait_for_dependency_timer, _dependency->watcher_elapse_time());
|
||||
if constexpr (!std::is_same_v<LocalExchangeSinkDependency, DependencyType>) {
|
||||
_dependency->clear_shared_state();
|
||||
}
|
||||
}
|
||||
if (_peak_memory_usage_counter) {
|
||||
_peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
|
||||
|
||||
Reference in New Issue
Block a user