@ -38,6 +38,7 @@
|
||||
#include "olap/storage_engine.h"
|
||||
#include "olap/tablet_manager.h"
|
||||
#include "util/time.h"
|
||||
#include "util/trace.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
@ -113,6 +114,7 @@ Status PadRowsetAction::_pad_rowset(TabletSharedPtr tablet, const Version& versi
|
||||
std::vector<RowsetSharedPtr> to_delete;
|
||||
{
|
||||
std::unique_lock wlock(tablet->get_header_lock());
|
||||
SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
|
||||
tablet->modify_rowsets(to_add, to_delete);
|
||||
tablet->save_meta();
|
||||
}
|
||||
|
||||
@ -35,6 +35,7 @@
|
||||
#include "olap/tablet_meta.h"
|
||||
#include "runtime/thread_context.h"
|
||||
#include "util/thread.h"
|
||||
#include "util/trace.h"
|
||||
#include "util/uid_util.h"
|
||||
|
||||
namespace doris {
|
||||
@ -83,6 +84,7 @@ Status ColdDataCompaction::modify_rowsets(const Merger::Statistics* stats) {
|
||||
UniqueId cooldown_meta_id = UniqueId::gen_uid();
|
||||
{
|
||||
std::lock_guard wlock(_tablet->get_header_lock());
|
||||
SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
|
||||
// Merged cooldowned rowsets MUST NOT be managed by version graph, they will be reclaimed by `remove_unused_remote_files`.
|
||||
_tablet->delete_rowsets(_input_rowsets, false);
|
||||
_tablet->add_rowsets({_output_rowset});
|
||||
|
||||
@ -535,6 +535,7 @@ Status Compaction::modify_rowsets(const Merger::Statistics* stats) {
|
||||
{
|
||||
std::lock_guard<std::mutex> wrlock_(_tablet->get_rowset_update_lock());
|
||||
std::lock_guard<std::shared_mutex> wrlock(_tablet->get_header_lock());
|
||||
SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
|
||||
|
||||
// Convert the delete bitmap of the input rowsets to output rowset for
|
||||
// incremental data.
|
||||
|
||||
@ -26,6 +26,7 @@
|
||||
|
||||
#include "common/config.h"
|
||||
#include "runtime/define_primitive_type.h"
|
||||
#include "util/trace.h"
|
||||
#include "vec/columns/column_array.h"
|
||||
#include "vec/columns/column_dictionary.h"
|
||||
#include "vec/columns/column_map.h"
|
||||
|
||||
@ -63,6 +63,7 @@
|
||||
#include "runtime/memory/mem_tracker.h"
|
||||
#include "runtime/runtime_state.h"
|
||||
#include "util/defer_op.h"
|
||||
#include "util/trace.h"
|
||||
#include "vec/aggregate_functions/aggregate_function.h"
|
||||
#include "vec/aggregate_functions/aggregate_function_reader.h"
|
||||
#include "vec/columns/column.h"
|
||||
@ -747,6 +748,7 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
|
||||
std::lock_guard<std::mutex> base_tablet_lock(base_tablet->get_push_lock());
|
||||
std::lock_guard<std::mutex> new_tablet_lock(new_tablet->get_push_lock());
|
||||
std::lock_guard<std::shared_mutex> base_tablet_wlock(base_tablet->get_header_lock());
|
||||
SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
|
||||
std::lock_guard<std::shared_mutex> new_tablet_wlock(new_tablet->get_header_lock());
|
||||
|
||||
do {
|
||||
@ -947,6 +949,7 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
|
||||
// step 3
|
||||
std::lock_guard<std::mutex> rwlock(new_tablet->get_rowset_update_lock());
|
||||
std::lock_guard<std::shared_mutex> new_wlock(new_tablet->get_header_lock());
|
||||
SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
|
||||
int64_t new_max_version = new_tablet->max_version().second;
|
||||
rowsets.clear();
|
||||
if (max_version < new_max_version) {
|
||||
@ -977,6 +980,7 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
|
||||
} else {
|
||||
// set state to ready
|
||||
std::lock_guard<std::shared_mutex> new_wlock(new_tablet->get_header_lock());
|
||||
SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
|
||||
res = new_tablet->set_tablet_state(TabletState::TABLET_RUNNING);
|
||||
if (!res) {
|
||||
break;
|
||||
@ -1053,6 +1057,7 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
|
||||
{
|
||||
// save tablet meta here because rowset meta is not saved during add rowset
|
||||
std::lock_guard<std::shared_mutex> new_wlock(sc_params.new_tablet->get_header_lock());
|
||||
SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
|
||||
sc_params.new_tablet->save_meta();
|
||||
}
|
||||
if (res) {
|
||||
|
||||
@ -127,12 +127,15 @@ class Block;
|
||||
} // namespace vectorized
|
||||
|
||||
using namespace ErrorCode;
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
using std::pair;
|
||||
using std::string;
|
||||
using std::vector;
|
||||
using io::FileSystemSPtr;
|
||||
|
||||
const std::chrono::seconds TRACE_TABLET_LOCK_THRESHOLD = 10s;
|
||||
|
||||
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(flush_bytes, MetricUnit::BYTES);
|
||||
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(flush_finish_count, MetricUnit::OPERATIONS);
|
||||
|
||||
@ -379,7 +382,7 @@ Status Tablet::revise_tablet_meta(const std::vector<RowsetSharedPtr>& to_add,
|
||||
}
|
||||
|
||||
RowsetSharedPtr Tablet::get_rowset(const RowsetId& rowset_id) {
|
||||
std::lock_guard<std::shared_mutex> wrlock(_meta_lock);
|
||||
std::shared_lock rdlock(_meta_lock);
|
||||
for (auto& version_rowset : _rs_version_map) {
|
||||
if (version_rowset.second->rowset_id() == rowset_id) {
|
||||
return version_rowset.second;
|
||||
@ -396,6 +399,7 @@ RowsetSharedPtr Tablet::get_rowset(const RowsetId& rowset_id) {
|
||||
Status Tablet::add_rowset(RowsetSharedPtr rowset) {
|
||||
DCHECK(rowset != nullptr);
|
||||
std::lock_guard<std::shared_mutex> wrlock(_meta_lock);
|
||||
SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
|
||||
// If the rowset already exist, just return directly. The rowset_id is an unique-id,
|
||||
// we can use it to check this situation.
|
||||
if (_contains_rowset(rowset->rowset_id())) {
|
||||
@ -669,6 +673,7 @@ void Tablet::_delete_stale_rowset_by_version(const Version& version) {
|
||||
void Tablet::delete_expired_stale_rowset() {
|
||||
int64_t now = UnixSeconds();
|
||||
std::lock_guard<std::shared_mutex> wrlock(_meta_lock);
|
||||
SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
|
||||
// Compute the end time to delete rowsets, when a expired rowset createtime less then this time, it will be deleted.
|
||||
double expired_stale_sweep_endtime =
|
||||
::difftime(now, config::tablet_rowset_stale_sweep_time_sec);
|
||||
@ -1137,6 +1142,7 @@ void Tablet::_max_continuous_version_from_beginning_unlocked(Version* version, V
|
||||
|
||||
void Tablet::calculate_cumulative_point() {
|
||||
std::lock_guard<std::shared_mutex> wrlock(_meta_lock);
|
||||
SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
|
||||
int64_t ret_cumulative_point;
|
||||
_cumulative_compaction_policy->calculate_cumulative_point(
|
||||
this, _tablet_meta->all_rs_metas(), _cumulative_point, &ret_cumulative_point);
|
||||
@ -1929,6 +1935,7 @@ Status Tablet::_cooldown_data() {
|
||||
erase_pending_remote_rowset(new_rowset_id.to_string());
|
||||
{
|
||||
std::unique_lock meta_rlock(_meta_lock);
|
||||
SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
|
||||
save_meta();
|
||||
}
|
||||
// upload cooldowned rowset meta to remote fs
|
||||
@ -2051,6 +2058,7 @@ Status Tablet::_follow_cooldowned_data() {
|
||||
|
||||
{
|
||||
std::lock_guard wlock(_meta_lock);
|
||||
SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
|
||||
if (tablet_state() != TABLET_RUNNING) {
|
||||
return Status::InternalError("tablet not running");
|
||||
}
|
||||
@ -2099,6 +2107,7 @@ Status Tablet::_follow_cooldowned_data() {
|
||||
}
|
||||
{
|
||||
std::lock_guard rlock(_meta_lock);
|
||||
SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
|
||||
save_meta();
|
||||
}
|
||||
|
||||
|
||||
@ -85,6 +85,8 @@ using TabletSharedPtr = std::shared_ptr<Tablet>;
|
||||
|
||||
enum TabletStorageType { STORAGE_TYPE_LOCAL, STORAGE_TYPE_REMOTE, STORAGE_TYPE_REMOTE_AND_LOCAL };
|
||||
|
||||
extern const std::chrono::seconds TRACE_TABLET_LOCK_THRESHOLD;
|
||||
|
||||
class Tablet : public BaseTablet {
|
||||
public:
|
||||
static TabletSharedPtr create_tablet_from_meta(TabletMetaSharedPtr tablet_meta,
|
||||
|
||||
@ -480,6 +480,7 @@ Status TabletManager::_drop_tablet_unlocked(TTabletId tablet_id, TReplicaId repl
|
||||
if (!keep_files) {
|
||||
// drop tablet will update tablet meta, should lock
|
||||
std::lock_guard<std::shared_mutex> wrlock(to_drop_tablet->get_header_lock());
|
||||
SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
|
||||
LOG(INFO) << "set tablet to shutdown state and remove it from memory. "
|
||||
<< "tablet_id=" << tablet_id << ", tablet_path=" << to_drop_tablet->tablet_path();
|
||||
// NOTE: has to update tablet here, but must not update tablet meta directly.
|
||||
@ -1107,20 +1108,17 @@ void TabletManager::update_root_path_info(std::map<string, DataDirInfo>* path_ma
|
||||
size_t* tablet_count) {
|
||||
DCHECK(tablet_count);
|
||||
*tablet_count = 0;
|
||||
for (const auto& tablets_shard : _tablets_shards) {
|
||||
std::shared_lock rdlock(tablets_shard.lock);
|
||||
for (const auto& item : tablets_shard.tablet_map) {
|
||||
TabletSharedPtr tablet = item.second;
|
||||
++(*tablet_count);
|
||||
auto iter = path_map->find(tablet->data_dir()->path());
|
||||
if (iter == path_map->end()) {
|
||||
continue;
|
||||
}
|
||||
if (iter->second.is_used) {
|
||||
iter->second.local_used_capacity += tablet->tablet_local_size();
|
||||
iter->second.remote_used_capacity += tablet->tablet_remote_size();
|
||||
}
|
||||
}
|
||||
auto filter = [path_map, tablet_count](Tablet* t) -> bool {
|
||||
++(*tablet_count);
|
||||
auto iter = path_map->find(t->data_dir()->path());
|
||||
return iter != path_map->end() && iter->second.is_used;
|
||||
};
|
||||
|
||||
auto tablets = get_all_tablet(filter);
|
||||
for (const auto& tablet : tablets) {
|
||||
auto& data_dir_info = (*path_map)[tablet->data_dir()->path()];
|
||||
data_dir_info.local_used_capacity += tablet->tablet_local_size();
|
||||
data_dir_info.remote_used_capacity += tablet->tablet_remote_size();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -63,6 +63,7 @@
|
||||
#include "util/network_util.h"
|
||||
#include "util/stopwatch.hpp"
|
||||
#include "util/thrift_rpc_helper.h"
|
||||
#include "util/trace.h"
|
||||
|
||||
using std::set;
|
||||
using std::stringstream;
|
||||
@ -582,6 +583,7 @@ Status EngineCloneTask::_finish_clone(Tablet* tablet, const std::string& clone_d
|
||||
std::lock_guard<std::mutex> push_lock(tablet->get_push_lock());
|
||||
std::lock_guard<std::mutex> rwlock(tablet->get_rowset_update_lock());
|
||||
std::lock_guard<std::shared_mutex> wrlock(tablet->get_header_lock());
|
||||
SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
|
||||
if (is_incremental_clone) {
|
||||
status = _finish_incremental_clone(tablet, cloned_tablet_meta, committed_version);
|
||||
} else {
|
||||
|
||||
@ -21,15 +21,18 @@
|
||||
#include <rapidjson/writer.h>
|
||||
#include <stdint.h>
|
||||
|
||||
#include <chrono>
|
||||
#include <iosfwd>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include "common/logging.h"
|
||||
#include "gutil/ref_counted.h"
|
||||
#include "gutil/strings/stringpiece.h"
|
||||
#include "gutil/strings/substitute.h"
|
||||
#include "gutil/threading/thread_collision_warner.h"
|
||||
#include "util/scoped_cleanup.h"
|
||||
#include "util/spinlock.h"
|
||||
#include "util/time.h"
|
||||
#include "util/trace_metrics.h"
|
||||
@ -113,6 +116,32 @@ class Trace;
|
||||
} \
|
||||
}()
|
||||
|
||||
// If this scope times out, make a simple trace.
|
||||
// It will log the cost time only.
|
||||
// Timeout is chrono duration struct, eg: 5ms, 100 * 1s.
|
||||
#define SCOPED_SIMPLE_TRACE_IF_TIMEOUT(timeout) \
|
||||
SCOPED_SIMPLE_TRACE_TO_STREAM_IF_TIMEOUT(timeout, LOG(WARNING))
|
||||
|
||||
// If this scope times out, then put simple trace to the stream.
|
||||
// Timeout is chrono duration struct, eg: 5ms, 100 * 1s.
|
||||
// For example:
|
||||
//
|
||||
// std::string tag = "[foo]";
|
||||
// SCOPED_SIMPLE_TRACE_TO_STREAM_IF_TIMEOUT(5s, LOG(INFO) << tag);
|
||||
//
|
||||
#define SCOPED_SIMPLE_TRACE_TO_STREAM_IF_TIMEOUT(timeout, stream) \
|
||||
using namespace std::chrono_literals; \
|
||||
auto VARNAME_LINENUM(scoped_simple_trace) = doris::MonotonicMicros(); \
|
||||
SCOPED_CLEANUP({ \
|
||||
auto VARNAME_LINENUM(timeout_us) = \
|
||||
std::chrono::duration_cast<std::chrono::microseconds>(timeout).count(); \
|
||||
auto VARNAME_LINENUM(cost_us) = \
|
||||
doris::MonotonicMicros() - VARNAME_LINENUM(scoped_simple_trace); \
|
||||
if (VARNAME_LINENUM(cost_us) >= VARNAME_LINENUM(timeout_us)) { \
|
||||
stream << "Simple trace cost(us): " << VARNAME_LINENUM(cost_us); \
|
||||
} \
|
||||
})
|
||||
|
||||
namespace doris {
|
||||
|
||||
struct TraceEntry;
|
||||
|
||||
@ -1784,7 +1784,7 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
}
|
||||
|
||||
if (!ok || !countDownLatch.getStatus().ok()) {
|
||||
errMsg = "Failed to create partition[" + partitionName + "]. Timeout:" + timeout + " seconds.";
|
||||
errMsg = "Failed to create partition[" + partitionName + "]. Timeout:" + (timeout / 1000) + " seconds.";
|
||||
// clear tasks
|
||||
AgentTaskQueue.removeBatchTask(batchTask, TTaskType.CREATE);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user