[CP][FEAT MERGE] implement of log storage compression

This commit is contained in:
yyy-hust
2024-04-07 06:45:18 +00:00
committed by ob-robot
parent e007f676a5
commit 784d2231f6
50 changed files with 1173 additions and 284 deletions

View File

@ -21,6 +21,9 @@
#include "logservice/palf/log_entry.h" //LogEntry
#include "logservice/palf/log_define.h"
#include "logservice/ob_log_service.h"//open_palf
#ifdef OB_BUILD_LOG_STORAGE_COMPRESS
#include "logservice/ob_log_compression.h"
#endif
#include "share/scn.h"//SCN
#include "logservice/ob_garbage_collector.h"//ObGCLSLog
#include "logservice/palf_handle_guard.h"//ObPalfHandleGuard
@ -323,6 +326,13 @@ int ObRecoveryLSService::process_ls_log_(
ret = OB_INVALID_ARGUMENT;
LOG_WARN("tenant_info is invalid", KR(ret), K(tenant_info));
}
#ifdef OB_BUILD_LOG_STORAGE_COMPRESS
char *decompress_buf = NULL;
int64_t decompress_buf_len = 0;
int64_t decompressed_len = 0;
#endif
const char *log_body = NULL;
int64_t log_body_size = 0;
while (OB_SUCC(ret) && OB_SUCC(iterator.next())) {
if (OB_FAIL(iterator.get_entry(log_entry, target_lsn))) {
LOG_WARN("failed to get log", KR(ret), K(log_entry));
@ -359,25 +369,43 @@ int ObRecoveryLSService::process_ls_log_(
} else if (OB_UNLIKELY(log_pos >= log_length)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("log pos is not expected", KR(ret), K(log_pos), K(log_length));
} else if (logservice::GC_LS_LOG_BASE_TYPE == header.get_log_type()) {
ObGCLSLog gc_log;
if (OB_FAIL(gc_log.deserialize(log_buf, log_length, log_pos))) {
LOG_WARN("failed to deserialize gc log", KR(ret), K(log_length));
} else if (ObGCLSLOGType::OFFLINE_LS == gc_log.get_log_type()) {
//set sys ls offline
if (OB_FAIL(process_gc_log_(gc_log, sync_scn))) {
LOG_WARN("failed to process gc log", KR(ret), K(sync_scn));
#ifdef OB_BUILD_LOG_STORAGE_COMPRESS
} else if (header.is_compressed()) {
if (OB_FAIL(decompress_log_payload_(log_buf + log_pos, log_length - log_pos,
decompress_buf, decompressed_len))) {
LOG_WARN("failed to decompress log payload", K(header), K(log_entry));
} else {
log_body = decompress_buf;
log_body_size = decompressed_len;
}
#endif
} else {
log_body = log_buf + log_pos;
log_body_size = log_length - log_pos;
}
if (OB_SUCC(ret)) {
int64_t pos = 0;
if (logservice::GC_LS_LOG_BASE_TYPE == header.get_log_type()) {
ObGCLSLog gc_log;
if (OB_FAIL(gc_log.deserialize(log_body, log_body_size, pos))) {
LOG_WARN("failed to deserialize gc log", KR(ret), K(log_body_size));
} else if (ObGCLSLOGType::OFFLINE_LS == gc_log.get_log_type()) {
//set sys ls offline
if (OB_FAIL(process_gc_log_(gc_log, sync_scn))) {
LOG_WARN("failed to process gc log", KR(ret), K(sync_scn));
}
}
}
// nothing
} else if (logservice::TRANS_SERVICE_LOG_BASE_TYPE == header.get_log_type()) {
ObTxLogBlock tx_log_block;
if (OB_FAIL(tx_log_block.init_for_replay(log_buf, log_length, log_pos))) {
LOG_WARN("failed to init tx log block", KR(ret), K(log_length));
} else if (OB_FAIL(process_ls_tx_log_(tx_log_block, sync_scn))) {
LOG_WARN("failed to process ls tx log", KR(ret), K(tx_log_block), K(sync_scn));
}
} else {}
// nothing
} else if (logservice::TRANS_SERVICE_LOG_BASE_TYPE == header.get_log_type()) {
ObTxLogBlock tx_log_block;
if (OB_FAIL(tx_log_block.init_for_replay(log_body, log_body_size, pos))) {
LOG_WARN("failed to init tx log block", KR(ret), K(log_length));
} else if (OB_FAIL(process_ls_tx_log_(tx_log_block, sync_scn))) {
LOG_WARN("failed to process ls tx log", KR(ret), K(tx_log_block), K(sync_scn));
}
} else {}
}
if (OB_SUCC(ret)) {
last_sync_scn = sync_scn;
@ -393,14 +421,20 @@ int ObRecoveryLSService::process_ls_log_(
int tmp_ret = OB_SUCCESS;
const char* comment = OB_SUCC(ret) ? "regular report when iterate log" :
"report sync_scn -1 when not valid to process";
if (OB_TMP_FAIL(report_sys_ls_recovery_stat_(last_sync_scn, false,
comment))) {
if (OB_TMP_FAIL(report_sys_ls_recovery_stat_(last_sync_scn, false, comment))) {
LOG_WARN("failed to report ls recovery stat", KR(ret), KR(tmp_ret),
K(last_sync_scn), K(comment));
}
}
}
}//end for each log
#ifdef OB_BUILD_LOG_STORAGE_COMPRESS
if (NULL != decompress_buf) {
mtl_free(decompress_buf);
}
#endif
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
if (!sync_scn.is_valid() || start_scn >= sync_scn) {
@ -1548,6 +1582,24 @@ int ObRecoveryLSService::init_restore_status(const share::SCN &sync_scn, int err
return ret;
}
#ifdef OB_BUILD_LOG_STORAGE_COMPRESS
int ObRecoveryLSService::decompress_log_payload_(const char *in_buf, const int64_t in_buf_len,
char *&decompress_buf, int64_t &decompressed_len)
{
int ret = OB_SUCCESS;
ObMemAttr attr(MTL_ID(), "RecoveryLS");
const int64_t decompress_buf_len = palf::MAX_LOG_BODY_SIZE;
if (NULL == decompress_buf
&& NULL == (decompress_buf = static_cast<char *>(mtl_malloc(decompress_buf_len, attr)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate memory");
} else if (OB_FAIL(logservice::decompress(in_buf, in_buf_len, decompress_buf,
decompress_buf_len, decompressed_len))) {
LOG_WARN("failed to decompress");
} else {/*do nothing*/}
return ret;
}
#endif
}//end of namespace rootserver
}//end of namespace oceanbase