[Profile](runtimefilter) fix merge time of runtime filter (#21654)
This commit is contained in:
@ -137,6 +137,7 @@ public:
|
||||
if (_inited) {
|
||||
return Status::OK();
|
||||
}
|
||||
// TODO: really need the lock?
|
||||
std::lock_guard<std::mutex> l(_lock);
|
||||
if (_inited) {
|
||||
return Status::OK();
|
||||
|
||||
@ -311,6 +311,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
|
||||
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
|
||||
std::shared_ptr<RuntimeFilterCntlVal> cntVal;
|
||||
int merged_size = 0;
|
||||
int64_t merge_time = 0;
|
||||
{
|
||||
int64_t start_merge = MonotonicMillis();
|
||||
std::lock_guard<std::mutex> guard(_filter_map_mutex);
|
||||
@ -334,9 +335,11 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
|
||||
// TODO: avoid log when we had acquired a lock
|
||||
VLOG_ROW << "merge size:" << merged_size << ":" << cntVal->producer_size;
|
||||
DCHECK_LE(merged_size, cntVal->producer_size);
|
||||
_merge_timer += (MonotonicMillis() - start_merge);
|
||||
iter->second->merge_time += (MonotonicMillis() - start_merge);
|
||||
if (merged_size < cntVal->producer_size) {
|
||||
return Status::OK();
|
||||
} else {
|
||||
merge_time = iter->second->merge_time;
|
||||
}
|
||||
}
|
||||
|
||||
@ -374,7 +377,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
|
||||
rpc_contexts[cur]->request.set_filter_id(request->filter_id());
|
||||
rpc_contexts[cur]->request.set_is_pipeline(request->has_is_pipeline() &&
|
||||
request->is_pipeline());
|
||||
rpc_contexts[cur]->request.set_merge_time(_merge_timer);
|
||||
rpc_contexts[cur]->request.set_merge_time(merge_time);
|
||||
*rpc_contexts[cur]->request.mutable_query_id() = request->query_id();
|
||||
if (has_attachment) {
|
||||
rpc_contexts[cur]->cntl.request_attachment().append(request_attachment);
|
||||
|
||||
@ -139,7 +139,7 @@ public:
|
||||
UniqueId instance_id() const { return _fragment_instance_id; }
|
||||
|
||||
struct RuntimeFilterCntlVal {
|
||||
int64_t create_time;
|
||||
int64_t merge_time;
|
||||
int producer_size;
|
||||
TRuntimeFilterDesc runtime_filter_desc;
|
||||
std::vector<doris::TRuntimeFilterTargetParams> target_info;
|
||||
@ -173,7 +173,6 @@ private:
|
||||
std::map<std::string, std::shared_ptr<RuntimeFilterCntlVal>> _filter_map;
|
||||
RuntimeState* _state;
|
||||
bool _opt_remote_rf = true;
|
||||
int64_t _merge_timer = 0;
|
||||
};
|
||||
|
||||
// RuntimeFilterMergeController has a map query-id -> entity
|
||||
|
||||
Reference in New Issue
Block a user