diff --git a/deps/oblib/src/lib/CMakeLists.txt b/deps/oblib/src/lib/CMakeLists.txt index b08baa5ef5..94211bd832 100644 --- a/deps/oblib/src/lib/CMakeLists.txt +++ b/deps/oblib/src/lib/CMakeLists.txt @@ -261,6 +261,7 @@ ob_set_subtarget(oblib_lib oblog oblog/ob_base_log_writer.cpp oblog/ob_easy_log.cpp oblog/ob_log.cpp + oblog/ob_log_compressor.cpp oblog/ob_log_module.ipp oblog/ob_log_time_fmt.cpp oblog/ob_trace_log.cpp diff --git a/deps/oblib/src/lib/compress/ob_compress_util.h b/deps/oblib/src/lib/compress/ob_compress_util.h index 0976c9f2ee..257c10a03f 100644 --- a/deps/oblib/src/lib/compress/ob_compress_util.h +++ b/deps/oblib/src/lib/compress/ob_compress_util.h @@ -77,6 +77,12 @@ const char *const perf_compress_funcs[] = "zstd_1.3.8", }; +const char *const syslog_compress_funcs[] = +{ + "none", + "zstd_1.0", + "zstd_1.3.8", +}; } /* namespace common */ } /* namespace oceanbase */ diff --git a/deps/oblib/src/lib/oblog/ob_log.cpp b/deps/oblib/src/lib/oblog/ob_log.cpp index a4c43b22a6..af0472a4d4 100644 --- a/deps/oblib/src/lib/oblog/ob_log.cpp +++ b/deps/oblib/src/lib/oblog/ob_log.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include "lib/oblog/ob_warning_buffer.h" #include "lib/ob_errno.h" #include "lib/profile/ob_trace_id.h" @@ -31,6 +32,7 @@ #include "lib/allocator/ob_vslice_alloc.h" #include "lib/allocator/ob_fifo_allocator.h" #include "common/ob_smart_var.h" +#include "lib/oblog/ob_log_compressor.h" using namespace oceanbase::lib; @@ -478,8 +480,8 @@ ObLogger::ObLogger() enable_wf_flag_(false), rec_old_file_flag_(false), can_print_(true), enable_async_log_(true), use_multi_flush_(false), stop_append_log_(false), enable_perf_mode_(false), last_async_flush_count_per_sec_(0), log_mem_limiter_(nullptr), - allocator_(nullptr), error_allocator_(nullptr), enable_log_limit_(true), is_arb_replica_(false), - new_file_info_(nullptr), info_as_wdiag_(true) + allocator_(nullptr), error_allocator_(nullptr), log_compressor_(nullptr), enable_log_limit_(true), + is_arb_replica_(false), new_file_info_(nullptr), info_as_wdiag_(true) { id_level_map_.set_level(OB_LOG_LEVEL_DBA_ERROR); @@ -795,7 +797,7 @@ void ObLogger::rotate_log(const char *filename, if (file_list.size() >= max_file_index_) { std::string oldFile = file_list.front(); file_list.pop_front(); - unlink(oldFile.c_str()); + unlink_if_need(oldFile.c_str()); } file_list.push_back(old_log_file); (void)pthread_mutex_unlock(&file_index_mutex_); @@ -848,6 +850,10 @@ void ObLogger::rotate_log(const char *filename, } } } + // awake log compressor when creating new log files + if (OB_NOT_NULL(log_compressor_)) { + log_compressor_->awake(); + } } } UNUSED(ret); @@ -935,6 +941,17 @@ int ObLogger::set_record_old_log_file(bool rec_old_file_flag) return ret; } +int ObLogger::set_log_compressor(ObLogCompressor *log_compressor) +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(log_compressor)) { + ret = OB_INVALID_ARGUMENT; + } else { + log_compressor_ = log_compressor; + } + return ret; +} + //@brief string copy with dst's length and src's length checking and src trim. int64_t str_copy_trim(char *dst, const int64_t dst_length, @@ -1259,6 +1276,7 @@ int ObLogger::get_log_files_in_dir(const char *filename, void *files, void *wf_f int ret = OB_SUCCESS; char *dirc = NULL; char *basec = NULL; + regex_t uncompressed_regex; if (OB_ISNULL(files) || OB_ISNULL(wf_files)) { ret = OB_INVALID_ARGUMENT; OB_LOG(WARN, "Input should not be NULL", K(files), K(wf_files), K(ret)); @@ -1271,6 +1289,8 @@ int ObLogger::get_log_files_in_dir(const char *filename, void *files, void *wf_f } else if (NULL == (basec = strdup(filename))) { ret = OB_ALLOCATE_MEMORY_FAILED; OB_LOG(ERROR, "strdup filename error", K(ret)); + } else if (OB_FAIL(regcomp(&uncompressed_regex, OB_UNCOMPRESSED_SYSLOG_FILE_PATTERN, REG_EXTENDED))) { + OB_LOG(ERROR, "failed to compile regex pattern", K(ret)); } else { ObIArray *files_arr = static_cast *>(files); ObIArray *wf_files_arr = static_cast *>(wf_files); @@ -1296,6 +1316,10 @@ int ObLogger::get_log_files_in_dir(const char *filename, void *files, void *wf_f FileName tmp_file; struct dirent *dir_entry = NULL;//dir_entry is from dir_pointer stream, need not to be freed. int64_t print_len = 0; + bool enable_delete_compressed_file = true; + if (OB_NOT_NULL(log_compressor_) && log_compressor_->is_enable_compress()) { + enable_delete_compressed_file = false; + } while (OB_SUCC(ret) && (dir_entry = readdir(dir_pointer)) != NULL) { if (DT_DIR != dir_entry->d_type) { if (prefix_match(wf_file_prefix, dir_entry->d_name)) { @@ -1307,7 +1331,8 @@ int ObLogger::get_log_files_in_dir(const char *filename, void *files, void *wf_f } else { }//do nothing } else if (prefix_match(wf_file, dir_entry->d_name)) { //.wf file, do nothing. - } else if (prefix_match(file_prefix, dir_entry->d_name)) { + } else if (prefix_match(file_prefix, dir_entry->d_name) + && (enable_delete_compressed_file || regexec(&uncompressed_regex, dir_entry->d_name, 0, NULL, 0) == 0)) { print_len = snprintf(tmp_file.file_name_, ObPLogFileStruct::MAX_LOG_FILE_NAME_SIZE, "%s/%s", dir_name, dir_entry->d_name); if (OB_UNLIKELY(print_len <0) || OB_UNLIKELY(print_len >= ObPLogFileStruct::MAX_LOG_FILE_NAME_SIZE)) { //do nothing @@ -1322,6 +1347,7 @@ int ObLogger::get_log_files_in_dir(const char *filename, void *files, void *wf_f OB_LOG(WARN, "Close dir error", K(ret)); } } + regfree(&uncompressed_regex); } if (NULL != dirc) { free(dirc); @@ -1382,7 +1408,7 @@ int ObLogger::add_files_to_list(void *files, if (file_list.size() >= max_file_index_) { oldFile = file_list.front(); file_list.pop_front(); - unlink(oldFile.c_str()); + unlink_if_need(oldFile.c_str()); } file_list.push_back(files_arr->at(i).file_name_); } @@ -1882,6 +1908,13 @@ int ObLogger::log_new_file_info(const ObPLogFileStruct &log_file) return ret; } +void ObLogger::unlink_if_need(const char *file) +{ + if (OB_ISNULL(log_compressor_) || !log_compressor_->is_enable_compress()) { + unlink(file); + } +} + void ObLogger::issue_dba_error(const int errcode, const char *file, const int line, const char *info_str) { const char *base_file_name = strrchr(file, '/'); diff --git a/deps/oblib/src/lib/oblog/ob_log.h b/deps/oblib/src/lib/oblog/ob_log.h index b8d4f27473..e6e956e91d 100644 --- a/deps/oblib/src/lib/oblog/ob_log.h +++ b/deps/oblib/src/lib/oblog/ob_log.h @@ -62,6 +62,7 @@ class ObVSliceAlloc; class ObBlockAllocMgr; class ObFIFOAllocator; class ObPLogItem; +class ObLogCompressor; extern void allow_next_syslog(int64_t count = 1); extern int logdata_vprintf(char *buf, const int64_t buf_len, int64_t &pos, const char *fmt, va_list args); @@ -556,6 +557,8 @@ public: //@brief Set whether record old log file. If this flag and max_file_index set, //will record log files in the directory for log file int set_record_old_log_file(bool rec_old_file_flag = false); + int set_log_compressor(ObLogCompressor *log_compressor); + int64_t get_max_file_index() const { return max_file_index_; } //@brief Get the process-only ObLogger. static ObLogger &get_logger(); @@ -747,6 +750,7 @@ private: int log_new_file_info(const ObPLogFileStruct &log_file); void drop_log_items(ObIBaseLogItem **items, const int64_t item_cnt) override; + void unlink_if_need(const char *file); private: static const char *const errstr_[]; // default log rate limiter if there's no tl_log_limiger @@ -806,6 +810,7 @@ private: ObBlockAllocMgr* log_mem_limiter_; ObVSliceAlloc* allocator_; ObFIFOAllocator* error_allocator_; + ObLogCompressor* log_compressor_; // juse use it for test promise log print bool enable_log_limit_; RLOCAL_STATIC(ByteBuf, local_buf_); diff --git a/deps/oblib/src/lib/oblog/ob_log_compressor.cpp b/deps/oblib/src/lib/oblog/ob_log_compressor.cpp new file mode 100644 index 0000000000..f4ad84f760 --- /dev/null +++ b/deps/oblib/src/lib/oblog/ob_log_compressor.cpp @@ -0,0 +1,543 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#define USING_LOG_PREFIX LIB +#include +#include +#include +#include + +#include "lib/oblog/ob_log_compressor.h" +#include "lib/thread/thread_mgr.h" +#include "lib/compress/ob_compressor_pool.h" + +using namespace oceanbase::lib; + +namespace oceanbase { +namespace common { + +ObLogCompressor::ObLogCompressor() : + is_inited_(false), stopped_(true), loop_interval_(OB_SYSLOG_COMPRESS_LOOP_INTERVAL), + max_disk_size_(0), min_uncompressed_count_(0), compress_func_(NONE_COMPRESSOR), + compressor_(NULL), next_compressor_(NULL), oldest_files_(cmp_, NULL) +{} + +ObLogCompressor::~ObLogCompressor() +{ + if (is_inited_) { + destroy(); + } +} + +int ObLogCompressor::init() +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(is_inited_)) { + ret = OB_INIT_TWICE; + LOG_ERROR("the ObLogCompressor has been inited", K(ret)); + } else if (OB_FAIL(log_compress_cond_.init(ObWaitEventIds::DEFAULT_COND_WAIT))) { + ret = OB_ERR_SYS; + LOG_ERROR("failed to init ObThreadCond", K(ret)); + } else { + strncpy(syslog_dir_, OB_SYSLOG_DIR, strlen(OB_SYSLOG_DIR)); + stopped_ = false; + if (OB_FAIL(TG_SET_RUNNABLE_AND_START(TGDefIDs::SYSLOG_COMPRESS, *this))) { + LOG_ERROR("failed to start log compression thread", K(ret)); + } else { + is_inited_ = true; + } + if (OB_FAIL(ret)) { + LOG_ERROR("syslog compressor init failed ", K(ret)); + stopped_ = true; + log_compress_cond_.destroy(); + } + } + LOG_INFO("syslog compressor init finish ", K(ret)); + + return ret; +} + +void ObLogCompressor::stop() +{ + if (is_inited_) { + stopped_ = true; + TG_STOP(TGDefIDs::SYSLOG_COMPRESS); + } +} + +void ObLogCompressor::wait() +{ + if (is_inited_) { + TG_WAIT(TGDefIDs::SYSLOG_COMPRESS); + } +} + +void ObLogCompressor::awake() +{ + ObThreadCondGuard guard(log_compress_cond_); + log_compress_cond_.signal(); +} + +void ObLogCompressor::destroy() +{ + if (is_inited_) { + { + ObThreadCondGuard guard(log_compress_cond_); + stopped_ = true; + log_compress_cond_.signal(); + } + stop(); + wait(); + log_compress_cond_.destroy(); + max_disk_size_ = 0; + compress_func_ = NONE_COMPRESSOR; + min_uncompressed_count_ = 0; + compressor_ = NULL; + next_compressor_ = NULL; + is_inited_ = false; + LOG_INFO("syslog compressor destroyed"); + } +} + +int ObLogCompressor::set_max_disk_size(int64_t max_disk_size) +{ + int ret = OB_SUCCESS; + if (max_disk_size < 0) { + ret = OB_INVALID_ARGUMENT; + } else { + if (max_disk_size > 0 && max_disk_size < OB_MIN_SYSLOG_DISK_SIZE) { + max_disk_size = OB_MIN_SYSLOG_DISK_SIZE; + } + if ((max_disk_size_ == 0 && max_disk_size > 0) + || (max_disk_size_ > 0 && max_disk_size_ > max_disk_size)) { + max_disk_size_ = max_disk_size; + awake(); + } else { + max_disk_size_ = max_disk_size; + } + } + return ret; +} + +int ObLogCompressor::set_compress_func(const char *compress_func_ptr) +{ + int ret = OB_SUCCESS; + ObCompressorType new_compress_func; + if (OB_ISNULL(compress_func_ptr)) { + ret = OB_INVALID_ARGUMENT; + } else if (OB_FAIL(ObCompressorPool::get_instance().get_compressor_type(compress_func_ptr, new_compress_func))) { + // do nothing + } else if (new_compress_func == NONE_COMPRESSOR + || new_compress_func == ZSTD_COMPRESSOR + || new_compress_func == ZSTD_1_3_8_COMPRESSOR) { + // to do: support ZLIB_COMPRESSOR + if (new_compress_func != compress_func_) { + LOG_INFO("modify log compress func", K(compress_func_), K(new_compress_func)); + if (OB_FAIL(set_next_compressor_(new_compress_func))) { + LOG_ERROR("fail to modify log compress func", K(ret), K(compress_func_), K(new_compress_func)); + } else if (compress_func_ == NONE_COMPRESSOR && new_compress_func != NONE_COMPRESSOR) { + compress_func_ = new_compress_func; + // from disable to enanble, awake compress thread + awake(); + } else { + compress_func_ = new_compress_func; + } + } + } else { + ret = OB_INVALID_ARGUMENT; + } + return ret; +} + +int ObLogCompressor::set_min_uncompressed_count(int64_t min_uncompressed_count) +{ + int ret = OB_SUCCESS; + if (min_uncompressed_count < 0) { + ret = OB_INVALID_ARGUMENT; + } else { + min_uncompressed_count_ = min_uncompressed_count; + } + return ret; +} + +bool ObLogCompressor::is_compressed_file(const char *file) +{ + int ret = OB_SUCCESS; + bool is_compressed = false; + regex_t regex; + if (OB_ISNULL(file)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argumet for file is null", K(ret)); + } else if (OB_FAIL(regcomp(®ex, OB_COMPRESSED_SYSLOG_FILE_PATTERN, REG_EXTENDED))) { + LOG_ERROR("failed to compile regex pattern", K(ret)); + } else { + if (regexec(®ex, file, 0, NULL, 0) == 0) { + is_compressed = true; + } + regfree(®ex); + } + return is_compressed; +} + +void ObLogCompressor::run1() +{ + lib::set_thread_name("SyslogCompress"); + LOG_INFO("syslog compress thread start"); + log_compress_loop_(); + LOG_INFO("syslog compress thread finish"); +} + +void ObLogCompressor::log_compress_loop_() +{ + int ret = OB_SUCCESS; + int log_type = OB_SYSLOG_COMPRESS_TYPE_COUNT; + const int src_size = OB_SYSLOG_COMPRESS_BLOCK_SIZE; + const int dest_size = OB_SYSLOG_COMPRESS_BUFFER_SIZE; + char *src_buf = (char *)ob_malloc(src_size + dest_size, "SyslogCompress"); + char *dest_buf = src_buf + src_size; + regex_t regex_archive; + regex_t regex_uncompressed; + + if (OB_ISNULL(src_buf)) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_ERROR("failed to ob_malloc", K(ret)); + } else if (OB_FAIL(regcomp(®ex_archive, OB_ARCHIVED_SYSLOG_FILE_PATTERN, REG_EXTENDED))) { + ret = OB_ERR_SYS; + LOG_ERROR("failed to compile archive regex pattern", K(ret)); + } else if (OB_FAIL(regcomp(®ex_uncompressed, OB_UNCOMPRESSED_SYSLOG_FILE_PATTERN, REG_EXTENDED))) { + regfree(®ex_archive); + ret = OB_ERR_SYS; + LOG_ERROR("failed to compile uncompressed regex pattern", K(ret)); + } else { + ObSyslogFile syslog_file; + char compress_files[OB_SYSLOG_COMPRESS_TYPE_COUNT][OB_MAX_SYSLOG_FILE_NAME_SIZE] = {{0}}; + int64_t log_file_count[OB_SYSLOG_COMPRESS_TYPE_COUNT] = {0}; + int64_t log_min_time[OB_SYSLOG_COMPRESS_TYPE_COUNT] = {0}; + int64_t compressed_file_count = 0; + int64_t deleted_file_count = 0; + + while (!stopped_) { + // wait until stoped or needing to work + { + common::ObThreadCondGuard guard(log_compress_cond_); + while (!stopped_ && !is_enable_compress() && max_disk_size_ <= 0 && OB_LOGGER.get_max_file_index() <= 0) { + log_compress_cond_.wait_us(loop_interval_); + } + } + + if (!stopped_) { + // record start time + int64_t start_time = ObClockGenerator::getClock(); + bool enable_delete_file = max_disk_size_ > 0 || OB_LOGGER.get_max_file_index() > 0; + + // check whether need to compress or delete file + int64_t total_size = 0; + struct dirent* entry; + struct stat stat_info; + DIR* dir = NULL; + compressed_file_count = 0; + deleted_file_count = 0; + for (int i = 0; i < OB_SYSLOG_COMPRESS_TYPE_COUNT; i++) { + log_file_count[i] = 0; + log_min_time[i] = INT64_MAX; + } + oldest_files_.reset(); + + if (OB_ISNULL(dir = opendir(syslog_dir_))) { + ret = OB_ERR_SYS; + LOG_ERROR("failed to open syslog directory", K(ret), K(errno), K(syslog_dir_)); + } else { + while (OB_NOT_NULL(entry = readdir(dir))) { + if (strncmp(entry->d_name, ".", 1) == 0 || strncmp(entry->d_name, "..", 2) == 0) { + continue; + } + snprintf(syslog_file.file_name_, OB_MAX_SYSLOG_FILE_NAME_SIZE, "%s/%s", syslog_dir_, entry->d_name); + if (stat(syslog_file.file_name_, &stat_info) == -1) { + ret = OB_ERR_SYS; + LOG_WARN("failed to get file info", K(ret), K(errno), K(syslog_file.file_name_)); + continue; + } + if (S_ISREG(stat_info.st_mode)) { + total_size += stat_info.st_size; + int64_t tmp_time = stat_info.st_mtim.tv_sec * 1000000000L + stat_info.st_mtim.tv_nsec; + syslog_file.mtime_ = tmp_time; + if (enable_delete_file + && regexec(®ex_archive, entry->d_name, 0, NULL, 0) == 0 + && OB_FAIL(oldest_files_.push(syslog_file))) { + LOG_ERROR("failed to put file into array", K(ret), K(syslog_file.file_name_), K(tmp_time)); + } + + if (regexec(®ex_uncompressed, entry->d_name, 0, NULL, 0) == 0) { + int log_type = get_log_type_(syslog_file.file_name_); + if (log_type >= 0 && log_type < OB_SYSLOG_COMPRESS_TYPE_COUNT) { + log_file_count[log_type]++; + if (tmp_time < log_min_time[log_type]) { + strncpy(compress_files[log_type], syslog_file.file_name_, OB_MAX_SYSLOG_FILE_NAME_SIZE); + log_min_time[log_type] = tmp_time; + } + } + } + } + } + } + if (OB_NOT_NULL(dir)) { + closedir(dir); + } + + // get disk remaining size + int64_t disk_remaining_size = get_disk_remaining_size_(); + disk_remaining_size = disk_remaining_size >=0 ? disk_remaining_size : INT64_MAX; + if (max_disk_size_ > 0 && max_disk_size_ - total_size < disk_remaining_size) { + disk_remaining_size = max_disk_size_ - total_size; + } + + // compress syslog file if necessary + if (!stopped_ && is_enable_compress() && disk_remaining_size < OB_SYSLOG_COMPRESS_RESERVE_SIZE) { + if (compressor_ != next_compressor_) { + compressor_ = next_compressor_; + } + if (OB_ISNULL(compressor_)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("unexpected error, compressor is null", K(ret)); + } else { + for (int i = 0; i < OB_SYSLOG_COMPRESS_TYPE_COUNT && is_enable_compress(); i++) { + if (log_file_count[i] > min_uncompressed_count_) { + int64_t file_size = get_file_size_(compress_files[i]); + if (OB_FAIL(compress_single_file_(compress_files[i], src_buf, dest_buf))) { + LOG_ERROR("failed to compress file", K(ret), K(compress_files[i])); + } else { + // estimated value + total_size -= file_size; + compressed_file_count++; + } + log_file_count[i]--; + } + } + } + } + + // delete oldest syslog file if necessary + enable_delete_file = enable_delete_file && (max_disk_size_ > 0 || OB_LOGGER.get_max_file_index() > 0); + if (!stopped_ && enable_delete_file && disk_remaining_size < OB_SYSLOG_DELETE_RESERVE_SIZE) { + int array_size = oldest_files_.count(); + const char *delete_file = NULL; + const ObSyslogFile *syslog_ptr = NULL; + int64_t need_delete_size = OB_SYSLOG_DELETE_RESERVE_SIZE - disk_remaining_size; + for (int i = 0; i < array_size && need_delete_size > 0 && OB_SUCC(ret); i++) { + if (OB_FAIL(oldest_files_.top(syslog_ptr))) { + break; + } else if (OB_NOT_NULL(syslog_ptr)) { + delete_file = syslog_ptr->file_name_; + int64_t delete_file_size = get_file_size_(delete_file); + if (delete_file_size >= 0) { + LOG_DEBUG("log compressor unlink file", K(delete_file), K(need_delete_size), K(delete_file_size)); + unlink(delete_file); + need_delete_size = need_delete_size - delete_file_size; + disk_remaining_size += delete_file_size; + deleted_file_count++; + } + delete_file = NULL; + ret = oldest_files_.pop(); + } + } + } + + // record cost time, sleep + int64_t cost_time = ObClockGenerator::getClock() - start_time; + LOG_INFO("log compressor cycles once. ", K(ret), K(cost_time), + K(compressed_file_count), K(deleted_file_count), K(disk_remaining_size)); + cost_time = cost_time >= 0 ? cost_time:0; + if (!stopped_ && cost_time < loop_interval_) { + usleep(loop_interval_ - cost_time); + } + } // if (!stopped_) + } // while (!stopped_) + regfree(®ex_archive); + regfree(®ex_uncompressed); + } + if (OB_NOT_NULL(src_buf)) { + ob_free(src_buf); + } +} + +int ObLogCompressor::get_compressed_file_name_(const char *file_name, char compressed_file_name[]) +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(file_name)) { + ret = OB_INVALID_ARGUMENT; + } else { + int size = strnlen(file_name, OB_MAX_SYSLOG_FILE_NAME_SIZE); + int suffix_size = strlen(OB_SYSLOG_COMPRESS_ZSTD_SUFFIX); + if (size <= 0 || size + suffix_size + 1 > OB_MAX_SYSLOG_FILE_NAME_SIZE) { + ret = OB_INVALID_ARGUMENT; + } else { + strncpy(compressed_file_name, file_name, size); + strncpy(compressed_file_name + size, OB_SYSLOG_COMPRESS_ZSTD_SUFFIX, suffix_size); + *(compressed_file_name + size + suffix_size) = '\0'; + } + } + return ret; +} + + +int ObLogCompressor::compress_single_block_( + char *dest, size_t dest_size, const char *src, size_t src_size, size_t &return_size) +{ + int ret = OB_SUCCESS; + int64_t size = -1; + if (OB_FAIL(((ObCompressor *)compressor_)->compress(src, src_size, dest, dest_size, size))) { + LOG_ERROR("failed to compress single block", K(ret)); + } else { + return_size = size; + } + return ret; +} + +int ObLogCompressor::compress_single_file_(const char *file_name, char *src_buf, char *dest_buf) +{ + int ret = OB_SUCCESS; + + if (OB_ISNULL(file_name) || OB_ISNULL(src_buf) || OB_ISNULL(dest_buf)) { + ret = OB_INVALID_ARGUMENT; + } else { + static const int sleep_us = 50 * 1000; // 50ms + int src_size = OB_SYSLOG_COMPRESS_BLOCK_SIZE; + int dest_size = OB_SYSLOG_COMPRESS_BUFFER_SIZE; + char compressed_file_name[OB_MAX_SYSLOG_FILE_NAME_SIZE]; + FILE *input_file = NULL; + FILE *output_file = NULL; + // record file modify time + struct stat st; + stat(file_name, &st); + time_t last_modified_time = st.st_mtime; + + if (OB_FAIL(get_compressed_file_name_(file_name, compressed_file_name))) { + LOG_ERROR("failed to get compressed file name", K(ret), K(file_name)); + } else if (strlen(compressed_file_name) < strlen(file_name)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("failed to get compressed file name", K(ret), K(file_name)); + } else if (NULL == (input_file = fopen(file_name, "r"))) { + ret = OB_ERR_SYS; + LOG_ERROR("failed to open file", K(ret), K(errno), K(file_name)); + } else if (NULL == (output_file = fopen(compressed_file_name, "w"))) { + ret = OB_ERR_SYS; + fclose(input_file); + LOG_ERROR("failed to open file", K(ret), K(errno), K(compressed_file_name)); + } else { + LOG_DEBUG("log compressor compress file ", K(file_name), K(compressed_file_name)); + size_t read_size = 0; + size_t write_size = 0; + while (OB_SUCC(ret) && !feof(input_file)) { + if ((read_size = fread(src_buf, 1, src_size, input_file)) > 0) { + if (OB_FAIL(compress_single_block_(dest_buf, dest_size, src_buf, read_size, write_size))) { + LOG_ERROR("failed to compress syslog block", K(ret)); + } else if (write_size != fwrite(dest_buf, 1, write_size, output_file)) { + ret = OB_ERR_SYS; + LOG_ERROR("failed to write file", K(ret), K(errno), K(compressed_file_name)); + } + } + usleep(sleep_us); + } + fclose(input_file); + fclose(output_file); + if (OB_FAIL(ret)) { + unlink(compressed_file_name); + } else { + unlink(file_name); + (void)set_last_modify_time_(compressed_file_name, last_modified_time); + } + } + } + + return ret; +} + +int ObLogCompressor::set_last_modify_time_(const char *file_name, const time_t &newTime) { + int ret = OB_SUCCESS; + struct utimbuf newTimes; + newTimes.actime = newTime; + newTimes.modtime = newTime; + + if (OB_ISNULL(file_name)) { + ret = OB_INVALID_ARGUMENT; + } else if (utime(file_name, &newTimes) != 0) { + ret = OB_ERR_SYS; + LOG_ERROR("failed to set the file's last modified time", K(ret), K(file_name), K(newTime)); + } + + return ret; +} + +int ObLogCompressor::set_next_compressor_(ObCompressorType compress_func) { + int ret = OB_SUCCESS; + ObCompressor *compressor = NULL; + if (compress_func == NONE_COMPRESSOR) { + next_compressor_ = NULL; + } else if (OB_FAIL(ObCompressorPool::get_instance().get_compressor(compress_func, compressor))) { + LOG_ERROR("Fail to get_compressor", K(ret), K(compress_func)); + } else { + next_compressor_ = compressor; + if (OB_ISNULL(compressor_)) { + compressor_ = compressor; + } + } + return ret; +} + +int ObLogCompressor::get_log_type_(const char *file_name) { + int type = OB_SYSLOG_COMPRESS_TYPE_COUNT; + int name_len = strnlen(file_name, OB_MAX_SYSLOG_FILE_NAME_SIZE); + if (name_len >= strlen("trace.log")) { + int dir_len = strlen(syslog_dir_); + if (name_len > dir_len + 1 + && 0 == strncmp(file_name, syslog_dir_, dir_len) + && file_name[dir_len] == '/') { + file_name = file_name + dir_len + 1; + } + for (int i = 0; i < OB_SYSLOG_COMPRESS_TYPE_COUNT; i++) { + if (0 == strncmp(file_name, OB_SYSLOG_FILE_PREFIX[i], strlen(OB_SYSLOG_FILE_PREFIX[i]))) { + type = i; + break; + } + } + } + + return type; +} + +int64_t ObLogCompressor::get_file_size_(const char *file_name) +{ + int64_t size = -1; + struct stat st; + if (stat(file_name, &st) == 0) { + size = st.st_size; + } + return size; +} + +int64_t ObLogCompressor::get_disk_remaining_size_() +{ + int ret = OB_SUCCESS; + int64_t remaining_size = 0; + struct statfs file_system; + if (statfs(syslog_dir_, &file_system) == -1) { + remaining_size = -1; + ret = OB_ERR_SYS; + LOG_ERROR("fail to get disk remaining size", K(ret), K(strerror(errno)), K(syslog_dir_)); + } else { + remaining_size = file_system.f_bsize * file_system.f_bavail; + } + return remaining_size; +} + +} // namespace common +} // namespace oceanbase diff --git a/deps/oblib/src/lib/oblog/ob_log_compressor.h b/deps/oblib/src/lib/oblog/ob_log_compressor.h new file mode 100644 index 0000000000..29b8c790f3 --- /dev/null +++ b/deps/oblib/src/lib/oblog/ob_log_compressor.h @@ -0,0 +1,196 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef OB_LOG_COMPRESSOR_H_ +#define OB_LOG_COMPRESSOR_H_ + +#include "lib/container/ob_heap.h" +#include "lib/lock/ob_thread_cond.h" +#include "lib/thread/thread_mgr_interface.h" + +namespace oceanbase { +namespace common { + +class ObCompressor; + +/* Log files are divided into blocks and then compressed. */ +static const int32_t OB_SYSLOG_COMPRESS_BLOCK_SIZE = OB_MALLOC_BIG_BLOCK_SIZE; +/* To prevent extreme cases where the files become larger after compression, + * the size of the decompression buffer needs to be larger than the original data. + * This size can refer to the ZSTD code implementation. + */ +static const int32_t OB_SYSLOG_COMPRESS_BUFFER_SIZE = + OB_SYSLOG_COMPRESS_BLOCK_SIZE + OB_SYSLOG_COMPRESS_BLOCK_SIZE / 128 + 512 + 19; +static const int32_t OB_MAX_SYSLOG_FILE_NAME_SIZE = ObPLogFileStruct::MAX_LOG_FILE_NAME_SIZE; +static const int32_t OB_SYSLOG_COMPRESS_TYPE_COUNT = FD_TRACE_FILE + 1; +/* by default, syslog file size = 256MB, and most of the logs are observer.log + * compressed file size ≈ 13MB, 13MB * 20 = 260MB, so OB_SYSLOG_DELETE_ARRAY_SIZE = 20 + 1 + * an extra position is reserved for newly inserted file name. + */ +static const int32_t OB_SYSLOG_DELETE_ARRAY_SIZE = 21; +static const int64_t OB_MIN_SYSLOG_DISK_SIZE = 2 * (1LL << 30); // 2GB +static const int64_t OB_SYSLOG_COMPRESS_RESERVE_SIZE = 4 * (1LL << 30); // 4GB +static const int64_t OB_SYSLOG_DELETE_RESERVE_SIZE = 2 * (1LL << 30); // 2GB +static const int64_t OB_SYSLOG_COMPRESS_LOOP_INTERVAL = 5000000; // 5s +static const char OB_SYSLOG_DIR[] = "log"; // same as LOG_DIR in src/observer/main.cpp +static const char OB_SYSLOG_COMPRESS_ZSTD_SUFFIX[] = ".zst"; // name suffix of file compressed by zstd +static const char OB_UNCOMPRESSED_SYSLOG_FILE_PATTERN[] = "^[a-z]+\\.log\\.[0-9]+$"; // only uncompressed files +static const char OB_COMPRESSED_SYSLOG_FILE_PATTERN[] = "^[a-z]+\\.log\\.[0-9]+\\.[a-z0-9]+$"; // only compressed files +static const char OB_ARCHIVED_SYSLOG_FILE_PATTERN[] = "^[a-z]+\\.log\\.[0-9]+[.a-z0-9]*$"; // all syslog files (excluding wf logs) +static const char *OB_SYSLOG_FILE_PREFIX[OB_SYSLOG_COMPRESS_TYPE_COUNT] = +{ + "observer.log", // FD_SVR_FILE + "rootservice.log", // FD_RS_FILE + "election.log", // FD_ELEC_FILE + "trace.log", // FD_TRACE_FILE +}; +STATIC_ASSERT(MAX_FD_FILE == 5, "if you add a new log type, add it's prefix here !!!"); + +#define OB_LOG_COMPRESSOR ::oceanbase::common::ObLogCompressor::get_log_compressor() + +struct ObSyslogFile +{ + ObSyslogFile() { reset(); } + void reset() + { + mtime_ = INT64_MAX; + memset(file_name_, 0, OB_MAX_SYSLOG_FILE_NAME_SIZE); + } + TO_STRING_KV(K_(mtime), K_(file_name)); + int64_t mtime_; + char file_name_[OB_MAX_SYSLOG_FILE_NAME_SIZE]; +}; + +struct ObSyslogCompareFunctor { + bool operator()(const ObSyslogFile &l, const ObSyslogFile &r) { + return l.mtime_ > r.mtime_; + } + int get_error_code() { return OB_SUCCESS; } +}; + +/* A priority array, order by modify time of files. */ +class ObSyslogPriorityArray : public ObBinaryHeap +{ +public: + ObSyslogPriorityArray(ObSyslogCompareFunctor &cmp, common::ObIAllocator *allocator = NULL) + : ObBinaryHeap(cmp, allocator) + {} + virtual ~ObSyslogPriorityArray() {} + + int push(const ObSyslogFile &element) + { + int ret = OB_SUCCESS; + if (OB_FAIL(array_.push_back(element))) { + } else if (OB_FAIL(upheap(array_.count() - 1))) { + } else if (array_.count() > OB_SYSLOG_DELETE_ARRAY_SIZE - 1) { + ret = pop_back(); + } + return ret; + } + +private: + int pop_back() + { + int ret = OB_SUCCESS; + if (OB_UNLIKELY(array_.empty())) { + ret = OB_EMPTY_RESULT; + } else { + int index = 0; + const ObSyslogFile *max_element = &array_.at(0); + for (int i = 1; i < array_.count(); i++) { + if (cmp_(array_.at(i), *max_element)) { + index = i; + max_element = &array_.at(i); + } + } + reset_root_cmp_cache(); + array_.at(index) = array_.at(array_.count() - 1); + array_.pop_back(); + if (index < array_.count()) { + if (index == get_root()) { + ret = downheap(index); + } else { + int64_t parent = get_parent(index); + if (cmp_(array_.at(parent), array_.at(index))) { + ret = upheap(index); + } else { + ret = downheap(index); + } + } + } + } + return ret; + } + +private: + using ObBinaryHeapBase::array_; + using ObBinaryHeapBase::cmp_; + using ObBinaryHeapBase::get_root; + using ObBinaryHeapBase::get_parent; + using ObBinaryHeapBase::reset_root_cmp_cache; + using ObBinaryHeap::upheap; + using ObBinaryHeap::downheap; +}; + +class ObLogCompressor final : public lib::TGRunnable { +public: + ObLogCompressor(); + virtual ~ObLogCompressor(); + int init(); + void stop(); + void wait(); + void awake(); + void destroy(); + int set_max_disk_size(int64_t max_disk_size); + int set_compress_func(const char *compress_func_ptr); + int set_min_uncompressed_count(int64_t min_uncompressed_count); + inline bool is_enable_compress() { return NONE_COMPRESSOR != compress_func_; } + static bool is_compressed_file(const char *file); + static inline ObLogCompressor& get_log_compressor() + { + static ObLogCompressor log_compressor; + return log_compressor; + } + TO_STRING_KV(K_(is_inited), K_(stopped), K_(loop_interval), K_(max_disk_size), + K_(min_uncompressed_count), K_(compress_func)); + +private: + void run1() override; + void log_compress_loop_(); + int get_compressed_file_name_(const char *file_name, char compressed_file_name[]); + int compress_single_block_(char *dest, size_t dest_size, const char *src, size_t src_size, size_t &return_size); + int compress_single_file_(const char *file_name, char *src_buf, char *dest_buf); + int set_last_modify_time_(const char *file_name, const time_t& newTime); + int set_next_compressor_(ObCompressorType compress_func); + int get_log_type_(const char *file_name); + int64_t get_file_size_(const char *file_name); + int64_t get_disk_remaining_size_(); + +private: + bool is_inited_; + bool stopped_; + int64_t loop_interval_; // don't modify + int64_t max_disk_size_; + int64_t min_uncompressed_count_; + char syslog_dir_[64]; + ObCompressorType compress_func_; + ObCompressor *compressor_; + ObCompressor *next_compressor_; + ObThreadCond log_compress_cond_; + ObSyslogCompareFunctor cmp_; + ObSyslogPriorityArray oldest_files_; +}; + +} // namespace common +} // namespace oceanbase + +#endif /* OB_LOG_COMPRESSOR_H_ */ diff --git a/deps/oblib/src/lib/thread/thread_define.h b/deps/oblib/src/lib/thread/thread_define.h index a528ff5574..aa3cb4533d 100644 --- a/deps/oblib/src/lib/thread/thread_define.h +++ b/deps/oblib/src/lib/thread/thread_define.h @@ -26,4 +26,5 @@ TG_DEF(MEMORY_DUMP, memDump, THREAD_POOL, 1) TG_DEF(SchemaRefTask, SchemaRefTask, DEDUP_QUEUE, 1, 1024, 1024, 1L << 30, 512L << 20, common::OB_MALLOC_BIG_BLOCK_SIZE, "SchemaDedupQueu") TG_DEF(ReqMemEvict, ReqMemEvict, TIMER) TG_DEF(replica_control, replica_control, THREAD_POOL, 1) +TG_DEF(SYSLOG_COMPRESS, SyslogCompress, THREAD_POOL, 1) #endif diff --git a/deps/oblib/unittest/lib/CMakeLists.txt b/deps/oblib/unittest/lib/CMakeLists.txt index e3192923af..77a5f6f0eb 100644 --- a/deps/oblib/unittest/lib/CMakeLists.txt +++ b/deps/oblib/unittest/lib/CMakeLists.txt @@ -74,6 +74,7 @@ oblib_addtest(net/test_ob_net_util.cpp) #oblib_addtest(number/test_number_v2.cpp) #oblib_addtest(oblog/test_base_log_buffer.cpp) oblib_addtest(oblog/test_base_log_writer.cpp) +oblib_addtest(oblog/test_ob_log_compressor.cpp) oblib_addtest(oblog/test_ob_log_obj.cpp) oblib_addtest(oblog/test_ob_log_performance.cpp) oblib_addtest(profile/test_ob_trace_id.cpp) diff --git a/deps/oblib/unittest/lib/oblog/test_ob_log_compressor.cpp b/deps/oblib/unittest/lib/oblog/test_ob_log_compressor.cpp new file mode 100644 index 0000000000..d4e5db8a7d --- /dev/null +++ b/deps/oblib/unittest/lib/oblog/test_ob_log_compressor.cpp @@ -0,0 +1,447 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#define USING_LOG_PREFIX LIB +#include +#include +#include +#include +#include +#include + +#define protected public +#define private public + +#include "lib/oblog/ob_log_compressor.h" +#include "lib/oblog/ob_log.h" +#include "lib/compress/ob_compressor_pool.h" +#include "common/ob_clock_generator.h" + +using namespace oceanbase::lib; +namespace oceanbase { +namespace common { + +const char *TEST_DIR = "testoblogcompressdir"; +const char *RM_COMMAND = "rm -rf testoblogcompressdir"; + +#define CREATE_FILES(file_count, modify_time, timestamp) \ + for (int i = 0; i < file_count; i++) { \ + snprintf(file_name, sizeof(file_name), "%s/%s.%ld", TEST_DIR, \ + OB_SYSLOG_FILE_PREFIX[i%OB_SYSLOG_COMPRESS_TYPE_COUNT], timestamp + i); \ + FILE *file =fopen(file_name, "w"); \ + ASSERT_EQ(true, OB_NOT_NULL(file)); \ + ASSERT_EQ(data_size, fwrite(data, 1, data_size, file)); \ + fclose(file); \ + OB_LOG_COMPRESSOR.set_last_modify_time_(file_name, modify_time + i*60); \ + } + +int get_file_count_by_regex(const char *dirname, const char *pattern) { + int count = 0; + regex_t regex; + struct dirent *entry; + DIR *dir = opendir(dirname); + + if (OB_ISNULL(dir)) { + count = -1; + } else if (OB_NOT_NULL(pattern) && regcomp(®ex, pattern, REG_EXTENDED | REG_NOSUB) != 0) { + closedir(dir); + count = -1; + } else { + while ((entry = readdir(dir)) != NULL) { + if (strncmp(entry->d_name, ".", 1) == 0 || strncmp(entry->d_name, "..", 2) == 0) { + continue; + } + if (OB_ISNULL(pattern) || regexec(®ex, entry->d_name, 0, NULL, 0) == 0) { + count++; + } + } + + closedir(dir); + if (OB_NOT_NULL(pattern)) { + regfree(®ex); + } + } + + return count; +} + +int compress_file_by_zstd(ObCompressor *compressor, const char *file_name_in, const char *file_name_out) +{ + int ret = OB_SUCCESS; + + int src_size = OB_SYSLOG_COMPRESS_BLOCK_SIZE; + int dest_size = OB_SYSLOG_COMPRESS_BUFFER_SIZE; + char *src_buf = (char *)ob_malloc(src_size + dest_size, ObModIds::OB_COMPRESSOR); + char *dest_buf = src_buf + src_size; + size_t read_size = 0; + int64_t w_size = 0; + int sleep_10us = 10 * 1000; + + FILE *input_file = fopen(file_name_in, "r"); + FILE *output_file = fopen(file_name_out, "w"); + fwrite(dest_buf, 1, w_size, output_file); + + while (OB_SUCC(ret) && !feof(input_file)) { + if ((read_size = fread(src_buf, 1, src_size, input_file)) > 0) { + if (OB_FAIL(compressor->compress(src_buf, read_size, dest_buf, dest_size, w_size))) { + printf("Failed to log_compress_block, err_code=%d.\n", ret); + } else if (w_size != fwrite(dest_buf, 1, w_size, output_file)) { + ret = OB_ERR_SYS; + printf("Failed to fwrite, err_code=%d.\n", errno); + } + } + } + fclose(input_file); + fclose(output_file); + + // clear environment + ob_free(src_buf); + + return ret; +} + +TEST(ObLogCompressor, log_priority_array_test) +{ + int ret = OB_SUCCESS; + const int MAX_FILE_COUNT = OB_SYSLOG_DELETE_ARRAY_SIZE - 1; + ObSyslogFile syslog; + const ObSyslogFile *file_name_out; + ObSyslogCompareFunctor cmp_; + ObSyslogPriorityArray priority_array(cmp_); + + for (int i = 0; i < MAX_FILE_COUNT; i++) { + syslog.mtime_ = 1500 + i; + snprintf(syslog.file_name_, sizeof(syslog.file_name_), "priority_array_test_%d", 1500 + i); + ASSERT_EQ(OB_SUCCESS, priority_array.push(syslog)); + } + ASSERT_EQ(MAX_FILE_COUNT, priority_array.count()); + for (int i = 0; i < MAX_FILE_COUNT/2; i++) { + syslog.mtime_ = 1200 + i; + snprintf(syslog.file_name_, sizeof(syslog.file_name_), "priority_array_test_%d", 1200 + i); + ASSERT_EQ(OB_SUCCESS, priority_array.push(syslog)); + } + ASSERT_EQ(MAX_FILE_COUNT, priority_array.count()); + for (int i = 0; i < MAX_FILE_COUNT/2; i++) { + file_name_out = NULL; + snprintf(syslog.file_name_, sizeof(syslog.file_name_), "priority_array_test_%d", 1200 + i); + ASSERT_EQ(OB_SUCCESS, priority_array.top(file_name_out)); + ASSERT_EQ(true, OB_NOT_NULL(file_name_out)); + ASSERT_EQ(0, strncmp(syslog.file_name_, file_name_out->file_name_, strlen(syslog.file_name_))); + ASSERT_EQ(OB_SUCCESS, priority_array.pop()); + } + for (int i = 0; i < MAX_FILE_COUNT/2; i++) { + file_name_out = NULL; + snprintf(syslog.file_name_, sizeof(syslog.file_name_), "priority_array_test_%d", 1500 + i); + ASSERT_EQ(OB_SUCCESS, priority_array.top(file_name_out)); + ASSERT_EQ(true, OB_NOT_NULL(file_name_out)); + ASSERT_EQ(0, strncmp(syslog.file_name_, file_name_out->file_name_, strlen(syslog.file_name_))); + ASSERT_EQ(OB_SUCCESS, priority_array.pop()); + } +} + +TEST(ObLogCompressor, base_compressor_test) +{ + int ret = OB_SUCCESS; + int test_count = 1000; + int test_size = test_count * sizeof(int); + ObCompressor *compressor_zstd; + + // get compressor + ASSERT_EQ(OB_SUCCESS, ObCompressorPool::get_instance().get_compressor(ZSTD_1_3_8_COMPRESSOR, compressor_zstd)); + + // prepare file + const char *file_name = "test_ob_log_compressor_file"; + const char *file_name_zstd = "test_ob_log_compressor_file.zst"; + std::string file_name_str = std::string(file_name); + std::string decompress_file_name_zstd = file_name_str + "_zstd"; + unlink(file_name_str.c_str()); + unlink(file_name_zstd); + unlink(decompress_file_name_zstd.c_str()); + + FILE *input_file = fopen(file_name, "w"); + ASSERT_EQ(true, NULL != input_file); + int data[test_count]; + for (int i = 0; i < test_count; i++) { + data[i] = i; + } + ASSERT_EQ(test_size, fwrite(data, 1, test_size, input_file)); + fclose(input_file); + // sleep 1s + sleep(1); + struct stat st, st_zstd; + ASSERT_EQ(0, stat(file_name, &st)); + + // compress + ASSERT_EQ(OB_SUCCESS, compress_file_by_zstd(compressor_zstd, file_name, file_name_zstd)); + ASSERT_EQ(0, stat(file_name_zstd, &st_zstd)); + ASSERT_EQ(true, st.st_mtime != st_zstd.st_mtime); // not equal + + // modify time + ASSERT_EQ(OB_SUCCESS, OB_LOG_COMPRESSOR.set_last_modify_time_(file_name_zstd, st.st_mtime)); + ASSERT_EQ(0, stat(file_name_zstd, &st_zstd)); + ASSERT_EQ(true, st.st_mtime == st_zstd.st_mtime); // equal + + // decompress, make sure your machine can exec zstd command + std::string decompress_zstd_command = "zstd -d " + std::string(file_name_zstd) + " -o " + decompress_file_name_zstd; + ASSERT_EQ(0, std::system(decompress_zstd_command.c_str())); + + // check + std::string compare_command = "diff -q " + file_name_str + " " + decompress_file_name_zstd + " > /dev/null"; + ASSERT_EQ(0, system(compare_command.c_str())); + + unlink(file_name_str.c_str()); + unlink(file_name_zstd); + unlink(decompress_file_name_zstd.c_str()); +} + +TEST(ObLogCompressor, syslog_compressor_base_test) +{ + int ret = OB_SUCCESS; + + // prepare + int data_count = 1000; + int loop_count = 2000; + int data_size = data_count * sizeof(int); + int data[data_count]; + const char *file_name = "testcompressfile.log.1234"; + const char *file_name_2 = "testcompressfile2.log.1234"; + const char *file_name_zstd = "testcompressfile.log.1234.zst"; + ASSERT_EQ(false, ObLogCompressor::is_compressed_file(file_name)); + ASSERT_EQ(true, ObLogCompressor::is_compressed_file(file_name_zstd)); + std::string file_name_str = std::string(file_name); + std::string file_name_str_2 = std::string(file_name_2); + std::string decompress_file_name_zstd = file_name_str + "_zstd"; + unlink(file_name_str.c_str()); + unlink(file_name_str_2.c_str()); + unlink(file_name_zstd); + unlink(decompress_file_name_zstd.c_str()); + + // function test + ASSERT_EQ(0, system(RM_COMMAND)); + ASSERT_EQ(0, mkdir(TEST_DIR, 0777)); + strncpy(OB_LOG_COMPRESSOR.syslog_dir_, TEST_DIR, strlen(TEST_DIR)); + int64_t free_size = OB_LOG_COMPRESSOR.get_disk_remaining_size_(); + printf("free_size:%ld\n", free_size); + ASSERT_GT(free_size, 0); + ASSERT_EQ(0, system(RM_COMMAND)); + + // prepare file data + FILE *input_file = fopen(file_name, "w"); + ASSERT_EQ(true, NULL != input_file); + for (int i = 0; i < data_count; i++) { + data[i] = i; + } + for (int j = 0; j < loop_count; j++) { + ASSERT_EQ(data_size, fwrite(data, 1, data_size, input_file)); + } + fclose(input_file); + std::string cp_file_command = "cp " + std::string(file_name) + " " + std::string(file_name_2); + ASSERT_EQ(0, std::system(cp_file_command.c_str())); + + // record last modify time + struct stat st, st_zstd; + ASSERT_EQ(0, stat(file_name, &st)); + + // compress + ObCompressor *compressor_zstd; + ASSERT_EQ(OB_SUCCESS, ObCompressorPool::get_instance().get_compressor(ZSTD_1_3_8_COMPRESSOR, compressor_zstd)); + LOG_INFO("start to init syslog compressor "); + ASSERT_EQ(OB_SUCCESS, OB_LOG_COMPRESSOR.init()); + strncpy(OB_LOG_COMPRESSOR.syslog_dir_, TEST_DIR, strlen(TEST_DIR)); + OB_LOG_COMPRESSOR.compressor_ = compressor_zstd; + ASSERT_EQ(true, OB_LOG_COMPRESSOR.is_inited_); + ASSERT_EQ(false, OB_LOG_COMPRESSOR.stopped_); + usleep(100000); // wait register pm in farm + + int src_size = OB_SYSLOG_COMPRESS_BLOCK_SIZE; + int dest_size = OB_SYSLOG_COMPRESS_BUFFER_SIZE; + char *src_buf = (char *)ob_malloc(src_size + dest_size, ObModIds::OB_LOG); + ASSERT_EQ(true, OB_NOT_NULL(src_buf)); + char *dest_buf = src_buf + src_size; + ASSERT_EQ(OB_SUCCESS, OB_LOG_COMPRESSOR.compress_single_file_(file_name, src_buf, dest_buf)); + ASSERT_EQ(0, stat(file_name_zstd, &st_zstd)); + ASSERT_EQ(true, st.st_mtime == st_zstd.st_mtime); // equal + + // decompress + std::string decompress_zstd_command = "zstd -d " + std::string(file_name_zstd) + " -o " + decompress_file_name_zstd; + ASSERT_EQ(0, std::system(decompress_zstd_command.c_str())); + std::string compare_command = "diff -q " + file_name_str_2 + " " + decompress_file_name_zstd + " > /dev/null"; + ASSERT_EQ(0, std::system(compare_command.c_str())); + + // error test + ASSERT_EQ(OB_INVALID_ARGUMENT, OB_LOG_COMPRESSOR.set_compress_func("zlib_1.0")); + ASSERT_EQ(OB_INVALID_ARGUMENT, OB_LOG_COMPRESSOR.set_compress_func("lz4_1.9.1")); + ASSERT_EQ(OB_SUCCESS, OB_LOG_COMPRESSOR.set_compress_func("none")); + + // clear + LOG_INFO("start to destroy syslog compressor "); + OB_LOG_COMPRESSOR.destroy(); + ob_free(src_buf); + unlink(file_name_str.c_str()); + unlink(file_name_str_2.c_str()); + unlink(file_name_zstd); + unlink(decompress_file_name_zstd.c_str()); +} + +TEST(ObLogCompressor, syslog_compressor_thread_test) +{ + int ret = OB_SUCCESS; + + // init log compressor + ASSERT_EQ(OB_SUCCESS, ObClockGenerator::get_instance().init()); + // loop faster + OB_LOG_COMPRESSOR.loop_interval_ = 100000; + LOG_INFO("start to init syslog compressor "); + ASSERT_EQ(OB_SUCCESS, OB_LOG_COMPRESSOR.init()); + ASSERT_EQ(true, OB_LOG_COMPRESSOR.is_inited_); + strncpy(OB_LOG_COMPRESSOR.syslog_dir_, TEST_DIR, strlen(TEST_DIR)); + + // prepare syslog file + const int MAX_SYSLOG_COUNT = 8; + const int min_uncompressed_count = 2; + char file_name[OB_MAX_SYSLOG_FILE_NAME_SIZE]; + char * file_name_out; + const char *data = "base_data.base_data.base_data.base_data.base_dat50base_data.base_data.base_data.base_data.base_da100" + "base_data.base_data.base_data.base_data.base_da150base_data.base_data.base_data.base_data.base_da200"; + int data_size = strlen(data); + int file_size = 200; + const char *pattern_uncompressed[OB_SYSLOG_COMPRESS_TYPE_COUNT] = + { + "^observer\\.log\\.[0-9]+$", + "^rootservice\\.log\\.[0-9]+$", + "^election\\.log\\.[0-9]+$", + "^trace\\.log\\.[0-9]+$", + }; + const char *pattern_compressed[OB_SYSLOG_COMPRESS_TYPE_COUNT] = + { + "^observer\\.log\\.[0-9]+\\.[a-z]+$", + "^rootservice\\.log\\.[0-9]+\\.[a-z]+$", + "^election\\.log\\.[0-9]+\\.[a-z]+$", + "^trace\\.log\\.[0-9]+\\.[a-z]+$", + }; + + // get modify time + time_t last_modified_time = time(NULL); + + last_modified_time -= 1000; + ASSERT_EQ(0, system(RM_COMMAND)); + ASSERT_EQ(0, mkdir(TEST_DIR, 0777)); + for (int i = 0; i < OB_SYSLOG_COMPRESS_TYPE_COUNT; i++) { + snprintf(file_name, sizeof(file_name), "%s/%s", TEST_DIR, OB_SYSLOG_FILE_PREFIX[i%OB_SYSLOG_COMPRESS_TYPE_COUNT]); + FILE *file =fopen(file_name, "w"); + ASSERT_EQ(true, OB_NOT_NULL(file)); + ASSERT_EQ(data_size, fwrite(data, 1, data_size, file)); + fclose(file); + OB_LOG_COMPRESSOR.set_last_modify_time_(file_name, last_modified_time + i*60); + } + last_modified_time -= 50000; + CREATE_FILES(MAX_SYSLOG_COUNT * OB_SYSLOG_COMPRESS_TYPE_COUNT, last_modified_time, 202311241000L) + + // set parameter + ASSERT_EQ(OB_SUCCESS, OB_LOGGER.set_max_file_index(MAX_SYSLOG_COUNT)); + ASSERT_EQ(OB_SUCCESS, OB_LOG_COMPRESSOR.set_min_uncompressed_count(min_uncompressed_count)); + int total_count = get_file_count_by_regex(TEST_DIR, NULL); + int last_total_count = 0; + ASSERT_EQ((MAX_SYSLOG_COUNT + 1) * OB_SYSLOG_COMPRESS_TYPE_COUNT, total_count); + for (int i = 0; i < OB_SYSLOG_COMPRESS_TYPE_COUNT; i++) { + int uncompressed_count = get_file_count_by_regex(TEST_DIR, pattern_uncompressed[i]); + ASSERT_EQ(MAX_SYSLOG_COUNT, uncompressed_count); + int compressed_count = get_file_count_by_regex(TEST_DIR, pattern_compressed[i]); + ASSERT_EQ(0, compressed_count); + } + + // enable compress + int64_t max_disk_size = OB_SYSLOG_COMPRESS_RESERVE_SIZE + file_size * ((MAX_SYSLOG_COUNT + 1 - 2)* OB_SYSLOG_COMPRESS_TYPE_COUNT); + ASSERT_EQ(OB_SUCCESS, OB_LOG_COMPRESSOR.set_max_disk_size(max_disk_size)); + ASSERT_EQ(OB_SUCCESS, OB_LOG_COMPRESSOR.set_compress_func("zstd_1.3.8")); + sleep(2); // 2s + total_count = get_file_count_by_regex(TEST_DIR, NULL); + ASSERT_EQ((MAX_SYSLOG_COUNT + 1) * OB_SYSLOG_COMPRESS_TYPE_COUNT, total_count); + for (int i = 0; i < OB_SYSLOG_COMPRESS_TYPE_COUNT; i++) { + int uncompressed_count = get_file_count_by_regex(TEST_DIR, pattern_uncompressed[i]); + ASSERT_LE(uncompressed_count, MAX_SYSLOG_COUNT - 2); + ASSERT_GE(uncompressed_count, min_uncompressed_count); + int compressed_count = get_file_count_by_regex(TEST_DIR, pattern_compressed[i]); + ASSERT_GE(compressed_count, 2); + } + + // add more syslog file + last_modified_time += 10000; + CREATE_FILES(2 * OB_SYSLOG_COMPRESS_TYPE_COUNT, last_modified_time, 202311242000L) + sleep(1); + total_count = get_file_count_by_regex(TEST_DIR, NULL); + ASSERT_EQ((MAX_SYSLOG_COUNT + 3) * OB_SYSLOG_COMPRESS_TYPE_COUNT, total_count); + for (int i = 0; i < OB_SYSLOG_COMPRESS_TYPE_COUNT; i++) { + int uncompressed_count = get_file_count_by_regex(TEST_DIR, pattern_uncompressed[i]); + ASSERT_LE(uncompressed_count, MAX_SYSLOG_COUNT - 2); + ASSERT_GE(uncompressed_count, min_uncompressed_count); + int compressed_count = get_file_count_by_regex(TEST_DIR, pattern_compressed[i]); + ASSERT_GE(compressed_count, 4); + } + + // make syslog_disk_size smaller, begin to delete file + max_disk_size = OB_SYSLOG_DELETE_RESERVE_SIZE + file_size * (4 * OB_SYSLOG_COMPRESS_TYPE_COUNT); + ASSERT_EQ(OB_SUCCESS, OB_LOG_COMPRESSOR.set_max_disk_size(max_disk_size)); + sleep(1); + total_count = get_file_count_by_regex(TEST_DIR, NULL); + ASSERT_LT(total_count, (MAX_SYSLOG_COUNT + 3) * OB_SYSLOG_COMPRESS_TYPE_COUNT); + ASSERT_GE(total_count, 4 * OB_SYSLOG_COMPRESS_TYPE_COUNT); + for (int i = 0; i < OB_SYSLOG_COMPRESS_TYPE_COUNT; i++) { + int uncompressed_count = get_file_count_by_regex(TEST_DIR, pattern_uncompressed[i]); + ASSERT_GE(uncompressed_count, min_uncompressed_count); + } + last_total_count = total_count; + + // add more syslog file + last_modified_time += 5000; + CREATE_FILES(2 * OB_SYSLOG_COMPRESS_TYPE_COUNT, last_modified_time, 202311243000L) + sleep(1); + total_count = get_file_count_by_regex(TEST_DIR, NULL); + ASSERT_LE(total_count, last_total_count); + ASSERT_GE(total_count, 4 * OB_SYSLOG_COMPRESS_TYPE_COUNT); + for (int i = 0; i < OB_SYSLOG_COMPRESS_TYPE_COUNT; i++) { + int uncompressed_count = get_file_count_by_regex(TEST_DIR, pattern_uncompressed[i]); + ASSERT_GE(uncompressed_count, min_uncompressed_count); + } + + // disable compress + ASSERT_EQ(OB_SUCCESS, OB_LOG_COMPRESSOR.set_compress_func("none")); + last_modified_time += 5000; + CREATE_FILES(2 * OB_SYSLOG_COMPRESS_TYPE_COUNT, last_modified_time, 202311244000L) + sleep(1); + total_count = get_file_count_by_regex(TEST_DIR, NULL); + ASSERT_LT(total_count, 5 * OB_SYSLOG_COMPRESS_TYPE_COUNT); + ASSERT_GE(total_count, 4 * OB_SYSLOG_COMPRESS_TYPE_COUNT); + for (int i = 0; i < OB_SYSLOG_COMPRESS_TYPE_COUNT; i++) { + int compressed_count = get_file_count_by_regex(TEST_DIR, pattern_compressed[i]); + ASSERT_EQ(compressed_count, 0); + } + + // clear + ASSERT_EQ(0, system(RM_COMMAND)); + LOG_INFO("start to destroy syslog compressor "); + OB_LOG_COMPRESSOR.destroy(); +} + +} // namespace common +} // namespace oceanbase + +int main(int argc, char **argv) +{ + oceanbase::ObLogger &logger = oceanbase::ObLogger::get_logger(); + logger.set_file_name("test_ob_log_compressor.log", true); + logger.set_log_level(OB_LOG_LEVEL_DEBUG); + testing::InitGoogleTest(&argc, argv); + int ret = RUN_ALL_TESTS(); + OB_LOG_COMPRESSOR.destroy(); + return ret; +} diff --git a/src/observer/ob_server.cpp b/src/observer/ob_server.cpp index 98b0f9c42c..578f33eb91 100644 --- a/src/observer/ob_server.cpp +++ b/src/observer/ob_server.cpp @@ -23,6 +23,7 @@ #include "lib/lock/ob_latch.h" #include "lib/net/ob_net_util.h" #include "lib/oblog/ob_base_log_buffer.h" +#include "lib/oblog/ob_log_compressor.h" #include "lib/ob_running_mode.h" #include "lib/profile/ob_active_resource_list.h" #include "lib/profile/ob_profile_log.h" @@ -278,6 +279,10 @@ int ObServer::init(const ObServerOptions &opts, const ObPLogWriterCfg &log_cfg) if (FAILEDx(OB_LOGGER.init(log_cfg, true))) { LOG_ERROR("async log init error.", KR(ret)); ret = OB_ELECTION_ASYNC_LOG_WARN_INIT; + } else if (OB_FAIL(OB_LOG_COMPRESSOR.init())) { + LOG_ERROR("log compressor init error.", KR(ret)); + } else if (OB_FAIL(OB_LOGGER.set_log_compressor(&OB_LOG_COMPRESSOR))) { + LOG_ERROR("set log compressor error.", KR(ret)); } if (OB_FAIL(ret)) { } else if (OB_FAIL(init_pre_setting())) { @@ -296,6 +301,10 @@ int ObServer::init(const ObServerOptions &opts, const ObPLogWriterCfg &log_cfg) if (FAILEDx(OB_LOGGER.init(log_cfg, false))) { LOG_ERROR("async log init error.", KR(ret)); ret = OB_ELECTION_ASYNC_LOG_WARN_INIT; + } else if (OB_FAIL(OB_LOG_COMPRESSOR.init())) { + LOG_ERROR("log compressor init error.", KR(ret)); + } else if (OB_FAIL(OB_LOGGER.set_log_compressor(&OB_LOG_COMPRESSOR))) { + LOG_ERROR("set log compressor error.", KR(ret)); } else if (OB_FAIL(init_tz_info_mgr())) { LOG_ERROR("init tz_info_mgr failed", KR(ret)); } else if (OB_FAIL(ObSqlTaskFactory::get_instance().init())) { @@ -552,6 +561,10 @@ void ObServer::destroy() OB_LOGGER.destroy(); FLOG_INFO("OB_LOGGER destroyed"); + FLOG_INFO("begin to destroy OB_LOG_COMPRESSOR"); + OB_LOG_COMPRESSOR.destroy(); + FLOG_INFO("OB_LOG_COMPRESSOR destroyed"); + FLOG_INFO("begin to destroy task controller"); ObTaskController::get().destroy(); FLOG_INFO("task controller destroyed"); @@ -1201,6 +1214,10 @@ int ObServer::stop() OB_LOGGER.stop(); FLOG_INFO("stop OB_LOGGER success"); + FLOG_INFO("begin to stop OB_LOG_COMPRESSOR"); + OB_LOG_COMPRESSOR.stop(); + FLOG_INFO("stop OB_LOG_COMPRESSOR success"); + FLOG_INFO("begin to stop task controller"); ObTaskController::get().stop(); FLOG_INFO("stop task controller success"); @@ -1547,6 +1564,10 @@ int ObServer::wait() OB_LOGGER.wait(); FLOG_INFO("wait OB_LOGGER success"); + FLOG_INFO("begin to wait OB_LOG_COMPRESSOR"); + OB_LOG_COMPRESSOR.wait(); + FLOG_INFO("wait OB_LOG_COMPRESSOR success"); + FLOG_INFO("begin to wait task controller"); ObTaskController::get().wait(); FLOG_INFO("wait task controller success"); @@ -2093,13 +2114,21 @@ int ObServer::init_pre_setting() const bool record_old_log_file = config_.enable_syslog_recycle; const bool log_warn = config_.enable_syslog_wf; const bool enable_async_syslog = config_.enable_async_syslog; + const int64_t max_disk_size = config_.syslog_disk_size; + const int64_t min_uncompressed_count = config_.syslog_file_uncompressed_count; + const char *compress_func_ptr = config_.syslog_compress_func.str(); OB_LOGGER.set_max_file_index(max_log_cnt); OB_LOGGER.set_record_old_log_file(record_old_log_file); LOG_INFO("Whether record old log file", K(record_old_log_file)); OB_LOGGER.set_log_warn(log_warn); LOG_INFO("Whether log warn", K(log_warn)); OB_LOGGER.set_enable_async_log(enable_async_syslog); - LOG_INFO("init log config", K(record_old_log_file), K(log_warn), K(enable_async_syslog)); + OB_LOG_COMPRESSOR.set_max_disk_size(max_disk_size); + LOG_INFO("Whether compress syslog file", K(compress_func_ptr)); + OB_LOG_COMPRESSOR.set_compress_func(compress_func_ptr); + OB_LOG_COMPRESSOR.set_min_uncompressed_count(min_uncompressed_count); + LOG_INFO("init log config", K(record_old_log_file), K(log_warn), K(enable_async_syslog), + K(max_disk_size), K(compress_func_ptr), K(min_uncompressed_count)); if (0 == max_log_cnt) { LOG_INFO("won't recycle log file"); } else { @@ -3794,6 +3823,10 @@ int ObServer::stop_server_in_arb_mode() OB_LOGGER.stop(); FLOG_INFO("stop OB_LOGGER success"); + FLOG_INFO("begin to stop OB_LOG_COMPRESSOR"); + OB_LOG_COMPRESSOR.stop(); + FLOG_INFO("stop OB_LOG_COMPRESSOR success"); + FLOG_INFO("begin to stop task controller"); ObTaskController::get().stop(); FLOG_INFO("stop task controller success"); @@ -3830,6 +3863,10 @@ int ObServer::wait_server_in_arb_mode() OB_LOGGER.wait(); FLOG_INFO("wait OB_LOGGER success"); + FLOG_INFO("begin to wait OB_LOG_COMPRESSOR"); + OB_LOG_COMPRESSOR.wait(); + FLOG_INFO("wait OB_LOG_COMPRESSOR success"); + FLOG_INFO("begin to wait task controller"); ObTaskController::get().wait(); FLOG_INFO("wait task controller success"); @@ -3867,6 +3904,7 @@ int ObServer::destroy_server_in_arb_mode() { int ret = OB_SUCCESS; OB_LOGGER.destroy(); + OB_LOG_COMPRESSOR.destroy(); ObTaskController::get().destroy(); sig_worker_->destroy(); signal_handle_->destroy(); diff --git a/src/share/config/ob_config_helper.cpp b/src/share/config/ob_config_helper.cpp index 4f2a9bb2fc..4f33032356 100644 --- a/src/share/config/ob_config_helper.cpp +++ b/src/share/config/ob_config_helper.cpp @@ -405,6 +405,47 @@ bool ObConfigCompressOptionChecker::check(const ObConfigItem &t) const return is_valid; } +bool ObConfigMaxSyslogFileCountChecker::check(const ObConfigItem &t) const +{ + bool is_valid = false; + int64_t max_count = ObConfigIntParser::get(t.str(), is_valid); + if (is_valid) { + int64_t uncompressed_count = GCONF.syslog_file_uncompressed_count; + if (max_count == 0 || max_count >= uncompressed_count) { + is_valid = true; + } else { + is_valid = false; + } + } + return is_valid; +} + +bool ObConfigSyslogCompressFuncChecker::check(const ObConfigItem &t) const +{ + bool is_valid = false; + for (int i = 0; i < ARRAYSIZEOF(common::syslog_compress_funcs) && !is_valid; ++i) { + if (0 == ObString::make_string(syslog_compress_funcs[i]).case_compare(t.str())) { + is_valid = true; + } + } + return is_valid; +} + +bool ObConfigSyslogFileUncompressedCountChecker::check(const ObConfigItem &t) const +{ + bool is_valid = false; + int64_t uncompressed_count = ObConfigIntParser::get(t.str(), is_valid); + if (is_valid) { + int64_t max_count = GCONF.max_syslog_file_count; + if (uncompressed_count >= 0 && (max_count == 0 || uncompressed_count <= max_count)) { + is_valid = true; + } else { + is_valid = false; + } + } + return is_valid; +} + bool ObConfigLogLevelChecker::check(const ObConfigItem &t) const { const ObString tmp_str(t.str()); diff --git a/src/share/config/ob_config_helper.h b/src/share/config/ob_config_helper.h index 46b8845c82..63e283c52e 100644 --- a/src/share/config/ob_config_helper.h +++ b/src/share/config/ob_config_helper.h @@ -268,6 +268,39 @@ private: DISALLOW_COPY_AND_ASSIGN(ObConfigCompressOptionChecker); }; +class ObConfigMaxSyslogFileCountChecker + : public ObConfigChecker +{ +public: + ObConfigMaxSyslogFileCountChecker() {} + virtual ~ObConfigMaxSyslogFileCountChecker() {} + bool check(const ObConfigItem &t) const; +private: + DISALLOW_COPY_AND_ASSIGN(ObConfigMaxSyslogFileCountChecker); +}; + +class ObConfigSyslogCompressFuncChecker + : public ObConfigChecker +{ +public: + ObConfigSyslogCompressFuncChecker() {} + virtual ~ObConfigSyslogCompressFuncChecker() {} + bool check(const ObConfigItem &t) const; +private: + DISALLOW_COPY_AND_ASSIGN(ObConfigSyslogCompressFuncChecker); +}; + +class ObConfigSyslogFileUncompressedCountChecker + : public ObConfigChecker +{ +public: + ObConfigSyslogFileUncompressedCountChecker() {} + virtual ~ObConfigSyslogFileUncompressedCountChecker() {} + bool check(const ObConfigItem &t) const; +private: + DISALLOW_COPY_AND_ASSIGN(ObConfigSyslogFileUncompressedCountChecker); +}; + class ObConfigUseLargePagesChecker : public ObConfigChecker { diff --git a/src/share/config/ob_reload_config.cpp b/src/share/config/ob_reload_config.cpp index 0de4d7e006..f72d8794d5 100644 --- a/src/share/config/ob_reload_config.cpp +++ b/src/share/config/ob_reload_config.cpp @@ -11,7 +11,7 @@ */ #include "share/config/ob_reload_config.h" - +#include "lib/oblog/ob_log_compressor.h" namespace oceanbase { @@ -35,6 +35,15 @@ int ObReloadConfig::reload_ob_logger_set() } else if (OB_FAIL(OB_LOGGER.set_record_old_log_file(conf_->enable_syslog_recycle))) { OB_LOG(ERROR, "fail to set_record_old_log_file", K(conf_->enable_syslog_recycle.str()), K(ret)); + } else if (OB_FAIL(OB_LOG_COMPRESSOR.set_max_disk_size(conf_->syslog_disk_size))) { + OB_LOG(ERROR, "fail to set_max_disk_size", + K(conf_->syslog_disk_size.str()), KR(ret)); + } else if (OB_FAIL(OB_LOG_COMPRESSOR.set_compress_func(conf_->syslog_compress_func.str()))) { + OB_LOG(ERROR, "fail to set_compress_func", + K(conf_->syslog_compress_func.str()), KR(ret)); + } else if (OB_FAIL(OB_LOG_COMPRESSOR.set_min_uncompressed_count(conf_->syslog_file_uncompressed_count))) { + OB_LOG(ERROR, "fail to set_min_uncompressed_count", + K(conf_->syslog_file_uncompressed_count.str()), KR(ret)); } else { OB_LOGGER.set_log_warn(conf_->enable_syslog_wf); OB_LOGGER.set_enable_async_log(conf_->enable_async_syslog); diff --git a/src/share/parameter/ob_parameter_seed.ipp b/src/share/parameter/ob_parameter_seed.ipp index ab20446503..bfa3e19fd1 100644 --- a/src/share/parameter/ob_parameter_seed.ipp +++ b/src/share/parameter/ob_parameter_seed.ipp @@ -195,12 +195,13 @@ DEF_CAP(syslog_io_bandwidth_limit, OB_CLUSTER_PARAMETER, "30MB", DEF_INT(diag_syslog_per_error_limit, OB_CLUSTER_PARAMETER, "200", "[0,]", "DIAG syslog limitation for each error per second, exceeding syslog would be truncated", ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); -DEF_INT(max_syslog_file_count, OB_CLUSTER_PARAMETER, "0", "[0,]", - "specifies the maximum number of the log files " - "that can co-exist before the log file recycling kicks in. " - "Each log file can occupy at most 256MB disk space. " - "When this value is set to 0, no log file will be removed. Range: [0, +∞) in integer", - ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); +DEF_INT_WITH_CHECKER(max_syslog_file_count, OB_CLUSTER_PARAMETER, "0", + common::ObConfigMaxSyslogFileCountChecker, + "specifies the maximum number of the log files " + "that can co-exist before the log file recycling kicks in. " + "Each log file can occupy at most 256MB disk space. " + "When this value is set to 0, no log file will be removed. Range: [0, +∞) in integer", + ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); DEF_BOOL(enable_async_syslog, OB_CLUSTER_PARAMETER, "True", "specifies whether use async log for observer.log, elec.log and rs.log", ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); @@ -212,6 +213,20 @@ DEF_BOOL(enable_syslog_recycle, OB_CLUSTER_PARAMETER, "False", "specifies whether log file recycling is turned on. " "Value: True:turned on; False: turned off", ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); +DEF_CAP(syslog_disk_size, OB_CLUSTER_PARAMETER, "0M", "[0M,)", + "the size of disk space used by the syslog files. Range: [0, +∞)", + ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); +DEF_STR_WITH_CHECKER(syslog_compress_func, OB_CLUSTER_PARAMETER, "none", + common::ObConfigSyslogCompressFuncChecker, + "compress function name for syslog files, " + "values: none, zstd_1.0, zstd_1.3.8", + ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); +DEF_INT_WITH_CHECKER(syslog_file_uncompressed_count, OB_CLUSTER_PARAMETER, "0", + common::ObConfigSyslogFileUncompressedCountChecker, + "specifies the minimum number of the syslog files that will not be compressed. " + "Each syslog file can occupy at most 256MB disk space. " + "When this value is set to 0, all syslog file may be compressed. Range: [0, +∞) in integer", + ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); DEF_INT(memory_limit_percentage, OB_CLUSTER_PARAMETER, "80", "[10, 95]", "the size of the memory reserved for internal use(for testing purpose). Range: [10, 95]", ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); diff --git a/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result b/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result index ff6a06ddf4..53b17a2ce6 100644 --- a/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result +++ b/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result @@ -222,6 +222,9 @@ standby_db_preferred_upstream_log_region standby_fetch_log_bandwidth_limit storage_meta_cache_priority strict_check_os_params +syslog_compress_func +syslog_disk_size +syslog_file_uncompressed_count syslog_io_bandwidth_limit syslog_level system_memory