[pipelineX](fix) Fix use-after-free MultiCastSourceDependency (#30199)

This commit is contained in:
Gabriel
2024-01-23 10:32:06 +08:00
committed by yiguolei
parent 7be9301360
commit 0e5d56fc2e
15 changed files with 170 additions and 28 deletions

View File

@ -63,6 +63,10 @@ public:
: PipelineXSinkLocalState<AnalyticSinkDependency>(parent, state) {}
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status close(RuntimeState* state, Status exec_status) override {
_shared_state->release_sink_dep();
return PipelineXSinkLocalState<AnalyticSinkDependency>::close(state, exec_status);
}
private:
friend class AnalyticSinkOperatorX;
@ -70,11 +74,11 @@ private:
bool _refresh_need_more_input() {
auto need_more_input = _whether_need_next_partition(_shared_state->found_partition_end);
if (need_more_input) {
_shared_state->source_dep->block();
_dependency->set_block_to_read();
_dependency->set_ready();
} else {
_dependency->block();
_shared_state->source_dep->set_ready();
_dependency->set_ready_to_read();
}
return need_more_input;
}

View File

@ -83,9 +83,12 @@ private:
auto need_more_input = _whether_need_next_partition(_shared_state->found_partition_end);
if (need_more_input) {
_dependency->block();
_shared_state->sink_dep->set_ready();
_dependency->set_ready_to_write();
if (!_shared_state->sink_released_flag) {
_shared_state->sink_dep->set_ready();
}
} else {
_shared_state->sink_dep->block();
_dependency->set_block_to_write();
_dependency->set_ready();
}
return need_more_input;

View File

@ -152,6 +152,13 @@ Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState
return Status::OK();
}
Status MultiCastDataStreamSourceLocalState::close(RuntimeState* state) {
_shared_state->multi_cast_data_streamer.released_dependency(
_parent->cast<Parent>()._consumer_id);
RETURN_IF_ERROR(Base::close(state));
return Status::OK();
}
Status MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state,
vectorized::Block* block,
SourceState& source_state) {

View File

@ -117,7 +117,7 @@ public:
RETURN_IF_ERROR(_acquire_runtime_filter());
return Status::OK();
}
Status close(RuntimeState* state) override;
friend class MultiCastDataStreamerSourceOperatorX;
RuntimeFilterDependency* filterdependency() override { return _filter_dependency.get(); }

View File

@ -104,15 +104,37 @@ void MultiCastDataStreamer::_set_ready_for_read(int sender_idx) {
if (_dependencies.empty()) {
return;
}
auto* dep = _dependencies[sender_idx];
DCHECK(dep);
dep->set_ready();
if (_dependencies_release_flag[sender_idx]) {
return;
}
{
std::unique_lock<std::mutex> lc(_release_lock);
if (_dependencies_release_flag[sender_idx]) {
return;
}
auto* dep = _dependencies[sender_idx];
DCHECK(dep);
dep->set_ready();
}
}
void MultiCastDataStreamer::_set_ready_for_read() {
size_t i = 0;
for (auto* dep : _dependencies) {
DCHECK(dep);
dep->set_ready();
if (_dependencies_release_flag[i]) {
i++;
continue;
}
{
std::unique_lock<std::mutex> lc(_release_lock);
if (_dependencies_release_flag[i]) {
i++;
continue;
}
DCHECK(dep);
dep->set_ready();
i++;
}
}
}

View File

@ -38,10 +38,14 @@ public:
bool with_dependencies = false)
: _row_desc(row_desc),
_profile(pool->add(new RuntimeProfile("MultiCastDataStreamSink"))),
_cast_sender_count(cast_sender_count) {
_cast_sender_count(cast_sender_count),
_dependencies_release_flag(cast_sender_count) {
_sender_pos_to_read.resize(cast_sender_count, _multi_cast_blocks.end());
if (with_dependencies) {
_dependencies.resize(cast_sender_count, nullptr);
for (size_t i = 0; i < cast_sender_count; i++) {
_dependencies_release_flag[i] = false;
}
}
_peak_mem_usage = ADD_COUNTER(profile(), "PeakMemUsage", TUnit::BYTES);
@ -79,6 +83,11 @@ public:
_block_reading(sender_idx);
}
void released_dependency(int sender_idx) {
std::unique_lock<std::mutex> lc(_release_lock);
_dependencies_release_flag[sender_idx] = true;
}
private:
void _set_ready_for_read(int sender_idx);
void _set_ready_for_read();
@ -97,6 +106,8 @@ private:
RuntimeProfile::Counter* _process_rows = nullptr;
RuntimeProfile::Counter* _peak_mem_usage = nullptr;
std::mutex _release_lock;
std::vector<std::atomic<bool>> _dependencies_release_flag;
std::vector<MultiCastSourceDependency*> _dependencies;
};
} // namespace doris::pipeline

