diff --git a/deps/oblib/src/lib/CMakeLists.txt b/deps/oblib/src/lib/CMakeLists.txt index dc567bb94..60d0277d8 100644 --- a/deps/oblib/src/lib/CMakeLists.txt +++ b/deps/oblib/src/lib/CMakeLists.txt @@ -136,6 +136,7 @@ ob_set_subtarget(oblib_lib common oblog/ob_log_module.ipp oblog/ob_trace_log.cpp oblog/ob_warning_buffer.cpp + oblog/ob_log_compressor.cpp ob_name_id_def.cpp ob_replica_define.cpp profile/ob_atomic_event.cpp @@ -234,6 +235,7 @@ ob_lib_add_pchs(lib oblog/ob_log.h oblog/ob_base_log_writer.h oblog/ob_async_log_struct.h + oblog/ob_log_compressor.h hash/fnv_hash.h coro/co_var.h time/Time.h diff --git a/deps/oblib/src/lib/allocator/ob_mod_define.h b/deps/oblib/src/lib/allocator/ob_mod_define.h index 50818709f..a31918e65 100644 --- a/deps/oblib/src/lib/allocator/ob_mod_define.h +++ b/deps/oblib/src/lib/allocator/ob_mod_define.h @@ -144,6 +144,7 @@ LABEL_ITEM_DEF(OB_BUFFER, Buffer) LABEL_ITEM_DEF(OB_THREAD_STORE, ThreadStore) LABEL_ITEM_DEF(OB_LOG_WRITER, LogWriter) LABEL_ITEM_DEF(OB_LOG_READER, LogReader) +LABEL_ITEM_DEF(OB_LOG_COMPRESSOR, LogCompressor) LABEL_ITEM_DEF(OB_REGEX, Regex) LABEL_ITEM_DEF(OB_SLAB, Slab) LABEL_ITEM_DEF(OB_SLAVE_MGR, SlaveMgr) diff --git a/deps/oblib/src/lib/oblog/ob_base_log_writer.cpp b/deps/oblib/src/lib/oblog/ob_base_log_writer.cpp index 8351a7050..b3d06f2e9 100644 --- a/deps/oblib/src/lib/oblog/ob_base_log_writer.cpp +++ b/deps/oblib/src/lib/oblog/ob_base_log_writer.cpp @@ -63,10 +63,13 @@ int ObBaseLogWriter::init(const ObBaseLogWriterCfg& log_cfg) LOG_STDERR("Fail to allocate memory, max_buffer_item_cnt=%lu.\n", log_cfg.max_buffer_item_cnt_); } else if (0 != pthread_mutex_init(&log_mutex_, NULL)) { ret = OB_ERR_SYS; + LOG_STDERR("Failed to pthread_mutex_init.\n"); } else if (0 != pthread_cond_init(&log_write_cond_, NULL)) { ret = OB_ERR_SYS; + LOG_STDERR("Failed to pthread_cond_init.\n"); } else if (0 != pthread_cond_init(&log_flush_cond_, NULL)) { ret = OB_ERR_SYS; + LOG_STDERR("Failed to pthread_cond_init.\n"); } else { flush_tid_ = 0; log_item_push_idx_ = 0; diff --git a/deps/oblib/src/lib/oblog/ob_log.cpp b/deps/oblib/src/lib/oblog/ob_log.cpp index 62d46c9c8..c99ee1dfc 100644 --- a/deps/oblib/src/lib/oblog/ob_log.cpp +++ b/deps/oblib/src/lib/oblog/ob_log.cpp @@ -31,6 +31,8 @@ #include "lib/container/ob_vector.h" #include "lib/coro/co.h" #include "lib/allocator/ob_fifo_allocator.h" +#include "lib/oblog/ob_log_compressor.h" +#include "lib/string/ob_string.h" using namespace oceanbase::lib; @@ -62,6 +64,7 @@ static const int64_t POP_COMPENSATED_TIME[5] = {0, 1, 2, 3, 4}; // for pop time static int64_t last_check_file_ts = 0; // last file sample timestamps static int64_t last_check_disk_ts = 0; // last disk sample timestamps static const int64_t NORMAL_LOG_SIZE = 1 << 10; +static const int64_t FILE_TIME_STR_LEN = 14; // xxxx-xx-xx xx:xx:xx #if defined TC_REACH_TIME_INTERVAL #undef TC_REACH_TIME_INTERVAL @@ -127,6 +130,41 @@ int process_thread_log_id_level_map(const char* str, const int32_t str_length) return ret; } +inline void ob_log_unlink(const char *file_cstr) +{ + unlink(file_cstr); + ObString file_name(file_cstr); + unlink(ObLogCompressor::get_compression_file_name(file_name).ptr()); +} + +time_t get_file_time(const ObString &file_name) +{ + time_t ret_time = 0; + ObString time_str; + ObString remaining_string = file_name; + // Walk through the string from back to front until the timestamp found. + while (time_str.empty() && !remaining_string.empty()) { + const char *idx = remaining_string.reverse_find('.'); + if (NULL != idx) { + ObString suffix_string = remaining_string.after(idx); + remaining_string = remaining_string.split_on(idx); + if (isdigit(suffix_string[0]) && suffix_string.length() == FILE_TIME_STR_LEN) { + time_str = suffix_string; + } + } else { + remaining_string.reset(); + } + } + if (!time_str.empty()) { + struct tm tm; + strptime(time_str.ptr(), "%Y%m%d%H%M%S", &tm); + if (0 > (ret_time = mktime(&tm))) { + ret_time = 0; + } + } + return ret_time; +} + void ObLogIdLevelMap::set_level(const int8_t level) { non_mod_level_ = level; @@ -410,8 +448,11 @@ const char* const ObLogger::errstr_[] = {"ERROR", "USER_ERR", "WARN", "INFO", "T ObLogger::ObLogger() : ObBaseLogWriter(), log_file_(), + log_compressor_(nullptr), max_file_size_(DEFAULT_MAX_FILE_SIZE), max_file_index_(0), + max_file_time_(0), + enable_file_compress_(false), name_id_map_(), id_level_map_(), wf_level_(OB_LOG_LEVEL_WARN), @@ -866,6 +907,48 @@ void ObLogger::log_data(const char* mod_name, int32_t level, LogLocation locatio } } +void ObLogger::remove_outdated_file(std::deque &file_list) +{ + if (file_list.size() > 0 && max_file_time_ > 0) { + time_t min_time = time(NULL) - max_file_time_; + + if (OB_LIKELY(0 == pthread_mutex_lock(&file_index_mutex_))) { + for (int i = 0; i < file_list.size(); i++) { + ObString file_name(file_list.front().c_str()); + time_t file_time = get_file_time(file_name); + if (file_time < min_time) { + file_list.pop_front(); + ob_log_unlink(file_name.ptr()); + } else { + i = file_list.size(); + } + } + (void)pthread_mutex_unlock(&file_index_mutex_); + } + } +} + +void ObLogger::update_compression_file(std::deque &file_list) +{ + if (enable_file_compress_ && NULL != log_compressor_) { + if (OB_LIKELY(0 == pthread_mutex_lock(&file_index_mutex_))) { + for (auto iter = file_list.begin(); iter != file_list.end(); iter++) { + ObString file_name(iter->c_str()); + if (isdigit(file_name[file_name.length() - 1])) { + ObString compression_file_name = ObLogCompressor::get_compression_file_name(file_name).ptr(); + if (0 != access(file_name.ptr(), F_OK) && 0 == access(compression_file_name.ptr(), F_OK)) { + iter->clear(); + iter->assign(compression_file_name.ptr()); + } else { + log_compressor_->append_log(file_name); + } + } + } + (void)pthread_mutex_unlock(&file_index_mutex_); + } + } +} + void ObLogger::rotate_log( const int64_t size, const bool redirect_flag, ObPLogFileStruct& log_struct, const ObPLogFDType fd_type) { @@ -923,12 +1006,12 @@ void ObLogger::rotate_log(const char* filename, const ObPLogFDType fd_type, cons tm.tm_min, tm.tm_sec); - if (max_file_index_ > 0) { + if (max_file_index_ > 0 || max_file_time_ > 0) { if (OB_LIKELY(0 == pthread_mutex_lock(&file_index_mutex_))) { - if (file_list.size() >= max_file_index_) { + if (max_file_index_ > 0 && file_list.size() >= max_file_index_) { std::string oldFile = file_list.front(); file_list.pop_front(); - unlink(oldFile.c_str()); + ob_log_unlink(oldFile.c_str()); } file_list.push_back(old_log_file); (void)pthread_mutex_unlock(&file_index_mutex_); @@ -958,12 +1041,12 @@ void ObLogger::rotate_log(const char* filename, const ObPLogFDType fd_type, cons } if (open_wf_flag_ && enable_wf_flag_) { - if (max_file_index_ > 0) { + if (max_file_index_ > 0 || max_file_time_ > 0) { if (OB_LIKELY(0 == pthread_mutex_lock(&file_index_mutex_))) { - if (wf_file_list.size() >= max_file_index_) { + if (max_file_index_ > 0 && wf_file_list.size() >= max_file_index_) { std::string old_wf_file = wf_file_list.front(); wf_file_list.pop_front(); - unlink(old_wf_file.c_str()); + ob_log_unlink(old_wf_file.c_str()); } wf_file_list.push_back(old_wf_log_file); (void)pthread_mutex_unlock(&file_index_mutex_); @@ -980,6 +1063,20 @@ void ObLogger::rotate_log(const char* filename, const ObPLogFDType fd_type, cons } } } + + if (max_file_time_ > 0) { + remove_outdated_file(file_list); + if (open_wf_flag_ && enable_wf_flag_) { + remove_outdated_file(wf_file_list); + } + } + + if (enable_file_compress_ && NULL != log_compressor_) { + log_compressor_->append_log(ObString::make_string(old_log_file)); + if (open_wf_flag_ && enable_wf_flag_) { + log_compressor_->append_log(ObString::make_string(old_wf_log_file)); + } + } } } UNUSED(fd_type); @@ -1053,11 +1150,38 @@ int ObLogger::set_max_file_index(int64_t max_file_index) return ret; } +int ObLogger::set_max_file_time(int64_t max_file_time) +{ + int ret = OB_SUCCESS; + if (max_file_time < 0) { + max_file_time = 0; + } + max_file_time_ = max_file_time / 1000000L; // usecond to second + if (max_file_time_ > 0 && rec_old_file_flag_) { + if (OB_FAIL(record_old_log_file())) { + LOG_WARN("Record old log file error", K(ret)); + } + } + return ret; +} + +int ObLogger::set_enable_file_compress(bool enable_file_compress) +{ + int ret = OB_SUCCESS; + enable_file_compress_ = enable_file_compress; + if (rec_old_file_flag_ && enable_file_compress_) { + if (OB_FAIL(record_old_log_file())) { + LOG_WARN("Record old log file error", K(ret)); + } + } + return ret; +} + int ObLogger::set_record_old_log_file(bool rec_old_file_flag) { int ret = OB_SUCCESS; rec_old_file_flag_ = rec_old_file_flag; - if (rec_old_file_flag_ && max_file_index_ > 0) { + if (rec_old_file_flag_ && (max_file_index_ > 0 || max_file_time_ > 0)) { if (OB_FAIL(record_old_log_file())) { LOG_WARN("Record old log file error", K(ret)); } @@ -1065,6 +1189,17 @@ int ObLogger::set_record_old_log_file(bool rec_old_file_flag) return ret; } +int ObLogger::set_log_compressor(ObLogCompressor *log_compressor) +{ + int ret = OB_SUCCESS; + if (NULL == log_compressor) { + ret = OB_INVALID_ARGUMENT; + } else { + log_compressor_ = log_compressor; + } + return ret; +} + //@brief string copy with dst's length and src's length checking and src trim. int64_t str_copy_trim(char* dst, const int64_t dst_length, const char* src, const int64_t src_length) { @@ -1371,7 +1506,7 @@ void ObLogger::log_user_error_line_column(const UserMsgLevel user_msg_level, con int ObLogger::record_old_log_file() { int ret = OB_SUCCESS; - if (max_file_index_ <= 0 || !rec_old_file_flag_) { + if ((max_file_index_ <= 0 && max_file_time_ <= 0) || !rec_old_file_flag_) { } else { ObSEArray files; ObSEArray wf_files; @@ -1509,25 +1644,38 @@ int ObLogger::add_files_to_list( file_list.clear(); std::string oldFile; for (int64_t i = 0; OB_SUCC(ret) && i < files_arr->count(); ++i) { - if (file_list.size() >= max_file_index_) { + if (max_file_index_ > 0 && file_list.size() >= max_file_index_) { oldFile = file_list.front(); file_list.pop_front(); - unlink(oldFile.c_str()); + ob_log_unlink(oldFile.c_str()); } file_list.push_back(files_arr->at(i).file_name_); } wf_file_list.clear(); std::string old_wf_file; for (int64_t i = 0; OB_SUCC(ret) && i < wf_files_arr->count(); ++i) { - if (wf_file_list.size() >= max_file_index_) { + if (max_file_index_ > 0 && wf_file_list.size() >= max_file_index_) { old_wf_file = wf_file_list.front(); wf_file_list.pop_front(); - unlink(old_wf_file.c_str()); + ob_log_unlink(old_wf_file.c_str()); } wf_file_list.push_back(wf_files_arr->at(i).file_name_); } (void)pthread_mutex_unlock(&file_index_mutex_); } + + if (max_file_time_ > 0) { + remove_outdated_file(file_list); + if (open_wf_flag_ && enable_wf_flag_) { + remove_outdated_file(wf_file_list); + } + } + if (enable_file_compress_) { + update_compression_file(file_list); + if (open_wf_flag_ && enable_wf_flag_) { + update_compression_file(wf_file_list); + } + } } return ret; } diff --git a/deps/oblib/src/lib/oblog/ob_log.h b/deps/oblib/src/lib/oblog/ob_log.h index 91a912411..ab8a731c0 100644 --- a/deps/oblib/src/lib/oblog/ob_log.h +++ b/deps/oblib/src/lib/oblog/ob_log.h @@ -54,6 +54,8 @@ namespace oceanbase { namespace common { class ObFIFOAllocator; class ObPLogItem; +class ObString; +class ObLogCompressor; #define OB_LOGGER ::oceanbase::common::ObLogger::get_logger() #define OB_LOG_NEED_TO_PRINT(level) (OB_UNLIKELY(OB_LOGGER.need_to_print(OB_LOG_LEVEL_##level))) @@ -866,10 +868,16 @@ public: void set_max_file_size(int64_t max_file_size); //@brief Set the max number of log-files. If max_file_index = 0, no limit. int set_max_file_index(int64_t max_file_index = 0x0F); + //@brief Set the max retention time of log-files. If max_file_time = 0, no limit. + int set_max_file_time(int64_t max_file_time); + //@brief Set whether compress log-files. If this flag set, will compress all log files. + int set_enable_file_compress(bool enable_file_compress); //@brief Set whether record old log file. If this flag and max_file_index set, // will record log files in the directory for log file int set_record_old_log_file(bool rec_old_file_flag = false); + int set_log_compressor(ObLogCompressor *log_compressor); + //@brief Get current time. static struct timeval get_cur_tv(); @@ -1012,6 +1020,9 @@ private: int add_files_to_list(void* files /*ObIArray * */, void* wf_files /*ObIArray * */, std::deque& file_list, std::deque& wf_file_list); + void remove_outdated_file(std::deque &file_list); + void update_compression_file(std::deque &file_list); + void rotate_log( const int64_t size, const bool redirect_flag, ObPLogFileStruct& log_struct, const ObPLogFDType fd_type); //@brief Rename the log to a filename with fmt. And open a new file with the old, then add old file to file_list. @@ -1061,9 +1072,12 @@ private: static RLOCAL(bool, disable_logging_); ObPLogFileStruct log_file_[MAX_FD_FILE]; + ObLogCompressor *log_compressor_; int64_t max_file_size_; int64_t max_file_index_; + int64_t max_file_time_; // max retention time(second) of log-file + int32_t enable_file_compress_; // percentage of log-file to compress pthread_mutex_t file_size_mutex_; pthread_mutex_t file_index_mutex_; diff --git a/deps/oblib/src/lib/oblog/ob_log_compressor.cpp b/deps/oblib/src/lib/oblog/ob_log_compressor.cpp new file mode 100644 index 000000000..08e89016b --- /dev/null +++ b/deps/oblib/src/lib/oblog/ob_log_compressor.cpp @@ -0,0 +1,265 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#include +#include +#include +#include +#include +#include + +#include "lib/oblog/ob_log_compressor.h" +#include "lib/ob_errno.h" +#include "lib/utility/ob_macro_utils.h" +#include "lib/oblog/ob_log_module.h" +#include "lib/compress/ob_compressor_pool.h" +#include "lib/string/ob_string.h" +#include "lib/ob_define.h" +#include "lib/thread/thread_pool.h" +#include "lib/lock/ob_thread_cond.h" +#include "lib/list/ob_list.h" +#include "lib/lock/ob_mutex.h" +#include "lib/allocator/ob_malloc.h" +#include "lib/oblog/ob_async_log_struct.h" + +using namespace oceanbase::lib; + +namespace oceanbase { +namespace common { +/* Log files are divided into blocks and then compressed. The default block size is (2M - 1K).*/ +static const int32_t DEFAULT_COMPRESSION_BLOCK_SIZE = OB_MALLOC_BIG_BLOCK_SIZE; +/* To prevent extreme cases where the files become larger after compression, + * the size of the decompression buffer needs to be larger than the original data. + * Specific size can refer to the ZSTD code implementation. */ +static const int32_t DEFAULT_COMPRESSION_BUFFER_SIZE = + DEFAULT_COMPRESSION_BLOCK_SIZE + DEFAULT_COMPRESSION_BLOCK_SIZE / 128 + 512 + 19; +static const int32_t DEFAULT_FILE_NAME_SIZE = ObPLogFileStruct::MAX_LOG_FILE_NAME_SIZE; +static const int32_t DEFAULT_LOG_QUEUE_DEPTH = 100000; + +ObLogCompressor::ObLogCompressor() : is_inited_(false), has_stoped_(true), compressor_(NULL) +{} + +ObLogCompressor::~ObLogCompressor() +{} + +int ObLogCompressor::init() +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(is_inited_)) { + ret = OB_INIT_TWICE; + LOG_STDERR("The ObLogCompressor has been inited.\n"); + } else if (OB_FAIL(file_list_.init(DEFAULT_LOG_QUEUE_DEPTH))) { + ret = OB_ERR_SYS; + LOG_STDERR("Failed to init file_list_.\n"); + } else if (OB_FAIL(log_compress_cond_.init(ObWaitEventIds::DEFAULT_COND_WAIT))) { + ret = OB_ERR_SYS; + LOG_STDERR("Failed to init ObThreadCond.\n"); + } else { + ObCompressor *ptr = NULL; + if (OB_FAIL(ObCompressorPool::get_instance().get_compressor(ZSTD_1_3_8_COMPRESSOR, ptr))) { + LOG_STDERR("Fail to get_compressor, err_code=%d.\n", ret); + } else { + compressor_ = ptr; + has_stoped_ = false; + if (OB_FAIL(start())) { + ret = OB_ERR_SYS; + LOG_STDERR("Fail to create log compression thread.\n"); + } else { + is_inited_ = true; + LOG_STDOUT("Success to create thread.\n"); + } + } + if (ret) { + log_compress_cond_.destroy(); + } + } + if (!is_inited_) { + destroy(); + } + + return ret; +} + +void ObLogCompressor::destroy() +{ + if (is_inited_) { + is_inited_ = false; + log_compress_cond_.lock(); + file_list_.destroy(); + has_stoped_ = true; + log_compress_cond_.signal(); + log_compress_cond_.unlock(); + wait(); + log_compress_cond_.destroy(); + } +} + +ObString ObLogCompressor::get_compression_file_name(const ObString &file_name) +{ + ObString compression_file_name; + ObString suffix_str = ".zst"; + int size = file_name.length(); + if (size && 0 == file_name[size - 1]) { + size -= 1; + } + if (size > 0 && size + 1 + suffix_str.length() <= DEFAULT_FILE_NAME_SIZE) { + const char *idx = NULL; + if (size > 4 && NULL != (idx = file_name.reverse_find('.')) && idx != file_name.ptr() && + 0 == file_name.after(--idx).compare(suffix_str)) { + } else { + char *buf = (char *)ob_malloc(size + 1 + suffix_str.length(), ObModIds::OB_LOG_COMPRESSOR); + if (buf) { + compression_file_name.assign_buffer(buf, DEFAULT_FILE_NAME_SIZE); + if (size != compression_file_name.write(file_name.ptr(), size)) { + ob_free(buf); + } else { + compression_file_name.write(".zst\0", 5); + } + } + } + } + return compression_file_name; +} + +ObCompressor *ObLogCompressor::get_compressor() +{ + return compressor_; +} + +int ObLogCompressor::append_log(const ObString &file_name) +{ + int ret = OB_SUCCESS; + if (!is_inited_) { + ret = OB_NOT_INIT; + LOG_STDERR("The ObLogCompressor has not been inited.\n"); + } else if (file_name.empty()) { + ret = OB_INVALID_ARGUMENT; + } else { + char *buf = (char *)ob_malloc(file_name.length() + 1 + sizeof(ObString), ObModIds::OB_LOG_COMPRESSOR); + if (buf) { + ObString *file_name_ptr = (ObString *)buf; + file_name_ptr->assign_buffer(buf + sizeof(ObString), file_name.length() + 1); + if (file_name.length() != file_name_ptr->write(file_name.ptr(), file_name.length()) || + (0 != file_name[file_name.length() - 1] && 1 != file_name_ptr->write("\0", 1))) { + ob_free(buf); + } else { + log_compress_cond_.lock(); + file_list_.push(file_name_ptr); + log_compress_cond_.signal(); + log_compress_cond_.unlock(); + } + } + } + return ret; +} + +int ObLogCompressor::log_compress_block( + char *dest, size_t dest_size, const char *src, size_t src_size, size_t &return_size) +{ + int ret = OB_SUCCESS; + int64_t size = -1; + if (OB_FAIL(((ObCompressor *)compressor_)->compress(src, src_size, dest, dest_size, size))) { + LOG_STDERR("Failed to compress, err_code=%d.\n", ret); + } else { + return_size = size; + } + return ret; +} + +void ObLogCompressor::log_compress() +{ + int ret = OB_SUCCESS; + static int sleep_us = 100 * 1000; // 100ms + int src_size = DEFAULT_COMPRESSION_BLOCK_SIZE; + int dest_size = DEFAULT_COMPRESSION_BUFFER_SIZE; + char *src_buf = (char *)ob_malloc(src_size + dest_size, ObModIds::OB_LOG_COMPRESSOR); + char *dest_buf = src_buf + src_size; + if (!src_buf) { + LOG_STDERR("Failed to ob_malloc.\n"); + } else { + while (!has_stoped_) { + ObString *file_name = NULL; + log_compress_cond_.lock(); + while (0 >= file_list_.get_total() && !has_stoped_) { + log_compress_cond_.wait(0); + } + if (!has_stoped_) { + ret = file_list_.pop(file_name); + } + log_compress_cond_.unlock(); + + if (has_stoped_ || NULL == file_name || file_name->empty() || 0 != access(file_name->ptr(), F_OK)) { + } else { + ObString compression_file_name = get_compression_file_name(*file_name); + FILE *input_file = NULL; + FILE *output_file = NULL; + if (compression_file_name.empty()) { + LOG_STDERR("Failed to get_compression_file_name.\n"); + } else if (NULL == (input_file = fopen(file_name->ptr(), "r"))) { + LOG_STDERR("Failed to fopen, err_code=%d.\n", errno); + } else if (NULL == (output_file = fopen(compression_file_name.ptr(), "w"))) { + fclose(input_file); + LOG_STDERR("Failed to fopen, err_code=%d.\n", errno); + } else { + size_t read_size = 0; + size_t write_size = 0; + while (OB_SUCC(ret) && !feof(input_file)) { + if ((read_size = fread(src_buf, 1, src_size, input_file)) > 0) { + if (OB_FAIL(log_compress_block(dest_buf, dest_size, src_buf, read_size, write_size))) { + LOG_STDERR("Failed to log_compress_block, err_code=%d.\n", ret); + } else if (write_size != fwrite(dest_buf, 1, write_size, output_file)) { + ret = OB_ERR_SYS; + LOG_STDERR("Failed to fwrite, err_code=%d.\n", errno); + } + } + usleep(sleep_us); + } + fclose(input_file); + fclose(output_file); + if (0 != access(file_name->ptr(), F_OK) || OB_SUCCESS != ret) { + unlink(compression_file_name.ptr()); + } else { + unlink(file_name->ptr()); + } + } + } + } + } + if (src_buf) { + ob_free(src_buf); + } +} + +void ObLogCompressor::run1() +{ + lib::set_thread_name("syslog_compress"); + log_compress(); +} + +int ObLogCompressor::start() +{ + ThreadPool::start(); + return OB_SUCCESS; +} + +void ObLogCompressor::stop() +{ + ThreadPool::stop(); +} + +void ObLogCompressor::wait() +{ + ThreadPool::wait(); +} + +} // namespace common +} // namespace oceanbase diff --git a/deps/oblib/src/lib/oblog/ob_log_compressor.h b/deps/oblib/src/lib/oblog/ob_log_compressor.h new file mode 100644 index 000000000..5abc75e8e --- /dev/null +++ b/deps/oblib/src/lib/oblog/ob_log_compressor.h @@ -0,0 +1,57 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef OB_LOG_COMPRESSOR_H_ +#define OB_LOG_COMPRESSOR_H_ + +#include "lib/thread/thread_pool.h" +#include "lib/lock/ob_thread_cond.h" +#include "lib/list/ob_list.h" +#include "lib/queue/ob_fixed_queue.h" + +namespace oceanbase { +namespace common { + +class ObCompressor; +class ObString; +class ObMalloc; + +class ObLogCompressor final : public lib::ThreadPool { +public: + ObLogCompressor(); + virtual ~ObLogCompressor(); + static ObString get_compression_file_name(const ObString &file_name); + int init(); + void destroy(); + int append_log(const ObString &file_name); + ObCompressor *get_compressor(); + +private: + int log_compress_block(char *dest, size_t dest_size, const char *src, size_t src_size, size_t &return_size); + void log_compress(); + void run1() override; + int start() override; + void stop() override; + void wait() override; + +private: + bool is_inited_; + bool has_stoped_; + ObFixedQueue file_list_; + ObThreadCond log_compress_cond_; + ObCompressor *compressor_; +}; + +} // namespace common +} // namespace oceanbase + +#endif /* OB_LOG_COMPRESSOR_H_ */ diff --git a/deps/oblib/unittest/CMakeLists.txt b/deps/oblib/unittest/CMakeLists.txt index ca2dc0925..9fa8c2714 100644 --- a/deps/oblib/unittest/CMakeLists.txt +++ b/deps/oblib/unittest/CMakeLists.txt @@ -5,7 +5,7 @@ target_link_libraries(oblib_testbase INTERFACE -lgmock -lgtest) function(oblib_addtest mainfile) get_filename_component(testname ${mainfile} NAME_WE) add_executable(${testname} ${ARGV}) - target_link_libraries(${testname} PRIVATE oblib oblib_testbase -static-libgcc -static-libstdc++) + target_link_libraries(${testname} PRIVATE easy aio -L${DEP_DIR}/lib/mariadb mariadb oblib oblib_testbase -static-libgcc -static-libstdc++) endfunction() add_subdirectory(lib) diff --git a/deps/oblib/unittest/lib/CMakeLists.txt b/deps/oblib/unittest/lib/CMakeLists.txt index e565af148..457d011d4 100644 --- a/deps/oblib/unittest/lib/CMakeLists.txt +++ b/deps/oblib/unittest/lib/CMakeLists.txt @@ -92,6 +92,7 @@ oblib_addtest(net/test_ob_addr.cpp) oblib_addtest(number/test_number_v2.cpp) oblib_addtest(oblog/test_base_log_buffer.cpp) oblib_addtest(oblog/test_base_log_writer.cpp) +oblib_addtest(oblog/test_ob_log_compressor.cpp) oblib_addtest(oblog/test_ob_log_obj.cpp) oblib_addtest(oblog/test_ob_log_performance.cpp) oblib_addtest(profile/test_ob_trace_id.cpp) diff --git a/deps/oblib/unittest/lib/oblog/test_ob_log_compressor.cpp b/deps/oblib/unittest/lib/oblog/test_ob_log_compressor.cpp new file mode 100644 index 000000000..a262ec578 --- /dev/null +++ b/deps/oblib/unittest/lib/oblog/test_ob_log_compressor.cpp @@ -0,0 +1,108 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#include +#include "lib/oblog/ob_log_compressor.h" +#include "lib/compress/ob_compressor_pool.h" +#include "lib/ob_errno.h" +#include "lib/string/ob_string.h" + +using namespace oceanbase::lib; + +namespace oceanbase { +namespace common { + +TEST(ObLogCompressor, normal) +{ + int ret = OB_SUCCESS; + int test_count = 1000; + int test_size = test_count * sizeof(int); + ObLogCompressor log_compressor; + ObCompressor *compressor; + + // normal init + ret = log_compressor.init(); + ASSERT_EQ(OB_SUCCESS, ret); + + // repeat init + ret = log_compressor.init(); + ASSERT_EQ(OB_INIT_TWICE, ret); + + // prepare data + ObString file_name = "test_ob_log_compressor_file"; + FILE *input_file = fopen(file_name.ptr(), "w"); + ASSERT_EQ(true, NULL != input_file); + int data[test_count]; + for (int i = 0; i < test_count; i++) { + data[i] = i; + } + ret = fwrite(data, 1, test_size, input_file); + ASSERT_EQ(test_size, ret); + fclose(input_file); + + // normal append + ret = log_compressor.append_log(file_name); + ASSERT_EQ(OB_SUCCESS, ret); + + // get compression result + sleep(2); + ObString compression_file_name = log_compressor.get_compression_file_name(file_name); + ASSERT_EQ(0, access(compression_file_name.ptr(), F_OK)); + FILE *output_file = fopen(compression_file_name.ptr(), "r"); + ASSERT_EQ(true, NULL != output_file); + int buf_size = test_size + 512; + int read_size = 0; + void *buf = malloc(buf_size); + ASSERT_EQ(true, NULL != buf); + read_size = fread(buf, 1, buf_size, output_file); + ASSERT_GT(read_size, 0); + fclose(output_file); + + // check decompression result + int64_t decomp_size = 0; + int decomp_buf_size = buf_size; + int *decomp_buf = (int *)malloc(decomp_buf_size); + ASSERT_EQ(true, NULL != decomp_buf); + compressor = (ObCompressor *)log_compressor.get_compressor(); + ASSERT_EQ(true, NULL != compressor); + ret = compressor->decompress((char *)buf, read_size, (char *)decomp_buf, decomp_buf_size, decomp_size); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(test_size, decomp_size); + for (int i = 0; i < test_count; i++) { + ASSERT_EQ(*(decomp_buf + i), i); + } + + // clear environment + free(buf); + free(decomp_buf); + ASSERT_NE(0, access(file_name.ptr(), F_OK)); + ASSERT_EQ(0, access(compression_file_name.ptr(), F_OK)); + unlink(compression_file_name.ptr()); + + // destroy and init + log_compressor.destroy(); + ret = log_compressor.init(); + ASSERT_EQ(OB_SUCCESS, ret); + + // repeat destroy + log_compressor.destroy(); + log_compressor.destroy(); +} + +} // namespace common +} // namespace oceanbase + +int main(int argc, char **argv) +{ + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/observer/ob_server.cpp b/src/observer/ob_server.cpp index 6c74f911e..7a736fa7d 100644 --- a/src/observer/ob_server.cpp +++ b/src/observer/ob_server.cpp @@ -74,6 +74,7 @@ #include "observer/ob_server_memory_cutter.h" #include "share/ob_bg_thread_monitor.h" #include "observer/omt/ob_tenant_timezone_mgr.h" +#include "lib/oblog/ob_log_compressor.h" //#include "share/ob_ofs.h" using namespace oceanbase::lib; @@ -172,10 +173,19 @@ int ObServer::init(const ObServerOptions& opts, const ObPLogWriterCfg& log_cfg) // set large page param ObLargePageHelper::set_param(config_.use_large_pages); + if (OB_SUCC(ret)) { + if (OB_FAIL(log_compressor_.init())) { + LOG_ERROR("log compressor init error.", K(ret)); + } + } + if (OB_SUCC(ret)) { if (OB_FAIL(OB_LOGGER.init(log_cfg))) { LOG_ERROR("async log init error.", K(ret)); ret = OB_ELECTION_ASYNC_LOG_WARN_INIT; + } else if (OB_FAIL(OB_LOGGER.set_log_compressor(&log_compressor_))) { + LOG_ERROR("set log compressor error.", K(ret)); + ret = OB_ELECTION_ASYNC_LOG_WARN_INIT; } } @@ -420,6 +430,8 @@ void ObServer::destroy() LOG_WARN("memory dump destroyed"); tenant_timezone_mgr_.destroy(); LOG_WARN("tenant timezone manager destroyed"); + log_compressor_.destroy(); + LOG_WARN("log compressor destroyed"); LOG_WARN("destroy observer end"); has_destroy_ = true; } @@ -951,10 +963,14 @@ int ObServer::init_pre_setting() // oblog configuration if (OB_SUCC(ret)) { const int max_log_cnt = static_cast(config_.max_syslog_file_count); + const int64_t max_log_time = config_.max_syslog_file_time; + const bool enable_log_compress = config_.enable_syslog_file_compress; const bool record_old_log_file = config_.enable_syslog_recycle; const bool log_warn = config_.enable_syslog_wf; const bool enable_async_syslog = config_.enable_async_syslog; OB_LOGGER.set_max_file_index(max_log_cnt); + OB_LOGGER.set_max_file_time(max_log_time); + OB_LOGGER.set_enable_file_compress(enable_log_compress); OB_LOGGER.set_record_old_log_file(record_old_log_file); LOG_INFO("Whether record old log file", K(record_old_log_file)); OB_LOGGER.set_log_warn(log_warn); diff --git a/src/observer/ob_server.h b/src/observer/ob_server.h index 8de312019..43c10118d 100644 --- a/src/observer/ob_server.h +++ b/src/observer/ob_server.h @@ -46,6 +46,7 @@ #include "observer/ob_service.h" #include "observer/ob_server_reload_config.h" #include "observer/ob_root_service_monitor.h" +#include "lib/oblog/ob_log_compressor.h" namespace oceanbase { namespace omt { @@ -426,6 +427,7 @@ private: blocksstable::ObStorageEnv storage_env_; share::ObSchemaStatusProxy schema_status_proxy_; ObSignalWorker sig_worker_; + common::ObLogCompressor log_compressor_; bool is_log_dir_empty_; sql::ObConnectResourceMgr conn_res_mgr_; diff --git a/src/share/config/ob_reload_config.cpp b/src/share/config/ob_reload_config.cpp index 3de6ac4d0..a015c843b 100644 --- a/src/share/config/ob_reload_config.cpp +++ b/src/share/config/ob_reload_config.cpp @@ -32,6 +32,10 @@ int ObReloadConfig::reload_ob_logger_set() K(ret)); } else if (OB_FAIL(OB_LOGGER.set_max_file_index(static_cast(conf_->max_syslog_file_count)))) { OB_LOG(ERROR, "fail to set_max_file_index", K(conf_->max_syslog_file_count.get()), K(ret)); + } else if (OB_FAIL(OB_LOGGER.set_max_file_time(conf_->max_syslog_file_time))) { + OB_LOG(ERROR, "fail to set_max_file_time", K(conf_->max_syslog_file_time.get()), K(ret)); + } else if (OB_FAIL(OB_LOGGER.set_enable_file_compress(conf_->enable_syslog_file_compress))) { + OB_LOG(ERROR, "fail to set_enable_file_compress", K(conf_->enable_syslog_file_compress.str()), K(ret)); } else if (OB_FAIL(OB_LOGGER.set_record_old_log_file(conf_->enable_syslog_recycle))) { OB_LOG(ERROR, "fail to set_record_old_log_file", K(conf_->enable_syslog_recycle.str()), K(ret)); } else { diff --git a/src/share/parameter/ob_parameter_seed.ipp b/src/share/parameter/ob_parameter_seed.ipp index 43d2fb2d3..6873aba9d 100644 --- a/src/share/parameter/ob_parameter_seed.ipp +++ b/src/share/parameter/ob_parameter_seed.ipp @@ -203,6 +203,15 @@ DEF_INT(cluster_id, OB_CLUSTER_PARAMETER, "0", "[1,4294901759]", "ID of the clus ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); DEF_STR(obconfig_url, OB_CLUSTER_PARAMETER, "", "URL for OBConfig service", ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); +DEF_BOOL(enable_syslog_file_compress, OB_CLUSTER_PARAMETER, "False", + "specifies whether to compress archive log files" + "Value: True:turned on; False: turned off", + ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); +DEF_TIME(max_syslog_file_time, OB_CLUSTER_PARAMETER, "0s", "[0s, 3650d]", + "specifies the maximum retention time of the log files. " + "When this value is set to 0s, no log file will be removed due to time. " + "with default 0s. Range: [0s, 3650d]", + ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); DEF_LOG_LEVEL(syslog_level, OB_CLUSTER_PARAMETER, "INFO", "specifies the current level of logging. There are DEBUG, TRACE, INFO, WARN, USER_ERR, ERROR, six different log " "levels.", @@ -214,7 +223,7 @@ DEF_INT(max_syslog_file_count, OB_CLUSTER_PARAMETER, "0", "[0,]", "specifies the maximum number of the log files " "that can co-exist before the log file recycling kicks in. " "Each log file can occupy at most 256MB disk space. " - "When this value is set to 0, no log file will be removed. Range: [0, +∞) in integer", + "When this value is set to 0, no log file will be removed due to the file count. Range: [0, +∞) in integer", ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); DEF_BOOL(enable_async_syslog, OB_CLUSTER_PARAMETER, "True", "specifies whether use async log for observer.log, elec.log and rs.log", diff --git a/test/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result b/test/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result index 280db7a6d..cd0759fc5 100644 --- a/test/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result +++ b/test/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result @@ -98,6 +98,7 @@ enable_separate_sys_clog enable_smooth_leader_switch enable_sql_audit enable_sql_operator_dump +enable_syslog_file_compress enable_syslog_recycle enable_syslog_wf enable_sys_table_ddl @@ -145,6 +146,7 @@ max_px_worker_count max_stale_time_for_weak_consistency max_string_print_length max_syslog_file_count +max_syslog_file_time memory_chunk_cache_size memory_limit memory_limit_percentage