[Feature][ThreadPool]Add Web Page to display thread's stats (#4110)

This CL mainly includes:
- add some methods to get thread's stats from Linux's system file in
env.
- support get thread's stats by http method.
- register page handle in BE to show thread's stats to help developer
position some thread relate problem.
This commit is contained in:
WangCong
2020-07-23 21:08:36 +08:00
committed by GitHub
parent 2334f5d997
commit 443b8f100b
13 changed files with 570 additions and 143 deletions

63
be/src/env/env.h vendored
View File

@ -9,8 +9,8 @@
#pragma once
#include <string>
#include <memory>
#include <string>
#include "common/status.h"
#include "util/slice.h"
@ -35,15 +35,15 @@ public:
// CREATE_OR_OPEN | opens | creates
// MUST_CREATE | fails | creates
// MUST_EXIST | opens | fails
enum OpenMode {
CREATE_OR_OPEN_WITH_TRUNCATE,
CREATE_OR_OPEN,
MUST_CREATE,
MUST_EXIST
enum OpenMode {
CREATE_OR_OPEN_WITH_TRUNCATE,
CREATE_OR_OPEN,
MUST_CREATE,
MUST_EXIST
};
Env() { }
virtual ~Env() { }
Env() {}
virtual ~Env() {}
// Return a default environment suitable for the current operating
// system. Sophisticated users may wish to provide their own Env
@ -85,8 +85,7 @@ public:
// Like the previous new_writable_file, but allows options to be
// specified.
virtual Status new_writable_file(const WritableFileOptions& opts,
const std::string& fname,
virtual Status new_writable_file(const WritableFileOptions& opts, const std::string& fname,
std::unique_ptr<WritableFile>* result) = 0;
// Creates a new readable and writable file. If a file with the same name
@ -98,8 +97,7 @@ public:
std::unique_ptr<RandomRWFile>* result) = 0;
// Like the previous new_random_rw_file, but allows options to be specified.
virtual Status new_random_rw_file(const RandomRWFileOptions& opts,
const std::string& fname,
virtual Status new_random_rw_file(const RandomRWFileOptions& opts, const std::string& fname,
std::unique_ptr<RandomRWFile>* result) = 0;
// Returns OK if the path exists.
@ -116,8 +114,7 @@ public:
// NotFound if "dir" does not exist, the calling process does not have
// permission to access "dir", or if "dir" is invalid.
// IOError if an IO Error was encountered
virtual Status get_children(const std::string& dir,
std::vector<std::string>* result) = 0;
virtual Status get_children(const std::string& dir, std::vector<std::string>* result) = 0;
// Iterate the specified directory and call given callback function with child's
// name. This function continues execution until all children have been iterated
@ -168,19 +165,16 @@ public:
virtual Status get_file_size(const std::string& fname, uint64_t* size) = 0;
// Store the last modification time of fname in *file_mtime.
virtual Status get_file_modified_time(const std::string& fname,
uint64_t* file_mtime) = 0;
virtual Status get_file_modified_time(const std::string& fname, uint64_t* file_mtime) = 0;
// Rename file src to target.
virtual Status rename_file(const std::string& src,
const std::string& target) = 0;
virtual Status rename_file(const std::string& src, const std::string& target) = 0;
// create a hard-link
virtual Status link_file(const std::string& /*old_path*/,
const std::string& /*new_path*/) = 0;
virtual Status link_file(const std::string& /*old_path*/, const std::string& /*new_path*/) = 0;
};
struct RandomAccessFileOptions {
RandomAccessFileOptions() { }
RandomAccessFileOptions() {}
};
// Creation-time options for WritableFile
@ -202,8 +196,8 @@ struct RandomRWFileOptions {
// A file abstraction for reading sequentially through a file
class SequentialFile {
public:
SequentialFile() { }
virtual ~SequentialFile() { }
SequentialFile() {}
virtual ~SequentialFile() {}
// Read up to "result.size" bytes from the file.
// Sets "result.data" to the data that was read.
@ -229,8 +223,8 @@ public:
class RandomAccessFile {
public:
RandomAccessFile() { }
virtual ~RandomAccessFile() { }
RandomAccessFile() {}
virtual ~RandomAccessFile() {}
// Read "result.size" bytes from the file starting at "offset".
// Copies the resulting data into "result.data".
@ -271,13 +265,13 @@ public:
// one of Append or PositionedAppend. We support only Append here.
class WritableFile {
public:
enum FlushMode {
FLUSH_SYNC,
FLUSH_ASYNC
enum FlushMode {
FLUSH_SYNC,
FLUSH_ASYNC
};
WritableFile() { }
virtual ~WritableFile() { }
WritableFile() {}
virtual ~WritableFile() {}
// Append data to the end of the file
virtual Status append(const Slice& data) = 0;
@ -325,12 +319,9 @@ private:
// A file abstraction for random reading and writing.
class RandomRWFile {
public:
enum FlushMode {
FLUSH_SYNC,
FLUSH_ASYNC
};
enum FlushMode { FLUSH_SYNC, FLUSH_ASYNC };
RandomRWFile() {}
virtual ~RandomRWFile() { }
virtual ~RandomRWFile() {}
virtual Status read_at(uint64_t offset, const Slice& result) const = 0;
@ -350,4 +341,4 @@ public:
virtual const std::string& filename() const = 0;
};
}
} // namespace doris

View File

@ -18,6 +18,7 @@
#include "env/env_util.h"
#include "env/env.h"
#include "util/faststring.h"
using std::shared_ptr;
using std::string;
@ -30,21 +31,73 @@ Status open_file_for_write(Env* env, const string& path, shared_ptr<WritableFile
return open_file_for_write(WritableFileOptions(), env, path, file);
}
Status open_file_for_write(const WritableFileOptions& opts,
Env *env, const string &path,
shared_ptr<WritableFile> *file) {
Status open_file_for_write(const WritableFileOptions& opts, Env* env, const string& path,
shared_ptr<WritableFile>* file) {
unique_ptr<WritableFile> w;
RETURN_IF_ERROR(env->new_writable_file(opts, path, &w));
file->reset(w.release());
return Status::OK();
}
Status open_file_for_random(Env *env, const string &path, shared_ptr<RandomAccessFile> *file) {
Status open_file_for_random(Env* env, const string& path, shared_ptr<RandomAccessFile>* file) {
unique_ptr<RandomAccessFile> r;
RETURN_IF_ERROR(env->new_random_access_file(path, &r));
file->reset(r.release());
return Status::OK();
}
static Status do_write_string_to_file(Env* env, const Slice& data, const std::string& fname,
bool should_sync) {
unique_ptr<WritableFile> file;
Status s = env->new_writable_file(fname, &file);
if (!s.ok()) {
return s;
}
s = file->append(data);
if (s.ok() && should_sync) {
s = file->sync();
}
if (s.ok()) {
s = file->close();
}
file.reset(); // Will auto-close if we did not close above
if (!s.ok()) {
RETURN_NOT_OK_STATUS_WITH_WARN(env->delete_file(fname),
"Failed to delete partially-written file " + fname);
}
return s;
}
Status write_string_to_file(Env* env, const Slice& data, const std::string& fname) {
return do_write_string_to_file(env, data, fname, false);
}
Status write_string_to_file_sync(Env* env, const Slice& data, const std::string& fname) {
return do_write_string_to_file(env, data, fname, true);
}
Status read_file_to_string(Env* env, const std::string& fname, faststring* data) {
data->clear();
unique_ptr<SequentialFile> file;
Status s = env->new_sequential_file(fname, &file);
if (!s.ok()) {
return s;
}
static const int kBufferSize = 8192;
unique_ptr<uint8_t[]> scratch(new uint8_t[kBufferSize]);
while (true) {
Slice fragment(scratch.get(), kBufferSize);
s = file->read(&fragment);
if (!s.ok()) {
break;
}
data->append(fragment.get_data(), fragment.get_size());
if (fragment.empty()) {
break;
}
}
return s;
}
} // namespace env_util
} // namespace doris

20
be/src/env/env_util.h vendored
View File

@ -20,6 +20,7 @@
#include <string>
#include "common/status.h"
#include "env.h"
namespace doris {
@ -30,14 +31,21 @@ struct WritableFileOptions;
namespace env_util {
Status open_file_for_write(Env *env, const std::string& path, std::shared_ptr<WritableFile> *file);
Status open_file_for_write(Env* env, const std::string& path, std::shared_ptr<WritableFile>* file);
Status open_file_for_write(const WritableFileOptions& opts, Env *env,
const std::string& path, std::shared_ptr<WritableFile> *file);
Status open_file_for_write(const WritableFileOptions& opts, Env* env, const std::string& path,
std::shared_ptr<WritableFile>* file);
Status open_file_for_random(Env *env, const std::string& path,
std::shared_ptr<RandomAccessFile> *file);
Status open_file_for_random(Env* env, const std::string& path,
std::shared_ptr<RandomAccessFile>* file);
// A utility routine: write "data" to the named file.
Status write_string_to_file(Env* env, const Slice& data, const std::string& fname);
// Like above but also fsyncs the new file.
Status write_string_to_file_sync(Env* env, const Slice& data, const std::string& fname);
// A utility routine: read contents of named file into *data
Status read_file_to_string(Env* env, const std::string& fname, faststring* data);
} // namespace env_util
} // namespace doris

View File

@ -18,20 +18,17 @@
#include "http/default_path_handlers.h"
#include <gperftools/malloc_extension.h>
#include <sys/stat.h>
#include <boost/algorithm/string.hpp>
#include <boost/bind.hpp>
#include <fstream>
#include <sstream>
#include "common/configbase.h"
#include "common/logging.h"
#include "http/web_page_handler.h"
#include "runtime/mem_tracker.h"
#include "util/debug_util.h"
#include "util/logging.h"
#include "util/pretty_printer.h"
#include "util/thread.h"
namespace doris {
@ -109,6 +106,7 @@ void add_default_path_handlers(WebPageHandler* web_page_handler, MemTracker* pro
web_page_handler->register_page("/varz", "Configs", config_handler, true /* is_on_nav_bar */);
web_page_handler->register_page("/memz", "Memory",
boost::bind<void>(&mem_usage_handler, process_mem_tracker, _1, _2), true /* is_on_nav_bar */);
register_thread_display_page(web_page_handler);
}
} // namespace doris

View File

@ -17,8 +17,7 @@
#include "http/web_page_handler.h"
#include <boost/bind.hpp>
#include <boost/mem_fn.hpp>
#include <functional>
#include "common/config.h"
#include "env/env.h"
@ -51,7 +50,7 @@ WebPageHandler::WebPageHandler(EvHttpServer* server) : _http_server(server) {
_http_server->register_static_file_handler(this);
TemplatePageHandlerCallback root_callback =
boost::bind<void>(boost::mem_fn(&WebPageHandler::root_handler), this, _1, _2);
std::bind<void>(std::mem_fn(&WebPageHandler::root_handler), this, std::placeholders::_1, std::placeholders::_2);
register_template_page("/", "Home", root_callback, false /* is_on_nav_bar */);
}

View File

@ -70,7 +70,8 @@ set(UTIL_FILES
null_load_error_hub.cpp
time.cpp
os_info.cpp
# coding_util.cpp
os_util.cpp
# coding_util.cpp
cidr.cpp
core_local.cpp
uid_util.cpp

165
be/src/util/os_util.cpp Normal file
View File

@ -0,0 +1,165 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "util/os_util.h"
#include <fcntl.h>
#include <sys/resource.h>
#include <unistd.h>
#include <cstddef>
#include <ostream>
#include <string>
#include <utility>
#include <vector>
#include <glog/logging.h>
#include "env/env_util.h"
#include "gutil/macros.h"
#include "gutil/strings/numbers.h"
#include "gutil/strings/split.h"
#include "gutil/strings/stringpiece.h"
#include "gutil/strings/substitute.h"
#include "gutil/strings/util.h"
#include "util/faststring.h"
using std::string;
using std::vector;
using strings::Split;
using strings::Substitute;
namespace doris {
// Ensure that Impala compiles on earlier kernels. If the target kernel does not support
// _SC_CLK_TCK, sysconf(_SC_CLK_TCK) will return -1.
#ifndef _SC_CLK_TCK
#define _SC_CLK_TCK 2
#endif
static const int64_t kTicksPerSec = sysconf(_SC_CLK_TCK);
// Offsets into the ../stat file array of per-thread statistics.
//
// They are themselves offset by two because the pid and comm fields of the
// file are parsed separately.
static const int64_t kUserTicks = 13 - 2;
static const int64_t kKernelTicks = 14 - 2;
static const int64_t kIoWait = 41 - 2;
// Largest offset we are interested in, to check we get a well formed stat file.
static const int64_t kMaxOffset = kIoWait;
Status parse_stat(const std::string& buffer, std::string* name, ThreadStats* stats) {
DCHECK(stats != nullptr);
// The thread name should be the only field with parentheses. But the name
// itself may contain parentheses.
size_t open_paren = buffer.find('(');
size_t close_paren = buffer.rfind(')');
if (open_paren == string::npos || // '(' must exist
close_paren == string::npos || // ')' must exist
open_paren >= close_paren || // '(' must come before ')'
close_paren + 2 == buffer.size()) { // there must be at least two chars after ')'
return Status::IOError("Unrecognised /proc format");
}
string extracted_name = buffer.substr(open_paren + 1, close_paren - (open_paren + 1));
string rest = buffer.substr(close_paren + 2);
vector<string> splits = Split(rest, " ", strings::SkipEmpty());
if (splits.size() < kMaxOffset) {
return Status::IOError("Unrecognised /proc format");
}
int64_t tmp;
if (safe_strto64(splits[kUserTicks], &tmp)) {
stats->user_ns = tmp * (1e9 / kTicksPerSec);
}
if (safe_strto64(splits[kKernelTicks], &tmp)) {
stats->kernel_ns = tmp * (1e9 / kTicksPerSec);
}
if (safe_strto64(splits[kIoWait], &tmp)) {
stats->iowait_ns = tmp * (1e9 / kTicksPerSec);
}
if (name != nullptr) {
*name = extracted_name;
}
return Status::OK();
}
Status get_thread_stats(int64_t tid, ThreadStats* stats) {
DCHECK(stats != nullptr);
if (kTicksPerSec <= 0) {
return Status::NotSupported("ThreadStats not supported");
}
faststring buf;
RETURN_IF_ERROR(env_util::read_file_to_string(
Env::Default(), Substitute("/proc/self/task/$0/stat", tid), &buf));
return parse_stat(buf.ToString(), nullptr, stats);
}
void disable_core_dumps() {
struct rlimit lim;
PCHECK(getrlimit(RLIMIT_CORE, &lim) == 0);
lim.rlim_cur = 0;
PCHECK(setrlimit(RLIMIT_CORE, &lim) == 0);
// Set coredump_filter to not dump any parts of the address space.
// Although the above disables core dumps to files, if core_pattern
// is set to a pipe rather than a file, it's not sufficient. Setting
// this pattern results in piping a very minimal dump into the core
// processor (eg abrtd), thus speeding up the crash.
int f;
RETRY_ON_EINTR(f, open("/proc/self/coredump_filter", O_WRONLY));
if (f >= 0) {
ssize_t ret;
RETRY_ON_EINTR(ret, write(f, "00000000", 8));
int close_ret;
RETRY_ON_EINTR(close_ret, close(f));
}
}
bool is_being_debugged() {
#ifndef __linux__
return false;
#else
// Look for the TracerPid line in /proc/self/status.
// If this is non-zero, we are being ptraced, which is indicative of gdb or strace
// being attached.
faststring buf;
Status s = env_util::read_file_to_string(Env::Default(), "/proc/self/status", &buf);
if (!s.ok()) {
LOG(WARNING) << "could not read /proc/self/status: " << s.to_string();
return false;
}
StringPiece buf_sp(reinterpret_cast<const char*>(buf.data()), buf.size());
vector<StringPiece> lines = Split(buf_sp, "\n");
for (const auto& l : lines) {
if (!HasPrefixString(l, "TracerPid:")) continue;
std::pair<StringPiece, StringPiece> key_val = Split(l, "\t");
int64_t tracer_pid = -1;
if (!safe_strto64(key_val.second.data(), key_val.second.size(), &tracer_pid)) {
LOG(WARNING) << "Invalid line in /proc/self/status: " << l;
return false;
}
return tracer_pid != 0;
}
LOG(WARNING) << "Could not find TracerPid line in /proc/self/status";
return false;
#endif // __linux__
}
} // namespace doris

68
be/src/util/os_util.h Normal file
View File

@ -0,0 +1,68 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef DORIS_BE_UTIL_OS_UTIL_H
#define DORIS_BE_UTIL_OS_UTIL_H
#include <cstdint>
#include <string>
#include <type_traits>
#include "common/status.h"
#include "env/env.h"
namespace doris {
// Utility methods to read interesting values from /proc.
// TODO: Get stats for parent process.
// Container struct for statistics read from the /proc filesystem for a thread.
struct ThreadStats {
int64_t user_ns;
int64_t kernel_ns;
int64_t iowait_ns;
// Default constructor zeroes all members in case structure can't be filled by
// GetThreadStats.
ThreadStats() : user_ns(0), kernel_ns(0), iowait_ns(0) {}
};
// Populates ThreadStats object using a given buffer. The buffer is expected to
// conform to /proc/<pid>/task/<tid>/stat layout; an error will be returned otherwise.
//
// If 'name' is supplied, the extracted thread name will be written to it.
Status parse_stat(const std::string& buffer, std::string* name, ThreadStats* stats);
// Populates ThreadStats object for a given thread by reading from
// /proc/<pid>/task/<tid>/stat. Returns OK unless the file cannot be read or is in an
// unrecognised format, or if the kernel version is not modern enough.
Status get_thread_stats(int64_t tid, ThreadStats* stats);
// Disable core dumps for this process.
//
// This is useful particularly in tests where we have injected failures and don't
// want to generate a core dump from an "expected" crash.
void disable_core_dumps();
// Return true if this process appears to be running under a debugger or strace.
//
// This may return false on unsupported (non-Linux) platforms.
bool is_being_debugged();
} // namespace doris
#endif

View File

@ -17,23 +17,29 @@
#include "thread.h"
#include <sys/prctl.h>
#include <sys/types.h>
#include <unistd.h>
#include <cstring>
#include <limits>
#include <map>
#include <memory>
#include <string>
#include <sys/prctl.h>
#include <sys/types.h>
#include <functional>
#include "common/logging.h"
#include "gutil/atomicops.h"
#include "gutil/once.h"
#include "gutil/dynamic_annotations.h"
#include "gutil/map-util.h"
#include "gutil/once.h"
#include "gutil/strings/substitute.h"
#include "olap/olap_define.h"
#include "util/easy_json.h"
#include "util/mutex.h"
#include "util/os_util.h"
#include "util/scoped_cleanup.h"
#include "util/url_coding.h"
namespace doris {
@ -55,9 +61,7 @@ static GoogleOnceType once = GOOGLE_ONCE_INIT;
// auditing. Used only by Thread.
class ThreadMgr {
public:
ThreadMgr()
: _threads_started_metric(0),
_threads_running_metric(0) {}
ThreadMgr() : _threads_started_metric(0), _threads_running_metric(0) {}
~ThreadMgr() {
MutexLock lock(&_lock);
@ -74,18 +78,17 @@ public:
// already been removed, this is a no-op.
void remove_thread(const pthread_t& pthread_id, const std::string& category);
private:
void display_thread_callback(const WebPageHandler::ArgumentMap& args, EasyJson* ej) const;
private:
// Container class for any details we want to capture about a thread
// TODO: Add start-time.
// TODO: Track fragment ID.
class ThreadDescriptor {
public:
ThreadDescriptor() { }
ThreadDescriptor() {}
ThreadDescriptor(std::string category, std::string name, int64_t thread_id)
: _name(std::move(name)),
_category(std::move(category)),
_thread_id(thread_id) {}
: _name(std::move(name)), _category(std::move(category)), _thread_id(thread_id) {}
const std::string& name() const { return _name; }
const std::string& category() const { return _category; }
@ -97,6 +100,8 @@ private:
int64_t _thread_id;
};
void summarize_thread_descriptor(const ThreadDescriptor& desc, EasyJson* ej) const;
// A ThreadCategory is a set of threads that are logically related.
// TODO: unordered_map is incompatible with pthread_t, but would be more
// efficient here.
@ -106,7 +111,7 @@ private:
typedef std::map<std::string, ThreadCategory> ThreadCategoryMap;
// Protects _thread_categories and thread metrics.
Mutex _lock;
mutable Mutex _lock;
// All thread categorys that ever contained a thread, even if empty
ThreadCategoryMap _thread_categories;
@ -121,7 +126,7 @@ private:
void ThreadMgr::set_thread_name(const std::string& name, int64_t tid) {
if (tid == getpid()) {
return ;
return;
}
int err = prctl(PR_SET_NAME, name.c_str());
if (err < 0 && errno != EPERM) {
@ -169,6 +174,81 @@ void ThreadMgr::remove_thread(const pthread_t& pthread_id, const std::string& ca
ANNOTATE_IGNORE_READS_AND_WRITES_END();
}
void ThreadMgr::display_thread_callback(const WebPageHandler::ArgumentMap& args, EasyJson* ej) const {
const auto* category_name = FindOrNull(args, "group");
if (category_name) {
bool requested_all = (*category_name == "all");
ej->Set("requested_thread_group", EasyJson::kObject);
(*ej)["group_name"] = escape_for_html_to_string(*category_name);
(*ej)["requested_all"] = requested_all;
// The critical section is as short as possible so as to minimize the delay
// imposed on new threads that acquire the lock in write mode.
vector<ThreadDescriptor> descriptors_to_print;
if (!requested_all) {
MutexLock l(&_lock);
const auto* category = FindOrNull(_thread_categories, *category_name);
if (!category) {
return;
}
for (const auto& elem : *category) {
descriptors_to_print.emplace_back(elem.second);
}
} else {
MutexLock l(&_lock);
for (const auto& category : _thread_categories) {
for (const auto& elem : category.second) {
descriptors_to_print.emplace_back(elem.second);
}
}
}
EasyJson found = (*ej).Set("found", EasyJson::kObject);
EasyJson threads = found.Set("threads", EasyJson::kArray);
for (const auto& desc : descriptors_to_print) {
summarize_thread_descriptor(desc, &threads);
}
} else {
// List all thread groups and the number of threads running in each.
vector<pair<string, uint64_t>> thread_categories_info;
uint64_t running;
{
MutexLock l(&_lock);
running = _threads_running_metric;
thread_categories_info.reserve(_thread_categories.size());
for (const auto& category : _thread_categories) {
thread_categories_info.emplace_back(category.first, category.second.size());
}
(*ej)["total_threads_running"] = running;
EasyJson groups = ej->Set("groups", EasyJson::kArray);
for (const auto& elem : thread_categories_info) {
string category_arg;
url_encode(elem.first, &category_arg);
EasyJson group = groups.PushBack(EasyJson::kObject);
group["encoded_group_name"] = category_arg;
group["group_name"] = elem.first;
group["threads_running"] = elem.second;
}
}
}
}
void ThreadMgr::summarize_thread_descriptor(const ThreadMgr::ThreadDescriptor& desc,
EasyJson* ej) const {
ThreadStats stats;
Status status = get_thread_stats(desc.thread_id(), &stats);
if (!status.ok()) {
LOG(WARNING) << "Could not get per-thread statistics: " << status.to_string();
}
EasyJson thread = ej->PushBack(EasyJson::kObject);
thread["thread_name"] = desc.name();
thread["user_sec"] = static_cast<double>(stats.user_ns) / 1e9;
thread["kernel_sec"] = static_cast<double>(stats.kernel_ns) / 1e9;
thread["iowait_sec"] = static_cast<double>(stats.iowait_ns) / 1e9;
}
Thread::~Thread() {
if (_joinable) {
int ret = pthread_detach(_thread);
@ -201,7 +281,8 @@ const std::string& Thread::category() const {
}
std::string Thread::to_string() const {
return strings::Substitute("Thread $0 (name: \"$1\", category: \"$2\")", tid(), _name, _category);
return strings::Substitute("Thread $0 (name: \"$1\", category: \"$2\")", tid(), _name,
_category);
}
Thread* Thread::current_thread() {
@ -210,7 +291,7 @@ Thread* Thread::current_thread() {
int64_t Thread::unique_thread_id() {
return static_cast<int64_t>(pthread_self());
}
}
int64_t Thread::current_thread_id() {
return syscall(SYS_gettid);
@ -268,7 +349,7 @@ Status Thread::start_thread(const std::string& category, const std::string& name
t->_joinable = true;
cleanup.cancel();
VLOG(3) << "Started thread " << t->tid()<< " - " << category << ":" << name;
VLOG(3) << "Started thread " << t->tid() << " - " << category << ":" << name;
return Status::OK();
}
@ -331,10 +412,10 @@ void Thread::init_threadmgr() {
}
ThreadJoiner::ThreadJoiner(Thread* thr)
: _thread(CHECK_NOTNULL(thr)),
_warn_after_ms(kDefaultWarnAfterMs),
_warn_every_ms(kDefaultWarnEveryMs),
_give_up_after_ms(kDefaultGiveUpAfterMs) {}
: _thread(CHECK_NOTNULL(thr)),
_warn_after_ms(kDefaultWarnAfterMs),
_warn_every_ms(kDefaultWarnEveryMs),
_give_up_after_ms(kDefaultGiveUpAfterMs) {}
ThreadJoiner& ThreadJoiner::warn_after_ms(int ms) {
_warn_after_ms = ms;
@ -352,8 +433,7 @@ ThreadJoiner& ThreadJoiner::give_up_after_ms(int ms) {
}
Status ThreadJoiner::join() {
if (Thread::current_thread() &&
Thread::current_thread()->tid() == _thread->tid()) {
if (Thread::current_thread() && Thread::current_thread()->tid() == _thread->tid()) {
return Status::InvalidArgument("Can't join on own thread", -1, _thread->_name);
}
@ -397,8 +477,15 @@ Status ThreadJoiner::join() {
}
waited_ms += wait_for;
}
return Status::Aborted(strings::Substitute("Timed out after $0ms joining on $1",
waited_ms, _thread->_name));
return Status::Aborted(
strings::Substitute("Timed out after $0ms joining on $1", waited_ms, _thread->_name));
}
void register_thread_display_page(WebPageHandler* web_page_handler) {
web_page_handler->register_template_page(
"/threadz", "Threads",
std::bind(&ThreadMgr::display_thread_callback, thread_manager.get(),
std::placeholders::_1, std::placeholders::_2),
true);
}
} // namespace doris

View File

@ -18,27 +18,25 @@
#ifndef DORIS_BE_SRC_UTIL_THREAD_H
#define DORIS_BE_SRC_UTIL_THREAD_H
#include <atomic>
#include <pthread.h>
#include <syscall.h>
#include <atomic>
#include "common/status.h"
#include "gutil/ref_counted.h"
#include "http/web_page_handler.h"
#include "util/countdown_latch.h"
namespace doris {
class Thread : public RefCountedThreadSafe<Thread> {
public:
enum CreateFlags {
NO_FLAGS = 0,
NO_STACK_WATCHDOG = 1
};
enum CreateFlags { NO_FLAGS = 0, NO_STACK_WATCHDOG = 1 };
template <class F>
static Status create_with_flags(const std::string& category, const std::string& name,
const F& f, uint64_t flags,
scoped_refptr<Thread>* holder) {
const F& f, uint64_t flags, scoped_refptr<Thread>* holder) {
return start_thread(category, name, f, flags, holder);
}
@ -145,17 +143,15 @@ private:
};
// User function to be executed by this thread.
typedef std::function<void ()> ThreadFunctor;
typedef std::function<void()> ThreadFunctor;
Thread(const std::string& category, const std::string& name, ThreadFunctor functor)
: _thread(0),
_tid(INVALID_TID),
_functor(std::move(functor)),
_category(std::move(category)),
_name(std::move(name)),
_done(1),
_joinable(false)
{}
: _thread(0),
_tid(INVALID_TID),
_functor(std::move(functor)),
_category(std::move(category)),
_name(std::move(name)),
_done(1),
_joinable(false) {}
// Library-specific thread ID.
pthread_t _thread;
@ -172,7 +168,7 @@ private:
int64_t _tid;
const ThreadFunctor _functor;
const std::string _category;
const std::string _name;
@ -188,7 +184,7 @@ private:
// Thread local pointer to the current thread of execution. Will be NULL if the current
// thread is not a Thread.
static __thread Thread* _tls;
// Wait for the running thread to publish its tid.
int64_t wait_for_tid() const;
@ -280,6 +276,8 @@ private:
DISALLOW_COPY_AND_ASSIGN(ThreadJoiner);
};
// Registers /threadz with the debug webserver.
void register_thread_display_page(WebPageHandler* web_page_handler);
} //namespace doris

View File

@ -78,7 +78,7 @@ bool url_decode(const std::string& in, std::string* out) {
} else {
return false;
}
} else if (in[i] == '+') {
} else if (in[i] == '+') {
(*out) += ' ';
} else {
(*out) += in[i];
@ -122,7 +122,7 @@ static void encode_base64_internal(const std::string& in, std::string* out,
out->assign((char*)buf.get(), d - buf.get());
}
void base64url_encode(const std::string& in, std::string *out) {
void base64url_encode(const std::string& in, std::string* out) {
static unsigned char basis64[] =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_";
encode_base64_internal(in, out, basis64, false);
@ -130,51 +130,39 @@ void base64url_encode(const std::string& in, std::string *out) {
void base64_encode(const std::string& in, std::string* out) {
static unsigned char basis64[] =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
encode_base64_internal(in, out, basis64, true);
}
static char encoding_table[] = {
'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H',
'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P',
'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X',
'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f',
'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n',
'o', 'p', 'q', 'r', 's', 't', 'u', 'v',
'w', 'x', 'y', 'z', '0', '1', '2', '3',
'4', '5', '6', '7', '8', '9', '+', '/'
};
static char encoding_table[] = {'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M',
'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z',
'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm',
'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '+', '/'};
static const char base64_pad = '=';
static short decoding_table[256] = {
-2, -2, -2, -2, -2, -2, -2, -2, -2, -1, -1, -2, -2, -1, -2, -2,
-2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2,
-1, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, 62, -2, -2, -2, 63,
52, 53, 54, 55, 56, 57, 58, 59, 60, 61, -2, -2, -2, -2, -2, -2,
-2, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14,
15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, -2, -2, -2, -2, -2,
-2, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40,
41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, -2, -2, -2, -2, -2,
-2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2,
-2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2,
-2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2,
-2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2,
-2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2,
-2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2,
-2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2,
-2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2
};
-2, -2, -2, -2, -2, -2, -2, -2, -2, -1, -1, -2, -2, -1, -2, -2, -2, -2, -2, -2, -2, -2,
-2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -1, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, 62,
-2, -2, -2, 63, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, -2, -2, -2, -2, -2, -2, -2, 0,
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22,
23, 24, 25, -2, -2, -2, -2, -2, -2, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38,
39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, -2, -2, -2, -2, -2, -2, -2, -2, -2,
-2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2,
-2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2,
-2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2,
-2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2,
-2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2,
-2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2};
static int mod_table[] = {0, 2, 1};
size_t base64_encode(const unsigned char *data,
size_t length,
unsigned char *encoded_data) {
size_t output_length = (size_t) (4.0 * ceil((double) length / 3.0));
size_t base64_encode(const unsigned char* data, size_t length, unsigned char* encoded_data) {
size_t output_length = (size_t)(4.0 * ceil((double)length / 3.0));
if (encoded_data == NULL) {
return 0;
return 0;
}
for (uint32_t i = 0, j = 0; i < length;) {
@ -196,11 +184,8 @@ size_t base64_encode(const unsigned char *data,
return output_length;
}
static inline int64_t base64_decode(
const char *data,
size_t length,
char *decoded_data) {
const char *current = data;
static inline int64_t base64_decode(const char* data, size_t length, char* decoded_data) {
const char* current = data;
int ch = 0;
int i = 0;
int j = 0;
@ -232,7 +217,7 @@ static inline int64_t base64_decode(
decoded_data[j] = (ch & 0x0f) << 4;
break;
case 2:
decoded_data[j++] |= ch >>2;
decoded_data[j++] |= ch >> 2;
decoded_data[j] = (ch & 0x03) << 6;
break;
case 3:
@ -279,7 +264,7 @@ bool base64_decode(const std::string& in, std::string* out) {
}
void escape_for_html(const std::string& in, std::stringstream* out) {
for (auto& c: in) {
for (auto& c : in) {
switch (c) {
case '<':
(*out) << "&lt;";
@ -298,5 +283,9 @@ void escape_for_html(const std::string& in, std::stringstream* out) {
}
}
}
std::string escape_for_html_to_string(const std::string& in) {
std::stringstream str;
escape_for_html(in, &str);
return str.str();
}
}

View File

@ -18,9 +18,9 @@
#ifndef DORIS_BE_SRC_COMMON_UTIL_URL_CODING_H
#define DORIS_BE_SRC_COMMON_UTIL_URL_CODING_H
#include <boost/cstdint.hpp>
#include <string>
#include <vector>
#include <boost/cstdint.hpp>
namespace doris {
@ -39,8 +39,8 @@ void url_encode(const std::vector<uint8_t>& in, std::string* out);
// certain characters like ' '.
bool url_decode(const std::string& in, std::string* out);
void base64url_encode(const std::string& in, std::string *out);
void base64_encode(const std::string& in, std::string *out);
void base64url_encode(const std::string& in, std::string* out);
void base64_encode(const std::string& in, std::string* out);
// Utility method to decode base64 encoded strings. Also not extremely
// performant.
@ -54,6 +54,8 @@ bool base64_decode(const std::string& in, std::string* out);
// judiciously.
void escape_for_html(const std::string& in, std::stringstream* out);
// Same as above, but returns a string.
std::string escape_for_html_to_string(const std::string& in);
}
#endif

View File

@ -0,0 +1,68 @@
{{!
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
}}
{{#requested_thread_group}}
<h2>Thread Group: {{group_name}}</h2>
{{#requested_all}}<h3>All Threads : </h3>{{/requested_all}}
{{#found}}
<table class='table table-hover' data-sort-name='name' data-toggle='table'>
<thead>
<tr>
<th data-field='name' data-sortable='true' data-sorter='stringsSorter'>Thread name</th>
<th data-sortable='true' data-sorter='floatsSorter'>Cumulative User CPU (s)</th>
<th data-sortable='true' data-sorter='floatsSorter'>Cumulative Kernel CPU (s)</th>
<th data-sortable='true' data-sorter='floatsSorter'>Cumulative IO-wait (s)</th>
</tr>
</thead>
<tbody>
{{#threads}}
<tr>
<td>{{thread_name}}</td>
<td>{{user_sec}}</td>
<td>{{kernel_sec}}</td>
<td>{{iowait_sec}}</td>
</tr>
{{/threads}}
</tbody>
</table>
{{/found}}
{{^found}}Thread group {{group_name}} not found{{/found}}
{{/requested_thread_group}}
{{^requested_thread_group}}
<h2>Thread Groups</h2>
<h4>{{total_threads_running}} thread(s) running</h4>
<a href='{{base_url}}/threadz?group=all'><h3>All Threads</h3></a>
<table class='table table-hover' data-sort-name='group' data-toggle='table'>
<thead>
<tr>
<th data-field='group' data-sortable='true' data-sorter='stringsSorter'>Group</th>
<th data-sortable='true' data-sorter='numericStringsSorter'>Threads running</th>
</tr>
</thead>
<tbody>
{{#groups}}
<tr>
<td><a href='{{base_url}}/threadz?group={{encoded_group_name}}'>{{group_name}}</a></td>
<td>{{threads_running}}</td>
</tr>
{{/groups}}
</tbody>
</table>
{{/requested_thread_group}}