This reverts commit 925da90480f60afc0e5333a536d41e004234874e.
This commit is contained in:
@ -39,7 +39,6 @@
|
||||
#include "runtime/runtime_state.h"
|
||||
#include "runtime/thread_context.h"
|
||||
#include "util/brpc_client_cache.h"
|
||||
#include "util/spinlock.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
@ -209,7 +208,7 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
|
||||
const TRuntimeFilterDesc* runtime_filter_desc, const TQueryOptions* query_options,
|
||||
const std::vector<doris::TRuntimeFilterTargetParams>* target_info,
|
||||
const int producer_size) {
|
||||
std::unique_lock<std::shared_mutex> guard(_filter_map_mutex);
|
||||
std::lock_guard<std::mutex> guard(_filter_map_mutex);
|
||||
std::shared_ptr<RuntimeFilterCntlVal> cntVal = std::make_shared<RuntimeFilterCntlVal>();
|
||||
// runtime_filter_desc and target will be released,
|
||||
// so we need to copy to cntVal
|
||||
@ -220,10 +219,10 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
|
||||
cntVal->filter =
|
||||
cntVal->pool->add(new IRuntimeFilter(_state, &_state->get_query_ctx()->obj_pool));
|
||||
|
||||
auto filter_id = runtime_filter_desc->filter_id;
|
||||
std::string filter_id = std::to_string(runtime_filter_desc->filter_id);
|
||||
// LOG(INFO) << "entity filter id:" << filter_id;
|
||||
cntVal->filter->init_with_desc(&cntVal->runtime_filter_desc, query_options, -1, false);
|
||||
_filter_map.emplace(filter_id, CntlValwithLock {cntVal, std::make_unique<SpinLock>()});
|
||||
_filter_map.emplace(filter_id, cntVal);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -231,7 +230,7 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
|
||||
const TRuntimeFilterDesc* runtime_filter_desc, const TQueryOptions* query_options,
|
||||
const std::vector<doris::TRuntimeFilterTargetParamsV2>* targetv2_info,
|
||||
const int producer_size) {
|
||||
std::unique_lock<std::shared_mutex> guard(_filter_map_mutex);
|
||||
std::lock_guard<std::mutex> guard(_filter_map_mutex);
|
||||
std::shared_ptr<RuntimeFilterCntlVal> cntVal = std::make_shared<RuntimeFilterCntlVal>();
|
||||
// runtime_filter_desc and target will be released,
|
||||
// so we need to copy to cntVal
|
||||
@ -242,10 +241,10 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
|
||||
cntVal->filter =
|
||||
cntVal->pool->add(new IRuntimeFilter(_state, &_state->get_query_ctx()->obj_pool));
|
||||
|
||||
auto filter_id = runtime_filter_desc->filter_id;
|
||||
std::string filter_id = std::to_string(runtime_filter_desc->filter_id);
|
||||
// LOG(INFO) << "entity filter id:" << filter_id;
|
||||
cntVal->filter->init_with_desc(&cntVal->runtime_filter_desc, query_options);
|
||||
_filter_map.emplace(filter_id, CntlValwithLock {cntVal, std::make_unique<SpinLock>()});
|
||||
_filter_map.emplace(filter_id, cntVal);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -313,37 +312,34 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
|
||||
std::shared_ptr<RuntimeFilterCntlVal> cntVal;
|
||||
int merged_size = 0;
|
||||
int64_t merge_time = 0;
|
||||
int64_t start_merge = MonotonicMillis();
|
||||
auto filter_id = request->filter_id();
|
||||
std::map<int, CntlValwithLock>::iterator iter;
|
||||
{
|
||||
std::shared_lock<std::shared_mutex> guard(_filter_map_mutex);
|
||||
iter = _filter_map.find(filter_id);
|
||||
int64_t start_merge = MonotonicMillis();
|
||||
std::lock_guard<std::mutex> guard(_filter_map_mutex);
|
||||
auto iter = _filter_map.find(std::to_string(request->filter_id()));
|
||||
VLOG_ROW << "recv filter id:" << request->filter_id() << " " << request->ShortDebugString();
|
||||
if (iter == _filter_map.end()) {
|
||||
return Status::InvalidArgument("unknown filter id {}",
|
||||
std::to_string(request->filter_id()));
|
||||
}
|
||||
}
|
||||
// iter->second = pair{CntlVal,SpinLock}
|
||||
cntVal = iter->second.first;
|
||||
{
|
||||
std::lock_guard<SpinLock> l(*iter->second.second);
|
||||
cntVal = iter->second;
|
||||
if (auto bf = cntVal->filter->get_bloomfilter()) {
|
||||
RETURN_IF_ERROR(bf->init_with_fixed_length());
|
||||
}
|
||||
MergeRuntimeFilterParams params(request, attach_data);
|
||||
ObjectPool* pool = cntVal->pool.get();
|
||||
ObjectPool* pool = iter->second->pool.get();
|
||||
RuntimeFilterWrapperHolder holder;
|
||||
RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(_state, ¶ms, pool, holder.getHandle()));
|
||||
RETURN_IF_ERROR(cntVal->filter->merge_from(holder.getHandle()->get()));
|
||||
cntVal->arrive_id.insert(UniqueId(request->fragment_id()));
|
||||
cntVal->arrive_id.insert(UniqueId(request->fragment_id()).to_string());
|
||||
merged_size = cntVal->arrive_id.size();
|
||||
// TODO: avoid log when we had acquired a lock
|
||||
VLOG_ROW << "merge size:" << merged_size << ":" << cntVal->producer_size;
|
||||
DCHECK_LE(merged_size, cntVal->producer_size);
|
||||
cntVal->merge_time += (MonotonicMillis() - start_merge);
|
||||
iter->second->merge_time += (MonotonicMillis() - start_merge);
|
||||
if (merged_size < cntVal->producer_size) {
|
||||
return Status::OK();
|
||||
} else {
|
||||
merge_time = cntVal->merge_time;
|
||||
merge_time = iter->second->merge_time;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -26,7 +26,6 @@
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <shared_mutex>
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
#include <unordered_set>
|
||||
@ -146,12 +145,12 @@ public:
|
||||
std::vector<doris::TRuntimeFilterTargetParams> target_info;
|
||||
std::vector<doris::TRuntimeFilterTargetParamsV2> targetv2_info;
|
||||
IRuntimeFilter* filter;
|
||||
std::unordered_set<UniqueId> arrive_id; // fragment_instance_id ?
|
||||
std::unordered_set<std::string> arrive_id; // fragment_instance_id ?
|
||||
std::shared_ptr<ObjectPool> pool;
|
||||
};
|
||||
|
||||
public:
|
||||
RuntimeFilterCntlVal* get_filter(int id) { return _filter_map[id].first.get(); }
|
||||
RuntimeFilterCntlVal* get_filter(int id) { return _filter_map[std::to_string(id)].get(); }
|
||||
|
||||
private:
|
||||
Status _init_with_desc(const TRuntimeFilterDesc* runtime_filter_desc,
|
||||
@ -167,11 +166,11 @@ private:
|
||||
UniqueId _query_id;
|
||||
UniqueId _fragment_instance_id;
|
||||
// protect _filter_map
|
||||
std::shared_mutex _filter_map_mutex;
|
||||
std::mutex _filter_map_mutex;
|
||||
std::shared_ptr<MemTracker> _mem_tracker;
|
||||
using CntlValwithLock =
|
||||
std::pair<std::shared_ptr<RuntimeFilterCntlVal>, std::unique_ptr<SpinLock>>;
|
||||
std::map<int, CntlValwithLock> _filter_map;
|
||||
// TODO: convert filter id to i32
|
||||
// filter-id -> val
|
||||
std::map<std::string, std::shared_ptr<RuntimeFilterCntlVal>> _filter_map;
|
||||
RuntimeState* _state;
|
||||
bool _opt_remote_rf = true;
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user