[improvement](spinlock) remove some potential bad spinlock usage (#27904)
* [improvement](spinlock) remove some potential spinlock usage --------- Co-authored-by: yiguolei <yiguolei@gmail.com>
This commit is contained in:
@ -116,7 +116,7 @@ QueryStatistics::~QueryStatistics() {
|
||||
}
|
||||
|
||||
void QueryStatisticsRecvr::insert(const PQueryStatistics& statistics, int sender_id) {
|
||||
std::lock_guard<SpinLock> l(_lock);
|
||||
std::lock_guard<std::mutex> l(_lock);
|
||||
if (!_query_statistics.contains(sender_id)) {
|
||||
_query_statistics[sender_id] = std::make_shared<QueryStatistics>();
|
||||
}
|
||||
@ -126,12 +126,12 @@ void QueryStatisticsRecvr::insert(const PQueryStatistics& statistics, int sender
|
||||
void QueryStatisticsRecvr::insert(QueryStatisticsPtr statistics, int sender_id) {
|
||||
if (!statistics->collected()) return;
|
||||
if (_query_statistics.contains(sender_id)) return;
|
||||
std::lock_guard<SpinLock> l(_lock);
|
||||
std::lock_guard<std::mutex> l(_lock);
|
||||
_query_statistics[sender_id] = statistics;
|
||||
}
|
||||
|
||||
QueryStatisticsPtr QueryStatisticsRecvr::find(int sender_id) {
|
||||
std::lock_guard<SpinLock> l(_lock);
|
||||
std::lock_guard<std::mutex> l(_lock);
|
||||
auto it = _query_statistics.find(sender_id);
|
||||
if (it != _query_statistics.end()) {
|
||||
return it->second;
|
||||
|
||||
@ -157,14 +157,14 @@ private:
|
||||
friend class QueryStatistics;
|
||||
|
||||
void merge(QueryStatistics* statistics) {
|
||||
std::lock_guard<SpinLock> l(_lock);
|
||||
std::lock_guard<std::mutex> l(_lock);
|
||||
for (auto& pair : _query_statistics) {
|
||||
statistics->merge(*(pair.second));
|
||||
}
|
||||
}
|
||||
|
||||
std::map<int, QueryStatisticsPtr> _query_statistics;
|
||||
SpinLock _lock;
|
||||
std::mutex _lock;
|
||||
};
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -313,7 +313,6 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
|
||||
std::to_string(request->filter_id()));
|
||||
}
|
||||
}
|
||||
// iter->second = pair{CntlVal,SpinLock}
|
||||
cntVal = iter->second.first;
|
||||
{
|
||||
std::lock_guard<std::mutex> l(*iter->second.second);
|
||||
|
||||
@ -91,7 +91,6 @@ struct UserFunctionCacheEntry {
|
||||
// used to lookup a symbol
|
||||
void* lib_handle = nullptr;
|
||||
|
||||
SpinLock map_lock;
|
||||
// from symbol_name to function pointer
|
||||
std::unordered_map<std::string, void*> fptr_map;
|
||||
|
||||
|
||||
@ -123,7 +123,7 @@ std::string Metric::to_prometheus(const std::string& display_name, const Labels&
|
||||
std::map<std::string, double> HistogramMetric::_s_output_percentiles = {
|
||||
{"0.50", 50.0}, {"0.75", 75.0}, {"0.90", 90.0}, {"0.95", 95.0}, {"0.99", 99.0}};
|
||||
void HistogramMetric::clear() {
|
||||
std::lock_guard<SpinLock> l(_lock);
|
||||
std::lock_guard<std::mutex> l(_lock);
|
||||
_stats.clear();
|
||||
}
|
||||
|
||||
@ -136,12 +136,12 @@ void HistogramMetric::add(const uint64_t& value) {
|
||||
}
|
||||
|
||||
void HistogramMetric::merge(const HistogramMetric& other) {
|
||||
std::lock_guard<SpinLock> l(_lock);
|
||||
std::lock_guard<std::mutex> l(_lock);
|
||||
_stats.merge(other._stats);
|
||||
}
|
||||
|
||||
void HistogramMetric::set_histogram(const HistogramStat& stats) {
|
||||
std::lock_guard<SpinLock> l(_lock);
|
||||
std::lock_guard<std::mutex> l(_lock);
|
||||
_stats.clear();
|
||||
_stats.merge(stats);
|
||||
}
|
||||
@ -228,7 +228,7 @@ std::string MetricPrototype::to_prometheus(const std::string& registry_name) con
|
||||
}
|
||||
|
||||
void MetricEntity::deregister_metric(const MetricPrototype* metric_type) {
|
||||
std::lock_guard<SpinLock> l(_lock);
|
||||
std::lock_guard<std::mutex> l(_lock);
|
||||
auto metric = _metrics.find(metric_type);
|
||||
if (metric != _metrics.end()) {
|
||||
delete metric->second;
|
||||
@ -238,7 +238,7 @@ void MetricEntity::deregister_metric(const MetricPrototype* metric_type) {
|
||||
|
||||
Metric* MetricEntity::get_metric(const std::string& name, const std::string& group_name) const {
|
||||
MetricPrototype dummy(MetricType::UNTYPED, MetricUnit::NOUNIT, name, "", group_name);
|
||||
std::lock_guard<SpinLock> l(_lock);
|
||||
std::lock_guard<std::mutex> l(_lock);
|
||||
auto it = _metrics.find(&dummy);
|
||||
if (it == _metrics.end()) {
|
||||
return nullptr;
|
||||
@ -247,7 +247,7 @@ Metric* MetricEntity::get_metric(const std::string& name, const std::string& gro
|
||||
}
|
||||
|
||||
void MetricEntity::register_hook(const std::string& name, const std::function<void()>& hook) {
|
||||
std::lock_guard<SpinLock> l(_lock);
|
||||
std::lock_guard<std::mutex> l(_lock);
|
||||
#ifndef BE_TEST
|
||||
DCHECK(_hooks.find(name) == _hooks.end()) << "hook is already exist! " << _name << ":" << name;
|
||||
#endif
|
||||
@ -255,7 +255,7 @@ void MetricEntity::register_hook(const std::string& name, const std::function<vo
|
||||
}
|
||||
|
||||
void MetricEntity::deregister_hook(const std::string& name) {
|
||||
std::lock_guard<SpinLock> l(_lock);
|
||||
std::lock_guard<std::mutex> l(_lock);
|
||||
_hooks.erase(name);
|
||||
}
|
||||
|
||||
@ -276,7 +276,7 @@ std::shared_ptr<MetricEntity> MetricRegistry::register_entity(const std::string&
|
||||
const Labels& labels,
|
||||
MetricEntityType type) {
|
||||
std::shared_ptr<MetricEntity> entity = std::make_shared<MetricEntity>(type, name, labels);
|
||||
std::lock_guard<SpinLock> l(_lock);
|
||||
std::lock_guard<std::mutex> l(_lock);
|
||||
auto inserted_entity = _entities.insert(std::make_pair(entity, 1));
|
||||
if (!inserted_entity.second) {
|
||||
// If exist, increase the registered count
|
||||
@ -286,7 +286,7 @@ std::shared_ptr<MetricEntity> MetricRegistry::register_entity(const std::string&
|
||||
}
|
||||
|
||||
void MetricRegistry::deregister_entity(const std::shared_ptr<MetricEntity>& entity) {
|
||||
std::lock_guard<SpinLock> l(_lock);
|
||||
std::lock_guard<std::mutex> l(_lock);
|
||||
auto found_entity = _entities.find(entity);
|
||||
if (found_entity != _entities.end()) {
|
||||
// Decrease the registered count
|
||||
@ -303,7 +303,7 @@ std::shared_ptr<MetricEntity> MetricRegistry::get_entity(const std::string& name
|
||||
MetricEntityType type) {
|
||||
std::shared_ptr<MetricEntity> dummy = std::make_shared<MetricEntity>(type, name, labels);
|
||||
|
||||
std::lock_guard<SpinLock> l(_lock);
|
||||
std::lock_guard<std::mutex> l(_lock);
|
||||
auto entity = _entities.find(dummy);
|
||||
if (entity == _entities.end()) {
|
||||
return std::shared_ptr<MetricEntity>();
|
||||
@ -312,9 +312,9 @@ std::shared_ptr<MetricEntity> MetricRegistry::get_entity(const std::string& name
|
||||
}
|
||||
|
||||
void MetricRegistry::trigger_all_hooks(bool force) const {
|
||||
std::lock_guard<SpinLock> l(_lock);
|
||||
std::lock_guard<std::mutex> l(_lock);
|
||||
for (const auto& entity : _entities) {
|
||||
std::lock_guard<SpinLock> l(entity.first->_lock);
|
||||
std::lock_guard<std::mutex> l(entity.first->_lock);
|
||||
entity.first->trigger_hook_unlocked(force);
|
||||
}
|
||||
}
|
||||
@ -322,12 +322,12 @@ void MetricRegistry::trigger_all_hooks(bool force) const {
|
||||
std::string MetricRegistry::to_prometheus(bool with_tablet_metrics) const {
|
||||
// Reorder by MetricPrototype
|
||||
EntityMetricsByType entity_metrics_by_types;
|
||||
std::lock_guard<SpinLock> l(_lock);
|
||||
std::lock_guard<std::mutex> l(_lock);
|
||||
for (const auto& entity : _entities) {
|
||||
if (entity.first->_type == MetricEntityType::kTablet && !with_tablet_metrics) {
|
||||
continue;
|
||||
}
|
||||
std::lock_guard<SpinLock> l(entity.first->_lock);
|
||||
std::lock_guard<std::mutex> l(entity.first->_lock);
|
||||
entity.first->trigger_hook_unlocked(false);
|
||||
for (const auto& metric : entity.first->_metrics) {
|
||||
std::pair<MetricEntity*, Metric*> new_elem =
|
||||
@ -365,12 +365,12 @@ std::string MetricRegistry::to_prometheus(bool with_tablet_metrics) const {
|
||||
std::string MetricRegistry::to_json(bool with_tablet_metrics) const {
|
||||
rj::Document doc {rj::kArrayType};
|
||||
rj::Document::AllocatorType& allocator = doc.GetAllocator();
|
||||
std::lock_guard<SpinLock> l(_lock);
|
||||
std::lock_guard<std::mutex> l(_lock);
|
||||
for (const auto& entity : _entities) {
|
||||
if (entity.first->_type == MetricEntityType::kTablet && !with_tablet_metrics) {
|
||||
continue;
|
||||
}
|
||||
std::lock_guard<SpinLock> l(entity.first->_lock);
|
||||
std::lock_guard<std::mutex> l(entity.first->_lock);
|
||||
entity.first->trigger_hook_unlocked(false);
|
||||
for (const auto& metric : entity.first->_metrics) {
|
||||
rj::Value metric_obj(rj::kObjectType);
|
||||
@ -406,9 +406,9 @@ std::string MetricRegistry::to_json(bool with_tablet_metrics) const {
|
||||
|
||||
std::string MetricRegistry::to_core_string() const {
|
||||
std::stringstream ss;
|
||||
std::lock_guard<SpinLock> l(_lock);
|
||||
std::lock_guard<std::mutex> l(_lock);
|
||||
for (const auto& entity : _entities) {
|
||||
std::lock_guard<SpinLock> l(entity.first->_lock);
|
||||
std::lock_guard<std::mutex> l(entity.first->_lock);
|
||||
entity.first->trigger_hook_unlocked(false);
|
||||
for (const auto& metric : entity.first->_metrics) {
|
||||
if (metric.first->is_core_metric) {
|
||||
|
||||
@ -35,7 +35,6 @@
|
||||
|
||||
#include "util/core_local.h"
|
||||
#include "util/histogram.h"
|
||||
#include "util/spinlock.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
@ -111,17 +110,17 @@ public:
|
||||
std::string to_string() const override { return std::to_string(value()); }
|
||||
|
||||
T value() const {
|
||||
std::lock_guard<SpinLock> l(_lock);
|
||||
std::lock_guard<std::mutex> l(_lock);
|
||||
return _value;
|
||||
}
|
||||
|
||||
void increment(const T& delta) {
|
||||
std::lock_guard<SpinLock> l(this->_lock);
|
||||
std::lock_guard<std::mutex> l(this->_lock);
|
||||
_value += delta;
|
||||
}
|
||||
|
||||
void set_value(const T& value) {
|
||||
std::lock_guard<SpinLock> l(this->_lock);
|
||||
std::lock_guard<std::mutex> l(this->_lock);
|
||||
_value = value;
|
||||
}
|
||||
|
||||
@ -130,14 +129,14 @@ public:
|
||||
}
|
||||
|
||||
protected:
|
||||
// We use spinlock instead of std::atomic is because atomic don't support
|
||||
// We use std::mutex instead of std::atomic is because atomic don't support
|
||||
// double's fetch_add
|
||||
// TODO(zc): If this is atomic is bottleneck, we change to thread local.
|
||||
// performance: on Intel(R) Xeon(R) CPU E5-2450 int64_t
|
||||
// original type: 2ns/op
|
||||
// single thread spinlock: 26ns/op
|
||||
// multiple thread(8) spinlock: 2500ns/op
|
||||
mutable SpinLock _lock;
|
||||
// single thread std::mutex: 26ns/op
|
||||
// multiple thread(8) std::mutex: 2500ns/op
|
||||
mutable std::mutex _lock;
|
||||
T _value;
|
||||
};
|
||||
|
||||
@ -202,7 +201,7 @@ public:
|
||||
|
||||
protected:
|
||||
static std::map<std::string, double> _s_output_percentiles;
|
||||
mutable SpinLock _lock;
|
||||
mutable std::mutex _lock;
|
||||
HistogramStat _stats;
|
||||
};
|
||||
|
||||
@ -351,7 +350,7 @@ public:
|
||||
|
||||
template <typename T>
|
||||
Metric* register_metric(const MetricPrototype* metric_type) {
|
||||
std::lock_guard<SpinLock> l(_lock);
|
||||
std::lock_guard<std::mutex> l(_lock);
|
||||
auto inserted_metric = _metrics.insert(std::make_pair(metric_type, nullptr));
|
||||
if (inserted_metric.second) {
|
||||
// If not exist, make a new metric pointer
|
||||
@ -377,7 +376,7 @@ private:
|
||||
std::string _name;
|
||||
Labels _labels;
|
||||
|
||||
mutable SpinLock _lock;
|
||||
mutable std::mutex _lock;
|
||||
MetricMap _metrics;
|
||||
std::map<std::string, std::function<void()>> _hooks;
|
||||
};
|
||||
@ -421,7 +420,7 @@ public:
|
||||
private:
|
||||
const std::string _name;
|
||||
|
||||
mutable SpinLock _lock;
|
||||
mutable std::mutex _lock;
|
||||
// MetricEntity -> register count
|
||||
std::unordered_map<std::shared_ptr<MetricEntity>, int32_t, MetricEntityHash,
|
||||
MetricEntityEqualTo>
|
||||
|
||||
@ -1,150 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <string.h>
|
||||
|
||||
#include <iostream>
|
||||
|
||||
#include "util/spinlock.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
/// A fixed-size sampler to collect samples over time. AddSample should be
|
||||
/// called periodically with the sampled value. Samples are added at the max
|
||||
/// resolution possible. When the sample buffer is full, the current samples
|
||||
/// are collapsed and the collection period is doubled.
|
||||
/// The input period and the streaming sampler period do not need to match, the
|
||||
/// streaming sampler will average values.
|
||||
/// T is the type of the sample and must be a native numerical type (e.g. int or float).
|
||||
template <typename T, int MAX_SAMPLES>
|
||||
class StreamingSampler {
|
||||
public:
|
||||
StreamingSampler(int initial_period = 500)
|
||||
: samples_collected_(0),
|
||||
period_(initial_period),
|
||||
current_sample_sum_(0),
|
||||
current_sample_count_(0),
|
||||
current_sample_total_time_(0) {}
|
||||
|
||||
/// Initialize the sampler with values.
|
||||
StreamingSampler(int period, const std::vector<T>& initial_samples)
|
||||
: samples_collected_(initial_samples.size()),
|
||||
period_(period),
|
||||
current_sample_sum_(0),
|
||||
current_sample_count_(0),
|
||||
current_sample_total_time_(0) {
|
||||
DCHECK_LE(samples_collected_, MAX_SAMPLES);
|
||||
memcpy(samples_, &initial_samples[0], sizeof(T) * samples_collected_);
|
||||
}
|
||||
|
||||
/// Add a sample to the sampler. 'ms' is the time elapsed since the last time this
|
||||
/// was called.
|
||||
/// The input value is accumulated into current_*. If the total time elapsed
|
||||
/// in current_sample_total_time_ is higher than the storage period, the value is
|
||||
/// stored. 'sample' should be interpreted as a representative sample from
|
||||
/// (now - ms, now].
|
||||
/// TODO: we can make this more complex by taking a weighted average of samples
|
||||
/// accumulated in a period.
|
||||
void AddSample(T sample, int ms) {
|
||||
std::lock_guard<SpinLock> l(lock_);
|
||||
++current_sample_count_;
|
||||
current_sample_sum_ += sample;
|
||||
current_sample_total_time_ += ms;
|
||||
|
||||
if (current_sample_total_time_ >= period_) {
|
||||
samples_[samples_collected_++] = current_sample_sum_ / current_sample_count_;
|
||||
current_sample_count_ = 0;
|
||||
current_sample_sum_ = 0;
|
||||
current_sample_total_time_ = 0;
|
||||
|
||||
if (samples_collected_ == MAX_SAMPLES) {
|
||||
/// collapse the samples in half by averaging them and doubling the storage period
|
||||
period_ *= 2;
|
||||
for (int i = 0; i < MAX_SAMPLES / 2; ++i) {
|
||||
samples_[i] = (samples_[i * 2] + samples_[i * 2 + 1]) / 2;
|
||||
}
|
||||
samples_collected_ /= 2;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the samples collected. Returns the number of samples and
|
||||
/// the period they were collected at.
|
||||
/// If lock is non-null, the lock will be taken before returning. The caller
|
||||
/// must unlock it.
|
||||
const T* GetSamples(int* num_samples, int* period, SpinLock** lock = nullptr) const {
|
||||
if (lock != nullptr) {
|
||||
lock_.lock();
|
||||
*lock = &lock_;
|
||||
}
|
||||
*num_samples = samples_collected_;
|
||||
*period = period_;
|
||||
return samples_;
|
||||
}
|
||||
|
||||
/// Set the underlying data to period/samples
|
||||
void SetSamples(int period, const std::vector<T>& samples) {
|
||||
DCHECK_LE(samples.size(), MAX_SAMPLES);
|
||||
|
||||
std::lock_guard<SpinLock> l(lock_);
|
||||
period_ = period;
|
||||
samples_collected_ = samples.size();
|
||||
memcpy(samples_, &samples[0], sizeof(T) * samples_collected_);
|
||||
current_sample_sum_ = 0;
|
||||
current_sample_count_ = 0;
|
||||
current_sample_total_time_ = 0;
|
||||
}
|
||||
|
||||
std::string DebugString(const std::string& prefix = "") const {
|
||||
std::lock_guard<SpinLock> l(lock_);
|
||||
std::stringstream ss;
|
||||
ss << prefix << "Period = " << period_ << std::endl
|
||||
<< prefix << "Num = " << samples_collected_ << std::endl
|
||||
<< prefix << "Samples = {";
|
||||
for (int i = 0; i < samples_collected_; ++i) {
|
||||
ss << samples_[i] << ", ";
|
||||
}
|
||||
ss << prefix << "}" << std::endl;
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
private:
|
||||
mutable SpinLock lock_;
|
||||
|
||||
/// Aggregated samples collected. Note: this is not all the input samples from
|
||||
/// AddSample(), as logically, those samples get resampled and aggregated.
|
||||
T samples_[MAX_SAMPLES];
|
||||
|
||||
/// Number of samples collected <= MAX_SAMPLES.
|
||||
int samples_collected_;
|
||||
|
||||
/// Storage period in ms.
|
||||
int period_;
|
||||
|
||||
/// The sum of input samples that makes up the next stored sample.
|
||||
T current_sample_sum_;
|
||||
|
||||
/// The number of input samples that contribute to current_sample_sum_.
|
||||
int current_sample_count_;
|
||||
|
||||
/// The total time that current_sample_sum_ represents
|
||||
int current_sample_total_time_;
|
||||
};
|
||||
|
||||
} // namespace doris
|
||||
Reference in New Issue
Block a user