View File

@ -219,7 +219,7 @@ void SetProbeSinkOperatorX<is_intersect>::_finalize_probe(
local_state._shared_state->probe_finished_children_dependency[_cur_child_id + 1]
->set_ready();
} else {
local_state._shared_state->source_dep->set_ready();
local_state._dependency->set_ready_to_read();
}
}

View File

@ -94,7 +94,7 @@ Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* state, vectorized::Blo
local_state._shared_state->probe_finished_children_dependency[_cur_child_id + 1]
->set_ready();
if (_child_quantity == 1) {
local_state._shared_state->source_dep->set_ready();
local_state._dependency->set_ready_to_read();
}
}
}

View File

@ -189,4 +189,13 @@ void LocalExchangeSharedState::sub_running_sink_operators() {
}
}
LocalExchangeSharedState::LocalExchangeSharedState(int num_instances)
: dependencies_release_flag(num_instances) {
source_dependencies.resize(num_instances, nullptr);
mem_trackers.resize(num_instances, nullptr);
for (size_t i = 0; i < num_instances; i++) {
dependencies_release_flag[i] = false;
}
}
} // namespace doris::pipeline

View File

@ -57,9 +57,22 @@ static constexpr auto TIME_UNIT_DEPENDENCY_LOG = 30 * 1000L * 1000L * 1000L;
static_assert(TIME_UNIT_DEPENDENCY_LOG < SLOW_DEPENDENCY_THRESHOLD);
struct BasicSharedState {
DependencySPtr source_dep = nullptr;
DependencySPtr sink_dep = nullptr;
Dependency* source_dep = nullptr;
Dependency* sink_dep = nullptr;
std::atomic_bool source_released_flag {false};
std::atomic_bool sink_released_flag {false};
std::mutex source_release_lock;
std::mutex sink_release_lock;
void release_source_dep() {
std::unique_lock<std::mutex> lc(source_release_lock);
source_released_flag = true;
}
void release_sink_dep() {
std::unique_lock<std::mutex> lc(sink_release_lock);
sink_released_flag = true;
}
virtual ~BasicSharedState() = default;
};
@ -108,9 +121,50 @@ public:
virtual void set_ready();
void set_ready_to_read() {
DCHECK(_is_write_dependency) << debug_string();
if (_shared_state->source_released_flag) {
return;
}
std::unique_lock<std::mutex> lc(_shared_state->source_release_lock);
if (_shared_state->source_released_flag) {
return;
}
DCHECK(_shared_state->source_dep != nullptr) << debug_string();
_shared_state->source_dep->set_ready();
}
void set_block_to_read() {
DCHECK(_is_write_dependency) << debug_string();
if (_shared_state->source_released_flag) {
return;
}
std::unique_lock<std::mutex> lc(_shared_state->source_release_lock);
if (_shared_state->source_released_flag) {
return;
}
DCHECK(_shared_state->source_dep != nullptr) << debug_string();
_shared_state->source_dep->block();
}
void set_ready_to_write() {
if (_shared_state->sink_released_flag) {
return;
}
std::unique_lock<std::mutex> lc(_shared_state->sink_release_lock);
if (_shared_state->sink_released_flag) {
return;
}
DCHECK(_shared_state->sink_dep != nullptr) << debug_string();
_shared_state->sink_dep->set_ready();
}
void set_block_to_write() {
if (_shared_state->sink_released_flag) {
return;
}
std::unique_lock<std::mutex> lc(_shared_state->sink_release_lock);
if (_shared_state->sink_released_flag) {
return;
}
DCHECK(_shared_state->sink_dep != nullptr) << debug_string();
_shared_state->sink_dep->block();
}
// Notify downstream pipeline tasks this dependency is blocked.
virtual void block() { _ready = false; }
@ -610,25 +664,47 @@ class Exchanger;
struct LocalExchangeSharedState : public BasicSharedState {
public:
ENABLE_FACTORY_CREATOR(LocalExchangeSharedState);
LocalExchangeSharedState(int num_instances);
std::unique_ptr<Exchanger> exchanger {};
std::vector<DependencySPtr> source_dependencies;
std::vector<Dependency*> source_dependencies;
std::vector<std::atomic_bool> dependencies_release_flag;
Dependency* sink_dependency;
std::vector<MemTracker*> mem_trackers;
std::atomic<size_t> mem_usage = 0;
std::mutex le_lock;
void sub_running_sink_operators();
void _set_ready_for_read() {
size_t i = 0;
for (auto& dep : source_dependencies) {
DCHECK(dep);
dep->set_ready();
if (dependencies_release_flag[i]) {
i++;
continue;
}
{
std::unique_lock<std::mutex> lc(source_release_lock);
if (dependencies_release_flag[i]) {
i++;
continue;
}
DCHECK(dep);
dep->set_ready();
i++;
}
}
}
void set_dep_by_channel_id(DependencySPtr dep, int channel_id) {
void set_dep_by_channel_id(Dependency* dep, int channel_id) {
source_dependencies[channel_id] = dep;
}
void set_ready_to_read(int channel_id) {
if (dependencies_release_flag[channel_id]) {
return;
}
std::unique_lock<std::mutex> lc(source_release_lock);
if (dependencies_release_flag[channel_id]) {
return;
}
auto& dep = source_dependencies[channel_id];
DCHECK(dep) << channel_id;
dep->set_ready();

View File

@ -37,7 +37,7 @@ Status LocalExchangeSourceLocalState::init(RuntimeState* state, LocalStateInfo&
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
_channel_id = info.task_idx;
_shared_state->set_dep_by_channel_id(info.dependency, _channel_id);
_shared_state->set_dep_by_channel_id(_dependency, _channel_id);
_shared_state->mem_trackers[_channel_id] = _mem_tracker.get();
_exchanger = _shared_state->exchanger.get();
DCHECK(_exchanger != nullptr);
@ -61,6 +61,12 @@ std::string LocalExchangeSourceLocalState::debug_string(int indentation_level) c
return fmt::to_string(debug_string_buffer);
}
Status LocalExchangeSourceLocalState::close(RuntimeState* state) {
_shared_state->dependencies_release_flag[_channel_id] = true;
RETURN_IF_ERROR(Base::close(state));
return Status::OK();
}
Status LocalExchangeSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) {
auto& local_state = get_local_state(state);

View File

@ -48,6 +48,7 @@ public:
Status init(RuntimeState* state, LocalStateInfo& info) override;
std::string debug_string(int indentation_level) const override;
Status close(RuntimeState* state) override;
private:
friend class LocalExchangeSourceOperatorX;

View File

@ -338,15 +338,15 @@ Status PipelineXLocalState<DependencyType>::init(RuntimeState* state, LocalState
_shared_state =
(typename DependencyType::SharedState*)_dependency->shared_state().get();
_shared_state->source_dep = info.dependency;
_shared_state->sink_dep = deps.front();
_shared_state->source_dep = info.dependency.get();
_shared_state->sink_dep = deps.front().get();
} else if constexpr (!is_fake_shared) {
_dependency->set_shared_state(deps.front()->shared_state());
_shared_state =
(typename DependencyType::SharedState*)_dependency->shared_state().get();
_shared_state->source_dep = info.dependency;
_shared_state->sink_dep = deps.front();
_shared_state->source_dep = info.dependency.get();
_shared_state->sink_dep = deps.front().get();
}
}
@ -378,6 +378,9 @@ Status PipelineXLocalState<DependencyType>::close(RuntimeState* state) {
if (_closed) {
return Status::OK();
}
if (_shared_state) {
_shared_state->release_source_dep();
}
if constexpr (!std::is_same_v<DependencyType, FakeDependency>) {
COUNTER_SET(_wait_for_dependency_timer, _dependency->watcher_elapse_time());
_dependency->clear_shared_state();

View File

@ -739,7 +739,7 @@ Status PipelineXFragmentContext::_add_local_exchange_impl(
is_shuffled_hash_join, shuffle_idx_to_instance_idx));
// 2. Create and initialize LocalExchangeSharedState.
auto shared_state = LocalExchangeSharedState::create_shared();
auto shared_state = LocalExchangeSharedState::create_shared(_num_instances);
switch (data_distribution.distribution_type) {
case ExchangeType::HASH_SHUFFLE:
shared_state->exchanger = ShuffleExchanger::create_unique(
@ -771,8 +771,6 @@ Status PipelineXFragmentContext::_add_local_exchange_impl(
return Status::InternalError("Unsupported local exchange type : " +
std::to_string((int)data_distribution.distribution_type));
}
shared_state->source_dependencies.resize(_num_instances, nullptr);
shared_state->mem_trackers.resize(_num_instances, nullptr);
auto sink_dep = std::make_shared<LocalExchangeSinkDependency>(sink_id, local_exchange_id,
_runtime_state->get_query_ctx());
sink_dep->set_shared_state(shared_state);

View File

@ -3985,7 +3985,9 @@ public class Coordinator implements CoordInterface {
if (enablePipelineEngine) {
for (PipelineExecContext ctx : pipelineExecContexts.values()) {
if (enablePipelineXEngine) {
ctx.attachPipelineProfileToFragmentProfile();
synchronized (this) {
ctx.attachPipelineProfileToFragmentProfile();
}
} else {
ctx.profileStream()
.forEach(p -> executionProfile.addInstanceProfile(ctx.profileFragmentId, p));