[4.2.3 patch 4.3.1] [FEAT MERGE] support syslog compression

This commit is contained in:
dimstars
2024-04-15 13:06:14 +00:00
committed by ob-robot
parent b1f89ef7db
commit 027d9cf8fe
15 changed files with 1385 additions and 13 deletions

View File

@ -261,6 +261,7 @@ ob_set_subtarget(oblib_lib oblog
oblog/ob_base_log_writer.cpp oblog/ob_base_log_writer.cpp
oblog/ob_easy_log.cpp oblog/ob_easy_log.cpp
oblog/ob_log.cpp oblog/ob_log.cpp
oblog/ob_log_compressor.cpp
oblog/ob_log_module.ipp oblog/ob_log_module.ipp
oblog/ob_log_time_fmt.cpp oblog/ob_log_time_fmt.cpp
oblog/ob_trace_log.cpp oblog/ob_trace_log.cpp

View File

@ -77,6 +77,12 @@ const char *const perf_compress_funcs[] =
"zstd_1.3.8", "zstd_1.3.8",
}; };
const char *const syslog_compress_funcs[] =
{
"none",
"zstd_1.0",
"zstd_1.3.8",
};
} /* namespace common */ } /* namespace common */
} /* namespace oceanbase */ } /* namespace oceanbase */

View File

