[pipelineX](join) Support shared hash table (#23876)

This commit is contained in:
Gabriel
2023-09-05 10:14:40 +08:00
committed by GitHub
parent b51d8aa7b7
commit 5853ed385e
9 changed files with 65 additions and 38 deletions

View File

@ -26,32 +26,24 @@
namespace doris {
namespace vectorized {
void SharedHashTableController::set_builder_and_consumers(TUniqueId builder,
const std::vector<TUniqueId>& consumers,
int node_id) {
void SharedHashTableController::set_builder_and_consumers(TUniqueId builder, int node_id) {
// Only need to set builder and consumers with pipeline engine enabled.
DCHECK(_pipeline_engine_enabled);
std::lock_guard<std::mutex> lock(_mutex);
DCHECK(_builder_fragment_ids.find(node_id) == _builder_fragment_ids.cend());
_builder_fragment_ids.insert({node_id, builder});
_ref_fragments[node_id].assign(consumers.cbegin(), consumers.cend());
}
bool SharedHashTableController::should_build_hash_table(const TUniqueId& fragment_instance_id,
int my_node_id) {
std::lock_guard<std::mutex> lock(_mutex);
auto it = _builder_fragment_ids.find(my_node_id);
if (_pipeline_engine_enabled) {
if (it != _builder_fragment_ids.cend()) {
return it->second == fragment_instance_id;
}
return false;
}
if (it == _builder_fragment_ids.cend()) {
_builder_fragment_ids.insert({my_node_id, fragment_instance_id});
return true;
DCHECK(_pipeline_engine_enabled && it != _builder_fragment_ids.cend());
if (it != _builder_fragment_ids.cend()) {
return it->second == fragment_instance_id;
}
throw Exception(ErrorCode::INTERNAL_ERROR,
"Shared hash table for node {} has not been initialized!", my_node_id);
return false;
}