@ -18,6 +18,7 @@
#include <libgen.h> #include <libgen.h>
#include <sys/prctl.h> #include <sys/prctl.h>
#include <linux/prctl.h> #include <linux/prctl.h>
#include <regex.h>
#include "lib/oblog/ob_warning_buffer.h" #include "lib/oblog/ob_warning_buffer.h"
#include "lib/ob_errno.h" #include "lib/ob_errno.h"
#include "lib/profile/ob_trace_id.h" #include "lib/profile/ob_trace_id.h"
@ -31,6 +32,7 @@
#include "lib/allocator/ob_vslice_alloc.h" #include "lib/allocator/ob_vslice_alloc.h"
#include "lib/allocator/ob_fifo_allocator.h" #include "lib/allocator/ob_fifo_allocator.h"
#include "common/ob_smart_var.h" #include "common/ob_smart_var.h"
#include "lib/oblog/ob_log_compressor.h"
using namespace oceanbase::lib; using namespace oceanbase::lib;
@ -478,8 +480,8 @@ ObLogger::ObLogger()
enable_wf_flag_(false), rec_old_file_flag_(false), can_print_(true), enable_wf_flag_(false), rec_old_file_flag_(false), can_print_(true),
enable_async_log_(true), use_multi_flush_(false), stop_append_log_(false), enable_perf_mode_(false), enable_async_log_(true), use_multi_flush_(false), stop_append_log_(false), enable_perf_mode_(false),
last_async_flush_count_per_sec_(0), log_mem_limiter_(nullptr), last_async_flush_count_per_sec_(0), log_mem_limiter_(nullptr),
allocator_(nullptr), error_allocator_(nullptr), enable_log_limit_(true), is_arb_replica_(false), allocator_(nullptr), error_allocator_(nullptr), log_compressor_(nullptr), enable_log_limit_(true),
new_file_info_(nullptr), info_as_wdiag_(true) is_arb_replica_(false), new_file_info_(nullptr), info_as_wdiag_(true)
{ {
id_level_map_.set_level(OB_LOG_LEVEL_DBA_ERROR); id_level_map_.set_level(OB_LOG_LEVEL_DBA_ERROR);
@ -795,7 +797,7 @@ void ObLogger::rotate_log(const char *filename,
if (file_list.size() >= max_file_index_) { if (file_list.size() >= max_file_index_) {
std::string oldFile = file_list.front(); std::string oldFile = file_list.front();
file_list.pop_front(); file_list.pop_front();
unlink(oldFile.c_str()); unlink_if_need(oldFile.c_str());
} }
file_list.push_back(old_log_file); file_list.push_back(old_log_file);
(void)pthread_mutex_unlock(&file_index_mutex_); (void)pthread_mutex_unlock(&file_index_mutex_);
@ -848,6 +850,10 @@ void ObLogger::rotate_log(const char *filename,
} }
} }
} }
// awake log compressor when creating new log files
if (OB_NOT_NULL(log_compressor_)) {
log_compressor_->awake();
}
} }
} }
UNUSED(ret); UNUSED(ret);
@ -935,6 +941,17 @@ int ObLogger::set_record_old_log_file(bool rec_old_file_flag)
return ret; return ret;
} }
int ObLogger::set_log_compressor(ObLogCompressor *log_compressor)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(log_compressor)) {
ret = OB_INVALID_ARGUMENT;
} else {
log_compressor_ = log_compressor;
}
return ret;
}
//@brief string copy with dst's length and src's length checking and src trim. //@brief string copy with dst's length and src's length checking and src trim.
int64_t str_copy_trim(char *dst, int64_t str_copy_trim(char *dst,
const int64_t dst_length, const int64_t dst_length,
@ -1259,6 +1276,7 @@ int ObLogger::get_log_files_in_dir(const char *filename, void *files, void *wf_f
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
char *dirc = NULL; char *dirc = NULL;
char *basec = NULL; char *basec = NULL;
regex_t uncompressed_regex;
if (OB_ISNULL(files) || OB_ISNULL(wf_files)) { if (OB_ISNULL(files) || OB_ISNULL(wf_files)) {
ret = OB_INVALID_ARGUMENT; ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "Input should not be NULL", K(files), K(wf_files), K(ret)); OB_LOG(WARN, "Input should not be NULL", K(files), K(wf_files), K(ret));
@ -1271,6 +1289,8 @@ int ObLogger::get_log_files_in_dir(const char *filename, void *files, void *wf_f
} else if (NULL == (basec = strdup(filename))) { } else if (NULL == (basec = strdup(filename))) {
ret = OB_ALLOCATE_MEMORY_FAILED; ret = OB_ALLOCATE_MEMORY_FAILED;
OB_LOG(ERROR, "strdup filename error", K(ret)); OB_LOG(ERROR, "strdup filename error", K(ret));
} else if (OB_FAIL(regcomp(&uncompressed_regex, OB_UNCOMPRESSED_SYSLOG_FILE_PATTERN, REG_EXTENDED))) {
OB_LOG(ERROR, "failed to compile regex pattern", K(ret));
} else { } else {
ObIArray<FileName> *files_arr = static_cast<ObIArray<FileName> *>(files); ObIArray<FileName> *files_arr = static_cast<ObIArray<FileName> *>(files);
ObIArray<FileName> *wf_files_arr = static_cast<ObIArray<FileName> *>(wf_files); ObIArray<FileName> *wf_files_arr = static_cast<ObIArray<FileName> *>(wf_files);
@ -1296,6 +1316,10 @@ int ObLogger::get_log_files_in_dir(const char *filename, void *files, void *wf_f
FileName tmp_file; FileName tmp_file;
struct dirent *dir_entry = NULL;//dir_entry is from dir_pointer stream, need not to be freed. struct dirent *dir_entry = NULL;//dir_entry is from dir_pointer stream, need not to be freed.
int64_t print_len = 0; int64_t print_len = 0;
bool enable_delete_compressed_file = true;
if (OB_NOT_NULL(log_compressor_) && log_compressor_->is_enable_compress()) {
enable_delete_compressed_file = false;
}
while (OB_SUCC(ret) && (dir_entry = readdir(dir_pointer)) != NULL) { while (OB_SUCC(ret) && (dir_entry = readdir(dir_pointer)) != NULL) {
if (DT_DIR != dir_entry->d_type) { if (DT_DIR != dir_entry->d_type) {
if (prefix_match(wf_file_prefix, dir_entry->d_name)) { if (prefix_match(wf_file_prefix, dir_entry->d_name)) {
@ -1307,7 +1331,8 @@ int ObLogger::get_log_files_in_dir(const char *filename, void *files, void *wf_f
} else { }//do nothing } else { }//do nothing
} else if (prefix_match(wf_file, dir_entry->d_name)) { } else if (prefix_match(wf_file, dir_entry->d_name)) {
//.wf file, do nothing. //.wf file, do nothing.
} else if (prefix_match(file_prefix, dir_entry->d_name)) { } else if (prefix_match(file_prefix, dir_entry->d_name)
&& (enable_delete_compressed_file || regexec(&uncompressed_regex, dir_entry->d_name, 0, NULL, 0) == 0)) {
print_len = snprintf(tmp_file.file_name_, ObPLogFileStruct::MAX_LOG_FILE_NAME_SIZE, "%s/%s", dir_name, dir_entry->d_name); print_len = snprintf(tmp_file.file_name_, ObPLogFileStruct::MAX_LOG_FILE_NAME_SIZE, "%s/%s", dir_name, dir_entry->d_name);
if (OB_UNLIKELY(print_len <0) || OB_UNLIKELY(print_len >= ObPLogFileStruct::MAX_LOG_FILE_NAME_SIZE)) { if (OB_UNLIKELY(print_len <0) || OB_UNLIKELY(print_len >= ObPLogFileStruct::MAX_LOG_FILE_NAME_SIZE)) {
//do nothing //do nothing
@ -1322,6 +1347,7 @@ int ObLogger::get_log_files_in_dir(const char *filename, void *files, void *wf_f
OB_LOG(WARN, "Close dir error", K(ret)); OB_LOG(WARN, "Close dir error", K(ret));
} }
} }
regfree(&uncompressed_regex);
} }
if (NULL != dirc) { if (NULL != dirc) {
free(dirc); free(dirc);
@ -1382,7 +1408,7 @@ int ObLogger::add_files_to_list(void *files,
if (file_list.size() >= max_file_index_) { if (file_list.size() >= max_file_index_) {
oldFile = file_list.front(); oldFile = file_list.front();
file_list.pop_front(); file_list.pop_front();
unlink(oldFile.c_str()); unlink_if_need(oldFile.c_str());
} }
file_list.push_back(files_arr->at(i).file_name_); file_list.push_back(files_arr->at(i).file_name_);
} }
@ -1882,6 +1908,13 @@ int ObLogger::log_new_file_info(const ObPLogFileStruct &log_file)
return ret; return ret;
} }
void ObLogger::unlink_if_need(const char *file)
{
if (OB_ISNULL(log_compressor_) || !log_compressor_->is_enable_compress()) {
unlink(file);
}
}
void ObLogger::issue_dba_error(const int errcode, const char *file, const int line, const char *info_str) void ObLogger::issue_dba_error(const int errcode, const char *file, const int line, const char *info_str)
{ {
const char *base_file_name = strrchr(file, '/'); const char *base_file_name = strrchr(file, '/');

View File

@ -62,6 +62,7 @@ class ObVSliceAlloc;
class ObBlockAllocMgr; class ObBlockAllocMgr;
class ObFIFOAllocator; class ObFIFOAllocator;
class ObPLogItem; class ObPLogItem;
class ObLogCompressor;
extern void allow_next_syslog(int64_t count = 1); extern void allow_next_syslog(int64_t count = 1);
extern int logdata_vprintf(char *buf, const int64_t buf_len, int64_t &pos, const char *fmt, va_list args); extern int logdata_vprintf(char *buf, const int64_t buf_len, int64_t &pos, const char *fmt, va_list args);
@ -556,6 +557,8 @@ public:
//@brief Set whether record old log file. If this flag and max_file_index set, //@brief Set whether record old log file. If this flag and max_file_index set,
//will record log files in the directory for log file //will record log files in the directory for log file
int set_record_old_log_file(bool rec_old_file_flag = false); int set_record_old_log_file(bool rec_old_file_flag = false);
int set_log_compressor(ObLogCompressor *log_compressor);
int64_t get_max_file_index() const { return max_file_index_; }
//@brief Get the process-only ObLogger. //@brief Get the process-only ObLogger.
static ObLogger &get_logger(); static ObLogger &get_logger();
@ -747,6 +750,7 @@ private:
int log_new_file_info(const ObPLogFileStruct &log_file); int log_new_file_info(const ObPLogFileStruct &log_file);
void drop_log_items(ObIBaseLogItem **items, const int64_t item_cnt) override; void drop_log_items(ObIBaseLogItem **items, const int64_t item_cnt) override;
void unlink_if_need(const char *file);
private: private:
static const char *const errstr_[]; static const char *const errstr_[];
// default log rate limiter if there's no tl_log_limiger // default log rate limiter if there's no tl_log_limiger
@ -806,6 +810,7 @@ private:
ObBlockAllocMgr* log_mem_limiter_; ObBlockAllocMgr* log_mem_limiter_;
ObVSliceAlloc* allocator_; ObVSliceAlloc* allocator_;
ObFIFOAllocator* error_allocator_; ObFIFOAllocator* error_allocator_;
ObLogCompressor* log_compressor_;
// juse use it for test promise log print // juse use it for test promise log print
bool enable_log_limit_; bool enable_log_limit_;
RLOCAL_STATIC(ByteBuf<LOCAL_BUF_SIZE>, local_buf_); RLOCAL_STATIC(ByteBuf<LOCAL_BUF_SIZE>, local_buf_);

View File

@ -0,0 +1,543 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX LIB
#include <dirent.h>
#include <regex.h>
#include <utime.h>
#include <sys/statfs.h>
#include "lib/oblog/ob_log_compressor.h"
#include "lib/thread/thread_mgr.h"
#include "lib/compress/ob_compressor_pool.h"
using namespace oceanbase::lib;
namespace oceanbase {
namespace common {
ObLogCompressor::ObLogCompressor() :
is_inited_(false), stopped_(true), loop_interval_(OB_SYSLOG_COMPRESS_LOOP_INTERVAL),
max_disk_size_(0), min_uncompressed_count_(0), compress_func_(NONE_COMPRESSOR),
compressor_(NULL), next_compressor_(NULL), oldest_files_(cmp_, NULL)
{}
ObLogCompressor::~ObLogCompressor()
{
if (is_inited_) {
destroy();
}
}
int ObLogCompressor::init()
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
LOG_ERROR("the ObLogCompressor has been inited", K(ret));
} else if (OB_FAIL(log_compress_cond_.init(ObWaitEventIds::DEFAULT_COND_WAIT))) {
ret = OB_ERR_SYS;
LOG_ERROR("failed to init ObThreadCond", K(ret));
} else {
strncpy(syslog_dir_, OB_SYSLOG_DIR, strlen(OB_SYSLOG_DIR));
stopped_ = false;
if (OB_FAIL(TG_SET_RUNNABLE_AND_START(TGDefIDs::SYSLOG_COMPRESS, *this))) {
LOG_ERROR("failed to start log compression thread", K(ret));
} else {
is_inited_ = true;
}
if (OB_FAIL(ret)) {
LOG_ERROR("syslog compressor init failed ", K(ret));
stopped_ = true;
log_compress_cond_.destroy();
}
}
LOG_INFO("syslog compressor init finish ", K(ret));
return ret;
}
void ObLogCompressor::stop()
{
if (is_inited_) {
stopped_ = true;
TG_STOP(TGDefIDs::SYSLOG_COMPRESS);
}
}
void ObLogCompressor::wait()
{
if (is_inited_) {
TG_WAIT(TGDefIDs::SYSLOG_COMPRESS);
}
}
void ObLogCompressor::awake()
{
ObThreadCondGuard guard(log_compress_cond_);
log_compress_cond_.signal();
}
void ObLogCompressor::destroy()
{
if (is_inited_) {
{
ObThreadCondGuard guard(log_compress_cond_);
stopped_ = true;
log_compress_cond_.signal();
}
stop();
wait();
log_compress_cond_.destroy();
max_disk_size_ = 0;
compress_func_ = NONE_COMPRESSOR;
min_uncompressed_count_ = 0;
compressor_ = NULL;
next_compressor_ = NULL;
is_inited_ = false;
LOG_INFO("syslog compressor destroyed");
}
}
int ObLogCompressor::set_max_disk_size(int64_t max_disk_size)
{
int ret = OB_SUCCESS;
if (max_disk_size < 0) {
ret = OB_INVALID_ARGUMENT;
} else {
if (max_disk_size > 0 && max_disk_size < OB_MIN_SYSLOG_DISK_SIZE) {
max_disk_size = OB_MIN_SYSLOG_DISK_SIZE;
}
if ((max_disk_size_ == 0 && max_disk_size > 0)
|| (max_disk_size_ > 0 && max_disk_size_ > max_disk_size)) {
max_disk_size_ = max_disk_size;
awake();
} else {
max_disk_size_ = max_disk_size;
}
}
return ret;
}
int ObLogCompressor::set_compress_func(const char *compress_func_ptr)
{
int ret = OB_SUCCESS;
ObCompressorType new_compress_func;
if (OB_ISNULL(compress_func_ptr)) {
ret = OB_INVALID_ARGUMENT;
} else if (OB_FAIL(ObCompressorPool::get_instance().get_compressor_type(compress_func_ptr, new_compress_func))) {
// do nothing
} else if (new_compress_func == NONE_COMPRESSOR
|| new_compress_func == ZSTD_COMPRESSOR
|| new_compress_func == ZSTD_1_3_8_COMPRESSOR) {
// to do: support ZLIB_COMPRESSOR
if (new_compress_func != compress_func_) {
LOG_INFO("modify log compress func", K(compress_func_), K(new_compress_func));
if (OB_FAIL(set_next_compressor_(new_compress_func))) {
LOG_ERROR("fail to modify log compress func", K(ret), K(compress_func_), K(new_compress_func));
} else if (compress_func_ == NONE_COMPRESSOR && new_compress_func != NONE_COMPRESSOR) {
compress_func_ = new_compress_func;
// from disable to enanble, awake compress thread
awake();
} else {
compress_func_ = new_compress_func;
}
}
} else {
ret = OB_INVALID_ARGUMENT;
}
return ret;
}
int ObLogCompressor::set_min_uncompressed_count(int64_t min_uncompressed_count)
{
int ret = OB_SUCCESS;
if (min_uncompressed_count < 0) {
ret = OB_INVALID_ARGUMENT;
} else {
min_uncompressed_count_ = min_uncompressed_count;
}
return ret;
}
bool ObLogCompressor::is_compressed_file(const char *file)
{
int ret = OB_SUCCESS;
bool is_compressed = false;
regex_t regex;
if (OB_ISNULL(file)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argumet for file is null", K(ret));
} else if (OB_FAIL(regcomp(&regex, OB_COMPRESSED_SYSLOG_FILE_PATTERN, REG_EXTENDED))) {
LOG_ERROR("failed to compile regex pattern", K(ret));
} else {
if (regexec(&regex, file, 0, NULL, 0) == 0) {
is_compressed = true;
}
regfree(&regex);
}
return is_compressed;
}
void ObLogCompressor::run1()
{
lib::set_thread_name("SyslogCompress");
LOG_INFO("syslog compress thread start");
log_compress_loop_();
LOG_INFO("syslog compress thread finish");
}
void ObLogCompressor::log_compress_loop_()
{
int ret = OB_SUCCESS;
int log_type = OB_SYSLOG_COMPRESS_TYPE_COUNT;
const int src_size = OB_SYSLOG_COMPRESS_BLOCK_SIZE;
const int dest_size = OB_SYSLOG_COMPRESS_BUFFER_SIZE;
char *src_buf = (char *)ob_malloc(src_size + dest_size, "SyslogCompress");
char *dest_buf = src_buf + src_size;
regex_t regex_archive;
regex_t regex_uncompressed;
if (OB_ISNULL(src_buf)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_ERROR("failed to ob_malloc", K(ret));
} else if (OB_FAIL(regcomp(&regex_archive, OB_ARCHIVED_SYSLOG_FILE_PATTERN, REG_EXTENDED))) {
ret = OB_ERR_SYS;
LOG_ERROR("failed to compile archive regex pattern", K(ret));
} else if (OB_FAIL(regcomp(&regex_uncompressed, OB_UNCOMPRESSED_SYSLOG_FILE_PATTERN, REG_EXTENDED))) {
regfree(&regex_archive);
ret = OB_ERR_SYS;
LOG_ERROR("failed to compile uncompressed regex pattern", K(ret));
} else {
ObSyslogFile syslog_file;
char compress_files[OB_SYSLOG_COMPRESS_TYPE_COUNT][OB_MAX_SYSLOG_FILE_NAME_SIZE] = {{0}};
int64_t log_file_count[OB_SYSLOG_COMPRESS_TYPE_COUNT] = {0};
int64_t log_min_time[OB_SYSLOG_COMPRESS_TYPE_COUNT] = {0};
int64_t compressed_file_count = 0;
int64_t deleted_file_count = 0;
while (!stopped_) {
// wait until stoped or needing to work
{
common::ObThreadCondGuard guard(log_compress_cond_);
while (!stopped_ && !is_enable_compress() && max_disk_size_ <= 0 && OB_LOGGER.get_max_file_index() <= 0) {
log_compress_cond_.wait_us(loop_interval_);
}
}
if (!stopped_) {
// record start time
int64_t start_time = ObClockGenerator::getClock();
bool enable_delete_file = max_disk_size_ > 0 || OB_LOGGER.get_max_file_index() > 0;
// check whether need to compress or delete file
int64_t total_size = 0;
struct dirent* entry;
struct stat stat_info;
DIR* dir = NULL;
compressed_file_count = 0;
deleted_file_count = 0;
for (int i = 0; i < OB_SYSLOG_COMPRESS_TYPE_COUNT; i++) {
log_file_count[i] = 0;
log_min_time[i] = INT64_MAX;
}
oldest_files_.reset();
if (OB_ISNULL(dir = opendir(syslog_dir_))) {
ret = OB_ERR_SYS;
LOG_ERROR("failed to open syslog directory", K(ret), K(errno), K(syslog_dir_));
} else {
while (OB_NOT_NULL(entry = readdir(dir))) {
if (strncmp(entry->d_name, ".", 1) == 0 || strncmp(entry->d_name, "..", 2) == 0) {
continue;
}
snprintf(syslog_file.file_name_, OB_MAX_SYSLOG_FILE_NAME_SIZE, "%s/%s", syslog_dir_, entry->d_name);
if (stat(syslog_file.file_name_, &stat_info) == -1) {
ret = OB_ERR_SYS;
LOG_WARN("failed to get file info", K(ret), K(errno), K(syslog_file.file_name_));
continue;
}
if (S_ISREG(stat_info.st_mode)) {
total_size += stat_info.st_size;
int64_t tmp_time = stat_info.st_mtim.tv_sec * 1000000000L + stat_info.st_mtim.tv_nsec;
syslog_file.mtime_ = tmp_time;
if (enable_delete_file
&& regexec(&regex_archive, entry->d_name, 0, NULL, 0) == 0
&& OB_FAIL(oldest_files_.push(syslog_file))) {
LOG_ERROR("failed to put file into array", K(ret), K(syslog_file.file_name_), K(tmp_time));
}
if (regexec(&regex_uncompressed, entry->d_name, 0, NULL, 0) == 0) {
int log_type = get_log_type_(syslog_file.file_name_);
if (log_type >= 0 && log_type < OB_SYSLOG_COMPRESS_TYPE_COUNT) {
log_file_count[log_type]++;
if (tmp_time < log_min_time[log_type]) {
strncpy(compress_files[log_type], syslog_file.file_name_, OB_MAX_SYSLOG_FILE_NAME_SIZE);
log_min_time[log_type] = tmp_time;
}
}
}
}
}
}
if (OB_NOT_NULL(dir)) {
closedir(dir);
}
// get disk remaining size
int64_t disk_remaining_size = get_disk_remaining_size_();
disk_remaining_size = disk_remaining_size >=0 ? disk_remaining_size : INT64_MAX;
if (max_disk_size_ > 0 && max_disk_size_ - total_size < disk_remaining_size) {
disk_remaining_size = max_disk_size_ - total_size;
}
// compress syslog file if necessary
if (!stopped_ && is_enable_compress() && disk_remaining_size < OB_SYSLOG_COMPRESS_RESERVE_SIZE) {
if (compressor_ != next_compressor_) {
compressor_ = next_compressor_;
}
if (OB_ISNULL(compressor_)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("unexpected error, compressor is null", K(ret));
} else {
for (int i = 0; i < OB_SYSLOG_COMPRESS_TYPE_COUNT && is_enable_compress(); i++) {
if (log_file_count[i] > min_uncompressed_count_) {
int64_t file_size = get_file_size_(compress_files[i]);
if (OB_FAIL(compress_single_file_(compress_files[i], src_buf, dest_buf))) {
LOG_ERROR("failed to compress file", K(ret), K(compress_files[i]));
} else {
// estimated value
total_size -= file_size;
compressed_file_count++;
}
log_file_count[i]--;
}
}
}
}
// delete oldest syslog file if necessary
enable_delete_file = enable_delete_file && (max_disk_size_ > 0 || OB_LOGGER.get_max_file_index() > 0);
if (!stopped_ && enable_delete_file && disk_remaining_size < OB_SYSLOG_DELETE_RESERVE_SIZE) {
int array_size = oldest_files_.count();
const char *delete_file = NULL;
const ObSyslogFile *syslog_ptr = NULL;
int64_t need_delete_size = OB_SYSLOG_DELETE_RESERVE_SIZE - disk_remaining_size;
for (int i = 0; i < array_size && need_delete_size > 0 && OB_SUCC(ret); i++) {
if (OB_FAIL(oldest_files_.top(syslog_ptr))) {
break;
} else if (OB_NOT_NULL(syslog_ptr)) {
delete_file = syslog_ptr->file_name_;
int64_t delete_file_size = get_file_size_(delete_file);
if (delete_file_size >= 0) {
LOG_DEBUG("log compressor unlink file", K(delete_file), K(need_delete_size), K(delete_file_size));
unlink(delete_file);
need_delete_size = need_delete_size - delete_file_size;
disk_remaining_size += delete_file_size;
deleted_file_count++;
}
delete_file = NULL;
ret = oldest_files_.pop();
}
}
}
// record cost time, sleep
int64_t cost_time = ObClockGenerator::getClock() - start_time;
LOG_INFO("log compressor cycles once. ", K(ret), K(cost_time),
K(compressed_file_count), K(deleted_file_count), K(disk_remaining_size));
cost_time = cost_time >= 0 ? cost_time:0;
if (!stopped_ && cost_time < loop_interval_) {
usleep(loop_interval_ - cost_time);
}
} // if (!stopped_)
} // while (!stopped_)
regfree(&regex_archive);
regfree(&regex_uncompressed);
}
if (OB_NOT_NULL(src_buf)) {
ob_free(src_buf);
}
}
int ObLogCompressor::get_compressed_file_name_(const char *file_name, char compressed_file_name[])
{
int ret = OB_SUCCESS;
if (OB_ISNULL(file_name)) {
ret = OB_INVALID_ARGUMENT;
} else {
int size = strnlen(file_name, OB_MAX_SYSLOG_FILE_NAME_SIZE);
int suffix_size = strlen(OB_SYSLOG_COMPRESS_ZSTD_SUFFIX);
if (size <= 0 || size + suffix_size + 1 > OB_MAX_SYSLOG_FILE_NAME_SIZE) {
ret = OB_INVALID_ARGUMENT;
} else {
strncpy(compressed_file_name, file_name, size);
strncpy(compressed_file_name + size, OB_SYSLOG_COMPRESS_ZSTD_SUFFIX, suffix_size);
*(compressed_file_name + size + suffix_size) = '\0';
}
}
return ret;
}
int ObLogCompressor::compress_single_block_(
char *dest, size_t dest_size, const char *src, size_t src_size, size_t &return_size)
{
int ret = OB_SUCCESS;
int64_t size = -1;
if (OB_FAIL(((ObCompressor *)compressor_)->compress(src, src_size, dest, dest_size, size))) {
LOG_ERROR("failed to compress single block", K(ret));
} else {
return_size = size;
}
return ret;
}
int ObLogCompressor::compress_single_file_(const char *file_name, char *src_buf, char *dest_buf)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(file_name) || OB_ISNULL(src_buf) || OB_ISNULL(dest_buf)) {
ret = OB_INVALID_ARGUMENT;
} else {
static const int sleep_us = 50 * 1000; // 50ms
int src_size = OB_SYSLOG_COMPRESS_BLOCK_SIZE;
int dest_size = OB_SYSLOG_COMPRESS_BUFFER_SIZE;
char compressed_file_name[OB_MAX_SYSLOG_FILE_NAME_SIZE];
FILE *input_file = NULL;
FILE *output_file = NULL;
// record file modify time
struct stat st;
stat(file_name, &st);
time_t last_modified_time = st.st_mtime;
if (OB_FAIL(get_compressed_file_name_(file_name, compressed_file_name))) {
LOG_ERROR("failed to get compressed file name", K(ret), K(file_name));
} else if (strlen(compressed_file_name) < strlen(file_name)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("failed to get compressed file name", K(ret), K(file_name));
} else if (NULL == (input_file = fopen(file_name, "r"))) {
ret = OB_ERR_SYS;
LOG_ERROR("failed to open file", K(ret), K(errno), K(file_name));
} else if (NULL == (output_file = fopen(compressed_file_name, "w"))) {
ret = OB_ERR_SYS;
fclose(input_file);
LOG_ERROR("failed to open file", K(ret), K(errno), K(compressed_file_name));
} else {
LOG_DEBUG("log compressor compress file ", K(file_name), K(compressed_file_name));
size_t read_size = 0;
size_t write_size = 0;
while (OB_SUCC(ret) && !feof(input_file)) {
if ((read_size = fread(src_buf, 1, src_size, input_file)) > 0) {
if (OB_FAIL(compress_single_block_(dest_buf, dest_size, src_buf, read_size, write_size))) {
LOG_ERROR("failed to compress syslog block", K(ret));
} else if (write_size != fwrite(dest_buf, 1, write_size, output_file)) {
ret = OB_ERR_SYS;
LOG_ERROR("failed to write file", K(ret), K(errno), K(compressed_file_name));
}
}
usleep(sleep_us);
}
fclose(input_file);
fclose(output_file);
if (OB_FAIL(ret)) {
unlink(compressed_file_name);
} else {
unlink(file_name);
(void)set_last_modify_time_(compressed_file_name, last_modified_time);
}
}
}
return ret;
}
int ObLogCompressor::set_last_modify_time_(const char *file_name, const time_t &newTime) {
int ret = OB_SUCCESS;
struct utimbuf newTimes;
newTimes.actime = newTime;
newTimes.modtime = newTime;
if (OB_ISNULL(file_name)) {
ret = OB_INVALID_ARGUMENT;
} else if (utime(file_name, &newTimes) != 0) {
ret = OB_ERR_SYS;
LOG_ERROR("failed to set the file's last modified time", K(ret), K(file_name), K(newTime));
}
return ret;
}
int ObLogCompressor::set_next_compressor_(ObCompressorType compress_func) {
int ret = OB_SUCCESS;
ObCompressor *compressor = NULL;
if (compress_func == NONE_COMPRESSOR) {
next_compressor_ = NULL;
} else if (OB_FAIL(ObCompressorPool::get_instance().get_compressor(compress_func, compressor))) {
LOG_ERROR("Fail to get_compressor", K(ret), K(compress_func));
} else {
next_compressor_ = compressor;
if (OB_ISNULL(compressor_)) {
compressor_ = compressor;
}
}
return ret;
}
int ObLogCompressor::get_log_type_(const char *file_name) {
int type = OB_SYSLOG_COMPRESS_TYPE_COUNT;
int name_len = strnlen(file_name, OB_MAX_SYSLOG_FILE_NAME_SIZE);
if (name_len >= strlen("trace.log")) {
int dir_len = strlen(syslog_dir_);
if (name_len > dir_len + 1
&& 0 == strncmp(file_name, syslog_dir_, dir_len)
&& file_name[dir_len] == '/') {
file_name = file_name + dir_len + 1;
}
for (int i = 0; i < OB_SYSLOG_COMPRESS_TYPE_COUNT; i++) {
if (0 == strncmp(file_name, OB_SYSLOG_FILE_PREFIX[i], strlen(OB_SYSLOG_FILE_PREFIX[i]))) {
type = i;
break;
}
}
}
return type;
}
int64_t ObLogCompressor::get_file_size_(const char *file_name)
{
int64_t size = -1;
struct stat st;
if (stat(file_name, &st) == 0) {
size = st.st_size;
}
return size;
}
int64_t ObLogCompressor::get_disk_remaining_size_()
{
int ret = OB_SUCCESS;
int64_t remaining_size = 0;
struct statfs file_system;
if (statfs(syslog_dir_, &file_system) == -1) {
remaining_size = -1;
ret = OB_ERR_SYS;
LOG_ERROR("fail to get disk remaining size", K(ret), K(strerror(errno)), K(syslog_dir_));
} else {
remaining_size = file_system.f_bsize * file_system.f_bavail;
}
return remaining_size;
}
} // namespace common
} // namespace oceanbase

View File

@ -0,0 +1,196 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OB_LOG_COMPRESSOR_H_
#define OB_LOG_COMPRESSOR_H_
#include "lib/container/ob_heap.h"
#include "lib/lock/ob_thread_cond.h"
#include "lib/thread/thread_mgr_interface.h"
namespace oceanbase {
namespace common {
class ObCompressor;
/* Log files are divided into blocks and then compressed. */
static const int32_t OB_SYSLOG_COMPRESS_BLOCK_SIZE = OB_MALLOC_BIG_BLOCK_SIZE;
/* To prevent extreme cases where the files become larger after compression,
* the size of the decompression buffer needs to be larger than the original data.
* This size can refer to the ZSTD code implementation.
*/
static const int32_t OB_SYSLOG_COMPRESS_BUFFER_SIZE =
OB_SYSLOG_COMPRESS_BLOCK_SIZE + OB_SYSLOG_COMPRESS_BLOCK_SIZE / 128 + 512 + 19;
static const int32_t OB_MAX_SYSLOG_FILE_NAME_SIZE = ObPLogFileStruct::MAX_LOG_FILE_NAME_SIZE;
static const int32_t OB_SYSLOG_COMPRESS_TYPE_COUNT = FD_TRACE_FILE + 1;
/* by default, syslog file size = 256MB, and most of the logs are observer.log
* compressed file size ≈ 13MB, 13MB * 20 = 260MB, so OB_SYSLOG_DELETE_ARRAY_SIZE = 20 + 1
* an extra position is reserved for newly inserted file name.
*/
static const int32_t OB_SYSLOG_DELETE_ARRAY_SIZE = 21;
static const int64_t OB_MIN_SYSLOG_DISK_SIZE = 2 * (1LL << 30); // 2GB
static const int64_t OB_SYSLOG_COMPRESS_RESERVE_SIZE = 4 * (1LL << 30); // 4GB
static const int64_t OB_SYSLOG_DELETE_RESERVE_SIZE = 2 * (1LL << 30); // 2GB
static const int64_t OB_SYSLOG_COMPRESS_LOOP_INTERVAL = 5000000; // 5s
static const char OB_SYSLOG_DIR[] = "log"; // same as LOG_DIR in src/observer/main.cpp
static const char OB_SYSLOG_COMPRESS_ZSTD_SUFFIX[] = ".zst"; // name suffix of file compressed by zstd
static const char OB_UNCOMPRESSED_SYSLOG_FILE_PATTERN[] = "^[a-z]+\\.log\\.[0-9]+$"; // only uncompressed files
static const char OB_COMPRESSED_SYSLOG_FILE_PATTERN[] = "^[a-z]+\\.log\\.[0-9]+\\.[a-z0-9]+$"; // only compressed files
static const char OB_ARCHIVED_SYSLOG_FILE_PATTERN[] = "^[a-z]+\\.log\\.[0-9]+[.a-z0-9]*$"; // all syslog files (excluding wf logs)
static const char *OB_SYSLOG_FILE_PREFIX[OB_SYSLOG_COMPRESS_TYPE_COUNT] =
{
"observer.log", // FD_SVR_FILE
"rootservice.log", // FD_RS_FILE
"election.log", // FD_ELEC_FILE
"trace.log", // FD_TRACE_FILE
};
STATIC_ASSERT(MAX_FD_FILE == 5, "if you add a new log type, add it's prefix here !!!");
#define OB_LOG_COMPRESSOR ::oceanbase::common::ObLogCompressor::get_log_compressor()
struct ObSyslogFile
{
ObSyslogFile() { reset(); }
void reset()
{
mtime_ = INT64_MAX;
memset(file_name_, 0, OB_MAX_SYSLOG_FILE_NAME_SIZE);
}
TO_STRING_KV(K_(mtime), K_(file_name));
int64_t mtime_;
char file_name_[OB_MAX_SYSLOG_FILE_NAME_SIZE];
};
struct ObSyslogCompareFunctor {
bool operator()(const ObSyslogFile &l, const ObSyslogFile &r) {
return l.mtime_ > r.mtime_;
}
int get_error_code() { return OB_SUCCESS; }
};
/* A priority array, order by modify time of files. */
class ObSyslogPriorityArray : public ObBinaryHeap<ObSyslogFile, ObSyslogCompareFunctor, OB_SYSLOG_DELETE_ARRAY_SIZE>
{
public:
ObSyslogPriorityArray(ObSyslogCompareFunctor &cmp, common::ObIAllocator *allocator = NULL)
: ObBinaryHeap<ObSyslogFile, ObSyslogCompareFunctor, OB_SYSLOG_DELETE_ARRAY_SIZE>(cmp, allocator)
{}
virtual ~ObSyslogPriorityArray() {}
int push(const ObSyslogFile &element)
{
int ret = OB_SUCCESS;
if (OB_FAIL(array_.push_back(element))) {
} else if (OB_FAIL(upheap(array_.count() - 1))) {
} else if (array_.count() > OB_SYSLOG_DELETE_ARRAY_SIZE - 1) {
ret = pop_back();
}
return ret;
}
private:
int pop_back()
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(array_.empty())) {
ret = OB_EMPTY_RESULT;
} else {
int index = 0;
const ObSyslogFile *max_element = &array_.at(0);
for (int i = 1; i < array_.count(); i++) {
if (cmp_(array_.at(i), *max_element)) {
index = i;
max_element = &array_.at(i);
}
}
reset_root_cmp_cache();
array_.at(index) = array_.at(array_.count() - 1);
array_.pop_back();
if (index < array_.count()) {
if (index == get_root()) {
ret = downheap(index);
} else {
int64_t parent = get_parent(index);
if (cmp_(array_.at(parent), array_.at(index))) {
ret = upheap(index);
} else {
ret = downheap(index);
}
}
}
}
return ret;
}
private:
using ObBinaryHeapBase<ObSyslogFile, ObSyslogCompareFunctor, OB_SYSLOG_DELETE_ARRAY_SIZE>::array_;
using ObBinaryHeapBase<ObSyslogFile, ObSyslogCompareFunctor, OB_SYSLOG_DELETE_ARRAY_SIZE>::cmp_;
using ObBinaryHeapBase<ObSyslogFile, ObSyslogCompareFunctor, OB_SYSLOG_DELETE_ARRAY_SIZE>::get_root;
using ObBinaryHeapBase<ObSyslogFile, ObSyslogCompareFunctor, OB_SYSLOG_DELETE_ARRAY_SIZE>::get_parent;
using ObBinaryHeapBase<ObSyslogFile, ObSyslogCompareFunctor, OB_SYSLOG_DELETE_ARRAY_SIZE>::reset_root_cmp_cache;
using ObBinaryHeap<ObSyslogFile, ObSyslogCompareFunctor, OB_SYSLOG_DELETE_ARRAY_SIZE>::upheap;
using ObBinaryHeap<ObSyslogFile, ObSyslogCompareFunctor, OB_SYSLOG_DELETE_ARRAY_SIZE>::downheap;
};
class ObLogCompressor final : public lib::TGRunnable {
public:
ObLogCompressor();
virtual ~ObLogCompressor();
int init();
void stop();
void wait();
void awake();
void destroy();
int set_max_disk_size(int64_t max_disk_size);
int set_compress_func(const char *compress_func_ptr);
int set_min_uncompressed_count(int64_t min_uncompressed_count);
inline bool is_enable_compress() { return NONE_COMPRESSOR != compress_func_; }
static bool is_compressed_file(const char *file);
static inline ObLogCompressor& get_log_compressor()
{
static ObLogCompressor log_compressor;
return log_compressor;
}
TO_STRING_KV(K_(is_inited), K_(stopped), K_(loop_interval), K_(max_disk_size),
K_(min_uncompressed_count), K_(compress_func));
private:
void run1() override;
void log_compress_loop_();
int get_compressed_file_name_(const char *file_name, char compressed_file_name[]);
int compress_single_block_(char *dest, size_t dest_size, const char *src, size_t src_size, size_t &return_size);
int compress_single_file_(const char *file_name, char *src_buf, char *dest_buf);
int set_last_modify_time_(const char *file_name, const time_t& newTime);
int set_next_compressor_(ObCompressorType compress_func);
int get_log_type_(const char *file_name);
int64_t get_file_size_(const char *file_name);
int64_t get_disk_remaining_size_();
private:
bool is_inited_;
bool stopped_;
int64_t loop_interval_; // don't modify
int64_t max_disk_size_;
int64_t min_uncompressed_count_;
char syslog_dir_[64];
ObCompressorType compress_func_;
ObCompressor *compressor_;
ObCompressor *next_compressor_;
ObThreadCond log_compress_cond_;
ObSyslogCompareFunctor cmp_;
ObSyslogPriorityArray oldest_files_;
};
} // namespace common
} // namespace oceanbase
#endif /* OB_LOG_COMPRESSOR_H_ */

View File

@ -26,4 +26,5 @@ TG_DEF(MEMORY_DUMP, memDump, THREAD_POOL, 1)
TG_DEF(SchemaRefTask, SchemaRefTask, DEDUP_QUEUE, 1, 1024, 1024, 1L << 30, 512L << 20, common::OB_MALLOC_BIG_BLOCK_SIZE, "SchemaDedupQueu") TG_DEF(SchemaRefTask, SchemaRefTask, DEDUP_QUEUE, 1, 1024, 1024, 1L << 30, 512L << 20, common::OB_MALLOC_BIG_BLOCK_SIZE, "SchemaDedupQueu")
TG_DEF(ReqMemEvict, ReqMemEvict, TIMER) TG_DEF(ReqMemEvict, ReqMemEvict, TIMER)
TG_DEF(replica_control, replica_control, THREAD_POOL, 1) TG_DEF(replica_control, replica_control, THREAD_POOL, 1)
TG_DEF(SYSLOG_COMPRESS, SyslogCompress, THREAD_POOL, 1)
#endif #endif

View File

@ -74,6 +74,7 @@ oblib_addtest(net/test_ob_net_util.cpp)
#oblib_addtest(number/test_number_v2.cpp) #oblib_addtest(number/test_number_v2.cpp)
#oblib_addtest(oblog/test_base_log_buffer.cpp) #oblib_addtest(oblog/test_base_log_buffer.cpp)
oblib_addtest(oblog/test_base_log_writer.cpp) oblib_addtest(oblog/test_base_log_writer.cpp)
oblib_addtest(oblog/test_ob_log_compressor.cpp)
oblib_addtest(oblog/test_ob_log_obj.cpp) oblib_addtest(oblog/test_ob_log_obj.cpp)
oblib_addtest(oblog/test_ob_log_performance.cpp) oblib_addtest(oblog/test_ob_log_performance.cpp)
oblib_addtest(profile/test_ob_trace_id.cpp) oblib_addtest(profile/test_ob_trace_id.cpp)

View File

@ -0,0 +1,447 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX LIB
#include <gtest/gtest.h>
#include <utime.h>
#include <string>
#include <fstream>
#include <dirent.h>
#include <regex.h>
#define protected public
#define private public
#include "lib/oblog/ob_log_compressor.h"
#include "lib/oblog/ob_log.h"
#include "lib/compress/ob_compressor_pool.h"
#include "common/ob_clock_generator.h"
using namespace oceanbase::lib;
namespace oceanbase {
namespace common {
const char *TEST_DIR = "testoblogcompressdir";
const char *RM_COMMAND = "rm -rf testoblogcompressdir";
#define CREATE_FILES(file_count, modify_time, timestamp) \
for (int i = 0; i < file_count; i++) { \
snprintf(file_name, sizeof(file_name), "%s/%s.%ld", TEST_DIR, \
OB_SYSLOG_FILE_PREFIX[i%OB_SYSLOG_COMPRESS_TYPE_COUNT], timestamp + i); \
FILE *file =fopen(file_name, "w"); \
ASSERT_EQ(true, OB_NOT_NULL(file)); \
ASSERT_EQ(data_size, fwrite(data, 1, data_size, file)); \
fclose(file); \
OB_LOG_COMPRESSOR.set_last_modify_time_(file_name, modify_time + i*60); \
}
int get_file_count_by_regex(const char *dirname, const char *pattern) {
int count = 0;
regex_t regex;
struct dirent *entry;
DIR *dir = opendir(dirname);
if (OB_ISNULL(dir)) {
count = -1;
} else if (OB_NOT_NULL(pattern) && regcomp(&regex, pattern, REG_EXTENDED | REG_NOSUB) != 0) {
closedir(dir);
count = -1;
} else {
while ((entry = readdir(dir)) != NULL) {
if (strncmp(entry->d_name, ".", 1) == 0 || strncmp(entry->d_name, "..", 2) == 0) {
continue;
}
if (OB_ISNULL(pattern) || regexec(&regex, entry->d_name, 0, NULL, 0) == 0) {
count++;
}
}
closedir(dir);
if (OB_NOT_NULL(pattern)) {
regfree(&regex);
}
}
return count;
}
int compress_file_by_zstd(ObCompressor *compressor, const char *file_name_in, const char *file_name_out)
{
int ret = OB_SUCCESS;
int src_size = OB_SYSLOG_COMPRESS_BLOCK_SIZE;
int dest_size = OB_SYSLOG_COMPRESS_BUFFER_SIZE;
char *src_buf = (char *)ob_malloc(src_size + dest_size, ObModIds::OB_COMPRESSOR);
char *dest_buf = src_buf + src_size;
size_t read_size = 0;
int64_t w_size = 0;
int sleep_10us = 10 * 1000;
FILE *input_file = fopen(file_name_in, "r");
FILE *output_file = fopen(file_name_out, "w");
fwrite(dest_buf, 1, w_size, output_file);
while (OB_SUCC(ret) && !feof(input_file)) {
if ((read_size = fread(src_buf, 1, src_size, input_file)) > 0) {
if (OB_FAIL(compressor->compress(src_buf, read_size, dest_buf, dest_size, w_size))) {
printf("Failed to log_compress_block, err_code=%d.\n", ret);
} else if (w_size != fwrite(dest_buf, 1, w_size, output_file)) {
ret = OB_ERR_SYS;
printf("Failed to fwrite, err_code=%d.\n", errno);
}
}
}
fclose(input_file);
fclose(output_file);
// clear environment
ob_free(src_buf);
return ret;
}
TEST(ObLogCompressor, log_priority_array_test)
{
int ret = OB_SUCCESS;
const int MAX_FILE_COUNT = OB_SYSLOG_DELETE_ARRAY_SIZE - 1;
ObSyslogFile syslog;
const ObSyslogFile *file_name_out;
ObSyslogCompareFunctor cmp_;
ObSyslogPriorityArray priority_array(cmp_);
for (int i = 0; i < MAX_FILE_COUNT; i++) {
syslog.mtime_ = 1500 + i;
snprintf(syslog.file_name_, sizeof(syslog.file_name_), "priority_array_test_%d", 1500 + i);
ASSERT_EQ(OB_SUCCESS, priority_array.push(syslog));
}
ASSERT_EQ(MAX_FILE_COUNT, priority_array.count());
for (int i = 0; i < MAX_FILE_COUNT/2; i++) {
syslog.mtime_ = 1200 + i;
snprintf(syslog.file_name_, sizeof(syslog.file_name_), "priority_array_test_%d", 1200 + i);
ASSERT_EQ(OB_SUCCESS, priority_array.push(syslog));
}
ASSERT_EQ(MAX_FILE_COUNT, priority_array.count());
for (int i = 0; i < MAX_FILE_COUNT/2; i++) {
file_name_out = NULL;
snprintf(syslog.file_name_, sizeof(syslog.file_name_), "priority_array_test_%d", 1200 + i);
ASSERT_EQ(OB_SUCCESS, priority_array.top(file_name_out));
ASSERT_EQ(true, OB_NOT_NULL(file_name_out));
ASSERT_EQ(0, strncmp(syslog.file_name_, file_name_out->file_name_, strlen(syslog.file_name_)));
ASSERT_EQ(OB_SUCCESS, priority_array.pop());
}
for (int i = 0; i < MAX_FILE_COUNT/2; i++) {
file_name_out = NULL;
snprintf(syslog.file_name_, sizeof(syslog.file_name_), "priority_array_test_%d", 1500 + i);
ASSERT_EQ(OB_SUCCESS, priority_array.top(file_name_out));
ASSERT_EQ(true, OB_NOT_NULL(file_name_out));
ASSERT_EQ(0, strncmp(syslog.file_name_, file_name_out->file_name_, strlen(syslog.file_name_)));
ASSERT_EQ(OB_SUCCESS, priority_array.pop());
}
}
TEST(ObLogCompressor, base_compressor_test)
{
int ret = OB_SUCCESS;
int test_count = 1000;
int test_size = test_count * sizeof(int);
ObCompressor *compressor_zstd;
// get compressor
ASSERT_EQ(OB_SUCCESS, ObCompressorPool::get_instance().get_compressor(ZSTD_1_3_8_COMPRESSOR, compressor_zstd));
// prepare file
const char *file_name = "test_ob_log_compressor_file";
const char *file_name_zstd = "test_ob_log_compressor_file.zst";
std::string file_name_str = std::string(file_name);
std::string decompress_file_name_zstd = file_name_str + "_zstd";
unlink(file_name_str.c_str());
unlink(file_name_zstd);
unlink(decompress_file_name_zstd.c_str());
FILE *input_file = fopen(file_name, "w");
ASSERT_EQ(true, NULL != input_file);
int data[test_count];
for (int i = 0; i < test_count; i++) {
data[i] = i;
}
ASSERT_EQ(test_size, fwrite(data, 1, test_size, input_file));
fclose(input_file);
// sleep 1s
sleep(1);
struct stat st, st_zstd;
ASSERT_EQ(0, stat(file_name, &st));
// compress
ASSERT_EQ(OB_SUCCESS, compress_file_by_zstd(compressor_zstd, file_name, file_name_zstd));
ASSERT_EQ(0, stat(file_name_zstd, &st_zstd));
ASSERT_EQ(true, st.st_mtime != st_zstd.st_mtime); // not equal
// modify time
ASSERT_EQ(OB_SUCCESS, OB_LOG_COMPRESSOR.set_last_modify_time_(file_name_zstd, st.st_mtime));
ASSERT_EQ(0, stat(file_name_zstd, &st_zstd));
ASSERT_EQ(true, st.st_mtime == st_zstd.st_mtime); // equal
// decompress, make sure your machine can exec zstd command
std::string decompress_zstd_command = "zstd -d " + std::string(file_name_zstd) + " -o " + decompress_file_name_zstd;
ASSERT_EQ(0, std::system(decompress_zstd_command.c_str()));
// check
std::string compare_command = "diff -q " + file_name_str + " " + decompress_file_name_zstd + " > /dev/null";
ASSERT_EQ(0, system(compare_command.c_str()));
unlink(file_name_str.c_str());
unlink(file_name_zstd);
unlink(decompress_file_name_zstd.c_str());
}
TEST(ObLogCompressor, syslog_compressor_base_test)
{
int ret = OB_SUCCESS;
// prepare
int data_count = 1000;
int loop_count = 2000;
int data_size = data_count * sizeof(int);
int data[data_count];
const char *file_name = "testcompressfile.log.1234";
const char *file_name_2 = "testcompressfile2.log.1234";
const char *file_name_zstd = "testcompressfile.log.1234.zst";
ASSERT_EQ(false, ObLogCompressor::is_compressed_file(file_name));
ASSERT_EQ(true, ObLogCompressor::is_compressed_file(file_name_zstd));
std::string file_name_str = std::string(file_name);
std::string file_name_str_2 = std::string(file_name_2);
std::string decompress_file_name_zstd = file_name_str + "_zstd";
unlink(file_name_str.c_str());
unlink(file_name_str_2.c_str());
unlink(file_name_zstd);
unlink(decompress_file_name_zstd.c_str());
// function test
ASSERT_EQ(0, system(RM_COMMAND));
ASSERT_EQ(0, mkdir(TEST_DIR, 0777));
strncpy(OB_LOG_COMPRESSOR.syslog_dir_, TEST_DIR, strlen(TEST_DIR));
int64_t free_size = OB_LOG_COMPRESSOR.get_disk_remaining_size_();
printf("free_size:%ld\n", free_size);
ASSERT_GT(free_size, 0);
ASSERT_EQ(0, system(RM_COMMAND));
// prepare file data
FILE *input_file = fopen(file_name, "w");
ASSERT_EQ(true, NULL != input_file);
for (int i = 0; i < data_count; i++) {
data[i] = i;
}
for (int j = 0; j < loop_count; j++) {
ASSERT_EQ(data_size, fwrite(data, 1, data_size, input_file));
}
fclose(input_file);
std::string cp_file_command = "cp " + std::string(file_name) + " " + std::string(file_name_2);
ASSERT_EQ(0, std::system(cp_file_command.c_str()));
// record last modify time
struct stat st, st_zstd;
ASSERT_EQ(0, stat(file_name, &st));
// compress
ObCompressor *compressor_zstd;
ASSERT_EQ(OB_SUCCESS, ObCompressorPool::get_instance().get_compressor(ZSTD_1_3_8_COMPRESSOR, compressor_zstd));
LOG_INFO("start to init syslog compressor ");
ASSERT_EQ(OB_SUCCESS, OB_LOG_COMPRESSOR.init());
strncpy(OB_LOG_COMPRESSOR.syslog_dir_, TEST_DIR, strlen(TEST_DIR));
OB_LOG_COMPRESSOR.compressor_ = compressor_zstd;
ASSERT_EQ(true, OB_LOG_COMPRESSOR.is_inited_);
ASSERT_EQ(false, OB_LOG_COMPRESSOR.stopped_);
usleep(100000); // wait register pm in farm
int src_size = OB_SYSLOG_COMPRESS_BLOCK_SIZE;
int dest_size = OB_SYSLOG_COMPRESS_BUFFER_SIZE;
char *src_buf = (char *)ob_malloc(src_size + dest_size, ObModIds::OB_LOG);
ASSERT_EQ(true, OB_NOT_NULL(src_buf));
char *dest_buf = src_buf + src_size;
ASSERT_EQ(OB_SUCCESS, OB_LOG_COMPRESSOR.compress_single_file_(file_name, src_buf, dest_buf));
ASSERT_EQ(0, stat(file_name_zstd, &st_zstd));
ASSERT_EQ(true, st.st_mtime == st_zstd.st_mtime); // equal
// decompress
std::string decompress_zstd_command = "zstd -d " + std::string(file_name_zstd) + " -o " + decompress_file_name_zstd;
ASSERT_EQ(0, std::system(decompress_zstd_command.c_str()));
std::string compare_command = "diff -q " + file_name_str_2 + " " + decompress_file_name_zstd + " > /dev/null";
ASSERT_EQ(0, std::system(compare_command.c_str()));
// error test
ASSERT_EQ(OB_INVALID_ARGUMENT, OB_LOG_COMPRESSOR.set_compress_func("zlib_1.0"));
ASSERT_EQ(OB_INVALID_ARGUMENT, OB_LOG_COMPRESSOR.set_compress_func("lz4_1.9.1"));
ASSERT_EQ(OB_SUCCESS, OB_LOG_COMPRESSOR.set_compress_func("none"));
// clear
LOG_INFO("start to destroy syslog compressor ");
OB_LOG_COMPRESSOR.destroy();
ob_free(src_buf);
unlink(file_name_str.c_str());
unlink(file_name_str_2.c_str());
unlink(file_name_zstd);
unlink(decompress_file_name_zstd.c_str());
}
TEST(ObLogCompressor, syslog_compressor_thread_test)
{
int ret = OB_SUCCESS;
// init log compressor
ASSERT_EQ(OB_SUCCESS, ObClockGenerator::get_instance().init());
// loop faster
OB_LOG_COMPRESSOR.loop_interval_ = 100000;
LOG_INFO("start to init syslog compressor ");
ASSERT_EQ(OB_SUCCESS, OB_LOG_COMPRESSOR.init());
ASSERT_EQ(true, OB_LOG_COMPRESSOR.is_inited_);
strncpy(OB_LOG_COMPRESSOR.syslog_dir_, TEST_DIR, strlen(TEST_DIR));
// prepare syslog file
const int MAX_SYSLOG_COUNT = 8;
const int min_uncompressed_count = 2;
char file_name[OB_MAX_SYSLOG_FILE_NAME_SIZE];
char * file_name_out;
const char *data = "base_data.base_data.base_data.base_data.base_dat50base_data.base_data.base_data.base_data.base_da100"
"base_data.base_data.base_data.base_data.base_da150base_data.base_data.base_data.base_data.base_da200";
int data_size = strlen(data);
int file_size = 200;
const char *pattern_uncompressed[OB_SYSLOG_COMPRESS_TYPE_COUNT] =
{
"^observer\\.log\\.[0-9]+$",
"^rootservice\\.log\\.[0-9]+$",
"^election\\.log\\.[0-9]+$",
"^trace\\.log\\.[0-9]+$",
};
const char *pattern_compressed[OB_SYSLOG_COMPRESS_TYPE_COUNT] =
{
"^observer\\.log\\.[0-9]+\\.[a-z]+$",
"^rootservice\\.log\\.[0-9]+\\.[a-z]+$",
"^election\\.log\\.[0-9]+\\.[a-z]+$",
"^trace\\.log\\.[0-9]+\\.[a-z]+$",
};
// get modify time
time_t last_modified_time = time(NULL);
last_modified_time -= 1000;
ASSERT_EQ(0, system(RM_COMMAND));
ASSERT_EQ(0, mkdir(TEST_DIR, 0777));
for (int i = 0; i < OB_SYSLOG_COMPRESS_TYPE_COUNT; i++) {
snprintf(file_name, sizeof(file_name), "%s/%s", TEST_DIR, OB_SYSLOG_FILE_PREFIX[i%OB_SYSLOG_COMPRESS_TYPE_COUNT]);
FILE *file =fopen(file_name, "w");
ASSERT_EQ(true, OB_NOT_NULL(file));
ASSERT_EQ(data_size, fwrite(data, 1, data_size, file));
fclose(file);
OB_LOG_COMPRESSOR.set_last_modify_time_(file_name, last_modified_time + i*60);
}
last_modified_time -= 50000;
CREATE_FILES(MAX_SYSLOG_COUNT * OB_SYSLOG_COMPRESS_TYPE_COUNT, last_modified_time, 202311241000L)
// set parameter
ASSERT_EQ(OB_SUCCESS, OB_LOGGER.set_max_file_index(MAX_SYSLOG_COUNT));
ASSERT_EQ(OB_SUCCESS, OB_LOG_COMPRESSOR.set_min_uncompressed_count(min_uncompressed_count));
int total_count = get_file_count_by_regex(TEST_DIR, NULL);
int last_total_count = 0;
ASSERT_EQ((MAX_SYSLOG_COUNT + 1) * OB_SYSLOG_COMPRESS_TYPE_COUNT, total_count);
for (int i = 0; i < OB_SYSLOG_COMPRESS_TYPE_COUNT; i++) {
int uncompressed_count = get_file_count_by_regex(TEST_DIR, pattern_uncompressed[i]);
ASSERT_EQ(MAX_SYSLOG_COUNT, uncompressed_count);
int compressed_count = get_file_count_by_regex(TEST_DIR, pattern_compressed[i]);
ASSERT_EQ(0, compressed_count);
}
// enable compress
int64_t max_disk_size = OB_SYSLOG_COMPRESS_RESERVE_SIZE + file_size * ((MAX_SYSLOG_COUNT + 1 - 2)* OB_SYSLOG_COMPRESS_TYPE_COUNT);
ASSERT_EQ(OB_SUCCESS, OB_LOG_COMPRESSOR.set_max_disk_size(max_disk_size));
ASSERT_EQ(OB_SUCCESS, OB_LOG_COMPRESSOR.set_compress_func("zstd_1.3.8"));
sleep(2); // 2s
total_count = get_file_count_by_regex(TEST_DIR, NULL);
ASSERT_EQ((MAX_SYSLOG_COUNT + 1) * OB_SYSLOG_COMPRESS_TYPE_COUNT, total_count);
for (int i = 0; i < OB_SYSLOG_COMPRESS_TYPE_COUNT; i++) {
int uncompressed_count = get_file_count_by_regex(TEST_DIR, pattern_uncompressed[i]);
ASSERT_LE(uncompressed_count, MAX_SYSLOG_COUNT - 2);
ASSERT_GE(uncompressed_count, min_uncompressed_count);
int compressed_count = get_file_count_by_regex(TEST_DIR, pattern_compressed[i]);
ASSERT_GE(compressed_count, 2);
}
// add more syslog file
last_modified_time += 10000;
CREATE_FILES(2 * OB_SYSLOG_COMPRESS_TYPE_COUNT, last_modified_time, 202311242000L)
sleep(1);
total_count = get_file_count_by_regex(TEST_DIR, NULL);
ASSERT_EQ((MAX_SYSLOG_COUNT + 3) * OB_SYSLOG_COMPRESS_TYPE_COUNT, total_count);
for (int i = 0; i < OB_SYSLOG_COMPRESS_TYPE_COUNT; i++) {
int uncompressed_count = get_file_count_by_regex(TEST_DIR, pattern_uncompressed[i]);
ASSERT_LE(uncompressed_count, MAX_SYSLOG_COUNT - 2);
ASSERT_GE(uncompressed_count, min_uncompressed_count);
int compressed_count = get_file_count_by_regex(TEST_DIR, pattern_compressed[i]);
ASSERT_GE(compressed_count, 4);
}
// make syslog_disk_size smaller, begin to delete file
max_disk_size = OB_SYSLOG_DELETE_RESERVE_SIZE + file_size * (4 * OB_SYSLOG_COMPRESS_TYPE_COUNT);
ASSERT_EQ(OB_SUCCESS, OB_LOG_COMPRESSOR.set_max_disk_size(max_disk_size));
sleep(1);
total_count = get_file_count_by_regex(TEST_DIR, NULL);
ASSERT_LT(total_count, (MAX_SYSLOG_COUNT + 3) * OB_SYSLOG_COMPRESS_TYPE_COUNT);
ASSERT_GE(total_count, 4 * OB_SYSLOG_COMPRESS_TYPE_COUNT);
for (int i = 0; i < OB_SYSLOG_COMPRESS_TYPE_COUNT; i++) {
int uncompressed_count = get_file_count_by_regex(TEST_DIR, pattern_uncompressed[i]);
ASSERT_GE(uncompressed_count, min_uncompressed_count);
}
last_total_count = total_count;
// add more syslog file
last_modified_time += 5000;
CREATE_FILES(2 * OB_SYSLOG_COMPRESS_TYPE_COUNT, last_modified_time, 202311243000L)
sleep(1);
total_count = get_file_count_by_regex(TEST_DIR, NULL);
ASSERT_LE(total_count, last_total_count);
ASSERT_GE(total_count, 4 * OB_SYSLOG_COMPRESS_TYPE_COUNT);
for (int i = 0; i < OB_SYSLOG_COMPRESS_TYPE_COUNT; i++) {
int uncompressed_count = get_file_count_by_regex(TEST_DIR, pattern_uncompressed[i]);
ASSERT_GE(uncompressed_count, min_uncompressed_count);
}
// disable compress
ASSERT_EQ(OB_SUCCESS, OB_LOG_COMPRESSOR.set_compress_func("none"));
last_modified_time += 5000;
CREATE_FILES(2 * OB_SYSLOG_COMPRESS_TYPE_COUNT, last_modified_time, 202311244000L)
sleep(1);
total_count = get_file_count_by_regex(TEST_DIR, NULL);
ASSERT_LT(total_count, 5 * OB_SYSLOG_COMPRESS_TYPE_COUNT);
ASSERT_GE(total_count, 4 * OB_SYSLOG_COMPRESS_TYPE_COUNT);
for (int i = 0; i < OB_SYSLOG_COMPRESS_TYPE_COUNT; i++) {
int compressed_count = get_file_count_by_regex(TEST_DIR, pattern_compressed[i]);
ASSERT_EQ(compressed_count, 0);
}
// clear
ASSERT_EQ(0, system(RM_COMMAND));
LOG_INFO("start to destroy syslog compressor ");
OB_LOG_COMPRESSOR.destroy();
}
} // namespace common
} // namespace oceanbase
int main(int argc, char **argv)
{
oceanbase::ObLogger &logger = oceanbase::ObLogger::get_logger();
logger.set_file_name("test_ob_log_compressor.log", true);
logger.set_log_level(OB_LOG_LEVEL_DEBUG);
testing::InitGoogleTest(&argc, argv);
int ret = RUN_ALL_TESTS();
OB_LOG_COMPRESSOR.destroy();
return ret;
}

View File

@ -23,6 +23,7 @@
#include "lib/lock/ob_latch.h" #include "lib/lock/ob_latch.h"
#include "lib/net/ob_net_util.h" #include "lib/net/ob_net_util.h"
#include "lib/oblog/ob_base_log_buffer.h" #include "lib/oblog/ob_base_log_buffer.h"
#include "lib/oblog/ob_log_compressor.h"
#include "lib/ob_running_mode.h" #include "lib/ob_running_mode.h"
#include "lib/profile/ob_active_resource_list.h" #include "lib/profile/ob_active_resource_list.h"
#include "lib/profile/ob_profile_log.h" #include "lib/profile/ob_profile_log.h"
@ -278,6 +279,10 @@ int ObServer::init(const ObServerOptions &opts, const ObPLogWriterCfg &log_cfg)
if (FAILEDx(OB_LOGGER.init(log_cfg, true))) { if (FAILEDx(OB_LOGGER.init(log_cfg, true))) {
LOG_ERROR("async log init error.", KR(ret)); LOG_ERROR("async log init error.", KR(ret));
ret = OB_ELECTION_ASYNC_LOG_WARN_INIT; ret = OB_ELECTION_ASYNC_LOG_WARN_INIT;
} else if (OB_FAIL(OB_LOG_COMPRESSOR.init())) {
LOG_ERROR("log compressor init error.", KR(ret));
} else if (OB_FAIL(OB_LOGGER.set_log_compressor(&OB_LOG_COMPRESSOR))) {
LOG_ERROR("set log compressor error.", KR(ret));
} }
if (OB_FAIL(ret)) { if (OB_FAIL(ret)) {
} else if (OB_FAIL(init_pre_setting())) { } else if (OB_FAIL(init_pre_setting())) {
@ -296,6 +301,10 @@ int ObServer::init(const ObServerOptions &opts, const ObPLogWriterCfg &log_cfg)
if (FAILEDx(OB_LOGGER.init(log_cfg, false))) { if (FAILEDx(OB_LOGGER.init(log_cfg, false))) {
LOG_ERROR("async log init error.", KR(ret)); LOG_ERROR("async log init error.", KR(ret));
ret = OB_ELECTION_ASYNC_LOG_WARN_INIT; ret = OB_ELECTION_ASYNC_LOG_WARN_INIT;
} else if (OB_FAIL(OB_LOG_COMPRESSOR.init())) {
LOG_ERROR("log compressor init error.", KR(ret));
} else if (OB_FAIL(OB_LOGGER.set_log_compressor(&OB_LOG_COMPRESSOR))) {
LOG_ERROR("set log compressor error.", KR(ret));
} else if (OB_FAIL(init_tz_info_mgr())) { } else if (OB_FAIL(init_tz_info_mgr())) {
LOG_ERROR("init tz_info_mgr failed", KR(ret)); LOG_ERROR("init tz_info_mgr failed", KR(ret));
} else if (OB_FAIL(ObSqlTaskFactory::get_instance().init())) { } else if (OB_FAIL(ObSqlTaskFactory::get_instance().init())) {
@ -552,6 +561,10 @@ void ObServer::destroy()
OB_LOGGER.destroy(); OB_LOGGER.destroy();
FLOG_INFO("OB_LOGGER destroyed"); FLOG_INFO("OB_LOGGER destroyed");
FLOG_INFO("begin to destroy OB_LOG_COMPRESSOR");
OB_LOG_COMPRESSOR.destroy();
FLOG_INFO("OB_LOG_COMPRESSOR destroyed");
FLOG_INFO("begin to destroy task controller"); FLOG_INFO("begin to destroy task controller");
ObTaskController::get().destroy(); ObTaskController::get().destroy();
FLOG_INFO("task controller destroyed"); FLOG_INFO("task controller destroyed");
@ -1201,6 +1214,10 @@ int ObServer::stop()
OB_LOGGER.stop(); OB_LOGGER.stop();
FLOG_INFO("stop OB_LOGGER success"); FLOG_INFO("stop OB_LOGGER success");
FLOG_INFO("begin to stop OB_LOG_COMPRESSOR");
OB_LOG_COMPRESSOR.stop();
FLOG_INFO("stop OB_LOG_COMPRESSOR success");
FLOG_INFO("begin to stop task controller"); FLOG_INFO("begin to stop task controller");
ObTaskController::get().stop(); ObTaskController::get().stop();
FLOG_INFO("stop task controller success"); FLOG_INFO("stop task controller success");
@ -1547,6 +1564,10 @@ int ObServer::wait()
OB_LOGGER.wait(); OB_LOGGER.wait();
FLOG_INFO("wait OB_LOGGER success"); FLOG_INFO("wait OB_LOGGER success");
FLOG_INFO("begin to wait OB_LOG_COMPRESSOR");
OB_LOG_COMPRESSOR.wait();
FLOG_INFO("wait OB_LOG_COMPRESSOR success");
FLOG_INFO("begin to wait task controller"); FLOG_INFO("begin to wait task controller");
ObTaskController::get().wait(); ObTaskController::get().wait();
FLOG_INFO("wait task controller success"); FLOG_INFO("wait task controller success");
@ -2093,13 +2114,21 @@ int ObServer::init_pre_setting()
const bool record_old_log_file = config_.enable_syslog_recycle; const bool record_old_log_file = config_.enable_syslog_recycle;
const bool log_warn = config_.enable_syslog_wf; const bool log_warn = config_.enable_syslog_wf;
const bool enable_async_syslog = config_.enable_async_syslog; const bool enable_async_syslog = config_.enable_async_syslog;
const int64_t max_disk_size = config_.syslog_disk_size;
const int64_t min_uncompressed_count = config_.syslog_file_uncompressed_count;
const char *compress_func_ptr = config_.syslog_compress_func.str();
OB_LOGGER.set_max_file_index(max_log_cnt); OB_LOGGER.set_max_file_index(max_log_cnt);
OB_LOGGER.set_record_old_log_file(record_old_log_file); OB_LOGGER.set_record_old_log_file(record_old_log_file);
LOG_INFO("Whether record old log file", K(record_old_log_file)); LOG_INFO("Whether record old log file", K(record_old_log_file));
OB_LOGGER.set_log_warn(log_warn); OB_LOGGER.set_log_warn(log_warn);
LOG_INFO("Whether log warn", K(log_warn)); LOG_INFO("Whether log warn", K(log_warn));
OB_LOGGER.set_enable_async_log(enable_async_syslog); OB_LOGGER.set_enable_async_log(enable_async_syslog);
LOG_INFO("init log config", K(record_old_log_file), K(log_warn), K(enable_async_syslog)); OB_LOG_COMPRESSOR.set_max_disk_size(max_disk_size);
LOG_INFO("Whether compress syslog file", K(compress_func_ptr));
OB_LOG_COMPRESSOR.set_compress_func(compress_func_ptr);
OB_LOG_COMPRESSOR.set_min_uncompressed_count(min_uncompressed_count);
LOG_INFO("init log config", K(record_old_log_file), K(log_warn), K(enable_async_syslog),
K(max_disk_size), K(compress_func_ptr), K(min_uncompressed_count));
if (0 == max_log_cnt) { if (0 == max_log_cnt) {
LOG_INFO("won't recycle log file"); LOG_INFO("won't recycle log file");
} else { } else {
@ -3794,6 +3823,10 @@ int ObServer::stop_server_in_arb_mode()
OB_LOGGER.stop(); OB_LOGGER.stop();
FLOG_INFO("stop OB_LOGGER success"); FLOG_INFO("stop OB_LOGGER success");
FLOG_INFO("begin to stop OB_LOG_COMPRESSOR");
OB_LOG_COMPRESSOR.stop();
FLOG_INFO("stop OB_LOG_COMPRESSOR success");
FLOG_INFO("begin to stop task controller"); FLOG_INFO("begin to stop task controller");
ObTaskController::get().stop(); ObTaskController::get().stop();
FLOG_INFO("stop task controller success"); FLOG_INFO("stop task controller success");
@ -3830,6 +3863,10 @@ int ObServer::wait_server_in_arb_mode()
OB_LOGGER.wait(); OB_LOGGER.wait();
FLOG_INFO("wait OB_LOGGER success"); FLOG_INFO("wait OB_LOGGER success");
FLOG_INFO("begin to wait OB_LOG_COMPRESSOR");
OB_LOG_COMPRESSOR.wait();
FLOG_INFO("wait OB_LOG_COMPRESSOR success");
FLOG_INFO("begin to wait task controller"); FLOG_INFO("begin to wait task controller");
ObTaskController::get().wait(); ObTaskController::get().wait();
FLOG_INFO("wait task controller success"); FLOG_INFO("wait task controller success");
@ -3867,6 +3904,7 @@ int ObServer::destroy_server_in_arb_mode()
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
OB_LOGGER.destroy(); OB_LOGGER.destroy();
OB_LOG_COMPRESSOR.destroy();
ObTaskController::get().destroy(); ObTaskController::get().destroy();
sig_worker_->destroy(); sig_worker_->destroy();
signal_handle_->destroy(); signal_handle_->destroy();

View File

@ -405,6 +405,47 @@ bool ObConfigCompressOptionChecker::check(const ObConfigItem &t) const
return is_valid; return is_valid;
} }
bool ObConfigMaxSyslogFileCountChecker::check(const ObConfigItem &t) const
{
bool is_valid = false;
int64_t max_count = ObConfigIntParser::get(t.str(), is_valid);
if (is_valid) {
int64_t uncompressed_count = GCONF.syslog_file_uncompressed_count;
if (max_count == 0 || max_count >= uncompressed_count) {
is_valid = true;
} else {
is_valid = false;
}
}
return is_valid;
}
bool ObConfigSyslogCompressFuncChecker::check(const ObConfigItem &t) const
{
bool is_valid = false;
for (int i = 0; i < ARRAYSIZEOF(common::syslog_compress_funcs) && !is_valid; ++i) {
if (0 == ObString::make_string(syslog_compress_funcs[i]).case_compare(t.str())) {
is_valid = true;
}
}
return is_valid;
}
bool ObConfigSyslogFileUncompressedCountChecker::check(const ObConfigItem &t) const
{
bool is_valid = false;
int64_t uncompressed_count = ObConfigIntParser::get(t.str(), is_valid);
if (is_valid) {
int64_t max_count = GCONF.max_syslog_file_count;
if (uncompressed_count >= 0 && (max_count == 0 || uncompressed_count <= max_count)) {
is_valid = true;
} else {
is_valid = false;
}
}
return is_valid;
}
bool ObConfigLogLevelChecker::check(const ObConfigItem &t) const bool ObConfigLogLevelChecker::check(const ObConfigItem &t) const
{ {
const ObString tmp_str(t.str()); const ObString tmp_str(t.str());

View File

@ -268,6 +268,39 @@ private:
DISALLOW_COPY_AND_ASSIGN(ObConfigCompressOptionChecker); DISALLOW_COPY_AND_ASSIGN(ObConfigCompressOptionChecker);
}; };
class ObConfigMaxSyslogFileCountChecker
: public ObConfigChecker
{
public:
ObConfigMaxSyslogFileCountChecker() {}
virtual ~ObConfigMaxSyslogFileCountChecker() {}
bool check(const ObConfigItem &t) const;
private:
DISALLOW_COPY_AND_ASSIGN(ObConfigMaxSyslogFileCountChecker);
};
class ObConfigSyslogCompressFuncChecker
: public ObConfigChecker
{
public:
ObConfigSyslogCompressFuncChecker() {}
virtual ~ObConfigSyslogCompressFuncChecker() {}
bool check(const ObConfigItem &t) const;
private:
DISALLOW_COPY_AND_ASSIGN(ObConfigSyslogCompressFuncChecker);
};
class ObConfigSyslogFileUncompressedCountChecker
: public ObConfigChecker
{
public:
ObConfigSyslogFileUncompressedCountChecker() {}
virtual ~ObConfigSyslogFileUncompressedCountChecker() {}
bool check(const ObConfigItem &t) const;
private:
DISALLOW_COPY_AND_ASSIGN(ObConfigSyslogFileUncompressedCountChecker);
};
class ObConfigUseLargePagesChecker class ObConfigUseLargePagesChecker
: public ObConfigChecker : public ObConfigChecker
{ {

View File

@ -11,7 +11,7 @@
*/ */
#include "share/config/ob_reload_config.h" #include "share/config/ob_reload_config.h"
#include "lib/oblog/ob_log_compressor.h"
namespace oceanbase namespace oceanbase
{ {
@ -35,6 +35,15 @@ int ObReloadConfig::reload_ob_logger_set()
} else if (OB_FAIL(OB_LOGGER.set_record_old_log_file(conf_->enable_syslog_recycle))) { } else if (OB_FAIL(OB_LOGGER.set_record_old_log_file(conf_->enable_syslog_recycle))) {
OB_LOG(ERROR, "fail to set_record_old_log_file", OB_LOG(ERROR, "fail to set_record_old_log_file",
K(conf_->enable_syslog_recycle.str()), K(ret)); K(conf_->enable_syslog_recycle.str()), K(ret));
} else if (OB_FAIL(OB_LOG_COMPRESSOR.set_max_disk_size(conf_->syslog_disk_size))) {
OB_LOG(ERROR, "fail to set_max_disk_size",
K(conf_->syslog_disk_size.str()), KR(ret));
} else if (OB_FAIL(OB_LOG_COMPRESSOR.set_compress_func(conf_->syslog_compress_func.str()))) {
OB_LOG(ERROR, "fail to set_compress_func",
K(conf_->syslog_compress_func.str()), KR(ret));
} else if (OB_FAIL(OB_LOG_COMPRESSOR.set_min_uncompressed_count(conf_->syslog_file_uncompressed_count))) {
OB_LOG(ERROR, "fail to set_min_uncompressed_count",
K(conf_->syslog_file_uncompressed_count.str()), KR(ret));
} else { } else {
OB_LOGGER.set_log_warn(conf_->enable_syslog_wf); OB_LOGGER.set_log_warn(conf_->enable_syslog_wf);
OB_LOGGER.set_enable_async_log(conf_->enable_async_syslog); OB_LOGGER.set_enable_async_log(conf_->enable_async_syslog);

View File

@ -195,12 +195,13 @@ DEF_CAP(syslog_io_bandwidth_limit, OB_CLUSTER_PARAMETER, "30MB",
DEF_INT(diag_syslog_per_error_limit, OB_CLUSTER_PARAMETER, "200", "[0,]", DEF_INT(diag_syslog_per_error_limit, OB_CLUSTER_PARAMETER, "200", "[0,]",
"DIAG syslog limitation for each error per second, exceeding syslog would be truncated", "DIAG syslog limitation for each error per second, exceeding syslog would be truncated",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_INT(max_syslog_file_count, OB_CLUSTER_PARAMETER, "0", "[0,]", DEF_INT_WITH_CHECKER(max_syslog_file_count, OB_CLUSTER_PARAMETER, "0",
"specifies the maximum number of the log files " common::ObConfigMaxSyslogFileCountChecker,
"that can co-exist before the log file recycling kicks in. " "specifies the maximum number of the log files "
"Each log file can occupy at most 256MB disk space. " "that can co-exist before the log file recycling kicks in. "
"When this value is set to 0, no log file will be removed. Range: [0, +∞) in integer", "Each log file can occupy at most 256MB disk space. "
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); "When this value is set to 0, no log file will be removed. Range: [0, +∞) in integer",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_BOOL(enable_async_syslog, OB_CLUSTER_PARAMETER, "True", DEF_BOOL(enable_async_syslog, OB_CLUSTER_PARAMETER, "True",
"specifies whether use async log for observer.log, elec.log and rs.log", "specifies whether use async log for observer.log, elec.log and rs.log",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
@ -212,6 +213,20 @@ DEF_BOOL(enable_syslog_recycle, OB_CLUSTER_PARAMETER, "False",
"specifies whether log file recycling is turned on. " "specifies whether log file recycling is turned on. "
"Value: True:turned on; False: turned off", "Value: True:turned on; False: turned off",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_CAP(syslog_disk_size, OB_CLUSTER_PARAMETER, "0M", "[0M,)",
"the size of disk space used by the syslog files. Range: [0, +∞)",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_STR_WITH_CHECKER(syslog_compress_func, OB_CLUSTER_PARAMETER, "none",
common::ObConfigSyslogCompressFuncChecker,
"compress function name for syslog files, "
"values: none, zstd_1.0, zstd_1.3.8",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_INT_WITH_CHECKER(syslog_file_uncompressed_count, OB_CLUSTER_PARAMETER, "0",
common::ObConfigSyslogFileUncompressedCountChecker,
"specifies the minimum number of the syslog files that will not be compressed. "
"Each syslog file can occupy at most 256MB disk space. "
"When this value is set to 0, all syslog file may be compressed. Range: [0, +∞) in integer",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_INT(memory_limit_percentage, OB_CLUSTER_PARAMETER, "80", "[10, 95]", DEF_INT(memory_limit_percentage, OB_CLUSTER_PARAMETER, "80", "[10, 95]",
"the size of the memory reserved for internal use(for testing purpose). Range: [10, 95]", "the size of the memory reserved for internal use(for testing purpose). Range: [10, 95]",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));

View File

@ -222,6 +222,9 @@ standby_db_preferred_upstream_log_region
standby_fetch_log_bandwidth_limit standby_fetch_log_bandwidth_limit
storage_meta_cache_priority storage_meta_cache_priority
strict_check_os_params strict_check_os_params
syslog_compress_func
syslog_disk_size
syslog_file_uncompressed_count
syslog_io_bandwidth_limit syslog_io_bandwidth_limit
syslog_level syslog_level
system_memory system_memory